1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, DiskTruncation, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30 retained_json_output_pointer, COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES,
31 COMPRESS_INPUT_TAIL_BYTES, FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES,
32 RAW_PASSTHROUGH_HEAD_BYTES, RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES,
33 STRUCTURED_OUTPUT_CAP_BYTES,
34};
35use super::persistence::{
36 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
37 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
38 ExitMarker, PersistedTask, TaskPaths,
39};
40use super::process::is_process_alive;
41#[cfg(unix)]
42use super::process::terminate_pgid;
43#[cfg(windows)]
44use super::process::terminate_pid;
45use super::pty_process::spawn_pty_for_command;
46use super::pty_runtime::PtyRuntime;
47use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
48use super::{BgTaskInfo, BgTaskStatus};
49const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
52const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
53const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
54const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
55
56const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
57
58#[derive(Debug, Clone, Serialize)]
59pub struct BgCompletion {
60 pub task_id: String,
61 #[serde(skip_serializing)]
64 pub session_id: String,
65 pub status: BgTaskStatus,
66 pub exit_code: Option<i32>,
67 pub command: String,
68 #[serde(default, skip_serializing_if = "String::is_empty")]
74 pub output_preview: String,
75 #[serde(default, skip_serializing_if = "is_false")]
80 pub output_truncated: bool,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub original_tokens: Option<u32>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub compressed_tokens: Option<u32>,
89 #[serde(default, skip_serializing_if = "is_false")]
91 pub tokens_skipped: bool,
92}
93
94fn is_false(v: &bool) -> bool {
95 !*v
96}
97
98#[derive(Debug, Clone, Serialize)]
99pub struct BgTaskSnapshot {
100 #[serde(flatten)]
101 pub info: BgTaskInfo,
102 pub exit_code: Option<i32>,
103 pub child_pid: Option<u32>,
104 pub workdir: String,
105 pub output_preview: String,
106 pub output_truncated: bool,
107 pub output_path: Option<String>,
108 pub stderr_path: Option<String>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub pty_rows: Option<u16>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_cols: Option<u16>,
113}
114
115#[derive(Debug, Clone, Copy, PartialEq, Eq)]
116enum TerminalOutputKind {
117 Compressed,
118 Raw,
119 Structured,
120}
121
122#[derive(Debug, Clone, PartialEq, Eq)]
123struct TerminalOutputCache {
124 output_preview: String,
125 output_truncated: bool,
126 kind: TerminalOutputKind,
127 output_path: Option<String>,
128 stderr_path: Option<String>,
129 recovery: Option<RecoveryContext>,
130}
131
132#[derive(Debug, Clone, PartialEq, Eq)]
133struct RecoveryContext {
134 dropped_by_class: BTreeMap<DropClass, usize>,
135 had_inner_drop: bool,
136 offset_hint_eligible: bool,
137 offset_start_line: Option<usize>,
138 byte_truncated: bool,
139 disk_truncated_prefix_bytes: u64,
140 output_path: Option<String>,
141 stderr_path: Option<String>,
142 include_stderr_path: bool,
143}
144
145impl RecoveryContext {
146 fn has_visible_drop(&self) -> bool {
147 self.byte_truncated
148 || self.disk_truncated_prefix_bytes > 0
149 || self.had_inner_drop
150 || !self.dropped_by_class.is_empty()
151 }
152}
153
154#[derive(Clone)]
155pub struct BgTaskRegistry {
156 pub(crate) inner: Arc<RegistryInner>,
157}
158
159pub(crate) struct RegistryInner {
160 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
161 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
162 pub(crate) progress_sender: SharedProgressSender,
163 watchdog_started: AtomicBool,
164 pub(crate) shutdown: AtomicBool,
165 pub(crate) long_running_reminder_enabled: AtomicBool,
166 pub(crate) long_running_reminder_interval_ms: AtomicU64,
167 persisted_gc_started: AtomicBool,
168 #[cfg(test)]
169 persisted_gc_runs: AtomicU64,
170 pub(crate) compressor:
176 Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
177 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
178 pub(crate) db_harness: RwLock<Option<String>>,
179 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
180 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
181 pub(crate) watch_registry: Mutex<WatchRegistry>,
182}
183
184pub(crate) struct BgTask {
185 pub(crate) task_id: String,
186 pub(crate) session_id: String,
187 pub(crate) paths: TaskPaths,
188 pub(crate) started: Instant,
189 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
190 pub(crate) terminal_at: Mutex<Option<Instant>>,
191 pub(crate) state: Mutex<BgTaskState>,
192}
193
194pub(crate) enum TaskRuntime {
195 Piped(Option<Child>),
196 Pty(Option<PtyRuntime>),
197}
198
199pub(crate) struct BgTaskState {
200 pub(crate) metadata: PersistedTask,
201 pub(crate) runtime: TaskRuntime,
202 pub(crate) detached: bool,
203 pub(crate) child_exit_observed: bool,
214 pub(crate) buffer: BgBuffer,
215 terminal_output_cache: Option<TerminalOutputCache>,
216 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
218}
219
220fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
221 session_id
222 .map(|session_id| completion.session_id == session_id)
223 .unwrap_or(true)
224}
225
226impl BgTaskRegistry {
227 pub fn new(progress_sender: SharedProgressSender) -> Self {
228 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
229 Self {
230 inner: Arc::new(RegistryInner {
231 tasks: Mutex::new(HashMap::new()),
232 completions: Mutex::new(VecDeque::new()),
233 progress_sender,
234 watchdog_started: AtomicBool::new(false),
235 shutdown: AtomicBool::new(false),
236 long_running_reminder_enabled: AtomicBool::new(true),
237 long_running_reminder_interval_ms: AtomicU64::new(600_000),
238 persisted_gc_started: AtomicBool::new(false),
239 #[cfg(test)]
240 persisted_gc_runs: AtomicU64::new(0),
241 compressor: Mutex::new(None),
242 db_pool: RwLock::new(None),
243 db_harness: RwLock::new(None),
244 wake_tx,
245 wake_rx,
246 watch_registry: Mutex::new(WatchRegistry::default()),
247 }),
248 }
249 }
250
251 pub fn set_harness(&self, harness: Harness) {
252 if let Ok(mut slot) = self.inner.db_harness.write() {
253 *slot = Some(harness.storage_segment());
254 }
255 }
256
257 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
258 if let Ok(mut slot) = self.inner.db_pool.write() {
259 *slot = Some(conn);
260 }
261 }
262
263 pub fn clear_db_pool(&self) {
264 if let Ok(mut slot) = self.inner.db_pool.write() {
265 *slot = None;
266 }
267 }
268
269 pub fn set_compressor<F>(&self, compressor: F)
274 where
275 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
276 {
277 self.set_compressor_with_exit_code(move |command, output, _exit_code| {
278 compressor(command, output)
279 });
280 }
281
282 pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
283 where
284 F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
285 {
286 if let Ok(mut slot) = self.inner.compressor.lock() {
287 *slot = Some(Box::new(compressor));
288 }
289 }
290
291 pub(crate) fn compress_output(
294 &self,
295 command: &str,
296 output: String,
297 exit_code: Option<i32>,
298 ) -> CompressionResult {
299 let Ok(slot) = self.inner.compressor.lock() else {
300 return CompressionResult::new(output);
301 };
302 match slot.as_ref() {
303 Some(compressor) => compressor(command, output, exit_code),
304 None => CompressionResult::new(output),
305 }
306 }
307
308 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
309 let (metadata, buffer) = {
310 let state = task.state.lock().ok()?;
311 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
312 return None;
313 }
314 if let Some(cache) = state.terminal_output_cache.clone() {
315 return Some(cache);
316 }
317 (state.metadata.clone(), state.buffer.clone())
318 };
319
320 let mut cap_buffer = buffer.clone();
321 let disk_truncation = cap_buffer.enforce_terminal_cap();
322 let cache = self.render_terminal_output(&metadata, &cap_buffer, disk_truncation);
323 let mut state = task.state.lock().ok()?;
324 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
325 return None;
326 }
327 if let Some(existing) = state.terminal_output_cache.clone() {
328 return Some(existing);
329 }
330 state.terminal_output_cache = Some(cache.clone());
331 Some(cache)
332 }
333
334 fn render_terminal_output(
335 &self,
336 metadata: &PersistedTask,
337 buffer: &BgBuffer,
338 disk_truncation: DiskTruncation,
339 ) -> TerminalOutputCache {
340 if metadata.mode == BgMode::Pty {
341 return TerminalOutputCache {
342 output_preview: String::new(),
343 output_truncated: false,
344 kind: TerminalOutputKind::Raw,
345 output_path: buffer.output_path().map(|path| path.display().to_string()),
346 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
347 recovery: None,
348 };
349 }
350
351 if let Some(structured) =
352 render_structured_output(&metadata.command, buffer, disk_truncation)
353 {
354 return structured;
355 }
356
357 if !metadata.compressed {
358 return render_raw_passthrough(buffer, disk_truncation);
359 }
360
361 let raw = buffer.read_combined_head_tail(
362 COMPRESS_INPUT_CAP_BYTES,
363 COMPRESS_INPUT_HEAD_BYTES,
364 COMPRESS_INPUT_TAIL_BYTES,
365 );
366 let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
367 render_compressed_with_recovery(buffer, compressed, raw.truncated, disk_truncation)
368 }
369
370 fn snapshot_with_terminal_cache(
371 &self,
372 task: &Arc<BgTask>,
373 preview_bytes: usize,
374 ) -> BgTaskSnapshot {
375 let mut snapshot = task.snapshot(preview_bytes);
376 self.maybe_compress_snapshot(task, &mut snapshot);
377 snapshot
378 }
379
380 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
381 let (metadata, buffer) = {
382 let state = task
383 .state
384 .lock()
385 .map_err(|_| "background task lock poisoned".to_string())?;
386 if !state.metadata.status.is_terminal() {
387 return Ok(());
388 }
389 (state.metadata.clone(), state.buffer.clone())
390 };
391
392 let cache = self.ensure_terminal_output_cache(task);
393 self.enqueue_completion_from_parts(
394 &metadata,
395 Some(&buffer),
396 None,
397 emit_frame,
398 cache.as_ref(),
399 );
400 Ok(())
401 }
402
403 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
404 write_task(&paths.json, metadata)?;
405 self.dual_write_task(paths, metadata);
406 Ok(())
407 }
408
409 fn update_task_metadata<F>(
410 &self,
411 paths: &TaskPaths,
412 update: F,
413 ) -> std::io::Result<PersistedTask>
414 where
415 F: FnOnce(&mut PersistedTask),
416 {
417 let metadata = update_task(&paths.json, update)?;
418 self.dual_write_task(paths, &metadata);
419 Ok(metadata)
420 }
421
422 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
423 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
424 let Some(pool) = pool else {
425 return;
426 };
427 let harness = self
428 .inner
429 .db_harness
430 .read()
431 .ok()
432 .and_then(|slot| slot.clone());
433 let Some(harness) = harness else {
434 crate::slog_warn!(
435 "dual-write bash_task to DB skipped for {}: harness not configured",
436 metadata.task_id
437 );
438 return;
439 };
440 let row = match metadata.to_bash_task_row(&harness, paths) {
441 Ok(row) => row,
442 Err(error) => {
443 crate::slog_warn!(
444 "dual-write bash_task to DB failed for {}: {}",
445 metadata.task_id,
446 error
447 );
448 return;
449 }
450 };
451 let conn = match pool.lock() {
452 Ok(conn) => conn,
453 Err(_) => {
454 crate::slog_warn!(
455 "dual-write bash_task to DB failed for {}: db mutex poisoned",
456 metadata.task_id
457 );
458 return;
459 }
460 };
461 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
462 crate::slog_warn!(
463 "dual-write bash_task to DB failed for {}: {}",
464 metadata.task_id,
465 error
466 );
467 }
468 }
469
470 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
471 self.inner
472 .long_running_reminder_enabled
473 .store(enabled, Ordering::SeqCst);
474 self.inner
475 .long_running_reminder_interval_ms
476 .store(interval_ms, Ordering::SeqCst);
477 }
478
479 #[cfg(unix)]
480 #[allow(clippy::too_many_arguments)]
481 pub fn spawn(
482 &self,
483 command: &str,
484 session_id: String,
485 workdir: PathBuf,
486 env: HashMap<String, String>,
487 timeout: Option<Duration>,
488 storage_dir: PathBuf,
489 max_running: usize,
490 notify_on_completion: bool,
491 compressed: bool,
492 project_root: Option<PathBuf>,
493 ) -> Result<String, String> {
494 self.start_watchdog();
495
496 let running = self.running_count();
497 if running >= max_running {
498 return Err(format!(
499 "background bash task limit exceeded: {running} running (max {max_running})"
500 ));
501 }
502
503 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
504 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
505 let task_id = self.generate_unique_task_id()?;
506 let paths = task_paths(&storage_dir, &session_id, &task_id);
507 fs::create_dir_all(&paths.dir)
508 .map_err(|e| format!("failed to create background task dir: {e}"))?;
509
510 let mut metadata = PersistedTask::starting(
511 task_id.clone(),
512 session_id.clone(),
513 command.to_string(),
514 workdir.clone(),
515 project_root,
516 timeout_ms,
517 notify_on_completion,
518 compressed,
519 );
520 self.persist_task(&paths, &metadata)
521 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
522
523 create_capture_file(&paths.stdout)
527 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
528 create_capture_file(&paths.stderr)
529 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
530
531 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
532 Ok(child) => child,
533 Err(error) => {
534 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
535 let _ = delete_task_bundle(&paths);
536 return Err(error);
537 }
538 };
539
540 let child_pid = child.id();
541 metadata.mark_running(child_pid, child_pid as i32);
542 self.persist_task(&paths, &metadata)
543 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
544
545 let task = Arc::new(BgTask {
546 task_id: task_id.clone(),
547 session_id,
548 paths: paths.clone(),
549 started: Instant::now(),
550 last_reminder_at: Mutex::new(None),
551 terminal_at: Mutex::new(None),
552 state: Mutex::new(BgTaskState {
553 metadata,
554 runtime: TaskRuntime::Piped(Some(child)),
555 detached: false,
556 child_exit_observed: false,
557 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
558 terminal_output_cache: None,
559 pending_terminal_override: None,
560 }),
561 });
562
563 self.inner
564 .tasks
565 .lock()
566 .map_err(|_| "background task registry lock poisoned".to_string())?
567 .insert(task_id.clone(), task);
568
569 Ok(task_id)
570 }
571
572 #[allow(clippy::too_many_arguments)]
573 pub fn spawn_pty(
574 &self,
575 command: &str,
576 session_id: String,
577 workdir: PathBuf,
578 env: HashMap<String, String>,
579 timeout: Option<Duration>,
580 storage_dir: PathBuf,
581 max_running: usize,
582 notify_on_completion: bool,
583 compressed: bool,
584 project_root: Option<PathBuf>,
585 rows: u16,
586 cols: u16,
587 ) -> Result<String, String> {
588 self.start_watchdog();
589
590 let running = self.running_count();
591 if running >= max_running {
592 return Err(format!(
593 "background bash task limit exceeded: {running} running (max {max_running})"
594 ));
595 }
596
597 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
598 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
599 let task_id = self.generate_unique_task_id()?;
600 let paths = task_paths(&storage_dir, &session_id, &task_id);
601 fs::create_dir_all(&paths.dir)
602 .map_err(|e| format!("failed to create background task dir: {e}"))?;
603
604 let mut metadata = PersistedTask::starting(
605 task_id.clone(),
606 session_id.clone(),
607 command.to_string(),
608 workdir.clone(),
609 project_root,
610 timeout_ms,
611 notify_on_completion,
612 compressed,
613 );
614 metadata.mode = BgMode::Pty;
615 metadata.pty_rows = Some(rows);
616 metadata.pty_cols = Some(cols);
617 self.persist_task(&paths, &metadata)
618 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
619 create_capture_file(&paths.pty)
620 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
621
622 let runtime = match spawn_pty_for_command(
623 &task_id,
624 &session_id,
625 command,
626 &paths,
627 &workdir,
628 &env,
629 rows,
630 cols,
631 self.inner.wake_tx.clone(),
632 ) {
633 Ok(runtime) => runtime,
634 Err(error) => {
635 crate::slog_warn!(
636 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
637 );
638 let _ = delete_task_bundle(&paths);
639 return Err(error);
640 }
641 };
642
643 if let Some(child_pid) = runtime.child_pid {
644 metadata.mark_running(child_pid, child_pid as i32);
645 } else {
646 metadata.status = BgTaskStatus::Running;
647 metadata.pgid = None;
648 }
649 self.persist_task(&paths, &metadata)
650 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
651
652 let task = Arc::new(BgTask {
653 task_id: task_id.clone(),
654 session_id,
655 paths: paths.clone(),
656 started: Instant::now(),
657 last_reminder_at: Mutex::new(None),
658 terminal_at: Mutex::new(None),
659 state: Mutex::new(BgTaskState {
660 metadata,
661 runtime: TaskRuntime::Pty(Some(runtime)),
662 detached: false,
663 child_exit_observed: false,
664 buffer: BgBuffer::pty(paths.pty.clone()),
665 terminal_output_cache: None,
666 pending_terminal_override: None,
667 }),
668 });
669
670 self.inner
671 .tasks
672 .lock()
673 .map_err(|_| "background task registry lock poisoned".to_string())?
674 .insert(task_id.clone(), task);
675
676 Ok(task_id)
677 }
678
679 #[cfg(windows)]
680 #[allow(clippy::too_many_arguments)]
681 pub fn spawn(
682 &self,
683 command: &str,
684 session_id: String,
685 workdir: PathBuf,
686 env: HashMap<String, String>,
687 timeout: Option<Duration>,
688 storage_dir: PathBuf,
689 max_running: usize,
690 notify_on_completion: bool,
691 compressed: bool,
692 project_root: Option<PathBuf>,
693 ) -> Result<String, String> {
694 self.start_watchdog();
695
696 let running = self.running_count();
697 if running >= max_running {
698 return Err(format!(
699 "background bash task limit exceeded: {running} running (max {max_running})"
700 ));
701 }
702
703 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
704 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
705 let task_id = self.generate_unique_task_id()?;
706 let paths = task_paths(&storage_dir, &session_id, &task_id);
707 fs::create_dir_all(&paths.dir)
708 .map_err(|e| format!("failed to create background task dir: {e}"))?;
709
710 let mut metadata = PersistedTask::starting(
711 task_id.clone(),
712 session_id.clone(),
713 command.to_string(),
714 workdir.clone(),
715 project_root,
716 timeout_ms,
717 notify_on_completion,
718 compressed,
719 );
720 self.persist_task(&paths, &metadata)
721 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
722
723 create_capture_file(&paths.stdout)
729 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
730 create_capture_file(&paths.stderr)
731 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
732
733 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
734 Ok(child) => child,
735 Err(error) => {
736 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
737 let _ = delete_task_bundle(&paths);
738 return Err(error);
739 }
740 };
741
742 let child_pid = child.id();
743 metadata.status = BgTaskStatus::Running;
744 metadata.child_pid = Some(child_pid);
745 metadata.pgid = None;
746 self.persist_task(&paths, &metadata)
747 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
748
749 let task = Arc::new(BgTask {
750 task_id: task_id.clone(),
751 session_id,
752 paths: paths.clone(),
753 started: Instant::now(),
754 last_reminder_at: Mutex::new(None),
755 terminal_at: Mutex::new(None),
756 state: Mutex::new(BgTaskState {
757 metadata,
758 runtime: TaskRuntime::Piped(Some(child)),
759 detached: false,
760 child_exit_observed: false,
761 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
762 terminal_output_cache: None,
763 pending_terminal_override: None,
764 }),
765 });
766
767 self.inner
768 .tasks
769 .lock()
770 .map_err(|_| "background task registry lock poisoned".to_string())?
771 .insert(task_id.clone(), task);
772
773 Ok(task_id)
774 }
775
776 pub fn write_pty(
777 &self,
778 task_id: &str,
779 session_id: &str,
780 input: &[u8],
781 ) -> Result<usize, String> {
782 let task = self
783 .task_for_session(task_id, session_id)
784 .ok_or_else(|| "task_not_found".to_string())?;
785
786 let writer = {
787 let state = task
788 .state
789 .lock()
790 .map_err(|_| "background task lock poisoned".to_string())?;
791 if state.metadata.mode != BgMode::Pty {
792 return Err("task_not_pty".to_string());
793 }
794 if state.metadata.status.is_terminal() {
795 return Err("task_exited".to_string());
796 }
797 match &state.runtime {
798 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
799 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
800 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
801 }
802 };
803
804 let mut writer = writer
805 .lock()
806 .map_err(|_| "PTY writer lock poisoned".to_string())?;
807 writer
808 .write_all(input)
809 .map_err(|error| format!("failed to write to PTY: {error}"))?;
810 writer
811 .flush()
812 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
813 Ok(input.len())
814 }
815
816 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
817 self.replay_session_inner(storage_dir, session_id, None)
818 }
819
820 pub fn replay_session_for_project(
821 &self,
822 storage_dir: &Path,
823 session_id: &str,
824 project_root: &Path,
825 ) -> Result<(), String> {
826 self.replay_session_inner(storage_dir, session_id, Some(project_root))
827 }
828
829 fn replay_session_inner(
830 &self,
831 storage_dir: &Path,
832 session_id: &str,
833 project_root: Option<&Path>,
834 ) -> Result<(), String> {
835 self.start_watchdog();
836 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
837 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
838 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
839 }
840 }
841
842 let canonical_project = project_root.map(canonicalized_path);
843 let tasks = match self.replay_session_from_db(session_id) {
855 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
856 Some(Ok(_)) => {
857 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
858 if !disk_tasks.is_empty() {
859 crate::slog_info!(
860 "bash task replay: 0 in DB for session {}, {} from disk fallback",
861 session_id,
862 disk_tasks.len()
863 );
864 }
865 disk_tasks
866 }
867 Some(Err(error)) => {
868 crate::slog_warn!(
869 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
870 session_id,
871 error
872 );
873 self.replay_session_from_disk(storage_dir, session_id)?
874 }
875 None => {
876 self.replay_session_from_disk(storage_dir, session_id)?
878 }
879 };
880
881 for mut metadata in tasks {
882 if metadata.session_id != session_id {
883 continue;
884 }
885 if let Some(canonical_project) = canonical_project.as_deref() {
886 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
887 if metadata_project.as_deref() != Some(canonical_project) {
888 continue;
889 }
890 }
891
892 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
893 match metadata.status {
894 BgTaskStatus::Starting => {
895 let completion_was_delivered = metadata.completion_delivered;
896 metadata.mark_terminal(
897 BgTaskStatus::Failed,
898 None,
899 Some("spawn aborted".to_string()),
900 );
901 metadata.completion_delivered |= completion_was_delivered;
902 let _ = self.persist_task(&paths, &metadata);
903 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
904 self.insert_rehydrated_task(metadata, paths, true)?;
905 }
906 BgTaskStatus::Running | BgTaskStatus::Killing => {
907 if metadata.mode == BgMode::Pty {
908 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
909 let completion_was_delivered = metadata.completion_delivered;
910 metadata = terminal_metadata_from_marker(metadata, marker, None);
911 metadata.completion_delivered |= completion_was_delivered;
912 let _ = self.persist_task(&paths, &metadata);
913 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
914 self.insert_rehydrated_task(metadata, paths, true)?;
915 } else if metadata.status.is_terminal() {
916 self.insert_rehydrated_task(metadata, paths, true)?;
917 } else {
918 let completion_was_delivered = metadata.completion_delivered;
919 metadata.mark_terminal(
920 BgTaskStatus::Killed,
921 None,
922 Some("pty_lost_on_bridge_restart".to_string()),
923 );
924 metadata.completion_delivered |= completion_was_delivered;
925 let _ = self.persist_task(&paths, &metadata);
926 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
927 self.insert_rehydrated_task(metadata, paths, true)?;
928 }
929 } else if self.running_metadata_is_stale(&metadata) {
930 let completion_was_delivered = metadata.completion_delivered;
931 metadata.mark_terminal(
932 BgTaskStatus::Killed,
933 None,
934 Some("orphaned (>24h)".to_string()),
935 );
936 metadata.completion_delivered |= completion_was_delivered;
937 if !paths.exit.exists() {
938 let _ = write_kill_marker_if_absent(&paths.exit);
939 }
940 let _ = self.persist_task(&paths, &metadata);
941 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
942 self.insert_rehydrated_task(metadata, paths, true)?;
943 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
944 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
945 "recovered from inconsistent killing state on replay".to_string()
946 });
947 if reason.is_some() {
948 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
949 metadata.task_id);
950 }
951 let completion_was_delivered = metadata.completion_delivered;
952 metadata = terminal_metadata_from_marker(metadata, marker, reason);
953 metadata.completion_delivered |= completion_was_delivered;
954 let _ = self.persist_task(&paths, &metadata);
955 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
956 self.insert_rehydrated_task(metadata, paths, true)?;
957 } else if metadata.status == BgTaskStatus::Killing {
958 if !paths.exit.exists() {
959 let _ = write_kill_marker_if_absent(&paths.exit);
960 }
961 let completion_was_delivered = metadata.completion_delivered;
962 metadata.mark_terminal(
963 BgTaskStatus::Killed,
964 None,
965 Some("recovered from inconsistent killing state on replay".to_string()),
966 );
967 metadata.completion_delivered |= completion_was_delivered;
968 let _ = self.persist_task(&paths, &metadata);
969 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
970 self.insert_rehydrated_task(metadata, paths, true)?;
971 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
972 let completion_was_delivered = metadata.completion_delivered;
973 metadata.mark_terminal(
974 BgTaskStatus::Failed,
975 None,
976 Some("process exited without exit marker".to_string()),
977 );
978 metadata.completion_delivered |= completion_was_delivered;
979 let _ = self.persist_task(&paths, &metadata);
980 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
981 self.insert_rehydrated_task(metadata, paths, true)?;
982 } else {
983 self.insert_rehydrated_task(metadata, paths, true)?;
984 }
985 }
986 _ if metadata.status.is_terminal() => {
987 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
993 self.insert_rehydrated_task(metadata, paths, true)?;
994 }
995 _ => {}
996 }
997 }
998
999 Ok(())
1000 }
1001
1002 fn replay_session_from_db(
1003 &self,
1004 session_id: &str,
1005 ) -> Option<Result<Vec<PersistedTask>, String>> {
1006 let pool = self
1007 .inner
1008 .db_pool
1009 .read()
1010 .ok()
1011 .and_then(|slot| slot.clone())?;
1012 let harness = self
1013 .inner
1014 .db_harness
1015 .read()
1016 .ok()
1017 .and_then(|slot| slot.clone())?;
1018 let conn = match pool.lock() {
1019 Ok(conn) => conn,
1020 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1021 };
1022 Some(
1023 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1024 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1025 .map_err(|error| error.to_string()),
1026 )
1027 }
1028
1029 fn replay_session_from_disk(
1030 &self,
1031 storage_dir: &Path,
1032 session_id: &str,
1033 ) -> Result<Vec<PersistedTask>, String> {
1034 let dir = session_tasks_dir(storage_dir, session_id);
1035 if !dir.exists() {
1036 return Ok(Vec::new());
1037 }
1038
1039 let entries = fs::read_dir(&dir)
1040 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1041 let mut tasks = Vec::new();
1042 for entry in entries.flatten() {
1043 let path = entry.path();
1044 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1045 continue;
1046 }
1047 match read_task(&path) {
1048 Ok(metadata) => tasks.push(metadata),
1049 Err(error) => {
1050 crate::slog_warn!(
1051 "quarantining invalid background task metadata {} during replay: {error}",
1052 path.display()
1053 );
1054 if let Err(quarantine_error) =
1055 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1056 {
1057 crate::slog_warn!(
1058 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1059 path.display()
1060 );
1061 }
1062 }
1063 }
1064 }
1065 Ok(tasks)
1066 }
1067
1068 pub fn register_watch(
1069 &self,
1070 task_id: String,
1071 pattern: WatchPattern,
1072 once: bool,
1073 ) -> Result<String, &'static str> {
1074 let task = self.task(&task_id).ok_or("task_not_found")?;
1075 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1076 .state
1077 .lock()
1078 .map(|state| {
1079 (
1080 state.metadata.mode.clone(),
1081 state.metadata.status.is_terminal(),
1082 task.paths.stdout.clone(),
1083 task.paths.stderr.clone(),
1084 task.paths.pty.clone(),
1085 )
1086 })
1087 .map_err(|_| "background_task_lock_poisoned")?;
1088
1089 let mut terminal_matches = Vec::new();
1090 let scanned_terminal = terminal_at_registration;
1091 let watch_id = {
1092 let mut registry = self
1093 .inner
1094 .watch_registry
1095 .lock()
1096 .map_err(|_| "watch_registry_poisoned")?;
1097 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1098 match &mode {
1099 BgMode::Pipes => {
1100 let stdout_key = format!("{task_id}:stdout");
1101 let stderr_key = format!("{task_id}:stderr");
1102 if terminal_at_registration {
1103 registry.set_file_cursor(&stdout_key, 0);
1104 registry.set_file_cursor(&stderr_key, 0);
1105 terminal_matches.extend(registry.scan_file_new_bytes(
1106 &stdout_key,
1107 &task_id,
1108 &stdout,
1109 ));
1110 terminal_matches.extend(registry.scan_file_new_bytes(
1111 &stderr_key,
1112 &task_id,
1113 &stderr,
1114 ));
1115 } else {
1116 registry.prime_file_cursor(&stdout_key, &stdout);
1117 registry.prime_file_cursor(&stderr_key, &stderr);
1118 }
1119 }
1120 BgMode::Pty => {
1121 let pty_key = format!("{task_id}:pty");
1122 if terminal_at_registration {
1123 registry.set_file_cursor(&pty_key, 0);
1124 terminal_matches
1125 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1126 } else {
1127 registry.prime_file_cursor(&pty_key, &pty);
1128 }
1129 }
1130 }
1131 watch_id
1132 };
1133
1134 if task.is_terminal() {
1135 if !scanned_terminal {
1136 terminal_matches = {
1137 let mut registry = self
1138 .inner
1139 .watch_registry
1140 .lock()
1141 .map_err(|_| "watch_registry_poisoned")?;
1142 match &mode {
1143 BgMode::Pipes => {
1144 let stdout_key = format!("{task_id}:stdout");
1145 let stderr_key = format!("{task_id}:stderr");
1146 registry.set_file_cursor(&stdout_key, 0);
1147 registry.set_file_cursor(&stderr_key, 0);
1148 let mut matches =
1149 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1150 matches.extend(registry.scan_file_new_bytes(
1151 &stderr_key,
1152 &task_id,
1153 &stderr,
1154 ));
1155 matches
1156 }
1157 BgMode::Pty => {
1158 let pty_key = format!("{task_id}:pty");
1159 registry.set_file_cursor(&pty_key, 0);
1160 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1161 }
1162 }
1163 };
1164 }
1165
1166 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1167 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1168 if watch_matched {
1169 let _ = task.set_completion_delivered(true, self);
1170 self.clear_task_watch_state(&task_id);
1171 }
1172 return Ok(watch_id);
1173 }
1174
1175 let completion = self
1176 .remove_pending_completion(&task_id)
1177 .or_else(|| self.completion_snapshot_for_task(&task));
1178 if terminal_matches.is_empty() {
1179 if let Some(completion) = completion.as_ref() {
1180 self.emit_bash_watch_exit(completion);
1181 }
1182 } else {
1183 for pattern_match in terminal_matches {
1184 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1185 }
1186 }
1187 let _ = task.set_completion_delivered(true, self);
1188 self.clear_task_watch_state(&task_id);
1189 }
1190
1191 Ok(watch_id)
1192 }
1193
1194 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1195 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1196 registry.unregister(task_id, watch_id);
1197 }
1198 }
1199
1200 pub fn active_watch_count(&self, task_id: &str) -> usize {
1201 self.inner
1202 .watch_registry
1203 .lock()
1204 .map(|registry| registry.active_count(task_id))
1205 .unwrap_or(0)
1206 }
1207
1208 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1209 self.inner
1210 .watch_registry
1211 .lock()
1212 .map(|registry| {
1213 (
1214 registry.has_controlled_task(task_id),
1215 registry.has_matched_task(task_id),
1216 )
1217 })
1218 .unwrap_or((false, false))
1219 }
1220
1221 fn task_has_watch_control(&self, task_id: &str) -> bool {
1222 self.inner
1223 .watch_registry
1224 .lock()
1225 .map(|registry| registry.has_controlled_task(task_id))
1226 .unwrap_or(false)
1227 }
1228
1229 fn clear_task_watch_state(&self, task_id: &str) {
1230 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1231 registry.clear_task(task_id);
1232 }
1233 }
1234
1235 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1236 let (mode, stdout, stderr, pty) = match task.state.lock() {
1237 Ok(state) => (
1238 state.metadata.mode.clone(),
1239 task.paths.stdout.clone(),
1240 task.paths.stderr.clone(),
1241 task.paths.pty.clone(),
1242 ),
1243 Err(_) => return,
1244 };
1245 let mut matches = Vec::new();
1246 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1247 match mode {
1248 BgMode::Pipes => {
1249 let stdout_key = format!("{}:stdout", task.task_id);
1250 let stderr_key = format!("{}:stderr", task.task_id);
1251 matches.extend(registry.scan_file_new_bytes(
1252 &stdout_key,
1253 &task.task_id,
1254 &stdout,
1255 ));
1256 matches.extend(registry.scan_file_new_bytes(
1257 &stderr_key,
1258 &task.task_id,
1259 &stderr,
1260 ));
1261 }
1262 BgMode::Pty => {
1263 let pty_key = format!("{}:pty", task.task_id);
1264 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1265 }
1266 }
1267 }
1268 for pattern_match in matches {
1269 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1270 }
1271 }
1272
1273 pub fn status(
1274 &self,
1275 task_id: &str,
1276 session_id: &str,
1277 project_root: Option<&Path>,
1278 storage_dir: Option<&Path>,
1279 preview_bytes: usize,
1280 ) -> Option<BgTaskSnapshot> {
1281 let mut task = self.task_for_session(task_id, session_id);
1282 if task.is_none() {
1283 if let Some(storage_dir) = storage_dir {
1284 let _ = if let Some(project_root) = project_root {
1285 self.replay_session_for_project(storage_dir, session_id, project_root)
1286 } else {
1287 self.replay_session(storage_dir, session_id)
1288 };
1289 task = self.task_for_session(task_id, session_id);
1290 }
1291 }
1292 let Some(task) = task else {
1293 return self.status_relaxed(
1294 task_id,
1295 session_id,
1296 project_root?,
1297 storage_dir?,
1298 preview_bytes,
1299 );
1300 };
1301 let _ = self.poll_task(&task);
1302 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1303 }
1304
1305 fn status_relaxed_task(
1306 &self,
1307 task_id: &str,
1308 project_root: &Path,
1309 storage_dir: &Path,
1310 ) -> Option<Arc<BgTask>> {
1311 let canonical_project = canonicalized_path(project_root);
1312 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1313 Some(Ok(Some(metadata))) => {
1314 if let Some(task) = self.task(task_id) {
1315 let matches_project = task
1316 .state
1317 .lock()
1318 .map(|state| {
1319 state
1320 .metadata
1321 .project_root
1322 .as_deref()
1323 .map(canonicalized_path)
1324 .as_deref()
1325 == Some(canonical_project.as_path())
1326 })
1327 .unwrap_or(false);
1328 return matches_project.then_some(task);
1329 }
1330 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1331 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1332 return None;
1333 }
1334 return self.task(task_id);
1335 }
1336 Some(Ok(None)) => {
1337 crate::slog_info!(
1338 "bash task relaxed DB miss for {}; falling back to disk",
1339 task_id
1340 );
1341 }
1342 Some(Err(error)) => {
1343 crate::slog_warn!(
1344 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1345 task_id,
1346 error
1347 );
1348 }
1349 None => {
1350 crate::slog_info!(
1351 "bash task relaxed DB unavailable for {}; falling back to disk",
1352 task_id
1353 );
1354 }
1355 }
1356 let root = storage_dir.join("bash-tasks");
1357 let entries = fs::read_dir(&root).ok()?;
1358 for entry in entries.flatten() {
1359 let dir = entry.path();
1360 if !dir.is_dir() {
1361 continue;
1362 }
1363 let path = dir.join(format!("{task_id}.json"));
1364 if !path.exists() {
1365 continue;
1366 }
1367 let metadata = match read_task(&path) {
1368 Ok(metadata) => metadata,
1369 Err(error) => {
1370 crate::slog_warn!(
1371 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1372 path.display()
1373 );
1374 if let Err(quarantine_error) =
1375 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1376 {
1377 crate::slog_warn!(
1378 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1379 path.display()
1380 );
1381 }
1382 continue;
1383 }
1384 };
1385 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1386 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1387 continue;
1388 }
1389 if let Some(task) = self.task(task_id) {
1390 let matches_project = task
1391 .state
1392 .lock()
1393 .map(|state| {
1394 state
1395 .metadata
1396 .project_root
1397 .as_deref()
1398 .map(canonicalized_path)
1399 .as_deref()
1400 == Some(canonical_project.as_path())
1401 })
1402 .unwrap_or(false);
1403 return matches_project.then_some(task);
1404 }
1405 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1406 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1407 return None;
1408 }
1409 return self.task(task_id);
1410 }
1411 None
1412 }
1413
1414 fn lookup_relaxed_task_from_db(
1415 &self,
1416 task_id: &str,
1417 project_root: &Path,
1418 ) -> Option<Result<Option<PersistedTask>, String>> {
1419 let pool = self
1420 .inner
1421 .db_pool
1422 .read()
1423 .ok()
1424 .and_then(|slot| slot.clone())?;
1425 let harness = self
1426 .inner
1427 .db_harness
1428 .read()
1429 .ok()
1430 .and_then(|slot| slot.clone())?;
1431 let conn = match pool.lock() {
1432 Ok(conn) => conn,
1433 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1434 };
1435 let project_key = crate::path_identity::project_scope_key(project_root);
1436 Some(
1437 crate::db::bash_tasks::find_bash_task_for_project(
1438 &conn,
1439 &harness,
1440 &project_key,
1441 task_id,
1442 )
1443 .map(|row| row.map(PersistedTask::from))
1444 .map_err(|error| error.to_string()),
1445 )
1446 }
1447
1448 pub(super) fn status_relaxed(
1449 &self,
1450 task_id: &str,
1451 _session_id: &str,
1452 project_root: &Path,
1453 storage_dir: &Path,
1454 preview_bytes: usize,
1455 ) -> Option<BgTaskSnapshot> {
1456 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1457 let _ = self.poll_task(&task);
1458 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1459 }
1460
1461 pub fn kill_relaxed(
1462 &self,
1463 task_id: &str,
1464 project_root: &Path,
1465 storage_dir: &Path,
1466 ) -> Result<BgTaskSnapshot, String> {
1467 let task = self
1468 .status_relaxed_task(task_id, project_root, storage_dir)
1469 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1470 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1471 }
1472
1473 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1474 #[cfg(test)]
1475 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1476
1477 let mut deleted = 0usize;
1478
1479 let root = storage_dir.join("bash-tasks");
1480 if root.exists() {
1481 let session_dirs = fs::read_dir(&root).map_err(|e| {
1482 format!(
1483 "failed to read background task root {}: {e}",
1484 root.display()
1485 )
1486 })?;
1487 for session_entry in session_dirs.flatten() {
1488 let session_dir = session_entry.path();
1489 if !session_dir.is_dir() {
1490 continue;
1491 }
1492 let task_entries = match fs::read_dir(&session_dir) {
1493 Ok(entries) => entries,
1494 Err(error) => {
1495 crate::slog_warn!(
1496 "failed to read background task session dir {}: {error}",
1497 session_dir.display()
1498 );
1499 continue;
1500 }
1501 };
1502 for task_entry in task_entries.flatten() {
1503 let json_path = task_entry.path();
1504 if json_path
1505 .extension()
1506 .and_then(|extension| extension.to_str())
1507 != Some("json")
1508 {
1509 continue;
1510 }
1511 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1512 continue;
1513 }
1514 let metadata = match read_task(&json_path) {
1515 Ok(metadata) => metadata,
1516 Err(error) => {
1517 crate::slog_warn!(
1518 "quarantining corrupt background task metadata {}: {error}",
1519 json_path.display()
1520 );
1521 quarantine_task_json(
1522 storage_dir,
1523 &session_dir,
1524 &json_path,
1525 QuarantineKind::Corrupt,
1526 )?;
1527 continue;
1528 }
1529 };
1530 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1531 continue;
1532 }
1533 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1534 match delete_task_bundle(&paths) {
1535 Ok(()) => {
1536 deleted += 1;
1537 log::debug!(
1538 "deleted persisted background task bundle {}",
1539 metadata.task_id
1540 );
1541 }
1542 Err(error) => {
1543 crate::slog_warn!(
1544 "failed to delete background task bundle {}: {error}",
1545 metadata.task_id
1546 );
1547 continue;
1548 }
1549 }
1550 }
1551 }
1552 }
1553 gc_quarantine(storage_dir);
1554 Ok(deleted)
1555 }
1556
1557 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1558 let tasks = self
1559 .inner
1560 .tasks
1561 .lock()
1562 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1563 .unwrap_or_default();
1564 tasks
1565 .into_iter()
1566 .map(|task| {
1567 let _ = self.poll_task(&task);
1568 self.snapshot_with_terminal_cache(&task, preview_bytes)
1569 })
1570 .collect()
1571 }
1572
1573 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1579 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1580 return;
1581 }
1582 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1583 snapshot.output_preview = cache.output_preview;
1584 snapshot.output_truncated = cache.output_truncated;
1585 }
1586 }
1587
1588 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1589 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1590 }
1591
1592 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1593 let task = self
1594 .task_for_session(task_id, session_id)
1595 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1596 let terminal_after_promote = {
1597 let mut state = task
1598 .state
1599 .lock()
1600 .map_err(|_| "background task lock poisoned".to_string())?;
1601 let updated = self
1602 .update_task_metadata(&task.paths, |metadata| {
1603 metadata.notify_on_completion = true;
1604 metadata.completion_delivered = false;
1605 })
1606 .map_err(|e| format!("failed to promote background task: {e}"))?;
1607 state.metadata = updated;
1608 state.metadata.status.is_terminal()
1609 };
1610 if terminal_after_promote {
1611 self.post_terminal_transition(&task, true)?;
1612 }
1613 Ok(true)
1614 }
1615
1616 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1617 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1618 .map(|_| ())
1619 }
1620
1621 pub fn cleanup_finished(&self, older_than: Duration) {
1622 let cutoff = Instant::now().checked_sub(older_than);
1623 let removable_paths: Vec<(String, TaskPaths)> =
1624 if let Ok(mut tasks) = self.inner.tasks.lock() {
1625 let removable = tasks
1626 .iter()
1627 .filter_map(|(task_id, task)| {
1628 let delivered_terminal = task
1629 .state
1630 .lock()
1631 .map(|state| {
1632 state.metadata.status.is_terminal()
1633 && state.metadata.completion_delivered
1634 })
1635 .unwrap_or(false);
1636 if !delivered_terminal {
1637 return None;
1638 }
1639
1640 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1641 let expired = match (terminal_at, cutoff) {
1642 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1643 (Some(_), None) => true,
1644 (None, _) => false,
1645 };
1646 expired.then(|| task_id.clone())
1647 })
1648 .collect::<Vec<_>>();
1649
1650 removable
1651 .into_iter()
1652 .filter_map(|task_id| {
1653 tasks
1654 .remove(&task_id)
1655 .map(|task| (task_id, task.paths.clone()))
1656 })
1657 .collect()
1658 } else {
1659 Vec::new()
1660 };
1661
1662 for (task_id, paths) in removable_paths {
1663 match delete_task_bundle(&paths) {
1664 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1665 Err(error) => crate::slog_warn!(
1666 "failed to delete persisted background task bundle {task_id}: {error}"
1667 ),
1668 }
1669 }
1670 }
1671
1672 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1673 self.drain_completions_for_session(None)
1674 }
1675
1676 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1677 let completions = match self.inner.completions.lock() {
1678 Ok(completions) => completions,
1679 Err(_) => return Vec::new(),
1680 };
1681
1682 completions
1683 .iter()
1684 .filter(|completion| completion_matches_session(completion, session_id))
1685 .cloned()
1686 .collect()
1687 }
1688
1689 pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1690 match self.inner.completions.lock() {
1691 Ok(completions) => completions
1692 .iter()
1693 .any(|completion| completion_matches_session(completion, session_id)),
1694 Err(_) => true,
1698 }
1699 }
1700
1701 pub fn ack_completions_for_session(
1702 &self,
1703 session_id: Option<&str>,
1704 task_ids: &[String],
1705 ) -> Vec<String> {
1706 if task_ids.is_empty() {
1707 return Vec::new();
1708 }
1709 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1710 let mut completion_sessions = HashMap::new();
1711 if let Ok(mut completions) = self.inner.completions.lock() {
1712 completions.retain(|completion| {
1713 let session_matches = session_id
1714 .map(|session_id| completion.session_id == session_id)
1715 .unwrap_or(true);
1716 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1717 completion_sessions
1718 .insert(completion.task_id.clone(), completion.session_id.clone());
1719 false
1720 } else {
1721 true
1722 }
1723 });
1724 }
1725
1726 let mut delivered = Vec::new();
1727 for task_id in task_ids {
1728 let task = if let Some(session_id) = session_id {
1729 self.task_for_session(task_id, session_id)
1730 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1731 self.task_for_session(task_id, completion_session_id)
1732 } else {
1733 self.task(task_id)
1734 };
1735 if let Some(task) = task {
1736 if task.set_completion_delivered(true, self).is_ok() {
1737 delivered.push(task_id.clone());
1738 }
1739 }
1740 }
1741
1742 delivered
1743 }
1744
1745 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1746 self.inner
1747 .completions
1748 .lock()
1749 .map(|completions| {
1750 completions
1751 .iter()
1752 .filter(|completion| completion.session_id == session_id)
1753 .cloned()
1754 .collect()
1755 })
1756 .unwrap_or_default()
1757 }
1758
1759 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1760 let mut completions = self.inner.completions.lock().ok()?;
1761 let idx = completions
1762 .iter()
1763 .position(|completion| completion.task_id == task_id)?;
1764 completions.remove(idx)
1765 }
1766
1767 fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1768 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1769 if !snapshot.info.status.is_terminal() {
1770 return None;
1771 }
1772 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1773 (String::new(), false)
1774 } else {
1775 self.ensure_terminal_output_cache(task)
1776 .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1777 .unwrap_or_else(|| (String::new(), false))
1778 };
1779 Some(BgCompletion {
1780 task_id: snapshot.info.task_id,
1781 session_id: task.session_id.clone(),
1782 status: snapshot.info.status,
1783 exit_code: snapshot.exit_code,
1784 command: snapshot.info.command,
1785 output_preview,
1786 output_truncated,
1787 original_tokens: None,
1788 compressed_tokens: None,
1789 tokens_skipped: false,
1790 })
1791 }
1792
1793 pub fn detach(&self) {
1794 self.inner.shutdown.store(true, Ordering::SeqCst);
1795 if let Ok(mut tasks) = self.inner.tasks.lock() {
1796 for task in tasks.values() {
1797 if let Ok(mut state) = task.state.lock() {
1798 match &mut state.runtime {
1799 TaskRuntime::Piped(child) => *child = None,
1800 TaskRuntime::Pty(runtime) => *runtime = None,
1801 }
1802 state.detached = true;
1803 }
1804 }
1805 tasks.clear();
1806 }
1807 }
1808
1809 pub fn shutdown(&self) {
1810 let tasks = self
1811 .inner
1812 .tasks
1813 .lock()
1814 .map(|tasks| {
1815 tasks
1816 .values()
1817 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1818 .collect::<Vec<_>>()
1819 })
1820 .unwrap_or_default();
1821 for (task_id, session_id) in tasks {
1822 let _ = self.kill(&task_id, &session_id);
1823 }
1824 }
1825
1826 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1827 if let Ok(state) = task.state.lock() {
1828 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1829 if !pty.exit_observed.load(Ordering::SeqCst) {
1837 return Ok(());
1838 }
1839 }
1840 }
1841 let marker = match read_exit_marker(&task.paths.exit) {
1842 Ok(Some(marker)) => marker,
1843 Ok(None) => return Ok(()),
1844 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1845 };
1846 self.finalize_from_marker(task, marker, None)
1847 }
1848
1849 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1850 let mut needs_completion = false;
1851 {
1852 let Ok(mut state) = task.state.lock() else {
1853 return;
1854 };
1855 match &mut state.runtime {
1856 TaskRuntime::Piped(child_slot) => {
1857 if let Some(child) = child_slot.as_mut() {
1858 if matches!(child.try_wait(), Ok(Some(_))) {
1859 *child_slot = None;
1860 state.detached = true;
1861 state.child_exit_observed = true;
1862 }
1863 } else if state.detached {
1864 let child_known_dead = state.child_exit_observed
1865 || state
1866 .metadata
1867 .child_pid
1868 .is_some_and(|pid| !is_process_alive(pid));
1869 if child_known_dead {
1870 needs_completion =
1871 self.fail_without_exit_marker_if_needed(task, &mut state);
1872 }
1873 }
1874 }
1875 TaskRuntime::Pty(Some(pty)) => {
1876 if pty.exit_observed.load(Ordering::SeqCst) {
1877 drop(state);
1878 let _ = self.poll_task(task);
1879 return;
1880 }
1881 }
1882 TaskRuntime::Pty(None) => {}
1883 }
1884 }
1885 if needs_completion {
1886 let _ = self.post_terminal_transition(task, true);
1887 }
1888 }
1889
1890 fn fail_without_exit_marker_if_needed(
1891 &self,
1892 task: &Arc<BgTask>,
1893 state: &mut BgTaskState,
1894 ) -> bool {
1895 if state.metadata.status.is_terminal() {
1896 return false;
1897 }
1898 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1899 return false;
1900 }
1901 let watch_controlled = self.task_has_watch_control(&task.task_id);
1902 let updated = self.update_task_metadata(&task.paths, |metadata| {
1903 metadata.mark_terminal(
1904 BgTaskStatus::Failed,
1905 None,
1906 Some("process exited without exit marker".to_string()),
1907 );
1908 if watch_controlled {
1909 metadata.completion_delivered = true;
1910 }
1911 });
1912 if let Ok(metadata) = updated {
1913 state.pending_terminal_override = None;
1914 state.metadata = metadata;
1915 task.mark_terminal_now();
1916 return true;
1917 }
1918 false
1919 }
1920
1921 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1922 self.inner
1923 .tasks
1924 .lock()
1925 .map(|tasks| {
1926 tasks
1927 .values()
1928 .filter(|task| task.is_running())
1929 .cloned()
1930 .collect()
1931 })
1932 .unwrap_or_default()
1933 }
1934
1935 fn insert_rehydrated_task(
1936 &self,
1937 metadata: PersistedTask,
1938 paths: TaskPaths,
1939 detached: bool,
1940 ) -> Result<(), String> {
1941 let task_id = metadata.task_id.clone();
1942 let session_id = metadata.session_id.clone();
1943 let started = started_instant_from_unix_millis(metadata.started_at);
1944 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1945 let mode = metadata.mode.clone();
1946 let task = Arc::new(BgTask {
1947 task_id: task_id.clone(),
1948 session_id,
1949 paths: paths.clone(),
1950 started,
1951 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1952 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1953 state: Mutex::new(BgTaskState {
1954 metadata,
1955 runtime: if mode == BgMode::Pty {
1956 TaskRuntime::Pty(None)
1957 } else {
1958 TaskRuntime::Piped(None)
1959 },
1960 detached,
1961 child_exit_observed: false,
1968 buffer: if mode == BgMode::Pty {
1969 BgBuffer::pty(paths.pty.clone())
1970 } else {
1971 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1972 },
1973 terminal_output_cache: None,
1974 pending_terminal_override: None,
1975 }),
1976 });
1977 self.inner
1978 .tasks
1979 .lock()
1980 .map_err(|_| "background task registry lock poisoned".to_string())?
1981 .insert(task_id, task);
1982 Ok(())
1983 }
1984
1985 fn kill_with_status(
1986 &self,
1987 task_id: &str,
1988 session_id: &str,
1989 terminal_status: BgTaskStatus,
1990 ) -> Result<BgTaskSnapshot, String> {
1991 let task = self
1992 .task_for_session(task_id, session_id)
1993 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1994 let mut terminalized = false;
1995
1996 {
1997 let mut state = task
1998 .state
1999 .lock()
2000 .map_err(|_| "background task lock poisoned".to_string())?;
2001 if state.metadata.status.is_terminal() {
2002 state.pending_terminal_override = None;
2003 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
2004 state.metadata =
2005 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2006 if self.task_has_watch_control(&task.task_id) {
2007 state.metadata.completion_delivered = true;
2008 }
2009 state.pending_terminal_override = None;
2010 task.mark_terminal_now();
2011 match &mut state.runtime {
2012 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2019 TaskRuntime::Pty(runtime) => *runtime = None,
2020 }
2021 state.detached = true;
2022 self.persist_task(&task.paths, &state.metadata)
2023 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2024 terminalized = true;
2025 } else {
2026 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2027 if !was_already_killing {
2028 state.metadata.status = BgTaskStatus::Killing;
2029 self.persist_task(&task.paths, &state.metadata)
2030 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2031 }
2032
2033 #[cfg(unix)]
2034 let pgid = state.metadata.pgid;
2035 #[cfg(windows)]
2036 let child_pid = state.metadata.child_pid;
2037 if !was_already_killing
2038 && state.metadata.mode == BgMode::Pty
2039 && terminal_status == BgTaskStatus::TimedOut
2040 {
2041 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2042 }
2043
2044 #[cfg(windows)]
2045 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2046
2047 match &mut state.runtime {
2048 TaskRuntime::Piped(child_slot) => {
2049 #[cfg(unix)]
2050 if let Some(pgid) = pgid {
2051 terminate_pgid(pgid, child_slot.as_mut());
2052 }
2053 #[cfg(windows)]
2054 if let Some(child) = child_slot.as_mut() {
2055 super::process::terminate_process(child);
2056 } else if let Some(pid) = child_pid {
2057 terminate_pid(pid);
2058 }
2059 if let Some(child) = child_slot.as_mut() {
2060 let _ = child.wait();
2061 }
2062 *child_slot = None;
2063 state.detached = true;
2064
2065 if !task.paths.exit.exists() {
2066 write_kill_marker_if_absent(&task.paths.exit)
2067 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2068 }
2069
2070 let exit_code = terminal_exit_code_for_status(&terminal_status);
2071 state
2072 .metadata
2073 .mark_terminal(terminal_status, exit_code, None);
2074 if self.task_has_watch_control(&task.task_id) {
2075 state.metadata.completion_delivered = true;
2076 }
2077 state.pending_terminal_override = None;
2078 task.mark_terminal_now();
2079 self.persist_task(&task.paths, &state.metadata)
2080 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2081 terminalized = true;
2082 }
2083 TaskRuntime::Pty(Some(pty)) => {
2084 pty.was_killed.store(true, Ordering::SeqCst);
2085 if let Err(error) = pty.killer.kill() {
2086 crate::slog_warn!(
2087 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2088 );
2089 }
2090 if let Some(pid) = pty.child_pid {
2091 #[cfg(unix)]
2092 terminate_pgid(pid as i32, None);
2093 #[cfg(windows)]
2094 terminate_pid(pid);
2095 }
2096 drop(pty.master.take());
2097
2098 #[cfg(windows)]
2099 {
2100 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2101 BgTaskStatus::TimedOut
2102 } else {
2103 BgTaskStatus::Killed
2104 };
2105 pty_forced_terminal_status = Some(
2106 state
2107 .pending_terminal_override
2108 .take()
2109 .unwrap_or(default_status),
2110 );
2111 }
2112 }
2113 TaskRuntime::Pty(None) => {}
2114 }
2115
2116 #[cfg(windows)]
2117 if let Some(target_status) = pty_forced_terminal_status {
2118 if !task.paths.exit.exists() {
2119 write_kill_marker_if_absent(&task.paths.exit)
2120 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2121 }
2122
2123 let exit_code = terminal_exit_code_for_status(&target_status);
2124 state.metadata.mark_terminal(target_status, exit_code, None);
2125 if self.task_has_watch_control(&task.task_id) {
2126 state.metadata.completion_delivered = true;
2127 }
2128 state.pending_terminal_override = None;
2129 task.mark_terminal_now();
2130 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2131 *runtime = None;
2132 }
2133 state.detached = true;
2134 self.persist_task(&task.paths, &state.metadata)
2135 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2136 terminalized = true;
2137 }
2138 }
2139 }
2140
2141 if terminalized {
2142 self.post_terminal_transition(&task, true)?;
2143 }
2144 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2145 }
2146
2147 fn finalize_from_marker(
2148 &self,
2149 task: &Arc<BgTask>,
2150 marker: ExitMarker,
2151 reason: Option<String>,
2152 ) -> Result<(), String> {
2153 let watch_controlled = self.task_has_watch_control(&task.task_id);
2154 let mut pty_reader_done = None;
2155 {
2156 let mut state = task
2157 .state
2158 .lock()
2159 .map_err(|_| "background task lock poisoned".to_string())?;
2160 if state.metadata.status.is_terminal() {
2161 state.pending_terminal_override = None;
2162 return Ok(());
2163 }
2164
2165 let pending_override = state.pending_terminal_override.take();
2166 let is_pty = state.metadata.mode == BgMode::Pty;
2167 let updated = self
2168 .update_task_metadata(&task.paths, |metadata| {
2169 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2170 let mut metadata = metadata.clone();
2171 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2172 let exit_code = terminal_exit_code_for_status(&target_status);
2173 metadata.mark_terminal(target_status, exit_code, reason);
2174 metadata
2175 } else {
2176 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2177 };
2178 if watch_controlled {
2179 new_metadata.completion_delivered = true;
2180 }
2181 *metadata = new_metadata;
2182 })
2183 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2184 state.metadata = updated;
2185 task.mark_terminal_now();
2186 match &mut state.runtime {
2187 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2192 TaskRuntime::Pty(runtime) => {
2193 pty_reader_done = runtime
2194 .as_ref()
2195 .map(|runtime| Arc::clone(&runtime.reader_done));
2196 *runtime = None;
2197 }
2198 }
2199 state.detached = true;
2200 }
2201
2202 if let Some(reader_done) = pty_reader_done {
2203 let deadline = Instant::now() + Duration::from_millis(200);
2204 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2205 std::thread::sleep(Duration::from_millis(10));
2206 }
2207 }
2208
2209 self.scan_task_watch_output(task);
2212
2213 self.post_terminal_transition(task, true)
2214 }
2215
2216 fn enqueue_completion_if_needed(
2217 &self,
2218 metadata: &PersistedTask,
2219 paths: Option<&TaskPaths>,
2220 emit_frame: bool,
2221 ) {
2222 if metadata.status.is_terminal() && !metadata.completion_delivered {
2223 let cache =
2224 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2225 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2226 }
2227 }
2228
2229 fn render_terminal_output_from_paths(
2230 &self,
2231 metadata: &PersistedTask,
2232 paths: &TaskPaths,
2233 ) -> Option<TerminalOutputCache> {
2234 if metadata.mode == BgMode::Pty {
2235 return None;
2236 }
2237 let mut buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2238 let disk_truncation = buffer.enforce_terminal_cap();
2239 Some(self.render_terminal_output(metadata, &buffer, disk_truncation))
2240 }
2241
2242 fn enqueue_completion_from_parts(
2243 &self,
2244 metadata: &PersistedTask,
2245 buffer: Option<&BgBuffer>,
2246 paths: Option<&TaskPaths>,
2247 emit_frame: bool,
2248 terminal_render: Option<&TerminalOutputCache>,
2249 ) {
2250 if !metadata.status.is_terminal() {
2261 return;
2262 }
2263
2264 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2265 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2266 } else {
2267 None
2268 };
2269 let render_buffer = buffer.or(owned_buffer.as_ref());
2270 let owned_render = if terminal_render.is_none() {
2271 render_buffer.map(|buffer| {
2272 let mut capped_buffer = buffer.clone();
2273 let disk_truncation = capped_buffer.enforce_terminal_cap();
2274 self.render_terminal_output(metadata, &capped_buffer, disk_truncation)
2275 })
2276 } else {
2277 None
2278 };
2279 let render = terminal_render.or(owned_render.as_ref());
2280
2281 let (output_preview, output_truncated) = render
2285 .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2286 .unwrap_or_else(|| (String::new(), false));
2287
2288 let token_counts = self.completion_token_counts(
2289 metadata,
2290 buffer,
2291 paths,
2292 render.map(|render| render.output_preview.as_str()),
2293 );
2294 let completion = BgCompletion {
2295 task_id: metadata.task_id.clone(),
2296 session_id: metadata.session_id.clone(),
2297 status: metadata.status.clone(),
2298 exit_code: metadata.exit_code,
2299 command: metadata.command.clone(),
2300 output_preview,
2301 output_truncated,
2302 original_tokens: token_counts.original_tokens,
2303 compressed_tokens: token_counts.compressed_tokens,
2304 tokens_skipped: token_counts.tokens_skipped,
2305 };
2306
2307 self.record_compression_event_if_applicable(metadata, &token_counts);
2318
2319 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2320 if watch_controlled {
2321 if emit_frame && !watch_matched {
2322 self.emit_bash_watch_exit(&completion);
2323 }
2324 self.clear_task_watch_state(&metadata.task_id);
2325 return;
2326 }
2327
2328 if metadata.completion_delivered {
2338 return;
2339 }
2340
2341 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2344 if completions
2345 .iter()
2346 .any(|existing| existing.task_id == metadata.task_id)
2347 {
2348 false
2349 } else {
2350 completions.push_back(completion.clone());
2351 true
2352 }
2353 } else {
2354 false
2355 };
2356
2357 if pushed && emit_frame {
2358 self.emit_bash_completed(completion);
2359 }
2360 }
2361
2362 fn record_compression_event_if_applicable(
2363 &self,
2364 metadata: &PersistedTask,
2365 token_counts: &CompletionTokenCounts,
2366 ) {
2367 if metadata.mode == BgMode::Pty {
2368 return;
2369 }
2370
2371 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2372 token_counts.original_tokens,
2373 token_counts.compressed_tokens,
2374 token_counts.original_bytes,
2375 token_counts.compressed_bytes,
2376 ) {
2377 (
2378 Some(original_tokens),
2379 Some(compressed_tokens),
2380 Some(original_bytes),
2381 Some(compressed_bytes),
2382 ) => (
2383 original_tokens,
2384 compressed_tokens,
2385 original_bytes,
2386 compressed_bytes,
2387 ),
2388 _ => {
2389 crate::slog_warn!(
2390 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2391 metadata.task_id
2392 );
2393 return;
2394 }
2395 };
2396
2397 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2398 let Some(pool) = pool else {
2399 crate::slog_warn!(
2400 "compression event skipped for {}: db_pool not initialized — was configure run?",
2401 metadata.task_id
2402 );
2403 return;
2404 };
2405 let harness = self
2406 .inner
2407 .db_harness
2408 .read()
2409 .ok()
2410 .and_then(|slot| slot.clone());
2411 let Some(harness) = harness else {
2412 crate::slog_warn!(
2413 "compression event insert skipped for {}: harness not configured",
2414 metadata.task_id
2415 );
2416 return;
2417 };
2418
2419 let project_root = metadata
2420 .project_root
2421 .as_deref()
2422 .unwrap_or(&metadata.workdir);
2423 let project_key = crate::path_identity::project_scope_key(project_root);
2424 let row = crate::db::compression_events::CompressionEventRow {
2425 harness: &harness,
2426 session_id: Some(&metadata.session_id),
2427 project_key: &project_key,
2428 tool: "bash",
2429 task_id: Some(&metadata.task_id),
2430 command: Some(&metadata.command),
2431 compressor: if metadata.compressed {
2432 "registry"
2433 } else {
2434 "none"
2435 },
2436 original_bytes,
2437 compressed_bytes,
2438 original_tokens,
2439 compressed_tokens,
2440 created_at: unix_millis() as i64,
2441 };
2442
2443 let conn = match pool.lock() {
2444 Ok(conn) => conn,
2445 Err(_) => {
2446 crate::slog_warn!(
2447 "compression event insert failed for {}: db mutex poisoned",
2448 metadata.task_id
2449 );
2450 return;
2451 }
2452 };
2453 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2454 Ok(_) => {
2455 crate::slog_debug!(
2459 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2460 metadata.task_id,
2461 project_key,
2462 metadata.session_id,
2463 original_tokens,
2464 compressed_tokens
2465 );
2466 }
2467 Err(error) => {
2468 crate::slog_warn!(
2469 "compression event insert failed for {}: {}",
2470 metadata.task_id,
2471 error
2472 );
2473 }
2474 }
2475 }
2476
2477 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2478 let Ok(progress_sender) = self
2479 .inner
2480 .progress_sender
2481 .lock()
2482 .map(|sender| sender.clone())
2483 else {
2484 return;
2485 };
2486 if let Some(sender) = progress_sender.as_ref() {
2487 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2488 pattern_match.task_id,
2489 session_id.to_string(),
2490 pattern_match.watch_id,
2491 pattern_match.match_text,
2492 pattern_match.match_offset,
2493 pattern_match.context,
2494 pattern_match.once,
2495 )));
2496 }
2497 }
2498
2499 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2500 let Ok(progress_sender) = self
2501 .inner
2502 .progress_sender
2503 .lock()
2504 .map(|sender| sender.clone())
2505 else {
2506 return;
2507 };
2508 let Some(sender) = progress_sender.as_ref() else {
2509 return;
2510 };
2511 let status = completion_status_text(&completion.status, completion.exit_code);
2512 let preview = completion.output_preview.trim_end();
2513 let context = if preview.is_empty() {
2514 format!("task {} exited ({status})", completion.task_id)
2515 } else {
2516 format!(
2517 "task {} exited ({status})
2518{preview}",
2519 completion.task_id
2520 )
2521 };
2522 sender(PushFrame::BashPatternMatch(
2523 BashPatternMatchFrame::task_exit(
2524 completion.task_id.clone(),
2525 completion.session_id.clone(),
2526 format!("exited ({status})"),
2527 context,
2528 ),
2529 ));
2530 }
2531
2532 fn emit_bash_completed(&self, completion: BgCompletion) {
2533 let Ok(progress_sender) = self
2534 .inner
2535 .progress_sender
2536 .lock()
2537 .map(|sender| sender.clone())
2538 else {
2539 return;
2540 };
2541 let Some(sender) = progress_sender.as_ref() else {
2542 return;
2543 };
2544 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2552 completion.task_id,
2553 completion.session_id,
2554 completion.status,
2555 completion.exit_code,
2556 completion.command,
2557 completion.output_preview,
2558 completion.output_truncated,
2559 completion.original_tokens,
2560 completion.compressed_tokens,
2561 completion.tokens_skipped,
2562 )));
2563 }
2564
2565 fn completion_token_counts(
2566 &self,
2567 metadata: &PersistedTask,
2568 buffer: Option<&BgBuffer>,
2569 paths: Option<&TaskPaths>,
2570 rendered_output: Option<&str>,
2571 ) -> CompletionTokenCounts {
2572 if metadata.mode == BgMode::Pty {
2573 return CompletionTokenCounts::skipped();
2574 }
2575
2576 let raw = match buffer {
2577 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2578 None => paths
2579 .map(|paths| {
2580 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2581 })
2582 .unwrap_or(TokenCountInput::Skipped),
2583 };
2584
2585 let TokenCountInput::Text(raw_output) = raw else {
2586 return CompletionTokenCounts::skipped();
2587 };
2588
2589 let original_tokens = token_count_u32(&raw_output);
2590 let original_bytes = raw_output.len() as i64;
2591 let compressed_output = rendered_output.unwrap_or(&raw_output);
2592 let compressed_tokens = token_count_u32(compressed_output);
2593 let compressed_bytes = compressed_output.len() as i64;
2594 CompletionTokenCounts {
2595 original_tokens: Some(original_tokens),
2596 compressed_tokens: Some(compressed_tokens),
2597 original_bytes: Some(original_bytes),
2598 compressed_bytes: Some(compressed_bytes),
2599 tokens_skipped: false,
2600 }
2601 }
2602
2603 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2604 if !self
2605 .inner
2606 .long_running_reminder_enabled
2607 .load(Ordering::SeqCst)
2608 {
2609 return;
2610 }
2611 let interval_ms = self
2612 .inner
2613 .long_running_reminder_interval_ms
2614 .load(Ordering::SeqCst);
2615 if interval_ms == 0 {
2616 return;
2617 }
2618 let interval = Duration::from_millis(interval_ms);
2619 let now = Instant::now();
2620 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2621 return;
2622 };
2623 let since = last_reminder_at.unwrap_or(task.started);
2624 if now.duration_since(since) < interval {
2625 return;
2626 }
2627 let command = task
2628 .state
2629 .lock()
2630 .map(|state| state.metadata.command.clone())
2631 .unwrap_or_default();
2632 *last_reminder_at = Some(now);
2633 self.emit_bash_long_running(BashLongRunningFrame::new(
2634 task.task_id.clone(),
2635 task.session_id.clone(),
2636 command,
2637 task.started.elapsed().as_millis() as u64,
2638 ));
2639 }
2640
2641 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2642 let Ok(progress_sender) = self
2643 .inner
2644 .progress_sender
2645 .lock()
2646 .map(|sender| sender.clone())
2647 else {
2648 return;
2649 };
2650 if let Some(sender) = progress_sender.as_ref() {
2651 sender(PushFrame::BashLongRunning(frame));
2652 }
2653 }
2654
2655 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2656 self.inner
2657 .tasks
2658 .lock()
2659 .ok()
2660 .and_then(|tasks| tasks.get(task_id).cloned())
2661 }
2662
2663 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2664 self.task(task_id)
2665 .filter(|task| task.session_id == session_id)
2666 }
2667
2668 fn running_count(&self) -> usize {
2669 self.inner
2670 .tasks
2671 .lock()
2672 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2673 .unwrap_or(0)
2674 }
2675
2676 fn start_watchdog(&self) {
2677 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2678 super::watchdog::start(self.clone());
2679 }
2680 }
2681
2682 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2683 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2684 }
2685
2686 #[cfg(test)]
2687 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2688 self.task_for_session(task_id, session_id)
2689 .map(|task| task.paths.json.clone())
2690 }
2691
2692 #[cfg(test)]
2693 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2694 self.task_for_session(task_id, session_id)
2695 .map(|task| task.paths.exit.clone())
2696 }
2697
2698 fn generate_unique_task_id(&self) -> Result<String, String> {
2700 for _ in 0..32 {
2701 let candidate = random_slug();
2702 let tasks = self
2703 .inner
2704 .tasks
2705 .lock()
2706 .map_err(|_| "background task registry lock poisoned".to_string())?;
2707 if tasks.contains_key(&candidate) {
2708 continue;
2709 }
2710 let completions = self
2711 .inner
2712 .completions
2713 .lock()
2714 .map_err(|_| "background completions lock poisoned".to_string())?;
2715 if completions
2716 .iter()
2717 .any(|completion| completion.task_id == candidate)
2718 {
2719 continue;
2720 }
2721 return Ok(candidate);
2722 }
2723 Err("failed to allocate unique background task id after 32 attempts".to_string())
2724 }
2725}
2726
2727fn render_compressed_with_recovery(
2728 buffer: &BgBuffer,
2729 mut compressed: CompressionResult,
2730 input_truncated: bool,
2731 disk_truncation: DiskTruncation,
2732) -> TerminalOutputCache {
2733 let had_trailing_newline = compressed.text.ends_with('\n');
2741 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2742 .trim_end()
2743 .to_string();
2744 if had_trailing_newline && !text.is_empty() {
2745 text.push('\n');
2746 }
2747 compressed.text = text;
2748
2749 let output_path = buffer.output_path().map(|path| path.display().to_string());
2750 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2751 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2752 let mut recovery = RecoveryContext {
2753 dropped_by_class: compressed.dropped_by_class,
2754 had_inner_drop: compressed.had_inner_drop,
2755 offset_hint_eligible: compressed.offset_hint_eligible,
2756 offset_start_line: compressed.offset_start_line,
2757 byte_truncated: input_truncated,
2758 disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
2759 output_path: output_path.clone(),
2760 stderr_path: stderr_path.clone(),
2761 include_stderr_path,
2762 };
2763
2764 let (output_preview, output_truncated) =
2765 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2766 TerminalOutputCache {
2767 output_preview,
2768 output_truncated,
2769 kind: TerminalOutputKind::Compressed,
2770 output_path,
2771 stderr_path,
2772 recovery: Some(recovery),
2773 }
2774}
2775
2776fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2777 render_body_with_recovery_marker_at_cap(
2778 body,
2779 recovery,
2780 FINAL_OUTPUT_CAP_BYTES,
2781 cap_final_output,
2782 cap_final_output_with_marker,
2783 )
2784}
2785
2786fn render_raw_body_with_recovery_marker(
2787 body: &str,
2788 recovery: &mut RecoveryContext,
2789) -> (String, bool) {
2790 render_body_with_recovery_marker_at_cap(
2791 body,
2792 recovery,
2793 RAW_PASSTHROUGH_CAP_BYTES,
2794 |input| {
2795 super::output::cap_head_tail(
2796 input,
2797 RAW_PASSTHROUGH_CAP_BYTES,
2798 RAW_PASSTHROUGH_HEAD_BYTES,
2799 RAW_PASSTHROUGH_TAIL_BYTES,
2800 )
2801 },
2802 |input, marker| {
2803 super::output::cap_head_tail_with_marker(
2804 input,
2805 RAW_PASSTHROUGH_CAP_BYTES,
2806 RAW_PASSTHROUGH_HEAD_BYTES,
2807 RAW_PASSTHROUGH_TAIL_BYTES,
2808 marker,
2809 )
2810 },
2811 )
2812}
2813
2814fn render_body_with_recovery_marker_at_cap<F, G>(
2815 body: &str,
2816 recovery: &mut RecoveryContext,
2817 cap_bytes: usize,
2818 cap_plain: F,
2819 cap_with_marker: G,
2820) -> (String, bool)
2821where
2822 F: Fn(&str) -> super::output::CappedText,
2823 G: Fn(&str, &str) -> super::output::CappedText,
2824{
2825 let needs_marker = recovery.has_visible_drop();
2826 if body.len() > cap_bytes {
2827 recovery.byte_truncated = true;
2828 if let Some(marker) = recovery_marker(recovery) {
2829 let capped = cap_with_marker(body, &marker);
2830 return (capped.text, true);
2831 }
2832 let capped = cap_plain(body);
2833 return (capped.text, capped.truncated || needs_marker);
2834 }
2835
2836 if !needs_marker {
2837 return (body.to_string(), false);
2838 }
2839
2840 let Some(marker) = recovery_marker(recovery) else {
2841 return (body.to_string(), true);
2842 };
2843 let with_marker = append_recovery_marker(body, &marker);
2844 if with_marker.len() <= cap_bytes {
2845 return (with_marker, true);
2846 }
2847
2848 recovery.byte_truncated = true;
2849 let marker = recovery_marker(recovery).unwrap_or(marker);
2850 let capped = cap_with_marker(body, &marker);
2851 (capped.text, true)
2852}
2853
2854fn append_recovery_marker(body: &str, marker: &str) -> String {
2855 if body.is_empty() {
2856 return marker.to_string();
2857 }
2858 let mut output = body.trim_end().to_string();
2859 output.push('\n');
2860 output.push_str(marker);
2861 output
2862}
2863
2864fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2865 let mut parts = Vec::new();
2866 for (class, count) in &recovery.dropped_by_class {
2867 let label = if *count == 1 {
2868 class.singular()
2869 } else {
2870 class.plural()
2871 };
2872 parts.push(format!("+{count} more {label}"));
2873 }
2874 if recovery.byte_truncated {
2875 parts.push("truncated output".to_string());
2876 }
2877 let disk_truncated_prefix_bytes = recovery.disk_truncated_prefix_bytes;
2878 if disk_truncated_prefix_bytes > 0 {
2879 parts.push(format!(
2880 "truncated {disk_truncated_prefix_bytes} bytes from saved output prefix"
2881 ));
2882 } else if recovery.had_inner_drop && parts.is_empty() {
2883 parts.push("omitted output".to_string());
2884 }
2885
2886 if parts.is_empty() {
2887 return None;
2888 }
2889
2890 let hint = recovery_hint(recovery);
2891 Some(format!("[{}; {hint}]", parts.join(", ")))
2892}
2893
2894fn recovery_hint(recovery: &RecoveryContext) -> String {
2895 if recovery.offset_hint_eligible
2899 && !recovery.byte_truncated
2900 && recovery.dropped_by_class.is_empty()
2901 && !recovery.include_stderr_path
2902 {
2903 if let (Some(path), Some(line)) =
2904 (recovery.output_path.as_deref(), recovery.offset_start_line)
2905 {
2906 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2907 }
2908 }
2909
2910 let mut paths = Vec::new();
2911 if let Some(path) = recovery.output_path.as_deref() {
2912 paths.push(path);
2913 }
2914 if recovery.include_stderr_path {
2915 if let Some(path) = recovery.stderr_path.as_deref() {
2916 if !paths.contains(&path) {
2917 paths.push(path);
2918 }
2919 }
2920 }
2921
2922 if paths.is_empty() {
2923 return "full output unavailable".to_string();
2924 }
2925
2926 let reads = paths
2927 .into_iter()
2928 .map(|path| format!("read {}", quote_path(path)))
2929 .collect::<Vec<_>>()
2930 .join(" and ");
2931 if recovery.disk_truncated_prefix_bytes > 0 {
2932 format!("retained output: {reads}")
2933 } else {
2934 format!("full output: {reads}")
2935 }
2936}
2937
2938fn strip_plain_truncation_marker_lines(input: &str) -> String {
2939 input
2940 .lines()
2941 .filter(|line| !is_plain_truncation_marker(line.trim()))
2942 .collect::<Vec<_>>()
2943 .join("\n")
2944}
2945
2946fn strip_recovery_marker_lines(input: &str) -> String {
2947 input
2948 .lines()
2949 .filter(|line| !is_recovery_marker(line.trim()))
2950 .collect::<Vec<_>>()
2951 .join("\n")
2952}
2953
2954fn is_plain_truncation_marker(line: &str) -> bool {
2955 let Some(rest) = line.strip_prefix("...<truncated ") else {
2956 return false;
2957 };
2958 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2959 return false;
2960 };
2961 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2962}
2963
2964fn is_recovery_marker(line: &str) -> bool {
2965 line.starts_with('[')
2966 && line.ends_with(']')
2967 && (line.contains("full output: read ")
2968 || line.contains("retained output: read ")
2969 || line.contains("see remaining: tail -n +")
2970 || line.contains("full output unavailable"))
2971}
2972
2973fn render_structured_output(
2974 command: &str,
2975 buffer: &BgBuffer,
2976 disk_truncation: DiskTruncation,
2977) -> Option<TerminalOutputCache> {
2978 if !is_gh_structured_command(command) {
2979 return None;
2980 }
2981
2982 let output_path = buffer
2983 .output_path()
2984 .map(|path| path.display().to_string())?;
2985 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2986 if stdout_bytes == 0 {
2987 return None;
2988 }
2989
2990 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2991 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2992 return None;
2993 }
2994 let output_preview = if disk_truncation.total_prefix_bytes() > 0 {
2995 retained_json_output_pointer(
2996 stdout_bytes,
2997 &output_path,
2998 disk_truncation.total_prefix_bytes(),
2999 )
3000 } else {
3001 json_output_pointer(stdout_bytes, &output_path)
3002 };
3003 return Some(TerminalOutputCache {
3004 output_preview,
3005 output_truncated: true,
3006 kind: TerminalOutputKind::Structured,
3007 output_path: Some(output_path),
3008 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3009 recovery: None,
3010 });
3011 }
3012
3013 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
3014 if stdout.truncated || !is_structured_body(&stdout.text) {
3015 return None;
3016 }
3017
3018 Some(TerminalOutputCache {
3019 output_preview: stdout.text,
3020 output_truncated: false,
3021 kind: TerminalOutputKind::Structured,
3022 output_path: Some(output_path),
3023 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3024 recovery: None,
3025 })
3026}
3027
3028fn render_raw_passthrough(
3029 buffer: &BgBuffer,
3030 disk_truncation: DiskTruncation,
3031) -> TerminalOutputCache {
3032 let raw = buffer.read_combined_head_tail(
3033 RAW_PASSTHROUGH_CAP_BYTES,
3034 RAW_PASSTHROUGH_HEAD_BYTES,
3035 RAW_PASSTHROUGH_TAIL_BYTES,
3036 );
3037 let output_path = buffer.output_path().map(|path| path.display().to_string());
3038 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3039 if !raw.truncated && disk_truncation.total_prefix_bytes() == 0 {
3040 return TerminalOutputCache {
3041 output_preview: raw.text,
3042 output_truncated: false,
3043 kind: TerminalOutputKind::Raw,
3044 output_path,
3045 stderr_path,
3046 recovery: None,
3047 };
3048 }
3049
3050 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3051 let mut recovery = RecoveryContext {
3052 dropped_by_class: BTreeMap::new(),
3053 had_inner_drop: false,
3054 offset_hint_eligible: false,
3055 offset_start_line: None,
3056 byte_truncated: raw.truncated,
3057 disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
3058 output_path: output_path.clone(),
3059 stderr_path: stderr_path.clone(),
3060 include_stderr_path,
3061 };
3062 let (output_preview, output_truncated) =
3063 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3064 TerminalOutputCache {
3065 output_preview,
3066 output_truncated,
3067 kind: TerminalOutputKind::Raw,
3068 output_path,
3069 stderr_path,
3070 recovery: Some(recovery),
3071 }
3072}
3073
3074fn completion_preview_for_cache(
3075 cache: &TerminalOutputCache,
3076 exit_code: Option<i32>,
3077) -> (String, bool) {
3078 let exit_ok = exit_code == Some(0);
3081 let threshold = completion_preview_threshold(exit_ok);
3082 if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3083 if let Some(path) = cache.output_path.as_deref() {
3084 return (
3085 json_output_pointer(cache.output_preview.len() as u64, path),
3086 true,
3087 );
3088 }
3089 return (cache.output_preview.clone(), cache.output_truncated);
3090 }
3091
3092 if let Some(recovery) = cache.recovery.as_ref() {
3093 if cache.output_preview.len() <= threshold {
3094 return (cache.output_preview.clone(), cache.output_truncated);
3095 }
3096 let body = strip_recovery_marker_lines(&cache.output_preview);
3097 let mut completion_recovery = recovery.clone();
3098 completion_recovery.byte_truncated = true;
3099 if let Some(marker) = recovery_marker(&completion_recovery) {
3100 let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3101 return (capped.text, true);
3102 }
3103 }
3104
3105 let capped = cap_completion_output(&cache.output_preview, exit_ok);
3106 (capped.text, cache.output_truncated || capped.truncated)
3107}
3108
3109fn is_gh_structured_command(command: &str) -> bool {
3110 let Some(normalized) = crate::compress::plain_command_for_structured_output(command) else {
3111 return false;
3112 };
3113 let tokens = shell_words_for_flags(&normalized);
3114 let Some(head) = tokens.first() else {
3115 return false;
3116 };
3117 let head_name = Path::new(head)
3118 .file_name()
3119 .and_then(|name| name.to_str())
3120 .unwrap_or(head);
3121 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3122 return false;
3123 }
3124 tokens.iter().any(|token| {
3125 matches!(token.as_str(), "--json" | "--jq" | "--template")
3126 || token.starts_with("--json=")
3127 || token.starts_with("--jq=")
3128 || token.starts_with("--template=")
3129 })
3130}
3131
3132fn shell_words_for_flags(command: &str) -> Vec<String> {
3133 let mut words = Vec::new();
3134 let mut current = String::new();
3135 let mut in_single = false;
3136 let mut in_double = false;
3137 let mut escaped = false;
3138
3139 for ch in command.chars() {
3140 if escaped {
3141 current.push(ch);
3142 escaped = false;
3143 continue;
3144 }
3145 if ch == '\\' && !in_single {
3146 escaped = true;
3147 continue;
3148 }
3149 if ch == '\'' && !in_double {
3150 in_single = !in_single;
3151 continue;
3152 }
3153 if ch == '"' && !in_single {
3154 in_double = !in_double;
3155 continue;
3156 }
3157 if ch.is_whitespace() && !in_single && !in_double {
3158 if !current.is_empty() {
3159 words.push(std::mem::take(&mut current));
3160 }
3161 continue;
3162 }
3163 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3164 if !current.is_empty() {
3165 words.push(std::mem::take(&mut current));
3166 }
3167 continue;
3168 }
3169 current.push(ch);
3170 }
3171 if !current.is_empty() {
3172 words.push(current);
3173 }
3174 words
3175}
3176
3177fn is_structured_body(body: &str) -> bool {
3178 let trimmed = body.trim();
3179 if trimmed.is_empty() {
3180 return false;
3181 }
3182 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3183 return true;
3184 }
3185
3186 let mut saw_line = false;
3187 for line in trimmed
3188 .lines()
3189 .map(str::trim)
3190 .filter(|line| !line.is_empty())
3191 {
3192 saw_line = true;
3193 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3194 return false;
3195 }
3196 }
3197 saw_line
3198}
3199
3200fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3201 let path = match (buffer, stream) {
3202 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3203 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3204 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3205 };
3206 let Some(path) = path else {
3207 return false;
3208 };
3209 let Ok(file) = std::fs::File::open(path) else {
3210 return false;
3211 };
3212 let mut limited = file.take(512);
3213 let mut bytes = Vec::new();
3214 if limited.read_to_end(&mut bytes).is_err() {
3215 return false;
3216 }
3217 String::from_utf8_lossy(&bytes)
3218 .chars()
3219 .find(|ch| !ch.is_whitespace())
3220 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3221}
3222
3223struct CompletionTokenCounts {
3224 original_tokens: Option<u32>,
3225 compressed_tokens: Option<u32>,
3226 original_bytes: Option<i64>,
3227 compressed_bytes: Option<i64>,
3228 tokens_skipped: bool,
3229}
3230
3231impl CompletionTokenCounts {
3232 fn skipped() -> Self {
3233 Self {
3234 original_tokens: None,
3235 compressed_tokens: None,
3236 original_bytes: None,
3237 compressed_bytes: None,
3238 tokens_skipped: true,
3239 }
3240 }
3241}
3242
3243fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3244 match status {
3245 BgTaskStatus::TimedOut => "timed out".to_string(),
3246 BgTaskStatus::Killed => "killed".to_string(),
3247 _ => exit_code
3248 .map(|code| format!("exit {code}"))
3249 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3250 }
3251}
3252
3253fn token_count_u32(text: &str) -> u32 {
3254 aft_tokenizer::count_tokens(text)
3255 .try_into()
3256 .unwrap_or(u32::MAX)
3257}
3258
3259impl Default for BgTaskRegistry {
3260 fn default() -> Self {
3261 Self::new(Arc::new(Mutex::new(None)))
3262 }
3263}
3264
3265fn modified_within(path: &Path, grace: Duration) -> bool {
3266 fs::metadata(path)
3267 .and_then(|metadata| metadata.modified())
3268 .ok()
3269 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3270 .map(|age| age < grace)
3271 .unwrap_or(false)
3272}
3273
3274fn canonicalized_path(path: &Path) -> PathBuf {
3275 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3276}
3277
3278fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3279 let now_ms = SystemTime::now()
3280 .duration_since(UNIX_EPOCH)
3281 .ok()
3282 .map(|duration| duration.as_millis() as u64)
3283 .unwrap_or(started_at);
3284 let elapsed_ms = now_ms.saturating_sub(started_at);
3285 Instant::now()
3286 .checked_sub(Duration::from_millis(elapsed_ms))
3287 .unwrap_or_else(Instant::now)
3288}
3289
3290fn gc_quarantine(storage_dir: &Path) {
3291 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3292 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3293 return;
3294 };
3295 for session_entry in session_dirs.flatten() {
3296 let session_quarantine_dir = session_entry.path();
3297 if !session_quarantine_dir.is_dir() {
3298 continue;
3299 }
3300 let entries = match fs::read_dir(&session_quarantine_dir) {
3301 Ok(entries) => entries,
3302 Err(error) => {
3303 crate::slog_warn!(
3304 "failed to read background task quarantine dir {}: {error}",
3305 session_quarantine_dir.display()
3306 );
3307 continue;
3308 }
3309 };
3310 for entry in entries.flatten() {
3311 let path = entry.path();
3312 if modified_within(&path, QUARANTINE_GC_GRACE) {
3313 continue;
3314 }
3315 let result = if path.is_dir() {
3316 fs::remove_dir_all(&path)
3317 } else {
3318 fs::remove_file(&path)
3319 };
3320 match result {
3321 Ok(()) => log::debug!(
3322 "deleted old background task quarantine entry {}",
3323 path.display()
3324 ),
3325 Err(error) => crate::slog_warn!(
3326 "failed to delete old background task quarantine entry {}: {error}",
3327 path.display()
3328 ),
3329 }
3330 }
3331 let _ = fs::remove_dir(&session_quarantine_dir);
3332 }
3333 let _ = fs::remove_dir(&quarantine_root);
3334}
3335
3336enum QuarantineKind {
3337 Corrupt,
3338 Invalid,
3339}
3340
3341fn quarantine_task_json(
3342 storage_dir: &Path,
3343 session_dir: &Path,
3344 json_path: &Path,
3345 kind: QuarantineKind,
3346) -> Result<(), String> {
3347 let session_hash = session_dir
3348 .file_name()
3349 .and_then(|name| name.to_str())
3350 .ok_or_else(|| {
3351 format!(
3352 "invalid background task session dir: {}",
3353 session_dir.display()
3354 )
3355 })?;
3356 let task_name = json_path
3357 .file_name()
3358 .and_then(|name| name.to_str())
3359 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3360 let unix_ts = SystemTime::now()
3361 .duration_since(UNIX_EPOCH)
3362 .map(|duration| duration.as_secs())
3363 .unwrap_or(0);
3364 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3365 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3366 format!(
3367 "failed to create background task quarantine dir {}: {e}",
3368 quarantine_dir.display()
3369 )
3370 })?;
3371 let target_name = quarantine_name(task_name, unix_ts, &kind);
3372 let target = quarantine_dir.join(target_name);
3373 fs::rename(json_path, &target).map_err(|e| {
3374 format!(
3375 "failed to quarantine background task metadata {} to {}: {e}",
3376 json_path.display(),
3377 target.display()
3378 )
3379 })?;
3380
3381 for sibling in task_sibling_paths(json_path) {
3382 if !sibling.exists() {
3383 continue;
3384 }
3385 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3386 crate::slog_warn!(
3387 "skipping background task sibling with invalid name during quarantine: {}",
3388 sibling.display()
3389 );
3390 continue;
3391 };
3392 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3393 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3394 crate::slog_warn!(
3395 "failed to quarantine background task sibling {} to {}: {error}",
3396 sibling.display(),
3397 sibling_target.display()
3398 );
3399 }
3400 }
3401
3402 let _ = fs::remove_dir(session_dir);
3403 Ok(())
3404}
3405
3406fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3407 match kind {
3408 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3409 QuarantineKind::Invalid => {
3410 let path = Path::new(file_name);
3411 let stem = path.file_stem().and_then(|stem| stem.to_str());
3412 let extension = path.extension().and_then(|extension| extension.to_str());
3413 match (stem, extension) {
3414 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3415 _ => format!("{file_name}.invalid.{unix_ts}"),
3416 }
3417 }
3418 }
3419}
3420
3421fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3422 let Some(parent) = json_path.parent() else {
3423 return Vec::new();
3424 };
3425 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3426 return Vec::new();
3427 };
3428 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3429 .into_iter()
3430 .map(|extension| parent.join(format!("{stem}.{extension}")))
3431 .collect()
3432}
3433
3434fn read_for_token_count_from_disk(
3435 metadata: &PersistedTask,
3436 paths: &TaskPaths,
3437 max_bytes_per_stream: usize,
3438) -> TokenCountInput {
3439 if metadata.mode == BgMode::Pty {
3440 return TokenCountInput::Skipped;
3441 }
3442 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3449 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3450 match (stdout, stderr) {
3451 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3452 String::from_utf8_lossy(&stdout).as_ref(),
3453 String::from_utf8_lossy(&stderr).as_ref(),
3454 )),
3455 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3456 String::from_utf8_lossy(&stdout).as_ref(),
3457 "",
3458 )),
3459 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3460 "",
3461 String::from_utf8_lossy(&stderr).as_ref(),
3462 )),
3463 (Err(_), Err(_)) => TokenCountInput::Skipped,
3464 }
3465}
3466
3467fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3472 use std::io::{Read, Seek, SeekFrom};
3473 let mut file = std::fs::File::open(path)?;
3474 let len = file.metadata()?.len();
3475 let read_len = len.min(max_bytes as u64);
3476 if read_len > 0 && len > max_bytes as u64 {
3477 file.seek(SeekFrom::End(-(read_len as i64)))?;
3478 }
3479 let mut bytes = Vec::with_capacity(read_len as usize);
3480 file.read_to_end(&mut bytes)?;
3481 Ok(bytes)
3482}
3483
3484impl BgTask {
3485 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3486 let state = self
3487 .state
3488 .lock()
3489 .unwrap_or_else(|poison| poison.into_inner());
3490 self.snapshot_locked(&state, preview_bytes)
3491 }
3492
3493 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3494 let metadata = &state.metadata;
3495 let duration_ms = metadata.duration_ms.or_else(|| {
3496 metadata
3497 .status
3498 .is_terminal()
3499 .then(|| self.started.elapsed().as_millis() as u64)
3500 });
3501 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3502 (String::new(), false)
3503 } else if metadata.status.is_terminal() {
3504 state
3505 .terminal_output_cache
3506 .as_ref()
3507 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3508 .unwrap_or_else(|| (String::new(), false))
3509 } else {
3510 state.buffer.read_tail(preview_bytes)
3511 };
3512 BgTaskSnapshot {
3513 info: BgTaskInfo {
3514 task_id: self.task_id.clone(),
3515 status: metadata.status.clone(),
3516 command: metadata.command.clone(),
3517 mode: metadata.mode.clone(),
3518 started_at: metadata.started_at,
3519 duration_ms,
3520 },
3521 exit_code: metadata.exit_code,
3522 child_pid: metadata.child_pid,
3523 workdir: metadata.workdir.display().to_string(),
3524 output_preview,
3525 output_truncated,
3526 output_path: state
3527 .buffer
3528 .output_path()
3529 .map(|path| path.display().to_string()),
3530 stderr_path: state
3531 .buffer
3532 .stderr_path()
3533 .map(|path| path.display().to_string()),
3534 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3535 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3536 }
3537 }
3538
3539 pub(crate) fn is_running(&self) -> bool {
3540 self.state
3541 .lock()
3542 .map(|state| {
3543 state.metadata.status == BgTaskStatus::Running
3544 || (state.metadata.mode == BgMode::Pty
3545 && state.metadata.status == BgTaskStatus::Killing)
3546 })
3547 .unwrap_or(false)
3548 }
3549
3550 fn is_terminal(&self) -> bool {
3551 self.state
3552 .lock()
3553 .map(|state| state.metadata.status.is_terminal())
3554 .unwrap_or(false)
3555 }
3556
3557 fn mark_terminal_now(&self) {
3558 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3559 if terminal_at.is_none() {
3560 *terminal_at = Some(Instant::now());
3561 }
3562 }
3563 }
3564
3565 fn set_completion_delivered(
3566 &self,
3567 delivered: bool,
3568 registry: &BgTaskRegistry,
3569 ) -> Result<(), String> {
3570 let mut state = self
3571 .state
3572 .lock()
3573 .map_err(|_| "background task lock poisoned".to_string())?;
3574 let updated = registry
3575 .update_task_metadata(&self.paths, |metadata| {
3576 metadata.completion_delivered = delivered;
3577 })
3578 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3579 state.metadata = updated;
3580 Ok(())
3581 }
3582}
3583
3584#[cfg(unix)]
3605fn reap_piped_child(child_slot: &mut Option<Child>) {
3606 if let Some(mut child) = child_slot.take() {
3607 if matches!(child.try_wait(), Ok(None)) {
3608 let _ = child.wait();
3609 }
3610 }
3611}
3612
3613#[cfg(windows)]
3618fn reap_piped_child(child_slot: &mut Option<Child>) {
3619 *child_slot = None;
3620}
3621
3622fn terminal_metadata_from_marker(
3623 mut metadata: PersistedTask,
3624 marker: ExitMarker,
3625 reason: Option<String>,
3626) -> PersistedTask {
3627 match marker {
3628 ExitMarker::Code(code) => {
3629 let status = if code == 0 {
3630 BgTaskStatus::Completed
3631 } else {
3632 BgTaskStatus::Failed
3633 };
3634 metadata.mark_terminal(status, Some(code), reason);
3635 }
3636 ExitMarker::Killed => metadata.mark_terminal(
3637 BgTaskStatus::Killed,
3638 terminal_exit_code_for_status(&BgTaskStatus::Killed),
3639 reason,
3640 ),
3641 }
3642 metadata
3643}
3644
3645fn terminal_exit_code_for_status(status: &BgTaskStatus) -> Option<i32> {
3646 match status {
3647 BgTaskStatus::TimedOut => Some(124),
3648 BgTaskStatus::Killed => Some(137),
3649 _ => None,
3650 }
3651}
3652
3653#[cfg(unix)]
3654fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3655 let stem = paths
3656 .json
3657 .file_stem()
3658 .and_then(|stem| stem.to_str())
3659 .unwrap_or("wrapper");
3660 let script_path = paths.dir.join(format!("{stem}.sh"));
3661 fs::write(&script_path, command)
3662 .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3663 Ok(script_path)
3664}
3665
3666#[cfg(unix)]
3667fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3668 let shell = resolve_posix_shell();
3669 let mut cmd = Command::new(&shell);
3670 cmd.arg("-c")
3676 .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3677 .arg(&shell)
3678 .arg(command_script)
3679 .arg(exit_path);
3680 unsafe {
3681 cmd.pre_exec(|| {
3682 if libc::setsid() == -1 {
3683 return Err(std::io::Error::last_os_error());
3684 }
3685 Ok(())
3686 });
3687 }
3688 cmd
3689}
3690
3691#[cfg(unix)]
3692fn resolve_posix_shell() -> PathBuf {
3693 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3694 POSIX_SHELL
3695 .get_or_init(|| {
3696 std::env::var_os("BASH")
3697 .filter(|value| !value.is_empty())
3698 .map(PathBuf::from)
3699 .filter(|path| path.exists())
3700 .or_else(|| which::which("bash").ok())
3701 .or_else(|| which::which("zsh").ok())
3702 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3703 })
3704 .clone()
3705}
3706
3707#[cfg(windows)]
3708fn detached_shell_command_for(
3709 shell: crate::windows_shell::WindowsShell,
3710 command: &str,
3711 exit_path: &Path,
3712 paths: &TaskPaths,
3713 creation_flags: u32,
3714) -> Result<Command, String> {
3715 use crate::windows_shell::WindowsShell;
3716 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3729 let wrapper_ext = match shell {
3730 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3731 WindowsShell::Cmd => "bat",
3732 WindowsShell::Posix(_) => "sh",
3736 };
3737 let wrapper_path = paths.dir.join(format!(
3738 "{}.{}",
3739 paths
3740 .json
3741 .file_stem()
3742 .and_then(|s| s.to_str())
3743 .unwrap_or("wrapper"),
3744 wrapper_ext
3745 ));
3746 fs::write(&wrapper_path, wrapper_body)
3747 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3748
3749 let mut cmd = Command::new(shell.binary().as_ref());
3750 match shell {
3751 WindowsShell::Pwsh | WindowsShell::Powershell => {
3752 cmd.args([
3755 "-NoLogo",
3756 "-NoProfile",
3757 "-NonInteractive",
3758 "-ExecutionPolicy",
3759 "Bypass",
3760 "-File",
3761 ]);
3762 cmd.arg(&wrapper_path);
3763 }
3764 WindowsShell::Cmd => {
3765 cmd.args(["/D", "/C"]);
3772 cmd.arg(&wrapper_path);
3773 }
3774 WindowsShell::Posix(_) => {
3775 cmd.arg(&wrapper_path);
3780 }
3781 }
3782
3783 cmd.creation_flags(creation_flags);
3787 Ok(cmd)
3788}
3789
3790fn spawn_detached_child(
3806 command: &str,
3807 paths: &TaskPaths,
3808 workdir: &Path,
3809 env: &HashMap<String, String>,
3810) -> Result<std::process::Child, String> {
3811 #[cfg(not(windows))]
3812 {
3813 let stdout = create_capture_file(&paths.stdout)
3814 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3815 let stderr = create_capture_file(&paths.stderr)
3816 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3817 let command_script = write_unix_command_script(command, paths)?;
3818 detached_shell_command(&command_script, &paths.exit)
3819 .current_dir(workdir)
3820 .envs(env)
3821 .stdin(Stdio::null())
3822 .stdout(Stdio::from(stdout))
3823 .stderr(Stdio::from(stderr))
3824 .spawn()
3825 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3826 }
3827 #[cfg(windows)]
3828 {
3829 use crate::windows_shell::shell_candidates;
3830 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3841 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3862 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3863 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3864 let with_breakaway =
3865 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3866 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3867 let mut last_error: Option<String> = None;
3868 for (idx, shell) in candidates.iter().enumerate() {
3869 for &flags in &[with_breakaway, without_breakaway] {
3873 let stdout = create_capture_file(&paths.stdout)
3875 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3876 let stderr = create_capture_file(&paths.stderr)
3877 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3878 let mut cmd =
3879 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3880 cmd.current_dir(workdir)
3881 .envs(env)
3882 .stdin(Stdio::null())
3883 .stdout(Stdio::from(stdout))
3884 .stderr(Stdio::from(stderr));
3885 match cmd.spawn() {
3886 Ok(child) => {
3887 if idx > 0 {
3888 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3889 the cached PATH probe disagreed with runtime spawn — likely PATH \
3890 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3891 shell.binary(),
3892 idx);
3893 }
3894 if flags == without_breakaway {
3895 crate::slog_warn!(
3896 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3897 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3898 Spawned without breakaway; the bg task will be torn down if the \
3899 AFT process group is killed."
3900 );
3901 }
3902 return Ok(child);
3903 }
3904 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3905 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3906 shell.binary());
3907 last_error = Some(format!("{}: {e}", shell.binary()));
3908 break;
3911 }
3912 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3913 crate::slog_warn!(
3915 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3916 Access Denied — retrying {} without breakaway",
3917 shell.binary()
3918 );
3919 last_error = Some(format!("{}: {e}", shell.binary()));
3920 continue;
3921 }
3922 Err(e) => {
3923 return Err(format!(
3924 "failed to spawn background bash command via {}: {e}",
3925 shell.binary()
3926 ));
3927 }
3928 }
3929 }
3930 }
3931 Err(format!(
3932 "failed to spawn background bash command: no Windows shell could be spawned. \
3933 Last error: {}. PATH-probed candidates: {:?}",
3934 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3935 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3936 ))
3937 }
3938}
3939
3940fn random_slug() -> String {
3941 let mut bytes = [0u8; 4];
3942 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3944 let t = SystemTime::now()
3946 .duration_since(UNIX_EPOCH)
3947 .map(|d| d.subsec_nanos())
3948 .unwrap_or(0);
3949 let p = std::process::id();
3950 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3951 });
3952 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3954 format!("bash-{hex}")
3955}
3956
3957#[cfg(test)]
3958mod tests {
3959 use std::collections::HashMap;
3960 use std::fs;
3961 use std::sync::atomic::{AtomicBool, AtomicUsize};
3962 use std::sync::{Arc, Mutex};
3963 use std::time::Duration;
3964 #[cfg(windows)]
3965 use std::time::Instant;
3966
3967 use super::*;
3968
3969 #[cfg(unix)]
3970 const QUICK_SUCCESS_COMMAND: &str = "true";
3971 #[cfg(windows)]
3972 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3973
3974 #[cfg(unix)]
3975 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3976 #[cfg(windows)]
3977 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3978
3979 #[test]
3980 fn gh_structured_detection_rejects_piped_commands() {
3981 assert!(is_gh_structured_command(
3982 "gh issue list --json number,title"
3983 ));
3984 assert!(is_gh_structured_command(
3985 "cd repo && gh issue list --json number,title"
3986 ));
3987
3988 assert!(!is_gh_structured_command(
3989 "gh issue list --json number,title | jq '.[]'"
3990 ));
3991 assert!(!is_gh_structured_command(
3992 "gh issue list --json number,title |"
3993 ));
3994 }
3995
3996 fn insert_terminal_piped_task(
3997 registry: &BgTaskRegistry,
3998 dir: &tempfile::TempDir,
3999 command: &str,
4000 stdout: &str,
4001 stderr: &str,
4002 compressed: bool,
4003 ) -> (String, Arc<BgTask>) {
4004 let task_id = format!("bash-test-{}", random_slug());
4005 let paths = task_paths(dir.path(), "session", &task_id);
4006 fs::create_dir_all(&paths.dir).unwrap();
4007 fs::write(&paths.stdout, stdout).unwrap();
4008 fs::write(&paths.stderr, stderr).unwrap();
4009 let mut metadata = PersistedTask::starting(
4010 task_id.clone(),
4011 "session".to_string(),
4012 command.to_string(),
4013 dir.path().to_path_buf(),
4014 Some(dir.path().to_path_buf()),
4015 Some(30_000),
4016 true,
4017 compressed,
4018 );
4019 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4020 write_task(&paths.json, &metadata).unwrap();
4021 registry
4022 .insert_rehydrated_task(metadata, paths, true)
4023 .expect("insert terminal task");
4024 let task = registry.task_for_session(&task_id, "session").unwrap();
4025 (task_id, task)
4026 }
4027
4028 fn insert_terminal_pty_task(
4029 registry: &BgTaskRegistry,
4030 dir: &tempfile::TempDir,
4031 pty_output: &str,
4032 ) -> (String, Arc<BgTask>) {
4033 let task_id = format!("bash-test-{}", random_slug());
4034 let paths = task_paths(dir.path(), "session", &task_id);
4035 fs::create_dir_all(&paths.dir).unwrap();
4036 fs::write(&paths.pty, pty_output).unwrap();
4037 let mut metadata = PersistedTask::starting(
4038 task_id.clone(),
4039 "session".to_string(),
4040 "python".to_string(),
4041 dir.path().to_path_buf(),
4042 Some(dir.path().to_path_buf()),
4043 Some(30_000),
4044 true,
4045 true,
4046 );
4047 metadata.mode = BgMode::Pty;
4048 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4049 write_task(&paths.json, &metadata).unwrap();
4050 registry
4051 .insert_rehydrated_task(metadata, paths, true)
4052 .expect("insert terminal pty task");
4053 let task = registry.task_for_session(&task_id, "session").unwrap();
4054 (task_id, task)
4055 }
4056
4057 #[cfg(unix)]
4058 fn wait_for_terminal_snapshot(
4059 registry: &BgTaskRegistry,
4060 task_id: &str,
4061 session_id: &str,
4062 project: &Path,
4063 storage: &Path,
4064 ) -> BgTaskSnapshot {
4065 let started = Instant::now();
4066 loop {
4067 let snapshot = registry
4068 .status(task_id, session_id, Some(project), Some(storage), 4096)
4069 .expect("spawned task should be visible to status");
4070 if snapshot.info.status.is_terminal() {
4071 return snapshot;
4072 }
4073 assert!(
4074 started.elapsed() < Duration::from_secs(10),
4075 "timed out waiting for task {task_id} to finish; last status={:?}",
4076 snapshot.info.status
4077 );
4078 std::thread::sleep(Duration::from_millis(50));
4079 }
4080 }
4081
4082 fn write_running_project_task(storage: &Path, project: &Path, session: &str, task_id: &str) {
4083 let paths = task_paths(storage, session, task_id);
4084 let mut metadata = PersistedTask::starting(
4085 task_id.to_string(),
4086 session.to_string(),
4087 "sleep 60".to_string(),
4088 project.to_path_buf(),
4089 Some(project.to_path_buf()),
4090 Some(30_000),
4091 true,
4092 true,
4093 );
4094 metadata.status = BgTaskStatus::Running;
4095 write_task(&paths.json, &metadata).unwrap();
4096 fs::write(&paths.stdout, "still running\n").unwrap();
4097 fs::write(&paths.stderr, "").unwrap();
4098 }
4099
4100 #[test]
4101 fn status_replay_filters_same_session_by_project_root() {
4102 let project_a = tempfile::tempdir().unwrap();
4103 let project_b = tempfile::tempdir().unwrap();
4104 let storage = tempfile::tempdir().unwrap();
4105 let session = "shared-session";
4106 let task_id = "bash-project-a";
4107 write_running_project_task(storage.path(), project_a.path(), session, task_id);
4108
4109 let actor_b = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4110 assert!(actor_b
4111 .status(
4112 task_id,
4113 session,
4114 Some(project_b.path()),
4115 Some(storage.path()),
4116 1024,
4117 )
4118 .is_none());
4119 assert!(actor_b.task_for_session(task_id, session).is_none());
4120
4121 let actor_a = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4122 let snapshot = actor_a
4123 .status(
4124 task_id,
4125 session,
4126 Some(project_a.path()),
4127 Some(storage.path()),
4128 1024,
4129 )
4130 .expect("owning project should replay its task");
4131 assert_eq!(snapshot.info.status, BgTaskStatus::Running);
4132 }
4133
4134 #[cfg(unix)]
4135 #[test]
4136 fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4137 let cases = [
4138 (
4139 "long-first",
4140 "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4141 vec!["one", "1", "three"],
4142 ),
4143 (
4144 "short-first",
4145 "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4146 vec!["one", "1", "three"],
4147 ),
4148 (
4149 "failing-middle",
4150 "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4151 vec!["one", "after-false", "three"],
4152 ),
4153 ];
4154
4155 for (name, command, expected_lines) in cases {
4156 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4157 let dir = tempfile::tempdir().unwrap();
4158 let session_id = format!("session-{name}");
4159 let task_id = registry
4160 .spawn(
4161 command,
4162 session_id.clone(),
4163 dir.path().to_path_buf(),
4164 HashMap::new(),
4165 Some(Duration::from_secs(30)),
4166 dir.path().to_path_buf(),
4167 10,
4168 true,
4169 true,
4170 Some(dir.path().to_path_buf()),
4171 )
4172 .unwrap();
4173
4174 let snapshot = wait_for_terminal_snapshot(
4175 ®istry,
4176 &task_id,
4177 &session_id,
4178 dir.path(),
4179 dir.path(),
4180 );
4181 assert_eq!(
4182 snapshot.info.status,
4183 BgTaskStatus::Completed,
4184 "{name}: task should complete; snapshot={snapshot:?}"
4185 );
4186 assert_eq!(
4187 snapshot.exit_code,
4188 Some(0),
4189 "{name}: script should use the final command's exit code"
4190 );
4191
4192 let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4193 let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4194 panic!(
4195 "{name}: failed to read raw stdout file {}: {error}",
4196 stdout_path.display()
4197 )
4198 });
4199 let lines: Vec<&str> = stdout.lines().collect();
4200 assert_eq!(
4201 lines,
4202 expected_lines,
4203 "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4204 stdout_path.display()
4205 );
4206 }
4207 }
4208
4209 #[test]
4210 fn recognizes_all_recovery_marker_forms() {
4211 assert!(is_recovery_marker(
4212 "[truncated output; full output: read \"/tmp/out\"]"
4213 ));
4214 assert!(is_recovery_marker(
4215 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4216 ));
4217 assert!(is_recovery_marker(
4218 "[truncated output; full output unavailable]"
4219 ));
4220 assert!(is_recovery_marker(
4221 r#"[truncated 123 bytes from saved output prefix; retained output: read "/tmp/out"]"#
4222 ));
4223 }
4224
4225 #[test]
4226 fn recovery_marker_reports_disk_prefix_truncation_as_retained_output() {
4227 let recovery = RecoveryContext {
4228 dropped_by_class: BTreeMap::new(),
4229 had_inner_drop: false,
4230 offset_hint_eligible: false,
4231 offset_start_line: None,
4232 byte_truncated: false,
4233 disk_truncated_prefix_bytes: 4096,
4234 output_path: Some("/tmp/stdout".to_string()),
4235 stderr_path: None,
4236 include_stderr_path: false,
4237 };
4238
4239 let marker = recovery_marker(&recovery).expect("disk truncation must emit marker");
4240
4241 assert!(marker.contains("truncated 4096 bytes from saved output prefix"));
4242 assert!(marker.contains(r#"retained output: read "/tmp/stdout""#));
4243 assert!(!marker.contains("full output: read"));
4244 }
4245
4246 #[test]
4247 fn killed_exit_marker_sets_nonzero_sentinel_exit_code() {
4248 let metadata = PersistedTask::starting(
4249 "task".to_string(),
4250 "session".to_string(),
4251 "cargo test".to_string(),
4252 PathBuf::from("/tmp"),
4253 None,
4254 None,
4255 true,
4256 true,
4257 );
4258
4259 let terminal = terminal_metadata_from_marker(metadata, ExitMarker::Killed, None);
4260
4261 assert_eq!(terminal.status, BgTaskStatus::Killed);
4262 assert_eq!(terminal.exit_code, Some(137));
4263 }
4264
4265 #[test]
4266 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4267 let registry = BgTaskRegistry::default();
4268 let dir = tempfile::tempdir().unwrap();
4269 let (_task_id, task) = insert_terminal_piped_task(
4270 ®istry,
4271 &dir,
4272 "custom-tool --verbose",
4273 &"stdout line\n".repeat(200_000),
4274 "",
4275 true,
4276 );
4277 let calls = Arc::new(AtomicUsize::new(0));
4278 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4279 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4280 let calls_for_closure = Arc::clone(&calls);
4281 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4282 let task_for_closure = Arc::clone(&task_holder);
4283 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4284 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4285 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4286 if task.state.try_lock().is_ok() {
4287 unlocked_for_closure.store(true, Ordering::SeqCst);
4288 }
4289 }
4290 CompressionResult::new(format!("compressed {} bytes", output.len()))
4291 });
4292
4293 let first = registry
4294 .status(
4295 &task.task_id,
4296 "session",
4297 None,
4298 Some(dir.path()),
4299 RUNNING_OUTPUT_PREVIEW_BYTES,
4300 )
4301 .unwrap();
4302 let second = registry
4303 .status(
4304 &task.task_id,
4305 "session",
4306 None,
4307 Some(dir.path()),
4308 RUNNING_OUTPUT_PREVIEW_BYTES,
4309 )
4310 .unwrap();
4311 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4312
4313 assert_eq!(
4314 calls.load(Ordering::SeqCst),
4315 1,
4316 "terminal render must be cached"
4317 );
4318 assert!(
4319 saw_unlocked_state.load(Ordering::SeqCst),
4320 "compressor must run after releasing the task state lock"
4321 );
4322 assert!(first.output_preview.starts_with("compressed "));
4323 assert_eq!(second.output_preview, first.output_preview);
4324 assert_eq!(listed[0].output_preview, first.output_preview);
4325 }
4326
4327 #[test]
4328 fn completion_preview_success_keeps_tail_only() {
4329 let registry = BgTaskRegistry::default();
4334 let dir = tempfile::tempdir().unwrap();
4335 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4336 let (_task_id, task) =
4337 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4338
4339 registry.post_terminal_transition(&task, true).unwrap();
4340 let completions = registry.drain_completions_for_session(Some("session"));
4341 assert_eq!(completions.len(), 1);
4342 let preview = &completions[0].output_preview;
4343 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4344 assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4345 assert!(completions[0].output_truncated);
4346 }
4347
4348 #[test]
4349 fn completion_preview_failure_keeps_head_and_tail() {
4350 let registry = BgTaskRegistry::default();
4353 let dir = tempfile::tempdir().unwrap();
4354 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4355 let task_id = format!("bash-test-{}", random_slug());
4356 let paths = task_paths(dir.path(), "session", &task_id);
4357 fs::create_dir_all(&paths.dir).unwrap();
4358 fs::write(&paths.stdout, &output).unwrap();
4359 fs::write(&paths.stderr, "").unwrap();
4360 let mut metadata = PersistedTask::starting(
4361 task_id.clone(),
4362 "session".to_string(),
4363 "cat big.log".to_string(),
4364 dir.path().to_path_buf(),
4365 Some(dir.path().to_path_buf()),
4366 Some(30_000),
4367 true,
4368 false,
4369 );
4370 metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4371 write_task(&paths.json, &metadata).unwrap();
4372 registry
4373 .insert_rehydrated_task(metadata, paths, true)
4374 .expect("insert terminal task");
4375 let task = registry.task_for_session(&task_id, "session").unwrap();
4376
4377 registry.post_terminal_transition(&task, true).unwrap();
4378 let completions = registry.drain_completions_for_session(Some("session"));
4379 assert_eq!(completions.len(), 1);
4380 let preview = &completions[0].output_preview;
4381 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4382 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4383 }
4384
4385 #[test]
4386 fn has_completions_for_session_matches_pending_delivery() {
4387 let registry = BgTaskRegistry::default();
4388 assert!(!registry.has_completions_for_session(Some("session")));
4389 assert!(!registry.has_completions_for_session(None));
4390
4391 let dir = tempfile::tempdir().unwrap();
4392 let (_task_id, task) =
4393 insert_terminal_piped_task(®istry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4394 registry.post_terminal_transition(&task, true).unwrap();
4395
4396 assert!(registry.has_completions_for_session(Some("session")));
4397 assert!(registry.has_completions_for_session(None));
4398 assert!(!registry.has_completions_for_session(Some("other-session")));
4399
4400 let completions = registry.drain_completions_for_session(Some("session"));
4401 assert_eq!(completions.len(), 1);
4402 assert_eq!(completions[0].task_id, task.task_id);
4403 }
4404
4405 #[test]
4406 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4407 let registry = BgTaskRegistry::default();
4408 let dir = tempfile::tempdir().unwrap();
4409 let calls = Arc::new(AtomicUsize::new(0));
4410 let calls_for_closure = Arc::clone(&calls);
4411 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4412 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4413 CompressionResult::new(output)
4414 });
4415 let (task_id, _task) = insert_terminal_piped_task(
4416 ®istry,
4417 &dir,
4418 "gh pr view 123 --json body",
4419 "{\"body\":\"hello\"}",
4420 "warning: stderr must not join json",
4421 true,
4422 );
4423
4424 let snapshot = registry
4425 .status(
4426 &task_id,
4427 "session",
4428 None,
4429 Some(dir.path()),
4430 RUNNING_OUTPUT_PREVIEW_BYTES,
4431 )
4432 .unwrap();
4433
4434 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4435 assert!(!snapshot.output_preview.contains("warning"));
4436 assert!(!snapshot.output_truncated);
4437 assert_eq!(
4438 calls.load(Ordering::SeqCst),
4439 0,
4440 "structured JSON bypasses compression"
4441 );
4442 }
4443
4444 #[test]
4445 fn registry_emits_single_recovery_marker_for_class_drops() {
4446 let registry = BgTaskRegistry::default();
4447 let dir = tempfile::tempdir().unwrap();
4448 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4449 let mut dropped = BTreeMap::new();
4450 dropped.insert(DropClass::Error, 18);
4451 dropped.insert(DropClass::Warning, 6);
4452 CompressionResult::with_class_drops("kept diagnostic", dropped)
4453 });
4454 let (task_id, task) =
4455 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4456
4457 let snapshot = registry
4458 .status(
4459 &task_id,
4460 "session",
4461 None,
4462 Some(dir.path()),
4463 RUNNING_OUTPUT_PREVIEW_BYTES,
4464 )
4465 .unwrap();
4466
4467 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4468 assert!(snapshot.output_preview.contains("+18 more errors"));
4469 assert!(snapshot.output_preview.contains("+6 more warnings"));
4470 assert!(snapshot
4471 .output_preview
4472 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4473 assert!(!snapshot.output_preview.contains("tail -n +"));
4474 assert!(snapshot.output_truncated);
4475 }
4476
4477 #[test]
4478 fn registry_marker_reports_semantic_and_byte_drops_once() {
4479 let registry = BgTaskRegistry::default();
4480 let dir = tempfile::tempdir().unwrap();
4481 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4482 let mut dropped = BTreeMap::new();
4483 dropped.insert(DropClass::Error, 1);
4484 CompressionResult::with_class_drops(
4485 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4486 dropped,
4487 )
4488 });
4489 let (task_id, _task) =
4490 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4491
4492 let snapshot = registry
4493 .status(
4494 &task_id,
4495 "session",
4496 None,
4497 Some(dir.path()),
4498 RUNNING_OUTPUT_PREVIEW_BYTES,
4499 )
4500 .unwrap();
4501
4502 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4503 assert!(snapshot.output_preview.contains("+1 more error"));
4504 assert!(snapshot.output_preview.contains("truncated output"));
4505 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4506 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4507 assert!(!snapshot.output_preview.contains("...<truncated"));
4508 assert!(snapshot.output_truncated);
4509 }
4510
4511 #[test]
4512 fn cargo_stderr_class_drops_name_both_capture_paths() {
4513 let registry = BgTaskRegistry::default();
4514 let dir = tempfile::tempdir().unwrap();
4515 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4516 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4517 crate::compress::compress_with_registry_exit_code(
4518 command,
4519 &output,
4520 exit_code,
4521 &filter_registry,
4522 )
4523 });
4524 let stderr = (0..22)
4525 .map(|index| {
4526 format!(
4527 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4528 index + 1,
4529 index + 1
4530 )
4531 })
4532 .collect::<Vec<_>>()
4533 .join("\n");
4534 let (task_id, task) = insert_terminal_piped_task(
4535 ®istry,
4536 &dir,
4537 "cargo check",
4538 "Finished dev [unoptimized] target(s) in 0.01s\n",
4539 &stderr,
4540 true,
4541 );
4542
4543 let snapshot = registry
4544 .status(
4545 &task_id,
4546 "session",
4547 None,
4548 Some(dir.path()),
4549 RUNNING_OUTPUT_PREVIEW_BYTES,
4550 )
4551 .unwrap();
4552
4553 assert!(snapshot.output_preview.contains("+2 more errors"));
4554 assert!(snapshot
4555 .output_preview
4556 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4557 assert!(snapshot
4558 .output_preview
4559 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4560 assert!(!snapshot.output_preview.contains("tail -n +"));
4561 }
4562
4563 #[test]
4564 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4565 let registry = BgTaskRegistry::default();
4566 let dir = tempfile::tempdir().unwrap();
4567 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4568 let (task_id, task) = insert_terminal_piped_task(
4569 ®istry,
4570 &dir,
4571 "cd /repo && gh pr view 123 --json body",
4572 &body,
4573 "",
4574 true,
4575 );
4576
4577 let snapshot = registry
4578 .status(
4579 &task_id,
4580 "session",
4581 None,
4582 Some(dir.path()),
4583 RUNNING_OUTPUT_PREVIEW_BYTES,
4584 )
4585 .unwrap();
4586
4587 assert!(snapshot.output_preview.starts_with("[JSON output "));
4588 assert!(snapshot
4589 .output_preview
4590 .contains(&task.paths.stdout.display().to_string()));
4591 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4592 assert!(snapshot.output_truncated);
4593 }
4594
4595 #[test]
4596 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4597 let registry = BgTaskRegistry::default();
4598 let dir = tempfile::tempdir().unwrap();
4599 let filter_registry = crate::compress::toml_filter::build_registry(
4600 crate::compress::builtin_filters::ALL,
4601 None,
4602 None,
4603 );
4604 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4605 crate::compress::compress_with_registry_exit_code(
4606 command,
4607 &output,
4608 exit_code,
4609 &filter_registry,
4610 )
4611 });
4612 let stdout = format!(
4613 "make[1]: Entering directory `/tmp`\n{}",
4614 (0..100)
4615 .map(|index| format!("compile line {index}"))
4616 .collect::<Vec<_>>()
4617 .join("\n")
4618 );
4619 let (task_id, task) =
4620 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4621
4622 let snapshot = registry
4623 .status(
4624 &task_id,
4625 "session",
4626 None,
4627 Some(dir.path()),
4628 RUNNING_OUTPUT_PREVIEW_BYTES,
4629 )
4630 .unwrap();
4631
4632 assert!(snapshot.output_preview.contains("compile line 99"));
4633 assert!(snapshot.output_preview.contains(&format!(
4634 "full output: read \"{}\"",
4635 task.paths.stdout.display()
4636 )));
4637 assert!(!snapshot
4638 .output_preview
4639 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4640 assert!(!snapshot.output_preview.contains("tail -n +"));
4641 }
4642
4643 #[test]
4644 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4645 let registry = BgTaskRegistry::default();
4646 let dir = tempfile::tempdir().unwrap();
4647 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4648 let (task_id, task) =
4649 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4650
4651 let snapshot = registry
4652 .status(
4653 &task_id,
4654 "session",
4655 None,
4656 Some(dir.path()),
4657 RUNNING_OUTPUT_PREVIEW_BYTES,
4658 )
4659 .unwrap();
4660
4661 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4662 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4663 assert!(snapshot.output_preview.contains("truncated output"));
4664 assert!(snapshot
4665 .output_preview
4666 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4667 assert!(snapshot
4668 .output_preview
4669 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4670 assert!(!snapshot.output_preview.contains("tail -n +"));
4671 assert!(snapshot.output_preview.len() > 16 * 1024);
4672 assert!(snapshot.output_truncated);
4673 }
4674
4675 #[test]
4676 fn pty_terminal_snapshot_bypasses_line_compression() {
4677 let registry = BgTaskRegistry::default();
4678 let dir = tempfile::tempdir().unwrap();
4679 let calls = Arc::new(AtomicUsize::new(0));
4680 let calls_for_closure = Arc::clone(&calls);
4681 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4682 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4683 CompressionResult::new(output)
4684 });
4685 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4686
4687 let snapshot = registry
4688 .status(
4689 &task_id,
4690 "session",
4691 None,
4692 Some(dir.path()),
4693 RUNNING_OUTPUT_PREVIEW_BYTES,
4694 )
4695 .unwrap();
4696
4697 assert_eq!(snapshot.info.mode, BgMode::Pty);
4698 assert_eq!(snapshot.output_preview, "");
4699 assert_eq!(calls.load(Ordering::SeqCst), 0);
4700 }
4701
4702 #[test]
4703 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4704 let registry = BgTaskRegistry::default();
4705 let dir = tempfile::tempdir().unwrap();
4706 let task_id = registry
4707 .spawn_pty(
4708 QUICK_SUCCESS_COMMAND,
4709 "session".to_string(),
4710 dir.path().to_path_buf(),
4711 HashMap::new(),
4712 Some(Duration::from_secs(30)),
4713 dir.path().to_path_buf(),
4714 10,
4715 true,
4716 false,
4717 Some(dir.path().to_path_buf()),
4718 50,
4719 120,
4720 )
4721 .unwrap();
4722
4723 let paths = task_paths(dir.path(), "session", &task_id);
4724 let metadata = read_task(&paths.json).unwrap();
4725 assert_eq!(
4726 metadata.schema_version,
4727 crate::bash_background::persistence::SCHEMA_VERSION
4728 );
4729 assert_eq!(metadata.mode, BgMode::Pty);
4730 assert_eq!(metadata.pty_rows, Some(50));
4731 assert_eq!(metadata.pty_cols, Some(120));
4732
4733 let snapshot = registry
4734 .status(&task_id, "session", None, Some(dir.path()), 1024)
4735 .unwrap();
4736 assert_eq!(snapshot.pty_rows, Some(50));
4737 assert_eq!(snapshot.pty_cols, Some(120));
4738 }
4739
4740 fn spawn_dead_child() -> std::process::Child {
4745 #[cfg(unix)]
4746 let mut cmd = std::process::Command::new("true");
4747 #[cfg(windows)]
4748 let mut cmd = {
4749 let mut c = std::process::Command::new("cmd");
4750 c.args(["/c", "exit", "0"]);
4751 c
4752 };
4753 cmd.stdin(std::process::Stdio::null());
4754 cmd.stdout(std::process::Stdio::null());
4755 cmd.stderr(std::process::Stdio::null());
4756 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4757 let started = Instant::now();
4766 loop {
4767 match child.try_wait() {
4768 Ok(Some(_)) => break,
4769 Ok(None) => {
4770 if started.elapsed() > Duration::from_secs(5) {
4771 panic!("dead-child stand-in did not exit within 5s");
4772 }
4773 std::thread::sleep(Duration::from_millis(10));
4774 }
4775 Err(error) => panic!("dead-child try_wait failed: {error}"),
4776 }
4777 }
4778 child
4779 }
4780
4781 #[test]
4782 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4783 let registry = BgTaskRegistry::default();
4784 let dir = tempfile::tempdir().unwrap();
4785 let task_id = registry
4786 .spawn(
4787 LONG_RUNNING_COMMAND,
4788 "session".to_string(),
4789 dir.path().to_path_buf(),
4790 HashMap::new(),
4791 Some(Duration::from_secs(30)),
4792 dir.path().to_path_buf(),
4793 10,
4794 true,
4795 false,
4796 Some(dir.path().to_path_buf()),
4797 )
4798 .unwrap();
4799 registry
4800 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4801 .unwrap();
4802 assert_eq!(
4803 registry
4804 .drain_completions_for_session(Some("session"))
4805 .len(),
4806 1
4807 );
4808
4809 registry.inner.completions.lock().unwrap().clear();
4812
4813 assert_eq!(
4814 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4815 vec![task_id.clone()]
4816 );
4817 assert!(registry
4818 .drain_completions_for_session(Some("session"))
4819 .is_empty());
4820
4821 let paths = task_paths(dir.path(), "session", &task_id);
4822 let metadata = read_task(&paths.json).unwrap();
4823 assert!(metadata.completion_delivered);
4824
4825 let replayed = BgTaskRegistry::default();
4826 replayed
4827 .replay_session_inner(dir.path(), "session", None)
4828 .unwrap();
4829 assert!(replayed
4830 .drain_completions_for_session(Some("session"))
4831 .is_empty());
4832 }
4833
4834 #[test]
4835 fn register_watch_rejects_unknown_task() {
4836 let registry = BgTaskRegistry::default();
4837
4838 let result = registry.register_watch(
4839 "missing-task".to_string(),
4840 WatchPattern::Substring("READY".into()),
4841 true,
4842 );
4843
4844 assert_eq!(result, Err("task_not_found"));
4845 }
4846
4847 #[test]
4848 fn register_watch_on_terminal_task_scans_existing_output() {
4849 let frames = Arc::new(Mutex::new(Vec::new()));
4850 let captured = Arc::clone(&frames);
4851 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4852 captured.lock().unwrap().push(frame);
4853 })
4854 as Box<dyn Fn(PushFrame) + Send + Sync>);
4855 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4856 let dir = tempfile::tempdir().unwrap();
4857 let task_id = registry
4858 .spawn(
4859 LONG_RUNNING_COMMAND,
4860 "session".to_string(),
4861 dir.path().to_path_buf(),
4862 HashMap::new(),
4863 Some(Duration::from_secs(30)),
4864 dir.path().to_path_buf(),
4865 10,
4866 true,
4867 false,
4868 Some(dir.path().to_path_buf()),
4869 )
4870 .unwrap();
4871 registry
4872 .inner
4873 .shutdown
4874 .store(true, std::sync::atomic::Ordering::SeqCst);
4875 let task = registry.task_for_session(&task_id, "session").unwrap();
4876 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4877 registry
4878 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4879 .unwrap();
4880 frames.lock().unwrap().clear();
4881 registry.inner.completions.lock().unwrap().clear();
4882
4883 registry
4884 .register_watch(
4885 task_id.clone(),
4886 WatchPattern::Substring("READY".into()),
4887 true,
4888 )
4889 .unwrap();
4890
4891 let frames = frames.lock().unwrap();
4892 let frame = frames
4893 .iter()
4894 .find_map(|frame| match frame {
4895 PushFrame::BashPatternMatch(frame) => Some(frame),
4896 _ => None,
4897 })
4898 .expect("terminal watch registration should emit pattern frame");
4899 assert_eq!(frame.reason, "pattern_match");
4900 assert_eq!(frame.task_id, task_id);
4901 assert_eq!(frame.session_id, "session");
4902 assert_eq!(frame.match_text, "READY");
4903 assert_eq!(frame.match_offset, 0);
4904 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4905 let metadata = read_task(&task.paths.json).unwrap();
4906 assert!(metadata.completion_delivered);
4907 }
4908
4909 #[test]
4910 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4911 let registry = BgTaskRegistry::default();
4912 let dir = tempfile::tempdir().unwrap();
4913 let task_id = registry
4914 .spawn(
4915 QUICK_SUCCESS_COMMAND,
4916 "session".to_string(),
4917 dir.path().to_path_buf(),
4918 HashMap::new(),
4919 Some(Duration::from_secs(30)),
4920 dir.path().to_path_buf(),
4921 10,
4922 true,
4923 false,
4924 Some(dir.path().to_path_buf()),
4925 )
4926 .unwrap();
4927 registry
4928 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4929 .unwrap();
4930 let completions = registry.drain_completions_for_session(Some("session"));
4931 assert_eq!(completions.len(), 1);
4932 assert_eq!(
4933 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4934 vec![task_id.clone()]
4935 );
4936
4937 registry.cleanup_finished(Duration::ZERO);
4938
4939 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4940 }
4941
4942 #[test]
4943 fn cleanup_finished_retains_undelivered_terminals() {
4944 let registry = BgTaskRegistry::default();
4945 let dir = tempfile::tempdir().unwrap();
4946 let task_id = registry
4947 .spawn(
4948 QUICK_SUCCESS_COMMAND,
4949 "session".to_string(),
4950 dir.path().to_path_buf(),
4951 HashMap::new(),
4952 Some(Duration::from_secs(30)),
4953 dir.path().to_path_buf(),
4954 10,
4955 true,
4956 false,
4957 Some(dir.path().to_path_buf()),
4958 )
4959 .unwrap();
4960 registry
4961 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4962 .unwrap();
4963
4964 registry.cleanup_finished(Duration::ZERO);
4965
4966 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4967 }
4968
4969 #[test]
4977 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4978 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4979 let dir = tempfile::tempdir().unwrap();
4980 let task_id = registry
4981 .spawn(
4982 QUICK_SUCCESS_COMMAND,
4983 "session".to_string(),
4984 dir.path().to_path_buf(),
4985 HashMap::new(),
4986 Some(Duration::from_secs(30)),
4987 dir.path().to_path_buf(),
4988 10,
4989 true,
4990 false,
4991 Some(dir.path().to_path_buf()),
4992 )
4993 .unwrap();
4994
4995 let task = registry.task_for_session(&task_id, "session").unwrap();
4996
4997 let started = Instant::now();
5002 loop {
5003 let exited = {
5004 let mut state = task.state.lock().unwrap();
5005 match &mut state.runtime {
5006 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5007 _ => true,
5008 }
5009 };
5010 if exited {
5011 break;
5012 }
5013 assert!(
5014 started.elapsed() < Duration::from_secs(5),
5015 "child should exit quickly"
5016 );
5017 std::thread::sleep(Duration::from_millis(20));
5018 }
5019
5020 registry
5028 .inner
5029 .shutdown
5030 .store(true, std::sync::atomic::Ordering::SeqCst);
5031 std::thread::sleep(Duration::from_millis(550));
5035
5036 let _ = std::fs::remove_file(&task.paths.exit);
5039
5040 {
5055 let mut state = task.state.lock().unwrap();
5056 state.metadata.status = BgTaskStatus::Running;
5057 state.metadata.status_reason = None;
5058 state.metadata.exit_code = None;
5059 state.metadata.finished_at = None;
5060 state.metadata.duration_ms = None;
5061 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5064 .expect("persist reset Running metadata for reap_child test");
5065 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5069 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5070 }
5071 }
5072 *task.terminal_at.lock().unwrap() = None;
5075
5076 assert!(
5079 task.is_running(),
5080 "precondition: metadata.status == Running"
5081 );
5082 assert!(
5083 !task.paths.exit.exists(),
5084 "precondition: exit marker absent"
5085 );
5086
5087 registry.reap_child(&task);
5092
5093 {
5094 let state = task.state.lock().unwrap();
5095 assert_eq!(
5096 state.metadata.status,
5097 BgTaskStatus::Running,
5098 "first reap must leave status Running while waiting one pass for marker"
5099 );
5100 assert_eq!(
5101 state.metadata.status_reason, None,
5102 "first reap must not record a failure reason"
5103 );
5104 assert!(
5105 matches!(state.runtime, TaskRuntime::Piped(None)),
5106 "child handle must be released after first reap"
5107 );
5108 assert!(
5109 state.detached,
5110 "task must be marked detached after first reap"
5111 );
5112 }
5113
5114 registry.reap_child(&task);
5118
5119 let state = task.state.lock().unwrap();
5120 assert!(
5121 state.metadata.status.is_terminal(),
5122 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
5123 state.metadata.status
5124 );
5125 assert_eq!(
5126 state.metadata.status,
5127 BgTaskStatus::Failed,
5128 "must specifically be Failed (not Killed): status={:?}",
5129 state.metadata.status
5130 );
5131 assert_eq!(
5132 state.metadata.status_reason.as_deref(),
5133 Some("process exited without exit marker"),
5134 "reason must match replay path's wording: {:?}",
5135 state.metadata.status_reason
5136 );
5137 assert!(
5138 matches!(state.runtime, TaskRuntime::Piped(None)),
5139 "child handle must stay released after second reap"
5140 );
5141 assert!(
5142 state.detached,
5143 "task must remain detached after second reap"
5144 );
5145 }
5146
5147 #[test]
5152 fn reap_child_preserves_running_when_exit_marker_exists() {
5153 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5154 let dir = tempfile::tempdir().unwrap();
5155 let task_id = registry
5156 .spawn(
5157 QUICK_SUCCESS_COMMAND,
5158 "session".to_string(),
5159 dir.path().to_path_buf(),
5160 HashMap::new(),
5161 Some(Duration::from_secs(30)),
5162 dir.path().to_path_buf(),
5163 10,
5164 true,
5165 false,
5166 Some(dir.path().to_path_buf()),
5167 )
5168 .unwrap();
5169
5170 let task = registry.task_for_session(&task_id, "session").unwrap();
5171
5172 let started = Instant::now();
5175 loop {
5176 let exited = {
5177 let mut state = task.state.lock().unwrap();
5178 match &mut state.runtime {
5179 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5180 _ => true,
5181 }
5182 };
5183 if exited && task.paths.exit.exists() {
5184 break;
5185 }
5186 assert!(
5187 started.elapsed() < Duration::from_secs(5),
5188 "child should exit and write marker quickly"
5189 );
5190 std::thread::sleep(Duration::from_millis(20));
5191 }
5192
5193 registry
5199 .inner
5200 .shutdown
5201 .store(true, std::sync::atomic::Ordering::SeqCst);
5202 std::thread::sleep(Duration::from_millis(550));
5203
5204 {
5210 let mut state = task.state.lock().unwrap();
5211 state.metadata.status = BgTaskStatus::Running;
5212 state.metadata.status_reason = None;
5213 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5214 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5215 }
5216 }
5217 *task.terminal_at.lock().unwrap() = None;
5218 if !task.paths.exit.exists() {
5221 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5222 }
5223
5224 registry.reap_child(&task);
5228
5229 let state = task.state.lock().unwrap();
5230 assert!(
5231 matches!(state.runtime, TaskRuntime::Piped(None)),
5232 "child handle still released even when marker exists"
5233 );
5234 assert!(
5235 state.detached,
5236 "task still marked detached even when marker exists"
5237 );
5238 assert_eq!(
5243 state.metadata.status,
5244 BgTaskStatus::Running,
5245 "reap_child must defer to poll_task when marker exists"
5246 );
5247 }
5248
5249 #[cfg(unix)]
5253 fn pid_stat(pid: u32) -> Option<String> {
5254 let output = std::process::Command::new("ps")
5255 .args(["-o", "stat=", "-p", &pid.to_string()])
5256 .output()
5257 .ok()?;
5258 if !output.status.success() {
5259 return None;
5260 }
5261 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5262 if stat.is_empty() {
5263 None
5264 } else {
5265 Some(stat)
5266 }
5267 }
5268
5269 #[cfg(unix)]
5271 fn is_zombie(pid: u32) -> bool {
5272 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5273 }
5274
5275 #[cfg(unix)]
5281 fn spawn_unreaped_zombie() -> std::process::Child {
5282 let child = std::process::Command::new("true")
5283 .stdin(std::process::Stdio::null())
5284 .stdout(std::process::Stdio::null())
5285 .stderr(std::process::Stdio::null())
5286 .spawn()
5287 .expect("spawn zombie stand-in");
5288 let pid = child.id();
5289 let started = Instant::now();
5290 while !is_zombie(pid) {
5291 assert!(
5292 started.elapsed() < Duration::from_secs(5),
5293 "stand-in child should become a zombie within 5s"
5294 );
5295 std::thread::sleep(Duration::from_millis(10));
5296 }
5297 child
5299 }
5300
5301 #[cfg(unix)]
5311 #[test]
5312 fn finalize_from_marker_reaps_child_no_zombie() {
5313 use std::sync::atomic::Ordering;
5314
5315 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5316 let dir = tempfile::tempdir().unwrap();
5317 let task_id = registry
5318 .spawn(
5319 QUICK_SUCCESS_COMMAND,
5320 "session".to_string(),
5321 dir.path().to_path_buf(),
5322 HashMap::new(),
5323 Some(Duration::from_secs(30)),
5324 dir.path().to_path_buf(),
5325 10,
5326 true,
5327 false,
5328 Some(dir.path().to_path_buf()),
5329 )
5330 .unwrap();
5331
5332 registry.inner.shutdown.store(true, Ordering::SeqCst);
5336 std::thread::sleep(Duration::from_millis(550));
5337
5338 let task = registry.task_for_session(&task_id, "session").unwrap();
5339
5340 let started = Instant::now();
5344 while !task.paths.exit.exists() {
5345 assert!(
5346 started.elapsed() < Duration::from_secs(5),
5347 "exit marker should land quickly for `true`"
5348 );
5349 std::thread::sleep(Duration::from_millis(20));
5350 }
5351
5352 let zombie_pid;
5358 {
5359 let mut state = task.state.lock().unwrap();
5360 state.metadata.status = BgTaskStatus::Running;
5361 state.metadata.status_reason = None;
5362 state.metadata.exit_code = None;
5363 state.metadata.finished_at = None;
5364 state.metadata.duration_ms = None;
5365 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5366 .expect("persist reset Running metadata");
5367 let zombie = spawn_unreaped_zombie();
5368 zombie_pid = zombie.id();
5369 state.runtime = TaskRuntime::Piped(Some(zombie));
5370 }
5371 *task.terminal_at.lock().unwrap() = None;
5372
5373 assert!(
5375 is_zombie(zombie_pid),
5376 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5377 );
5378
5379 registry.poll_task(&task).unwrap();
5382
5383 {
5384 let state = task.state.lock().unwrap();
5385 assert!(
5386 matches!(state.runtime, TaskRuntime::Piped(None)),
5387 "child handle must be released after marker finalize"
5388 );
5389 assert!(
5390 state.metadata.status.is_terminal(),
5391 "task must be terminal after marker finalize: {:?}",
5392 state.metadata.status
5393 );
5394 }
5395
5396 assert!(
5399 !is_zombie(zombie_pid),
5400 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5401 after the exit-marker terminal transition"
5402 );
5403 }
5404
5405 #[cfg(unix)]
5409 #[test]
5410 fn kill_with_existing_marker_reaps_child_no_zombie() {
5411 use std::sync::atomic::Ordering;
5412
5413 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5414 let dir = tempfile::tempdir().unwrap();
5415 let task_id = registry
5416 .spawn(
5417 QUICK_SUCCESS_COMMAND,
5418 "session".to_string(),
5419 dir.path().to_path_buf(),
5420 HashMap::new(),
5421 Some(Duration::from_secs(30)),
5422 dir.path().to_path_buf(),
5423 10,
5424 true,
5425 false,
5426 Some(dir.path().to_path_buf()),
5427 )
5428 .unwrap();
5429
5430 registry.inner.shutdown.store(true, Ordering::SeqCst);
5431 std::thread::sleep(Duration::from_millis(550));
5432
5433 let task = registry.task_for_session(&task_id, "session").unwrap();
5434
5435 let started = Instant::now();
5436 while !task.paths.exit.exists() {
5437 assert!(
5438 started.elapsed() < Duration::from_secs(5),
5439 "exit marker should land quickly for `true`"
5440 );
5441 std::thread::sleep(Duration::from_millis(20));
5442 }
5443
5444 let zombie_pid;
5445 {
5446 let mut state = task.state.lock().unwrap();
5447 state.metadata.status = BgTaskStatus::Running;
5448 state.metadata.status_reason = None;
5449 state.metadata.exit_code = None;
5450 state.metadata.finished_at = None;
5451 state.metadata.duration_ms = None;
5452 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5453 .expect("persist reset Running metadata");
5454 let zombie = spawn_unreaped_zombie();
5455 zombie_pid = zombie.id();
5456 state.runtime = TaskRuntime::Piped(Some(zombie));
5457 }
5458 *task.terminal_at.lock().unwrap() = None;
5459
5460 assert!(
5461 is_zombie(zombie_pid),
5462 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5463 );
5464
5465 registry
5467 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5468 .expect("kill should succeed");
5469
5470 {
5471 let state = task.state.lock().unwrap();
5472 assert!(
5473 matches!(state.runtime, TaskRuntime::Piped(None)),
5474 "child handle must be released after marker-aware kill"
5475 );
5476 assert!(state.metadata.status.is_terminal());
5477 }
5478
5479 assert!(
5480 !is_zombie(zombie_pid),
5481 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5482 after a marker-aware kill"
5483 );
5484 }
5485
5486 #[test]
5487 fn cleanup_finished_keeps_running_tasks() {
5488 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5489 let dir = tempfile::tempdir().unwrap();
5490 let task_id = registry
5491 .spawn(
5492 LONG_RUNNING_COMMAND,
5493 "session".to_string(),
5494 dir.path().to_path_buf(),
5495 HashMap::new(),
5496 Some(Duration::from_secs(30)),
5497 dir.path().to_path_buf(),
5498 10,
5499 true,
5500 false,
5501 Some(dir.path().to_path_buf()),
5502 )
5503 .unwrap();
5504
5505 registry.cleanup_finished(Duration::ZERO);
5506
5507 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5508 let _ = registry.kill(&task_id, "session");
5509 }
5510
5511 #[cfg(windows)]
5512 fn wait_for_file(path: &Path) -> String {
5513 let started = Instant::now();
5514 loop {
5515 if path.exists() {
5516 return fs::read_to_string(path).expect("read file");
5517 }
5518 assert!(
5519 started.elapsed() < Duration::from_secs(30),
5520 "timed out waiting for {}",
5521 path.display()
5522 );
5523 std::thread::sleep(Duration::from_millis(100));
5524 }
5525 }
5526
5527 #[cfg(windows)]
5528 fn spawn_windows_registry_command(
5529 command: &str,
5530 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5531 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5532 let dir = tempfile::tempdir().unwrap();
5533 let task_id = registry
5534 .spawn(
5535 command,
5536 "session".to_string(),
5537 dir.path().to_path_buf(),
5538 HashMap::new(),
5539 Some(Duration::from_secs(30)),
5540 dir.path().to_path_buf(),
5541 10,
5542 false,
5543 false,
5544 Some(dir.path().to_path_buf()),
5545 )
5546 .unwrap();
5547 (registry, dir, task_id)
5548 }
5549
5550 #[cfg(windows)]
5551 #[test]
5552 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5553 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5554 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5555
5556 let content = wait_for_file(&exit_path);
5557
5558 assert_eq!(content.trim(), "0");
5559 }
5560
5561 #[cfg(windows)]
5562 #[test]
5563 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5564 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5565 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5566
5567 let content = wait_for_file(&exit_path);
5568
5569 assert_eq!(content.trim(), "42");
5570 }
5571
5572 #[cfg(windows)]
5573 #[test]
5574 fn windows_spawn_captures_stdout_to_disk() {
5575 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5576 let task = registry.task_for_session(&task_id, "session").unwrap();
5577 let stdout_path = task.paths.stdout.clone();
5578 let exit_path = task.paths.exit.clone();
5579
5580 let _ = wait_for_file(&exit_path);
5581 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5582
5583 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5584 }
5585
5586 #[cfg(windows)]
5587 #[test]
5588 fn windows_spawn_uses_pwsh_when_available() {
5589 let candidates = crate::windows_shell::shell_candidates_with(
5593 |binary| match binary {
5594 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5595 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5596 _ => None,
5597 },
5598 || None,
5599 );
5600 let shell = candidates.first().expect("at least one candidate").clone();
5601 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5602 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5603 }
5604
5605 #[cfg(windows)]
5612 #[test]
5613 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5614 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5615 let script =
5616 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5617
5618 assert!(
5622 script.contains("set CODE=%ERRORLEVEL%"),
5623 "wrapper must capture exit code into CODE: {script}"
5624 );
5625 assert!(
5626 script.contains("echo %CODE% >"),
5627 "wrapper must echo CODE to a temp marker file: {script}"
5628 );
5629 assert!(
5630 script.contains("move /Y"),
5631 "wrapper must use atomic move to write the marker: {script}"
5632 );
5633 assert!(
5636 script.contains("> nul"),
5637 "wrapper must redirect move output to nul: {script}"
5638 );
5639 assert!(
5641 script.contains("exit /B %CODE%"),
5642 "wrapper must propagate the captured exit code: {script}"
5643 );
5644 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5645 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5646 }
5647
5648 #[cfg(windows)]
5654 #[test]
5655 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5656 use crate::windows_shell::WindowsShell;
5657 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5658 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5659 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5660 assert_eq!(
5661 args_strs,
5662 vec!["/D", "/S", "/C", "echo wrapped"],
5663 "Cmd::bg_command must prepend /D /S /C"
5664 );
5665 }
5666
5667 #[cfg(windows)]
5670 #[test]
5671 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5672 use crate::windows_shell::WindowsShell;
5673 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5674 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5675 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5676 assert!(
5677 args_strs.contains(&"-Command"),
5678 "Pwsh::bg_command must use -Command: {args_strs:?}"
5679 );
5680 assert!(
5681 args_strs.contains(&"Get-Date"),
5682 "Pwsh::bg_command must include the user command body"
5683 );
5684 }
5685}