1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, DiskTruncation, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30 retained_json_output_pointer, COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES,
31 COMPRESS_INPUT_TAIL_BYTES, FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES,
32 RAW_PASSTHROUGH_HEAD_BYTES, RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES,
33 STRUCTURED_OUTPUT_CAP_BYTES,
34};
35use super::persistence::{
36 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
37 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
38 ExitMarker, PersistedTask, TaskPaths,
39};
40use super::process::is_process_alive;
41#[cfg(unix)]
42use super::process::terminate_pgid;
43#[cfg(windows)]
44use super::process::terminate_pid;
45use super::pty_process::spawn_pty_for_command;
46use super::pty_runtime::PtyRuntime;
47use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
48use super::{BgTaskInfo, BgTaskStatus};
49const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
52const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
53const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
54const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
55
56const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
57
58#[derive(Debug, Clone, Serialize)]
59pub struct BgCompletion {
60 pub task_id: String,
61 #[serde(skip_serializing)]
64 pub session_id: String,
65 pub status: BgTaskStatus,
66 pub exit_code: Option<i32>,
67 pub command: String,
68 #[serde(default, skip_serializing_if = "String::is_empty")]
74 pub output_preview: String,
75 #[serde(default, skip_serializing_if = "is_false")]
80 pub output_truncated: bool,
81 #[serde(default, skip_serializing_if = "Option::is_none")]
84 pub original_tokens: Option<u32>,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub compressed_tokens: Option<u32>,
89 #[serde(default, skip_serializing_if = "is_false")]
91 pub tokens_skipped: bool,
92}
93
94fn is_false(v: &bool) -> bool {
95 !*v
96}
97
98#[derive(Debug, Clone, Serialize)]
99pub struct BgTaskSnapshot {
100 #[serde(flatten)]
101 pub info: BgTaskInfo,
102 pub exit_code: Option<i32>,
103 pub child_pid: Option<u32>,
104 pub workdir: String,
105 pub output_preview: String,
106 pub output_truncated: bool,
107 pub output_path: Option<String>,
108 pub stderr_path: Option<String>,
109 #[serde(skip_serializing_if = "Option::is_none")]
110 pub pty_rows: Option<u16>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_cols: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_screen: Option<String>,
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118enum TerminalOutputKind {
119 Compressed,
120 Raw,
121 Structured,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125struct TerminalOutputCache {
126 output_preview: String,
127 output_truncated: bool,
128 kind: TerminalOutputKind,
129 output_path: Option<String>,
130 stderr_path: Option<String>,
131 recovery: Option<RecoveryContext>,
132}
133
134#[derive(Debug, Clone, PartialEq, Eq)]
135struct RecoveryContext {
136 dropped_by_class: BTreeMap<DropClass, usize>,
137 had_inner_drop: bool,
138 offset_hint_eligible: bool,
139 offset_start_line: Option<usize>,
140 byte_truncated: bool,
141 disk_truncated_prefix_bytes: u64,
142 output_path: Option<String>,
143 stderr_path: Option<String>,
144 include_stderr_path: bool,
145}
146
147impl RecoveryContext {
148 fn has_visible_drop(&self) -> bool {
149 self.byte_truncated
150 || self.disk_truncated_prefix_bytes > 0
151 || self.had_inner_drop
152 || !self.dropped_by_class.is_empty()
153 }
154}
155
156#[derive(Clone)]
157pub struct BgTaskRegistry {
158 pub(crate) inner: Arc<RegistryInner>,
159}
160
161pub(crate) struct RegistryInner {
162 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
163 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
164 pub(crate) progress_sender: SharedProgressSender,
165 watchdog_started: AtomicBool,
166 pub(crate) shutdown: AtomicBool,
167 pub(crate) long_running_reminder_enabled: AtomicBool,
168 pub(crate) long_running_reminder_interval_ms: AtomicU64,
169 persisted_gc_started: AtomicBool,
170 #[cfg(test)]
171 persisted_gc_runs: AtomicU64,
172 pub(crate) compressor:
178 Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
179 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
180 pub(crate) db_harness: RwLock<Option<String>>,
181 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
182 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
183 pub(crate) watch_registry: Mutex<WatchRegistry>,
184}
185
186pub(crate) struct BgTask {
187 pub(crate) task_id: String,
188 pub(crate) session_id: String,
189 pub(crate) paths: TaskPaths,
190 pub(crate) started: Instant,
191 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
192 pub(crate) terminal_at: Mutex<Option<Instant>>,
193 pub(crate) state: Mutex<BgTaskState>,
194}
195
196pub(crate) enum TaskRuntime {
197 Piped(Option<Child>),
198 Pty(Option<PtyRuntime>),
199}
200
201pub(crate) struct BgTaskState {
202 pub(crate) metadata: PersistedTask,
203 pub(crate) runtime: TaskRuntime,
204 pub(crate) detached: bool,
205 pub(crate) child_exit_observed: bool,
216 pub(crate) buffer: BgBuffer,
217 terminal_output_cache: Option<TerminalOutputCache>,
218 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
220}
221
222fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
223 session_id
224 .map(|session_id| completion.session_id == session_id)
225 .unwrap_or(true)
226}
227
228impl BgTaskRegistry {
229 pub fn new(progress_sender: SharedProgressSender) -> Self {
230 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
231 Self {
232 inner: Arc::new(RegistryInner {
233 tasks: Mutex::new(HashMap::new()),
234 completions: Mutex::new(VecDeque::new()),
235 progress_sender,
236 watchdog_started: AtomicBool::new(false),
237 shutdown: AtomicBool::new(false),
238 long_running_reminder_enabled: AtomicBool::new(true),
239 long_running_reminder_interval_ms: AtomicU64::new(600_000),
240 persisted_gc_started: AtomicBool::new(false),
241 #[cfg(test)]
242 persisted_gc_runs: AtomicU64::new(0),
243 compressor: Mutex::new(None),
244 db_pool: RwLock::new(None),
245 db_harness: RwLock::new(None),
246 wake_tx,
247 wake_rx,
248 watch_registry: Mutex::new(WatchRegistry::default()),
249 }),
250 }
251 }
252
253 pub fn set_harness(&self, harness: Harness) {
254 if let Ok(mut slot) = self.inner.db_harness.write() {
255 *slot = Some(harness.storage_segment());
256 }
257 }
258
259 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
260 if let Ok(mut slot) = self.inner.db_pool.write() {
261 *slot = Some(conn);
262 }
263 }
264
265 pub fn clear_db_pool(&self) {
266 if let Ok(mut slot) = self.inner.db_pool.write() {
267 *slot = None;
268 }
269 }
270
271 pub fn set_compressor<F>(&self, compressor: F)
276 where
277 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
278 {
279 self.set_compressor_with_exit_code(move |command, output, _exit_code| {
280 compressor(command, output)
281 });
282 }
283
284 pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
285 where
286 F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
287 {
288 if let Ok(mut slot) = self.inner.compressor.lock() {
289 *slot = Some(Box::new(compressor));
290 }
291 }
292
293 pub(crate) fn compress_output(
296 &self,
297 command: &str,
298 output: String,
299 exit_code: Option<i32>,
300 ) -> CompressionResult {
301 let Ok(slot) = self.inner.compressor.lock() else {
302 return CompressionResult::new(output);
303 };
304 match slot.as_ref() {
305 Some(compressor) => compressor(command, output, exit_code),
306 None => CompressionResult::new(output),
307 }
308 }
309
310 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
311 let (metadata, buffer) = {
312 let state = task.state.lock().ok()?;
313 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
314 return None;
315 }
316 if let Some(cache) = state.terminal_output_cache.clone() {
317 return Some(cache);
318 }
319 (state.metadata.clone(), state.buffer.clone())
320 };
321
322 let mut cap_buffer = buffer.clone();
323 let disk_truncation = cap_buffer.enforce_terminal_cap();
324 let cache = self.render_terminal_output(&metadata, &cap_buffer, disk_truncation);
325 let mut state = task.state.lock().ok()?;
326 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
327 return None;
328 }
329 if let Some(existing) = state.terminal_output_cache.clone() {
330 return Some(existing);
331 }
332 state.terminal_output_cache = Some(cache.clone());
333 Some(cache)
334 }
335
336 fn render_terminal_output(
337 &self,
338 metadata: &PersistedTask,
339 buffer: &BgBuffer,
340 disk_truncation: DiskTruncation,
341 ) -> TerminalOutputCache {
342 if metadata.mode == BgMode::Pty {
343 return TerminalOutputCache {
344 output_preview: String::new(),
345 output_truncated: false,
346 kind: TerminalOutputKind::Raw,
347 output_path: buffer.output_path().map(|path| path.display().to_string()),
348 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
349 recovery: None,
350 };
351 }
352
353 if let Some(structured) =
354 render_structured_output(&metadata.command, buffer, disk_truncation)
355 {
356 return structured;
357 }
358
359 if !metadata.compressed {
360 return render_raw_passthrough(buffer, disk_truncation);
361 }
362
363 let raw = buffer.read_combined_head_tail(
364 COMPRESS_INPUT_CAP_BYTES,
365 COMPRESS_INPUT_HEAD_BYTES,
366 COMPRESS_INPUT_TAIL_BYTES,
367 );
368 let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
369 render_compressed_with_recovery(buffer, compressed, raw.truncated, disk_truncation)
370 }
371
372 fn snapshot_with_terminal_cache(
373 &self,
374 task: &Arc<BgTask>,
375 preview_bytes: usize,
376 ) -> BgTaskSnapshot {
377 let mut snapshot = task.snapshot(preview_bytes);
378 self.maybe_compress_snapshot(task, &mut snapshot);
379 snapshot
380 }
381
382 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
383 let (metadata, buffer) = {
384 let state = task
385 .state
386 .lock()
387 .map_err(|_| "background task lock poisoned".to_string())?;
388 if !state.metadata.status.is_terminal() {
389 return Ok(());
390 }
391 (state.metadata.clone(), state.buffer.clone())
392 };
393
394 let cache = self.ensure_terminal_output_cache(task);
395 self.enqueue_completion_from_parts(
396 &metadata,
397 Some(&buffer),
398 None,
399 emit_frame,
400 cache.as_ref(),
401 );
402 Ok(())
403 }
404
405 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
406 write_task(&paths.json, metadata)?;
407 self.dual_write_task(paths, metadata);
408 Ok(())
409 }
410
411 fn update_task_metadata<F>(
412 &self,
413 paths: &TaskPaths,
414 update: F,
415 ) -> std::io::Result<PersistedTask>
416 where
417 F: FnOnce(&mut PersistedTask),
418 {
419 let metadata = update_task(&paths.json, update)?;
420 self.dual_write_task(paths, &metadata);
421 Ok(metadata)
422 }
423
424 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
425 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
426 let Some(pool) = pool else {
427 return;
428 };
429 let harness = self
430 .inner
431 .db_harness
432 .read()
433 .ok()
434 .and_then(|slot| slot.clone());
435 let Some(harness) = harness else {
436 crate::slog_warn!(
437 "dual-write bash_task to DB skipped for {}: harness not configured",
438 metadata.task_id
439 );
440 return;
441 };
442 let row = match metadata.to_bash_task_row(&harness, paths) {
443 Ok(row) => row,
444 Err(error) => {
445 crate::slog_warn!(
446 "dual-write bash_task to DB failed for {}: {}",
447 metadata.task_id,
448 error
449 );
450 return;
451 }
452 };
453 let conn = match pool.lock() {
454 Ok(conn) => conn,
455 Err(_) => {
456 crate::slog_warn!(
457 "dual-write bash_task to DB failed for {}: db mutex poisoned",
458 metadata.task_id
459 );
460 return;
461 }
462 };
463 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
464 crate::slog_warn!(
465 "dual-write bash_task to DB failed for {}: {}",
466 metadata.task_id,
467 error
468 );
469 }
470 }
471
472 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
473 self.inner
474 .long_running_reminder_enabled
475 .store(enabled, Ordering::SeqCst);
476 self.inner
477 .long_running_reminder_interval_ms
478 .store(interval_ms, Ordering::SeqCst);
479 }
480
481 #[cfg(unix)]
482 #[allow(clippy::too_many_arguments)]
483 pub fn spawn(
484 &self,
485 command: &str,
486 session_id: String,
487 workdir: PathBuf,
488 env: HashMap<String, String>,
489 timeout: Option<Duration>,
490 storage_dir: PathBuf,
491 max_running: usize,
492 notify_on_completion: bool,
493 compressed: bool,
494 project_root: Option<PathBuf>,
495 ) -> Result<String, String> {
496 self.start_watchdog();
497
498 let running = self.running_count();
499 if running >= max_running {
500 return Err(format!(
501 "background bash task limit exceeded: {running} running (max {max_running})"
502 ));
503 }
504
505 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
506 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
507 let task_id = self.generate_unique_task_id()?;
508 let paths = task_paths(&storage_dir, &session_id, &task_id);
509 fs::create_dir_all(&paths.dir)
510 .map_err(|e| format!("failed to create background task dir: {e}"))?;
511
512 let mut metadata = PersistedTask::starting(
513 task_id.clone(),
514 session_id.clone(),
515 command.to_string(),
516 workdir.clone(),
517 project_root,
518 timeout_ms,
519 notify_on_completion,
520 compressed,
521 );
522 self.persist_task(&paths, &metadata)
523 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
524
525 create_capture_file(&paths.stdout)
529 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
530 create_capture_file(&paths.stderr)
531 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
532
533 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
534 Ok(child) => child,
535 Err(error) => {
536 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
537 let _ = delete_task_bundle(&paths);
538 return Err(error);
539 }
540 };
541
542 let child_pid = child.id();
543 metadata.mark_running(child_pid, child_pid as i32);
544 self.persist_task(&paths, &metadata)
545 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
546
547 let task = Arc::new(BgTask {
548 task_id: task_id.clone(),
549 session_id,
550 paths: paths.clone(),
551 started: Instant::now(),
552 last_reminder_at: Mutex::new(None),
553 terminal_at: Mutex::new(None),
554 state: Mutex::new(BgTaskState {
555 metadata,
556 runtime: TaskRuntime::Piped(Some(child)),
557 detached: false,
558 child_exit_observed: false,
559 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
560 terminal_output_cache: None,
561 pending_terminal_override: None,
562 }),
563 });
564
565 self.inner
566 .tasks
567 .lock()
568 .map_err(|_| "background task registry lock poisoned".to_string())?
569 .insert(task_id.clone(), task);
570
571 Ok(task_id)
572 }
573
574 #[allow(clippy::too_many_arguments)]
575 pub fn spawn_pty(
576 &self,
577 command: &str,
578 session_id: String,
579 workdir: PathBuf,
580 env: HashMap<String, String>,
581 timeout: Option<Duration>,
582 storage_dir: PathBuf,
583 max_running: usize,
584 notify_on_completion: bool,
585 compressed: bool,
586 project_root: Option<PathBuf>,
587 rows: u16,
588 cols: u16,
589 ) -> Result<String, String> {
590 self.start_watchdog();
591
592 let running = self.running_count();
593 if running >= max_running {
594 return Err(format!(
595 "background bash task limit exceeded: {running} running (max {max_running})"
596 ));
597 }
598
599 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
600 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
601 let task_id = self.generate_unique_task_id()?;
602 let paths = task_paths(&storage_dir, &session_id, &task_id);
603 fs::create_dir_all(&paths.dir)
604 .map_err(|e| format!("failed to create background task dir: {e}"))?;
605
606 let mut metadata = PersistedTask::starting(
607 task_id.clone(),
608 session_id.clone(),
609 command.to_string(),
610 workdir.clone(),
611 project_root,
612 timeout_ms,
613 notify_on_completion,
614 compressed,
615 );
616 metadata.mode = BgMode::Pty;
617 metadata.pty_rows = Some(rows);
618 metadata.pty_cols = Some(cols);
619 self.persist_task(&paths, &metadata)
620 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
621 create_capture_file(&paths.pty)
622 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
623
624 let runtime = match spawn_pty_for_command(
625 &task_id,
626 &session_id,
627 command,
628 &paths,
629 &workdir,
630 &env,
631 rows,
632 cols,
633 self.inner.wake_tx.clone(),
634 ) {
635 Ok(runtime) => runtime,
636 Err(error) => {
637 crate::slog_warn!(
638 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
639 );
640 let _ = delete_task_bundle(&paths);
641 return Err(error);
642 }
643 };
644
645 if let Some(child_pid) = runtime.child_pid {
646 metadata.mark_running(child_pid, child_pid as i32);
647 } else {
648 metadata.status = BgTaskStatus::Running;
649 metadata.pgid = None;
650 }
651 self.persist_task(&paths, &metadata)
652 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
653
654 let task = Arc::new(BgTask {
655 task_id: task_id.clone(),
656 session_id,
657 paths: paths.clone(),
658 started: Instant::now(),
659 last_reminder_at: Mutex::new(None),
660 terminal_at: Mutex::new(None),
661 state: Mutex::new(BgTaskState {
662 metadata,
663 runtime: TaskRuntime::Pty(Some(runtime)),
664 detached: false,
665 child_exit_observed: false,
666 buffer: BgBuffer::pty(paths.pty.clone()),
667 terminal_output_cache: None,
668 pending_terminal_override: None,
669 }),
670 });
671
672 self.inner
673 .tasks
674 .lock()
675 .map_err(|_| "background task registry lock poisoned".to_string())?
676 .insert(task_id.clone(), task);
677
678 Ok(task_id)
679 }
680
681 #[cfg(windows)]
682 #[allow(clippy::too_many_arguments)]
683 pub fn spawn(
684 &self,
685 command: &str,
686 session_id: String,
687 workdir: PathBuf,
688 env: HashMap<String, String>,
689 timeout: Option<Duration>,
690 storage_dir: PathBuf,
691 max_running: usize,
692 notify_on_completion: bool,
693 compressed: bool,
694 project_root: Option<PathBuf>,
695 ) -> Result<String, String> {
696 self.start_watchdog();
697
698 let running = self.running_count();
699 if running >= max_running {
700 return Err(format!(
701 "background bash task limit exceeded: {running} running (max {max_running})"
702 ));
703 }
704
705 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
706 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
707 let task_id = self.generate_unique_task_id()?;
708 let paths = task_paths(&storage_dir, &session_id, &task_id);
709 fs::create_dir_all(&paths.dir)
710 .map_err(|e| format!("failed to create background task dir: {e}"))?;
711
712 let mut metadata = PersistedTask::starting(
713 task_id.clone(),
714 session_id.clone(),
715 command.to_string(),
716 workdir.clone(),
717 project_root,
718 timeout_ms,
719 notify_on_completion,
720 compressed,
721 );
722 self.persist_task(&paths, &metadata)
723 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
724
725 create_capture_file(&paths.stdout)
731 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
732 create_capture_file(&paths.stderr)
733 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
734
735 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
736 Ok(child) => child,
737 Err(error) => {
738 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
739 let _ = delete_task_bundle(&paths);
740 return Err(error);
741 }
742 };
743
744 let child_pid = child.id();
745 metadata.status = BgTaskStatus::Running;
746 metadata.child_pid = Some(child_pid);
747 metadata.pgid = None;
748 self.persist_task(&paths, &metadata)
749 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
750
751 let task = Arc::new(BgTask {
752 task_id: task_id.clone(),
753 session_id,
754 paths: paths.clone(),
755 started: Instant::now(),
756 last_reminder_at: Mutex::new(None),
757 terminal_at: Mutex::new(None),
758 state: Mutex::new(BgTaskState {
759 metadata,
760 runtime: TaskRuntime::Piped(Some(child)),
761 detached: false,
762 child_exit_observed: false,
763 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
764 terminal_output_cache: None,
765 pending_terminal_override: None,
766 }),
767 });
768
769 self.inner
770 .tasks
771 .lock()
772 .map_err(|_| "background task registry lock poisoned".to_string())?
773 .insert(task_id.clone(), task);
774
775 Ok(task_id)
776 }
777
778 pub fn write_pty(
779 &self,
780 task_id: &str,
781 session_id: &str,
782 input: &[u8],
783 ) -> Result<usize, String> {
784 let task = self
785 .task_for_session(task_id, session_id)
786 .ok_or_else(|| "task_not_found".to_string())?;
787
788 let writer = {
789 let state = task
790 .state
791 .lock()
792 .map_err(|_| "background task lock poisoned".to_string())?;
793 if state.metadata.mode != BgMode::Pty {
794 return Err("task_not_pty".to_string());
795 }
796 if state.metadata.status.is_terminal() {
797 return Err("task_exited".to_string());
798 }
799 match &state.runtime {
800 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
801 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
802 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
803 }
804 };
805
806 let mut writer = writer
807 .lock()
808 .map_err(|_| "PTY writer lock poisoned".to_string())?;
809 writer
810 .write_all(input)
811 .map_err(|error| format!("failed to write to PTY: {error}"))?;
812 writer
813 .flush()
814 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
815 Ok(input.len())
816 }
817
818 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
819 self.replay_session_inner(storage_dir, session_id, None)
820 }
821
822 pub fn replay_session_for_project(
823 &self,
824 storage_dir: &Path,
825 session_id: &str,
826 project_root: &Path,
827 ) -> Result<(), String> {
828 self.replay_session_inner(storage_dir, session_id, Some(project_root))
829 }
830
831 fn replay_session_inner(
832 &self,
833 storage_dir: &Path,
834 session_id: &str,
835 project_root: Option<&Path>,
836 ) -> Result<(), String> {
837 self.start_watchdog();
838 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
839 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
840 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
841 }
842 }
843
844 let canonical_project = project_root.map(canonicalized_path);
845 let tasks = match self.replay_session_from_db(session_id) {
857 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
858 Some(Ok(_)) => {
859 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
860 if !disk_tasks.is_empty() {
861 crate::slog_info!(
862 "bash task replay: 0 in DB for session {}, {} from disk fallback",
863 session_id,
864 disk_tasks.len()
865 );
866 }
867 disk_tasks
868 }
869 Some(Err(error)) => {
870 crate::slog_warn!(
871 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
872 session_id,
873 error
874 );
875 self.replay_session_from_disk(storage_dir, session_id)?
876 }
877 None => {
878 self.replay_session_from_disk(storage_dir, session_id)?
880 }
881 };
882
883 for mut metadata in tasks {
884 if metadata.session_id != session_id {
885 continue;
886 }
887 if let Some(canonical_project) = canonical_project.as_deref() {
888 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
889 if metadata_project.as_deref() != Some(canonical_project) {
890 continue;
891 }
892 }
893
894 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
895 match metadata.status {
896 BgTaskStatus::Starting => {
897 let completion_was_delivered = metadata.completion_delivered;
898 metadata.mark_terminal(
899 BgTaskStatus::Failed,
900 None,
901 Some("spawn aborted".to_string()),
902 );
903 metadata.completion_delivered |= completion_was_delivered;
904 let _ = self.persist_task(&paths, &metadata);
905 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
906 self.insert_rehydrated_task(metadata, paths, true)?;
907 }
908 BgTaskStatus::Running | BgTaskStatus::Killing => {
909 if metadata.mode == BgMode::Pty {
910 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
911 let completion_was_delivered = metadata.completion_delivered;
912 metadata = terminal_metadata_from_marker(metadata, marker, None);
913 metadata.completion_delivered |= completion_was_delivered;
914 let _ = self.persist_task(&paths, &metadata);
915 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
916 self.insert_rehydrated_task(metadata, paths, true)?;
917 } else if metadata.status.is_terminal() {
918 self.insert_rehydrated_task(metadata, paths, true)?;
919 } else {
920 let completion_was_delivered = metadata.completion_delivered;
921 metadata.mark_terminal(
922 BgTaskStatus::Killed,
923 None,
924 Some("pty_lost_on_bridge_restart".to_string()),
925 );
926 metadata.completion_delivered |= completion_was_delivered;
927 let _ = self.persist_task(&paths, &metadata);
928 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
929 self.insert_rehydrated_task(metadata, paths, true)?;
930 }
931 } else if self.running_metadata_is_stale(&metadata) {
932 let completion_was_delivered = metadata.completion_delivered;
933 metadata.mark_terminal(
934 BgTaskStatus::Killed,
935 None,
936 Some("orphaned (>24h)".to_string()),
937 );
938 metadata.completion_delivered |= completion_was_delivered;
939 if !paths.exit.exists() {
940 let _ = write_kill_marker_if_absent(&paths.exit);
941 }
942 let _ = self.persist_task(&paths, &metadata);
943 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
944 self.insert_rehydrated_task(metadata, paths, true)?;
945 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
946 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
947 "recovered from inconsistent killing state on replay".to_string()
948 });
949 if reason.is_some() {
950 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
951 metadata.task_id);
952 }
953 let completion_was_delivered = metadata.completion_delivered;
954 metadata = terminal_metadata_from_marker(metadata, marker, reason);
955 metadata.completion_delivered |= completion_was_delivered;
956 let _ = self.persist_task(&paths, &metadata);
957 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
958 self.insert_rehydrated_task(metadata, paths, true)?;
959 } else if metadata.status == BgTaskStatus::Killing {
960 if !paths.exit.exists() {
961 let _ = write_kill_marker_if_absent(&paths.exit);
962 }
963 let completion_was_delivered = metadata.completion_delivered;
964 metadata.mark_terminal(
965 BgTaskStatus::Killed,
966 None,
967 Some("recovered from inconsistent killing state on replay".to_string()),
968 );
969 metadata.completion_delivered |= completion_was_delivered;
970 let _ = self.persist_task(&paths, &metadata);
971 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
972 self.insert_rehydrated_task(metadata, paths, true)?;
973 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
974 let completion_was_delivered = metadata.completion_delivered;
975 metadata.mark_terminal(
976 BgTaskStatus::Failed,
977 None,
978 Some("process exited without exit marker".to_string()),
979 );
980 metadata.completion_delivered |= completion_was_delivered;
981 let _ = self.persist_task(&paths, &metadata);
982 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
983 self.insert_rehydrated_task(metadata, paths, true)?;
984 } else {
985 self.insert_rehydrated_task(metadata, paths, true)?;
986 }
987 }
988 _ if metadata.status.is_terminal() => {
989 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
995 self.insert_rehydrated_task(metadata, paths, true)?;
996 }
997 _ => {}
998 }
999 }
1000
1001 Ok(())
1002 }
1003
1004 fn replay_session_from_db(
1005 &self,
1006 session_id: &str,
1007 ) -> Option<Result<Vec<PersistedTask>, String>> {
1008 let pool = self
1009 .inner
1010 .db_pool
1011 .read()
1012 .ok()
1013 .and_then(|slot| slot.clone())?;
1014 let harness = self
1015 .inner
1016 .db_harness
1017 .read()
1018 .ok()
1019 .and_then(|slot| slot.clone())?;
1020 let conn = match pool.lock() {
1021 Ok(conn) => conn,
1022 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1023 };
1024 Some(
1025 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1026 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1027 .map_err(|error| error.to_string()),
1028 )
1029 }
1030
1031 fn replay_session_from_disk(
1032 &self,
1033 storage_dir: &Path,
1034 session_id: &str,
1035 ) -> Result<Vec<PersistedTask>, String> {
1036 let dir = session_tasks_dir(storage_dir, session_id);
1037 if !dir.exists() {
1038 return Ok(Vec::new());
1039 }
1040
1041 let entries = fs::read_dir(&dir)
1042 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1043 let mut tasks = Vec::new();
1044 for entry in entries.flatten() {
1045 let path = entry.path();
1046 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1047 continue;
1048 }
1049 match read_task(&path) {
1050 Ok(metadata) => tasks.push(metadata),
1051 Err(error) => {
1052 crate::slog_warn!(
1053 "quarantining invalid background task metadata {} during replay: {error}",
1054 path.display()
1055 );
1056 if let Err(quarantine_error) =
1057 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1058 {
1059 crate::slog_warn!(
1060 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1061 path.display()
1062 );
1063 }
1064 }
1065 }
1066 }
1067 Ok(tasks)
1068 }
1069
1070 pub fn register_watch(
1071 &self,
1072 task_id: String,
1073 pattern: WatchPattern,
1074 once: bool,
1075 ) -> Result<String, &'static str> {
1076 let task = self.task(&task_id).ok_or("task_not_found")?;
1077 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1078 .state
1079 .lock()
1080 .map(|state| {
1081 (
1082 state.metadata.mode.clone(),
1083 state.metadata.status.is_terminal(),
1084 task.paths.stdout.clone(),
1085 task.paths.stderr.clone(),
1086 task.paths.pty.clone(),
1087 )
1088 })
1089 .map_err(|_| "background_task_lock_poisoned")?;
1090
1091 let mut terminal_matches = Vec::new();
1092 let scanned_terminal = terminal_at_registration;
1093 let watch_id = {
1094 let mut registry = self
1095 .inner
1096 .watch_registry
1097 .lock()
1098 .map_err(|_| "watch_registry_poisoned")?;
1099 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1100 match &mode {
1101 BgMode::Pipes => {
1102 let stdout_key = format!("{task_id}:stdout");
1103 let stderr_key = format!("{task_id}:stderr");
1104 if terminal_at_registration {
1105 registry.set_file_cursor(&stdout_key, 0);
1106 registry.set_file_cursor(&stderr_key, 0);
1107 terminal_matches.extend(registry.scan_file_new_bytes(
1108 &stdout_key,
1109 &task_id,
1110 &stdout,
1111 ));
1112 terminal_matches.extend(registry.scan_file_new_bytes(
1113 &stderr_key,
1114 &task_id,
1115 &stderr,
1116 ));
1117 } else {
1118 registry.prime_file_cursor(&stdout_key, &stdout);
1119 registry.prime_file_cursor(&stderr_key, &stderr);
1120 }
1121 }
1122 BgMode::Pty => {
1123 let pty_key = format!("{task_id}:pty");
1124 if terminal_at_registration {
1125 registry.set_file_cursor(&pty_key, 0);
1126 terminal_matches
1127 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1128 } else {
1129 registry.prime_file_cursor(&pty_key, &pty);
1130 }
1131 }
1132 }
1133 watch_id
1134 };
1135
1136 if task.is_terminal() {
1137 if !scanned_terminal {
1138 terminal_matches = {
1139 let mut registry = self
1140 .inner
1141 .watch_registry
1142 .lock()
1143 .map_err(|_| "watch_registry_poisoned")?;
1144 match &mode {
1145 BgMode::Pipes => {
1146 let stdout_key = format!("{task_id}:stdout");
1147 let stderr_key = format!("{task_id}:stderr");
1148 registry.set_file_cursor(&stdout_key, 0);
1149 registry.set_file_cursor(&stderr_key, 0);
1150 let mut matches =
1151 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1152 matches.extend(registry.scan_file_new_bytes(
1153 &stderr_key,
1154 &task_id,
1155 &stderr,
1156 ));
1157 matches
1158 }
1159 BgMode::Pty => {
1160 let pty_key = format!("{task_id}:pty");
1161 registry.set_file_cursor(&pty_key, 0);
1162 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1163 }
1164 }
1165 };
1166 }
1167
1168 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1169 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1170 if watch_matched {
1171 let _ = task.set_completion_delivered(true, self);
1172 self.clear_task_watch_state(&task_id);
1173 }
1174 return Ok(watch_id);
1175 }
1176
1177 let completion = self
1178 .remove_pending_completion(&task_id)
1179 .or_else(|| self.completion_snapshot_for_task(&task));
1180 if terminal_matches.is_empty() {
1181 if let Some(completion) = completion.as_ref() {
1182 self.emit_bash_watch_exit(completion);
1183 }
1184 } else {
1185 for pattern_match in terminal_matches {
1186 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1187 }
1188 }
1189 let _ = task.set_completion_delivered(true, self);
1190 self.clear_task_watch_state(&task_id);
1191 }
1192
1193 Ok(watch_id)
1194 }
1195
1196 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1197 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1198 registry.unregister(task_id, watch_id);
1199 }
1200 }
1201
1202 pub fn active_watch_count(&self, task_id: &str) -> usize {
1203 self.inner
1204 .watch_registry
1205 .lock()
1206 .map(|registry| registry.active_count(task_id))
1207 .unwrap_or(0)
1208 }
1209
1210 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1211 self.inner
1212 .watch_registry
1213 .lock()
1214 .map(|registry| {
1215 (
1216 registry.has_controlled_task(task_id),
1217 registry.has_matched_task(task_id),
1218 )
1219 })
1220 .unwrap_or((false, false))
1221 }
1222
1223 fn task_has_watch_control(&self, task_id: &str) -> bool {
1224 self.inner
1225 .watch_registry
1226 .lock()
1227 .map(|registry| registry.has_controlled_task(task_id))
1228 .unwrap_or(false)
1229 }
1230
1231 fn clear_task_watch_state(&self, task_id: &str) {
1232 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1233 registry.clear_task(task_id);
1234 }
1235 }
1236
1237 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1238 let (mode, stdout, stderr, pty) = match task.state.lock() {
1239 Ok(state) => (
1240 state.metadata.mode.clone(),
1241 task.paths.stdout.clone(),
1242 task.paths.stderr.clone(),
1243 task.paths.pty.clone(),
1244 ),
1245 Err(_) => return,
1246 };
1247 let mut matches = Vec::new();
1248 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1249 match mode {
1250 BgMode::Pipes => {
1251 let stdout_key = format!("{}:stdout", task.task_id);
1252 let stderr_key = format!("{}:stderr", task.task_id);
1253 matches.extend(registry.scan_file_new_bytes(
1254 &stdout_key,
1255 &task.task_id,
1256 &stdout,
1257 ));
1258 matches.extend(registry.scan_file_new_bytes(
1259 &stderr_key,
1260 &task.task_id,
1261 &stderr,
1262 ));
1263 }
1264 BgMode::Pty => {
1265 let pty_key = format!("{}:pty", task.task_id);
1266 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1267 }
1268 }
1269 }
1270 for pattern_match in matches {
1271 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1272 }
1273 }
1274
1275 pub fn status(
1276 &self,
1277 task_id: &str,
1278 session_id: &str,
1279 project_root: Option<&Path>,
1280 storage_dir: Option<&Path>,
1281 preview_bytes: usize,
1282 ) -> Option<BgTaskSnapshot> {
1283 let mut task = self.task_for_session(task_id, session_id);
1284 if task.is_none() {
1285 if let Some(storage_dir) = storage_dir {
1286 let _ = if let Some(project_root) = project_root {
1287 self.replay_session_for_project(storage_dir, session_id, project_root)
1288 } else {
1289 self.replay_session(storage_dir, session_id)
1290 };
1291 task = self.task_for_session(task_id, session_id);
1292 }
1293 }
1294 let Some(task) = task else {
1295 return self.status_relaxed(
1296 task_id,
1297 session_id,
1298 project_root?,
1299 storage_dir?,
1300 preview_bytes,
1301 );
1302 };
1303 let _ = self.poll_task(&task);
1304 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1305 }
1306
1307 fn status_relaxed_task(
1308 &self,
1309 task_id: &str,
1310 project_root: &Path,
1311 storage_dir: &Path,
1312 ) -> Option<Arc<BgTask>> {
1313 let canonical_project = canonicalized_path(project_root);
1314 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1315 Some(Ok(Some(metadata))) => {
1316 if let Some(task) = self.task(task_id) {
1317 let matches_project = task
1318 .state
1319 .lock()
1320 .map(|state| {
1321 state
1322 .metadata
1323 .project_root
1324 .as_deref()
1325 .map(canonicalized_path)
1326 .as_deref()
1327 == Some(canonical_project.as_path())
1328 })
1329 .unwrap_or(false);
1330 return matches_project.then_some(task);
1331 }
1332 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1333 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1334 return None;
1335 }
1336 return self.task(task_id);
1337 }
1338 Some(Ok(None)) => {
1339 crate::slog_info!(
1340 "bash task relaxed DB miss for {}; falling back to disk",
1341 task_id
1342 );
1343 }
1344 Some(Err(error)) => {
1345 crate::slog_warn!(
1346 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1347 task_id,
1348 error
1349 );
1350 }
1351 None => {
1352 crate::slog_info!(
1353 "bash task relaxed DB unavailable for {}; falling back to disk",
1354 task_id
1355 );
1356 }
1357 }
1358 let root = storage_dir.join("bash-tasks");
1359 let entries = fs::read_dir(&root).ok()?;
1360 for entry in entries.flatten() {
1361 let dir = entry.path();
1362 if !dir.is_dir() {
1363 continue;
1364 }
1365 let path = dir.join(format!("{task_id}.json"));
1366 if !path.exists() {
1367 continue;
1368 }
1369 let metadata = match read_task(&path) {
1370 Ok(metadata) => metadata,
1371 Err(error) => {
1372 crate::slog_warn!(
1373 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1374 path.display()
1375 );
1376 if let Err(quarantine_error) =
1377 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1378 {
1379 crate::slog_warn!(
1380 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1381 path.display()
1382 );
1383 }
1384 continue;
1385 }
1386 };
1387 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1388 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1389 continue;
1390 }
1391 if let Some(task) = self.task(task_id) {
1392 let matches_project = task
1393 .state
1394 .lock()
1395 .map(|state| {
1396 state
1397 .metadata
1398 .project_root
1399 .as_deref()
1400 .map(canonicalized_path)
1401 .as_deref()
1402 == Some(canonical_project.as_path())
1403 })
1404 .unwrap_or(false);
1405 return matches_project.then_some(task);
1406 }
1407 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1408 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1409 return None;
1410 }
1411 return self.task(task_id);
1412 }
1413 None
1414 }
1415
1416 fn lookup_relaxed_task_from_db(
1417 &self,
1418 task_id: &str,
1419 project_root: &Path,
1420 ) -> Option<Result<Option<PersistedTask>, String>> {
1421 let pool = self
1422 .inner
1423 .db_pool
1424 .read()
1425 .ok()
1426 .and_then(|slot| slot.clone())?;
1427 let harness = self
1428 .inner
1429 .db_harness
1430 .read()
1431 .ok()
1432 .and_then(|slot| slot.clone())?;
1433 let conn = match pool.lock() {
1434 Ok(conn) => conn,
1435 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1436 };
1437 let project_key = crate::path_identity::project_scope_key(project_root);
1438 Some(
1439 crate::db::bash_tasks::find_bash_task_for_project(
1440 &conn,
1441 &harness,
1442 &project_key,
1443 task_id,
1444 )
1445 .map(|row| row.map(PersistedTask::from))
1446 .map_err(|error| error.to_string()),
1447 )
1448 }
1449
1450 pub(super) fn status_relaxed(
1451 &self,
1452 task_id: &str,
1453 _session_id: &str,
1454 project_root: &Path,
1455 storage_dir: &Path,
1456 preview_bytes: usize,
1457 ) -> Option<BgTaskSnapshot> {
1458 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1459 let _ = self.poll_task(&task);
1460 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1461 }
1462
1463 pub fn kill_relaxed(
1464 &self,
1465 task_id: &str,
1466 project_root: &Path,
1467 storage_dir: &Path,
1468 ) -> Result<BgTaskSnapshot, String> {
1469 let task = self
1470 .status_relaxed_task(task_id, project_root, storage_dir)
1471 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1472 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1473 }
1474
1475 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1476 #[cfg(test)]
1477 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1478
1479 let mut deleted = 0usize;
1480
1481 let root = storage_dir.join("bash-tasks");
1482 if root.exists() {
1483 let session_dirs = fs::read_dir(&root).map_err(|e| {
1484 format!(
1485 "failed to read background task root {}: {e}",
1486 root.display()
1487 )
1488 })?;
1489 for session_entry in session_dirs.flatten() {
1490 let session_dir = session_entry.path();
1491 if !session_dir.is_dir() {
1492 continue;
1493 }
1494 let task_entries = match fs::read_dir(&session_dir) {
1495 Ok(entries) => entries,
1496 Err(error) => {
1497 crate::slog_warn!(
1498 "failed to read background task session dir {}: {error}",
1499 session_dir.display()
1500 );
1501 continue;
1502 }
1503 };
1504 for task_entry in task_entries.flatten() {
1505 let json_path = task_entry.path();
1506 if json_path
1507 .extension()
1508 .and_then(|extension| extension.to_str())
1509 != Some("json")
1510 {
1511 continue;
1512 }
1513 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1514 continue;
1515 }
1516 let metadata = match read_task(&json_path) {
1517 Ok(metadata) => metadata,
1518 Err(error) => {
1519 crate::slog_warn!(
1520 "quarantining corrupt background task metadata {}: {error}",
1521 json_path.display()
1522 );
1523 quarantine_task_json(
1524 storage_dir,
1525 &session_dir,
1526 &json_path,
1527 QuarantineKind::Corrupt,
1528 )?;
1529 continue;
1530 }
1531 };
1532 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1533 continue;
1534 }
1535 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1536 match delete_task_bundle(&paths) {
1537 Ok(()) => {
1538 deleted += 1;
1539 log::debug!(
1540 "deleted persisted background task bundle {}",
1541 metadata.task_id
1542 );
1543 }
1544 Err(error) => {
1545 crate::slog_warn!(
1546 "failed to delete background task bundle {}: {error}",
1547 metadata.task_id
1548 );
1549 continue;
1550 }
1551 }
1552 }
1553 }
1554 }
1555 gc_quarantine(storage_dir);
1556 Ok(deleted)
1557 }
1558
1559 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1560 let tasks = self
1561 .inner
1562 .tasks
1563 .lock()
1564 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1565 .unwrap_or_default();
1566 tasks
1567 .into_iter()
1568 .map(|task| {
1569 let _ = self.poll_task(&task);
1570 self.snapshot_with_terminal_cache(&task, preview_bytes)
1571 })
1572 .collect()
1573 }
1574
1575 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1581 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1582 return;
1583 }
1584 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1585 snapshot.output_preview = cache.output_preview;
1586 snapshot.output_truncated = cache.output_truncated;
1587 }
1588 }
1589
1590 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1591 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1592 }
1593
1594 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1595 let task = self
1596 .task_for_session(task_id, session_id)
1597 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1598 let terminal_after_promote = {
1599 let mut state = task
1600 .state
1601 .lock()
1602 .map_err(|_| "background task lock poisoned".to_string())?;
1603 let updated = self
1604 .update_task_metadata(&task.paths, |metadata| {
1605 metadata.notify_on_completion = true;
1606 metadata.completion_delivered = false;
1607 })
1608 .map_err(|e| format!("failed to promote background task: {e}"))?;
1609 state.metadata = updated;
1610 state.metadata.status.is_terminal()
1611 };
1612 if terminal_after_promote {
1613 self.post_terminal_transition(&task, true)?;
1614 }
1615 Ok(true)
1616 }
1617
1618 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1619 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1620 .map(|_| ())
1621 }
1622
1623 pub fn cleanup_finished(&self, older_than: Duration) {
1624 let cutoff = Instant::now().checked_sub(older_than);
1625 let removable_paths: Vec<(String, TaskPaths)> =
1626 if let Ok(mut tasks) = self.inner.tasks.lock() {
1627 let removable = tasks
1628 .iter()
1629 .filter_map(|(task_id, task)| {
1630 let delivered_terminal = task
1631 .state
1632 .lock()
1633 .map(|state| {
1634 state.metadata.status.is_terminal()
1635 && state.metadata.completion_delivered
1636 })
1637 .unwrap_or(false);
1638 if !delivered_terminal {
1639 return None;
1640 }
1641
1642 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1643 let expired = match (terminal_at, cutoff) {
1644 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1645 (Some(_), None) => true,
1646 (None, _) => false,
1647 };
1648 expired.then(|| task_id.clone())
1649 })
1650 .collect::<Vec<_>>();
1651
1652 removable
1653 .into_iter()
1654 .filter_map(|task_id| {
1655 tasks
1656 .remove(&task_id)
1657 .map(|task| (task_id, task.paths.clone()))
1658 })
1659 .collect()
1660 } else {
1661 Vec::new()
1662 };
1663
1664 for (task_id, paths) in removable_paths {
1665 match delete_task_bundle(&paths) {
1666 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1667 Err(error) => crate::slog_warn!(
1668 "failed to delete persisted background task bundle {task_id}: {error}"
1669 ),
1670 }
1671 }
1672 }
1673
1674 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1675 self.drain_completions_for_session(None)
1676 }
1677
1678 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1679 let completions = match self.inner.completions.lock() {
1680 Ok(completions) => completions,
1681 Err(_) => return Vec::new(),
1682 };
1683
1684 completions
1685 .iter()
1686 .filter(|completion| completion_matches_session(completion, session_id))
1687 .cloned()
1688 .collect()
1689 }
1690
1691 pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1692 match self.inner.completions.lock() {
1693 Ok(completions) => completions
1694 .iter()
1695 .any(|completion| completion_matches_session(completion, session_id)),
1696 Err(_) => true,
1700 }
1701 }
1702
1703 pub fn ack_completions_for_session(
1704 &self,
1705 session_id: Option<&str>,
1706 task_ids: &[String],
1707 ) -> Vec<String> {
1708 if task_ids.is_empty() {
1709 return Vec::new();
1710 }
1711 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1712 let mut completion_sessions = HashMap::new();
1713 if let Ok(mut completions) = self.inner.completions.lock() {
1714 completions.retain(|completion| {
1715 let session_matches = session_id
1716 .map(|session_id| completion.session_id == session_id)
1717 .unwrap_or(true);
1718 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1719 completion_sessions
1720 .insert(completion.task_id.clone(), completion.session_id.clone());
1721 false
1722 } else {
1723 true
1724 }
1725 });
1726 }
1727
1728 let mut delivered = Vec::new();
1729 for task_id in task_ids {
1730 let task = if let Some(session_id) = session_id {
1731 self.task_for_session(task_id, session_id)
1732 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1733 self.task_for_session(task_id, completion_session_id)
1734 } else {
1735 self.task(task_id)
1736 };
1737 if let Some(task) = task {
1738 if task.set_completion_delivered(true, self).is_ok() {
1739 delivered.push(task_id.clone());
1740 }
1741 }
1742 }
1743
1744 delivered
1745 }
1746
1747 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1748 self.inner
1749 .completions
1750 .lock()
1751 .map(|completions| {
1752 completions
1753 .iter()
1754 .filter(|completion| completion.session_id == session_id)
1755 .cloned()
1756 .collect()
1757 })
1758 .unwrap_or_default()
1759 }
1760
1761 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1762 let mut completions = self.inner.completions.lock().ok()?;
1763 let idx = completions
1764 .iter()
1765 .position(|completion| completion.task_id == task_id)?;
1766 completions.remove(idx)
1767 }
1768
1769 fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1770 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1771 if !snapshot.info.status.is_terminal() {
1772 return None;
1773 }
1774 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1775 (String::new(), false)
1776 } else {
1777 self.ensure_terminal_output_cache(task)
1778 .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1779 .unwrap_or_else(|| (String::new(), false))
1780 };
1781 Some(BgCompletion {
1782 task_id: snapshot.info.task_id,
1783 session_id: task.session_id.clone(),
1784 status: snapshot.info.status,
1785 exit_code: snapshot.exit_code,
1786 command: snapshot.info.command,
1787 output_preview,
1788 output_truncated,
1789 original_tokens: None,
1790 compressed_tokens: None,
1791 tokens_skipped: false,
1792 })
1793 }
1794
1795 pub fn detach(&self) {
1796 self.inner.shutdown.store(true, Ordering::SeqCst);
1797 if let Ok(mut tasks) = self.inner.tasks.lock() {
1798 for task in tasks.values() {
1799 if let Ok(mut state) = task.state.lock() {
1800 match &mut state.runtime {
1801 TaskRuntime::Piped(child) => *child = None,
1802 TaskRuntime::Pty(runtime) => *runtime = None,
1803 }
1804 state.detached = true;
1805 }
1806 }
1807 tasks.clear();
1808 }
1809 }
1810
1811 pub fn shutdown(&self) {
1812 let tasks = self
1813 .inner
1814 .tasks
1815 .lock()
1816 .map(|tasks| {
1817 tasks
1818 .values()
1819 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1820 .collect::<Vec<_>>()
1821 })
1822 .unwrap_or_default();
1823 for (task_id, session_id) in tasks {
1824 let _ = self.kill(&task_id, &session_id);
1825 }
1826 }
1827
1828 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1829 if let Ok(state) = task.state.lock() {
1830 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1831 if !pty.exit_observed.load(Ordering::SeqCst) {
1839 return Ok(());
1840 }
1841 }
1842 }
1843 let marker = match read_exit_marker(&task.paths.exit) {
1844 Ok(Some(marker)) => marker,
1845 Ok(None) => return Ok(()),
1846 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1847 };
1848 self.finalize_from_marker(task, marker, None)
1849 }
1850
1851 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1852 let mut needs_completion = false;
1853 {
1854 let Ok(mut state) = task.state.lock() else {
1855 return;
1856 };
1857 match &mut state.runtime {
1858 TaskRuntime::Piped(child_slot) => {
1859 if let Some(child) = child_slot.as_mut() {
1860 if matches!(child.try_wait(), Ok(Some(_))) {
1861 *child_slot = None;
1862 state.detached = true;
1863 state.child_exit_observed = true;
1864 }
1865 } else if state.detached {
1866 let child_known_dead = state.child_exit_observed
1867 || state
1868 .metadata
1869 .child_pid
1870 .is_some_and(|pid| !is_process_alive(pid));
1871 if child_known_dead {
1872 needs_completion =
1873 self.fail_without_exit_marker_if_needed(task, &mut state);
1874 }
1875 }
1876 }
1877 TaskRuntime::Pty(Some(pty)) => {
1878 if pty.exit_observed.load(Ordering::SeqCst) {
1879 drop(state);
1880 let _ = self.poll_task(task);
1881 return;
1882 }
1883 }
1884 TaskRuntime::Pty(None) => {}
1885 }
1886 }
1887 if needs_completion {
1888 let _ = self.post_terminal_transition(task, true);
1889 }
1890 }
1891
1892 fn fail_without_exit_marker_if_needed(
1893 &self,
1894 task: &Arc<BgTask>,
1895 state: &mut BgTaskState,
1896 ) -> bool {
1897 if state.metadata.status.is_terminal() {
1898 return false;
1899 }
1900 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1901 return false;
1902 }
1903 let watch_controlled = self.task_has_watch_control(&task.task_id);
1904 let updated = self.update_task_metadata(&task.paths, |metadata| {
1905 metadata.mark_terminal(
1906 BgTaskStatus::Failed,
1907 None,
1908 Some("process exited without exit marker".to_string()),
1909 );
1910 if watch_controlled {
1911 metadata.completion_delivered = true;
1912 }
1913 });
1914 if let Ok(metadata) = updated {
1915 state.pending_terminal_override = None;
1916 state.metadata = metadata;
1917 task.mark_terminal_now();
1918 return true;
1919 }
1920 false
1921 }
1922
1923 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1924 self.inner
1925 .tasks
1926 .lock()
1927 .map(|tasks| {
1928 tasks
1929 .values()
1930 .filter(|task| task.is_running())
1931 .cloned()
1932 .collect()
1933 })
1934 .unwrap_or_default()
1935 }
1936
1937 fn insert_rehydrated_task(
1938 &self,
1939 metadata: PersistedTask,
1940 paths: TaskPaths,
1941 detached: bool,
1942 ) -> Result<(), String> {
1943 let task_id = metadata.task_id.clone();
1944 let session_id = metadata.session_id.clone();
1945 let started = started_instant_from_unix_millis(metadata.started_at);
1946 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1947 let mode = metadata.mode.clone();
1948 let task = Arc::new(BgTask {
1949 task_id: task_id.clone(),
1950 session_id,
1951 paths: paths.clone(),
1952 started,
1953 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1954 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1955 state: Mutex::new(BgTaskState {
1956 metadata,
1957 runtime: if mode == BgMode::Pty {
1958 TaskRuntime::Pty(None)
1959 } else {
1960 TaskRuntime::Piped(None)
1961 },
1962 detached,
1963 child_exit_observed: false,
1970 buffer: if mode == BgMode::Pty {
1971 BgBuffer::pty(paths.pty.clone())
1972 } else {
1973 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1974 },
1975 terminal_output_cache: None,
1976 pending_terminal_override: None,
1977 }),
1978 });
1979 self.inner
1980 .tasks
1981 .lock()
1982 .map_err(|_| "background task registry lock poisoned".to_string())?
1983 .insert(task_id, task);
1984 Ok(())
1985 }
1986
1987 fn kill_with_status(
1988 &self,
1989 task_id: &str,
1990 session_id: &str,
1991 terminal_status: BgTaskStatus,
1992 ) -> Result<BgTaskSnapshot, String> {
1993 let task = self
1994 .task_for_session(task_id, session_id)
1995 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1996 let mut terminalized = false;
1997
1998 {
1999 let mut state = task
2000 .state
2001 .lock()
2002 .map_err(|_| "background task lock poisoned".to_string())?;
2003 if state.metadata.status.is_terminal() {
2004 state.pending_terminal_override = None;
2005 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
2006 state.metadata =
2007 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2008 if self.task_has_watch_control(&task.task_id) {
2009 state.metadata.completion_delivered = true;
2010 }
2011 state.pending_terminal_override = None;
2012 task.mark_terminal_now();
2013 match &mut state.runtime {
2014 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2021 TaskRuntime::Pty(runtime) => *runtime = None,
2022 }
2023 state.detached = true;
2024 self.persist_task(&task.paths, &state.metadata)
2025 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2026 terminalized = true;
2027 } else {
2028 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2029 if !was_already_killing {
2030 state.metadata.status = BgTaskStatus::Killing;
2031 self.persist_task(&task.paths, &state.metadata)
2032 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2033 }
2034
2035 #[cfg(unix)]
2036 let pgid = state.metadata.pgid;
2037 #[cfg(windows)]
2038 let child_pid = state.metadata.child_pid;
2039 if !was_already_killing
2040 && state.metadata.mode == BgMode::Pty
2041 && terminal_status == BgTaskStatus::TimedOut
2042 {
2043 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2044 }
2045
2046 #[cfg(windows)]
2047 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2048
2049 match &mut state.runtime {
2050 TaskRuntime::Piped(child_slot) => {
2051 #[cfg(unix)]
2052 if let Some(pgid) = pgid {
2053 terminate_pgid(pgid, child_slot.as_mut());
2054 }
2055 #[cfg(windows)]
2056 if let Some(child) = child_slot.as_mut() {
2057 super::process::terminate_process(child);
2058 } else if let Some(pid) = child_pid {
2059 terminate_pid(pid);
2060 }
2061 if let Some(child) = child_slot.as_mut() {
2062 let _ = child.wait();
2063 }
2064 *child_slot = None;
2065 state.detached = true;
2066
2067 if !task.paths.exit.exists() {
2068 write_kill_marker_if_absent(&task.paths.exit)
2069 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2070 }
2071
2072 let exit_code = terminal_exit_code_for_status(&terminal_status);
2073 state
2074 .metadata
2075 .mark_terminal(terminal_status, exit_code, None);
2076 if self.task_has_watch_control(&task.task_id) {
2077 state.metadata.completion_delivered = true;
2078 }
2079 state.pending_terminal_override = None;
2080 task.mark_terminal_now();
2081 self.persist_task(&task.paths, &state.metadata)
2082 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2083 terminalized = true;
2084 }
2085 TaskRuntime::Pty(Some(pty)) => {
2086 pty.was_killed.store(true, Ordering::SeqCst);
2087 if let Err(error) = pty.killer.kill() {
2088 crate::slog_warn!(
2089 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2090 );
2091 }
2092 if let Some(pid) = pty.child_pid {
2093 #[cfg(unix)]
2094 terminate_pgid(pid as i32, None);
2095 #[cfg(windows)]
2096 terminate_pid(pid);
2097 }
2098 drop(pty.master.take());
2099
2100 #[cfg(windows)]
2101 {
2102 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2103 BgTaskStatus::TimedOut
2104 } else {
2105 BgTaskStatus::Killed
2106 };
2107 pty_forced_terminal_status = Some(
2108 state
2109 .pending_terminal_override
2110 .take()
2111 .unwrap_or(default_status),
2112 );
2113 }
2114 }
2115 TaskRuntime::Pty(None) => {}
2116 }
2117
2118 #[cfg(windows)]
2119 if let Some(target_status) = pty_forced_terminal_status {
2120 if !task.paths.exit.exists() {
2121 write_kill_marker_if_absent(&task.paths.exit)
2122 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2123 }
2124
2125 let exit_code = terminal_exit_code_for_status(&target_status);
2126 state.metadata.mark_terminal(target_status, exit_code, None);
2127 if self.task_has_watch_control(&task.task_id) {
2128 state.metadata.completion_delivered = true;
2129 }
2130 state.pending_terminal_override = None;
2131 task.mark_terminal_now();
2132 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2133 *runtime = None;
2134 }
2135 state.detached = true;
2136 self.persist_task(&task.paths, &state.metadata)
2137 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2138 terminalized = true;
2139 }
2140 }
2141 }
2142
2143 if terminalized {
2144 self.post_terminal_transition(&task, true)?;
2145 }
2146 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2147 }
2148
2149 fn finalize_from_marker(
2150 &self,
2151 task: &Arc<BgTask>,
2152 marker: ExitMarker,
2153 reason: Option<String>,
2154 ) -> Result<(), String> {
2155 let watch_controlled = self.task_has_watch_control(&task.task_id);
2156 let mut pty_reader_done = None;
2157 {
2158 let mut state = task
2159 .state
2160 .lock()
2161 .map_err(|_| "background task lock poisoned".to_string())?;
2162 if state.metadata.status.is_terminal() {
2163 state.pending_terminal_override = None;
2164 return Ok(());
2165 }
2166
2167 let pending_override = state.pending_terminal_override.take();
2168 let is_pty = state.metadata.mode == BgMode::Pty;
2169 let updated = self
2170 .update_task_metadata(&task.paths, |metadata| {
2171 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2172 let mut metadata = metadata.clone();
2173 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2174 let exit_code = terminal_exit_code_for_status(&target_status);
2175 metadata.mark_terminal(target_status, exit_code, reason);
2176 metadata
2177 } else {
2178 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2179 };
2180 if watch_controlled {
2181 new_metadata.completion_delivered = true;
2182 }
2183 *metadata = new_metadata;
2184 })
2185 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2186 state.metadata = updated;
2187 task.mark_terminal_now();
2188 match &mut state.runtime {
2189 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2194 TaskRuntime::Pty(runtime) => {
2195 pty_reader_done = runtime
2196 .as_ref()
2197 .map(|runtime| Arc::clone(&runtime.reader_done));
2198 *runtime = None;
2199 }
2200 }
2201 state.detached = true;
2202 }
2203
2204 if let Some(reader_done) = pty_reader_done {
2205 let deadline = Instant::now() + Duration::from_millis(200);
2206 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2207 std::thread::sleep(Duration::from_millis(10));
2208 }
2209 }
2210
2211 self.scan_task_watch_output(task);
2214
2215 self.post_terminal_transition(task, true)
2216 }
2217
2218 fn enqueue_completion_if_needed(
2219 &self,
2220 metadata: &PersistedTask,
2221 paths: Option<&TaskPaths>,
2222 emit_frame: bool,
2223 ) {
2224 if metadata.status.is_terminal() && !metadata.completion_delivered {
2225 let cache =
2226 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2227 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2228 }
2229 }
2230
2231 fn render_terminal_output_from_paths(
2232 &self,
2233 metadata: &PersistedTask,
2234 paths: &TaskPaths,
2235 ) -> Option<TerminalOutputCache> {
2236 if metadata.mode == BgMode::Pty {
2237 return None;
2238 }
2239 let mut buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2240 let disk_truncation = buffer.enforce_terminal_cap();
2241 Some(self.render_terminal_output(metadata, &buffer, disk_truncation))
2242 }
2243
2244 fn enqueue_completion_from_parts(
2245 &self,
2246 metadata: &PersistedTask,
2247 buffer: Option<&BgBuffer>,
2248 paths: Option<&TaskPaths>,
2249 emit_frame: bool,
2250 terminal_render: Option<&TerminalOutputCache>,
2251 ) {
2252 if !metadata.status.is_terminal() {
2263 return;
2264 }
2265
2266 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2267 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2268 } else {
2269 None
2270 };
2271 let render_buffer = buffer.or(owned_buffer.as_ref());
2272 let owned_render = if terminal_render.is_none() {
2273 render_buffer.map(|buffer| {
2274 let mut capped_buffer = buffer.clone();
2275 let disk_truncation = capped_buffer.enforce_terminal_cap();
2276 self.render_terminal_output(metadata, &capped_buffer, disk_truncation)
2277 })
2278 } else {
2279 None
2280 };
2281 let render = terminal_render.or(owned_render.as_ref());
2282
2283 let (output_preview, output_truncated) = render
2287 .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2288 .unwrap_or_else(|| (String::new(), false));
2289
2290 let token_counts = self.completion_token_counts(
2291 metadata,
2292 buffer,
2293 paths,
2294 render.map(|render| render.output_preview.as_str()),
2295 );
2296 let completion = BgCompletion {
2297 task_id: metadata.task_id.clone(),
2298 session_id: metadata.session_id.clone(),
2299 status: metadata.status.clone(),
2300 exit_code: metadata.exit_code,
2301 command: metadata.command.clone(),
2302 output_preview,
2303 output_truncated,
2304 original_tokens: token_counts.original_tokens,
2305 compressed_tokens: token_counts.compressed_tokens,
2306 tokens_skipped: token_counts.tokens_skipped,
2307 };
2308
2309 self.record_compression_event_if_applicable(metadata, &token_counts);
2320
2321 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2322 if watch_controlled {
2323 if emit_frame && !watch_matched {
2324 self.emit_bash_watch_exit(&completion);
2325 }
2326 self.clear_task_watch_state(&metadata.task_id);
2327 return;
2328 }
2329
2330 if metadata.completion_delivered {
2340 return;
2341 }
2342
2343 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2346 if completions
2347 .iter()
2348 .any(|existing| existing.task_id == metadata.task_id)
2349 {
2350 false
2351 } else {
2352 completions.push_back(completion.clone());
2353 true
2354 }
2355 } else {
2356 false
2357 };
2358
2359 if pushed && emit_frame {
2360 self.emit_bash_completed(completion);
2361 }
2362 }
2363
2364 fn record_compression_event_if_applicable(
2365 &self,
2366 metadata: &PersistedTask,
2367 token_counts: &CompletionTokenCounts,
2368 ) {
2369 if metadata.mode == BgMode::Pty {
2370 return;
2371 }
2372
2373 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2374 token_counts.original_tokens,
2375 token_counts.compressed_tokens,
2376 token_counts.original_bytes,
2377 token_counts.compressed_bytes,
2378 ) {
2379 (
2380 Some(original_tokens),
2381 Some(compressed_tokens),
2382 Some(original_bytes),
2383 Some(compressed_bytes),
2384 ) => (
2385 original_tokens,
2386 compressed_tokens,
2387 original_bytes,
2388 compressed_bytes,
2389 ),
2390 _ => {
2391 crate::slog_warn!(
2392 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2393 metadata.task_id
2394 );
2395 return;
2396 }
2397 };
2398
2399 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2400 let Some(pool) = pool else {
2401 crate::slog_warn!(
2402 "compression event skipped for {}: db_pool not initialized — was configure run?",
2403 metadata.task_id
2404 );
2405 return;
2406 };
2407 let harness = self
2408 .inner
2409 .db_harness
2410 .read()
2411 .ok()
2412 .and_then(|slot| slot.clone());
2413 let Some(harness) = harness else {
2414 crate::slog_warn!(
2415 "compression event insert skipped for {}: harness not configured",
2416 metadata.task_id
2417 );
2418 return;
2419 };
2420
2421 let project_root = metadata
2422 .project_root
2423 .as_deref()
2424 .unwrap_or(&metadata.workdir);
2425 let project_key = crate::path_identity::project_scope_key(project_root);
2426 let row = crate::db::compression_events::CompressionEventRow {
2427 harness: &harness,
2428 session_id: Some(&metadata.session_id),
2429 project_key: &project_key,
2430 tool: "bash",
2431 task_id: Some(&metadata.task_id),
2432 command: Some(&metadata.command),
2433 compressor: if metadata.compressed {
2434 "registry"
2435 } else {
2436 "none"
2437 },
2438 original_bytes,
2439 compressed_bytes,
2440 original_tokens,
2441 compressed_tokens,
2442 created_at: unix_millis() as i64,
2443 };
2444
2445 let conn = match pool.lock() {
2446 Ok(conn) => conn,
2447 Err(_) => {
2448 crate::slog_warn!(
2449 "compression event insert failed for {}: db mutex poisoned",
2450 metadata.task_id
2451 );
2452 return;
2453 }
2454 };
2455 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2456 Ok(_) => {
2457 crate::slog_debug!(
2461 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2462 metadata.task_id,
2463 project_key,
2464 metadata.session_id,
2465 original_tokens,
2466 compressed_tokens
2467 );
2468 }
2469 Err(error) => {
2470 crate::slog_warn!(
2471 "compression event insert failed for {}: {}",
2472 metadata.task_id,
2473 error
2474 );
2475 }
2476 }
2477 }
2478
2479 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2480 let Ok(progress_sender) = self
2481 .inner
2482 .progress_sender
2483 .lock()
2484 .map(|sender| sender.clone())
2485 else {
2486 return;
2487 };
2488 if let Some(sender) = progress_sender.as_ref() {
2489 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2490 pattern_match.task_id,
2491 session_id.to_string(),
2492 pattern_match.watch_id,
2493 pattern_match.match_text,
2494 pattern_match.match_offset,
2495 pattern_match.context,
2496 pattern_match.once,
2497 )));
2498 }
2499 }
2500
2501 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2502 let Ok(progress_sender) = self
2503 .inner
2504 .progress_sender
2505 .lock()
2506 .map(|sender| sender.clone())
2507 else {
2508 return;
2509 };
2510 let Some(sender) = progress_sender.as_ref() else {
2511 return;
2512 };
2513 let status = completion_status_text(&completion.status, completion.exit_code);
2514 let preview = completion.output_preview.trim_end();
2515 let context = if preview.is_empty() {
2516 format!("task {} exited ({status})", completion.task_id)
2517 } else {
2518 format!(
2519 "task {} exited ({status})
2520{preview}",
2521 completion.task_id
2522 )
2523 };
2524 sender(PushFrame::BashPatternMatch(
2525 BashPatternMatchFrame::task_exit(
2526 completion.task_id.clone(),
2527 completion.session_id.clone(),
2528 format!("exited ({status})"),
2529 context,
2530 ),
2531 ));
2532 }
2533
2534 fn emit_bash_completed(&self, completion: BgCompletion) {
2535 let Ok(progress_sender) = self
2536 .inner
2537 .progress_sender
2538 .lock()
2539 .map(|sender| sender.clone())
2540 else {
2541 return;
2542 };
2543 let Some(sender) = progress_sender.as_ref() else {
2544 return;
2545 };
2546 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2554 completion.task_id,
2555 completion.session_id,
2556 completion.status,
2557 completion.exit_code,
2558 completion.command,
2559 completion.output_preview,
2560 completion.output_truncated,
2561 completion.original_tokens,
2562 completion.compressed_tokens,
2563 completion.tokens_skipped,
2564 )));
2565 }
2566
2567 fn completion_token_counts(
2568 &self,
2569 metadata: &PersistedTask,
2570 buffer: Option<&BgBuffer>,
2571 paths: Option<&TaskPaths>,
2572 rendered_output: Option<&str>,
2573 ) -> CompletionTokenCounts {
2574 if metadata.mode == BgMode::Pty {
2575 return CompletionTokenCounts::skipped();
2576 }
2577
2578 let raw = match buffer {
2579 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2580 None => paths
2581 .map(|paths| {
2582 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2583 })
2584 .unwrap_or(TokenCountInput::Skipped),
2585 };
2586
2587 let TokenCountInput::Text(raw_output) = raw else {
2588 return CompletionTokenCounts::skipped();
2589 };
2590
2591 let original_tokens = token_count_u32(&raw_output);
2592 let original_bytes = raw_output.len() as i64;
2593 let compressed_output = rendered_output.unwrap_or(&raw_output);
2594 let compressed_tokens = token_count_u32(compressed_output);
2595 let compressed_bytes = compressed_output.len() as i64;
2596 CompletionTokenCounts {
2597 original_tokens: Some(original_tokens),
2598 compressed_tokens: Some(compressed_tokens),
2599 original_bytes: Some(original_bytes),
2600 compressed_bytes: Some(compressed_bytes),
2601 tokens_skipped: false,
2602 }
2603 }
2604
2605 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2606 if !self
2607 .inner
2608 .long_running_reminder_enabled
2609 .load(Ordering::SeqCst)
2610 {
2611 return;
2612 }
2613 let interval_ms = self
2614 .inner
2615 .long_running_reminder_interval_ms
2616 .load(Ordering::SeqCst);
2617 if interval_ms == 0 {
2618 return;
2619 }
2620 let interval = Duration::from_millis(interval_ms);
2621 let now = Instant::now();
2622 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2623 return;
2624 };
2625 let since = last_reminder_at.unwrap_or(task.started);
2626 if now.duration_since(since) < interval {
2627 return;
2628 }
2629 let command = task
2630 .state
2631 .lock()
2632 .map(|state| state.metadata.command.clone())
2633 .unwrap_or_default();
2634 *last_reminder_at = Some(now);
2635 self.emit_bash_long_running(BashLongRunningFrame::new(
2636 task.task_id.clone(),
2637 task.session_id.clone(),
2638 command,
2639 task.started.elapsed().as_millis() as u64,
2640 ));
2641 }
2642
2643 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2644 let Ok(progress_sender) = self
2645 .inner
2646 .progress_sender
2647 .lock()
2648 .map(|sender| sender.clone())
2649 else {
2650 return;
2651 };
2652 if let Some(sender) = progress_sender.as_ref() {
2653 sender(PushFrame::BashLongRunning(frame));
2654 }
2655 }
2656
2657 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2658 self.inner
2659 .tasks
2660 .lock()
2661 .ok()
2662 .and_then(|tasks| tasks.get(task_id).cloned())
2663 }
2664
2665 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2666 self.task(task_id)
2667 .filter(|task| task.session_id == session_id)
2668 }
2669
2670 fn running_count(&self) -> usize {
2671 self.inner
2672 .tasks
2673 .lock()
2674 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2675 .unwrap_or(0)
2676 }
2677
2678 fn start_watchdog(&self) {
2679 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2680 super::watchdog::start(self.clone());
2681 }
2682 }
2683
2684 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2685 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2686 }
2687
2688 #[cfg(test)]
2689 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2690 self.task_for_session(task_id, session_id)
2691 .map(|task| task.paths.json.clone())
2692 }
2693
2694 #[cfg(test)]
2695 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2696 self.task_for_session(task_id, session_id)
2697 .map(|task| task.paths.exit.clone())
2698 }
2699
2700 fn generate_unique_task_id(&self) -> Result<String, String> {
2702 for _ in 0..32 {
2703 let candidate = random_slug();
2704 let tasks = self
2705 .inner
2706 .tasks
2707 .lock()
2708 .map_err(|_| "background task registry lock poisoned".to_string())?;
2709 if tasks.contains_key(&candidate) {
2710 continue;
2711 }
2712 let completions = self
2713 .inner
2714 .completions
2715 .lock()
2716 .map_err(|_| "background completions lock poisoned".to_string())?;
2717 if completions
2718 .iter()
2719 .any(|completion| completion.task_id == candidate)
2720 {
2721 continue;
2722 }
2723 return Ok(candidate);
2724 }
2725 Err("failed to allocate unique background task id after 32 attempts".to_string())
2726 }
2727}
2728
2729fn render_compressed_with_recovery(
2730 buffer: &BgBuffer,
2731 mut compressed: CompressionResult,
2732 input_truncated: bool,
2733 disk_truncation: DiskTruncation,
2734) -> TerminalOutputCache {
2735 let had_trailing_newline = compressed.text.ends_with('\n');
2743 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2744 .trim_end()
2745 .to_string();
2746 if had_trailing_newline && !text.is_empty() {
2747 text.push('\n');
2748 }
2749 compressed.text = text;
2750
2751 let output_path = buffer.output_path().map(|path| path.display().to_string());
2752 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2753 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2754 let mut recovery = RecoveryContext {
2755 dropped_by_class: compressed.dropped_by_class,
2756 had_inner_drop: compressed.had_inner_drop,
2757 offset_hint_eligible: compressed.offset_hint_eligible,
2758 offset_start_line: compressed.offset_start_line,
2759 byte_truncated: input_truncated,
2760 disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
2761 output_path: output_path.clone(),
2762 stderr_path: stderr_path.clone(),
2763 include_stderr_path,
2764 };
2765
2766 let (output_preview, output_truncated) =
2767 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2768 TerminalOutputCache {
2769 output_preview,
2770 output_truncated,
2771 kind: TerminalOutputKind::Compressed,
2772 output_path,
2773 stderr_path,
2774 recovery: Some(recovery),
2775 }
2776}
2777
2778fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2779 render_body_with_recovery_marker_at_cap(
2780 body,
2781 recovery,
2782 FINAL_OUTPUT_CAP_BYTES,
2783 cap_final_output,
2784 cap_final_output_with_marker,
2785 )
2786}
2787
2788fn render_raw_body_with_recovery_marker(
2789 body: &str,
2790 recovery: &mut RecoveryContext,
2791) -> (String, bool) {
2792 render_body_with_recovery_marker_at_cap(
2793 body,
2794 recovery,
2795 RAW_PASSTHROUGH_CAP_BYTES,
2796 |input| {
2797 super::output::cap_head_tail(
2798 input,
2799 RAW_PASSTHROUGH_CAP_BYTES,
2800 RAW_PASSTHROUGH_HEAD_BYTES,
2801 RAW_PASSTHROUGH_TAIL_BYTES,
2802 )
2803 },
2804 |input, marker| {
2805 super::output::cap_head_tail_with_marker(
2806 input,
2807 RAW_PASSTHROUGH_CAP_BYTES,
2808 RAW_PASSTHROUGH_HEAD_BYTES,
2809 RAW_PASSTHROUGH_TAIL_BYTES,
2810 marker,
2811 )
2812 },
2813 )
2814}
2815
2816fn render_body_with_recovery_marker_at_cap<F, G>(
2817 body: &str,
2818 recovery: &mut RecoveryContext,
2819 cap_bytes: usize,
2820 cap_plain: F,
2821 cap_with_marker: G,
2822) -> (String, bool)
2823where
2824 F: Fn(&str) -> super::output::CappedText,
2825 G: Fn(&str, &str) -> super::output::CappedText,
2826{
2827 let needs_marker = recovery.has_visible_drop();
2828 if body.len() > cap_bytes {
2829 recovery.byte_truncated = true;
2830 if let Some(marker) = recovery_marker(recovery) {
2831 let capped = cap_with_marker(body, &marker);
2832 return (capped.text, true);
2833 }
2834 let capped = cap_plain(body);
2835 return (capped.text, capped.truncated || needs_marker);
2836 }
2837
2838 if !needs_marker {
2839 return (body.to_string(), false);
2840 }
2841
2842 let Some(marker) = recovery_marker(recovery) else {
2843 return (body.to_string(), true);
2844 };
2845 let with_marker = append_recovery_marker(body, &marker);
2846 if with_marker.len() <= cap_bytes {
2847 return (with_marker, true);
2848 }
2849
2850 recovery.byte_truncated = true;
2851 let marker = recovery_marker(recovery).unwrap_or(marker);
2852 let capped = cap_with_marker(body, &marker);
2853 (capped.text, true)
2854}
2855
2856fn append_recovery_marker(body: &str, marker: &str) -> String {
2857 if body.is_empty() {
2858 return marker.to_string();
2859 }
2860 let mut output = body.trim_end().to_string();
2861 output.push('\n');
2862 output.push_str(marker);
2863 output
2864}
2865
2866fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2867 let mut parts = Vec::new();
2868 for (class, count) in &recovery.dropped_by_class {
2869 let label = if *count == 1 {
2870 class.singular()
2871 } else {
2872 class.plural()
2873 };
2874 parts.push(format!("+{count} more {label}"));
2875 }
2876 if recovery.byte_truncated {
2877 parts.push("truncated output".to_string());
2878 }
2879 let disk_truncated_prefix_bytes = recovery.disk_truncated_prefix_bytes;
2880 if disk_truncated_prefix_bytes > 0 {
2881 parts.push(format!(
2882 "truncated {disk_truncated_prefix_bytes} bytes from saved output prefix"
2883 ));
2884 } else if recovery.had_inner_drop && parts.is_empty() {
2885 parts.push("omitted output".to_string());
2886 }
2887
2888 if parts.is_empty() {
2889 return None;
2890 }
2891
2892 let hint = recovery_hint(recovery);
2893 Some(format!("[{}; {hint}]", parts.join(", ")))
2894}
2895
2896fn recovery_hint(recovery: &RecoveryContext) -> String {
2897 if recovery.offset_hint_eligible
2901 && !recovery.byte_truncated
2902 && recovery.dropped_by_class.is_empty()
2903 && !recovery.include_stderr_path
2904 {
2905 if let (Some(path), Some(line)) =
2906 (recovery.output_path.as_deref(), recovery.offset_start_line)
2907 {
2908 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2909 }
2910 }
2911
2912 let mut paths = Vec::new();
2913 if let Some(path) = recovery.output_path.as_deref() {
2914 paths.push(path);
2915 }
2916 if recovery.include_stderr_path {
2917 if let Some(path) = recovery.stderr_path.as_deref() {
2918 if !paths.contains(&path) {
2919 paths.push(path);
2920 }
2921 }
2922 }
2923
2924 if paths.is_empty() {
2925 return "full output unavailable".to_string();
2926 }
2927
2928 let reads = paths
2929 .into_iter()
2930 .map(|path| format!("read {}", quote_path(path)))
2931 .collect::<Vec<_>>()
2932 .join(" and ");
2933 if recovery.disk_truncated_prefix_bytes > 0 {
2934 format!("retained output: {reads}")
2935 } else {
2936 format!("full output: {reads}")
2937 }
2938}
2939
2940fn strip_plain_truncation_marker_lines(input: &str) -> String {
2941 input
2942 .lines()
2943 .filter(|line| !is_plain_truncation_marker(line.trim()))
2944 .collect::<Vec<_>>()
2945 .join("\n")
2946}
2947
2948fn strip_recovery_marker_lines(input: &str) -> String {
2949 input
2950 .lines()
2951 .filter(|line| !is_recovery_marker(line.trim()))
2952 .collect::<Vec<_>>()
2953 .join("\n")
2954}
2955
2956fn is_plain_truncation_marker(line: &str) -> bool {
2957 let Some(rest) = line.strip_prefix("...<truncated ") else {
2958 return false;
2959 };
2960 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2961 return false;
2962 };
2963 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2964}
2965
2966fn is_recovery_marker(line: &str) -> bool {
2967 line.starts_with('[')
2968 && line.ends_with(']')
2969 && (line.contains("full output: read ")
2970 || line.contains("retained output: read ")
2971 || line.contains("see remaining: tail -n +")
2972 || line.contains("full output unavailable"))
2973}
2974
2975fn render_structured_output(
2976 command: &str,
2977 buffer: &BgBuffer,
2978 disk_truncation: DiskTruncation,
2979) -> Option<TerminalOutputCache> {
2980 if !is_gh_structured_command(command) {
2981 return None;
2982 }
2983
2984 let output_path = buffer
2985 .output_path()
2986 .map(|path| path.display().to_string())?;
2987 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2988 if stdout_bytes == 0 {
2989 return None;
2990 }
2991
2992 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2993 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2994 return None;
2995 }
2996 let output_preview = if disk_truncation.total_prefix_bytes() > 0 {
2997 retained_json_output_pointer(
2998 stdout_bytes,
2999 &output_path,
3000 disk_truncation.total_prefix_bytes(),
3001 )
3002 } else {
3003 json_output_pointer(stdout_bytes, &output_path)
3004 };
3005 return Some(TerminalOutputCache {
3006 output_preview,
3007 output_truncated: true,
3008 kind: TerminalOutputKind::Structured,
3009 output_path: Some(output_path),
3010 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3011 recovery: None,
3012 });
3013 }
3014
3015 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
3016 if stdout.truncated || !is_structured_body(&stdout.text) {
3017 return None;
3018 }
3019
3020 Some(TerminalOutputCache {
3021 output_preview: stdout.text,
3022 output_truncated: false,
3023 kind: TerminalOutputKind::Structured,
3024 output_path: Some(output_path),
3025 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3026 recovery: None,
3027 })
3028}
3029
3030fn render_raw_passthrough(
3031 buffer: &BgBuffer,
3032 disk_truncation: DiskTruncation,
3033) -> TerminalOutputCache {
3034 let raw = buffer.read_combined_head_tail(
3035 RAW_PASSTHROUGH_CAP_BYTES,
3036 RAW_PASSTHROUGH_HEAD_BYTES,
3037 RAW_PASSTHROUGH_TAIL_BYTES,
3038 );
3039 let output_path = buffer.output_path().map(|path| path.display().to_string());
3040 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3041 if !raw.truncated && disk_truncation.total_prefix_bytes() == 0 {
3042 return TerminalOutputCache {
3043 output_preview: raw.text,
3044 output_truncated: false,
3045 kind: TerminalOutputKind::Raw,
3046 output_path,
3047 stderr_path,
3048 recovery: None,
3049 };
3050 }
3051
3052 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3053 let mut recovery = RecoveryContext {
3054 dropped_by_class: BTreeMap::new(),
3055 had_inner_drop: false,
3056 offset_hint_eligible: false,
3057 offset_start_line: None,
3058 byte_truncated: raw.truncated,
3059 disk_truncated_prefix_bytes: disk_truncation.total_prefix_bytes(),
3060 output_path: output_path.clone(),
3061 stderr_path: stderr_path.clone(),
3062 include_stderr_path,
3063 };
3064 let (output_preview, output_truncated) =
3065 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3066 TerminalOutputCache {
3067 output_preview,
3068 output_truncated,
3069 kind: TerminalOutputKind::Raw,
3070 output_path,
3071 stderr_path,
3072 recovery: Some(recovery),
3073 }
3074}
3075
3076fn completion_preview_for_cache(
3077 cache: &TerminalOutputCache,
3078 exit_code: Option<i32>,
3079) -> (String, bool) {
3080 let exit_ok = exit_code == Some(0);
3083 let threshold = completion_preview_threshold(exit_ok);
3084 if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3085 if let Some(path) = cache.output_path.as_deref() {
3086 return (
3087 json_output_pointer(cache.output_preview.len() as u64, path),
3088 true,
3089 );
3090 }
3091 return (cache.output_preview.clone(), cache.output_truncated);
3092 }
3093
3094 if let Some(recovery) = cache.recovery.as_ref() {
3095 if cache.output_preview.len() <= threshold {
3096 return (cache.output_preview.clone(), cache.output_truncated);
3097 }
3098 let body = strip_recovery_marker_lines(&cache.output_preview);
3099 let mut completion_recovery = recovery.clone();
3100 completion_recovery.byte_truncated = true;
3101 if let Some(marker) = recovery_marker(&completion_recovery) {
3102 let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3103 return (capped.text, true);
3104 }
3105 }
3106
3107 let capped = cap_completion_output(&cache.output_preview, exit_ok);
3108 (capped.text, cache.output_truncated || capped.truncated)
3109}
3110
3111fn is_gh_structured_command(command: &str) -> bool {
3112 let Some(normalized) = crate::compress::plain_command_for_structured_output(command) else {
3113 return false;
3114 };
3115 let tokens = shell_words_for_flags(&normalized);
3116 let Some(head) = tokens.first() else {
3117 return false;
3118 };
3119 let head_name = Path::new(head)
3120 .file_name()
3121 .and_then(|name| name.to_str())
3122 .unwrap_or(head);
3123 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3124 return false;
3125 }
3126 tokens.iter().any(|token| {
3127 matches!(token.as_str(), "--json" | "--jq" | "--template")
3128 || token.starts_with("--json=")
3129 || token.starts_with("--jq=")
3130 || token.starts_with("--template=")
3131 })
3132}
3133
3134fn shell_words_for_flags(command: &str) -> Vec<String> {
3135 let mut words = Vec::new();
3136 let mut current = String::new();
3137 let mut in_single = false;
3138 let mut in_double = false;
3139 let mut escaped = false;
3140
3141 for ch in command.chars() {
3142 if escaped {
3143 current.push(ch);
3144 escaped = false;
3145 continue;
3146 }
3147 if ch == '\\' && !in_single {
3148 escaped = true;
3149 continue;
3150 }
3151 if ch == '\'' && !in_double {
3152 in_single = !in_single;
3153 continue;
3154 }
3155 if ch == '"' && !in_single {
3156 in_double = !in_double;
3157 continue;
3158 }
3159 if ch.is_whitespace() && !in_single && !in_double {
3160 if !current.is_empty() {
3161 words.push(std::mem::take(&mut current));
3162 }
3163 continue;
3164 }
3165 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3166 if !current.is_empty() {
3167 words.push(std::mem::take(&mut current));
3168 }
3169 continue;
3170 }
3171 current.push(ch);
3172 }
3173 if !current.is_empty() {
3174 words.push(current);
3175 }
3176 words
3177}
3178
3179fn is_structured_body(body: &str) -> bool {
3180 let trimmed = body.trim();
3181 if trimmed.is_empty() {
3182 return false;
3183 }
3184 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3185 return true;
3186 }
3187
3188 let mut saw_line = false;
3189 for line in trimmed
3190 .lines()
3191 .map(str::trim)
3192 .filter(|line| !line.is_empty())
3193 {
3194 saw_line = true;
3195 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3196 return false;
3197 }
3198 }
3199 saw_line
3200}
3201
3202fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3203 let path = match (buffer, stream) {
3204 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3205 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3206 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3207 };
3208 let Some(path) = path else {
3209 return false;
3210 };
3211 let Ok(file) = std::fs::File::open(path) else {
3212 return false;
3213 };
3214 let mut limited = file.take(512);
3215 let mut bytes = Vec::new();
3216 if limited.read_to_end(&mut bytes).is_err() {
3217 return false;
3218 }
3219 String::from_utf8_lossy(&bytes)
3220 .chars()
3221 .find(|ch| !ch.is_whitespace())
3222 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3223}
3224
3225struct CompletionTokenCounts {
3226 original_tokens: Option<u32>,
3227 compressed_tokens: Option<u32>,
3228 original_bytes: Option<i64>,
3229 compressed_bytes: Option<i64>,
3230 tokens_skipped: bool,
3231}
3232
3233impl CompletionTokenCounts {
3234 fn skipped() -> Self {
3235 Self {
3236 original_tokens: None,
3237 compressed_tokens: None,
3238 original_bytes: None,
3239 compressed_bytes: None,
3240 tokens_skipped: true,
3241 }
3242 }
3243}
3244
3245fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3246 match status {
3247 BgTaskStatus::TimedOut => "timed out".to_string(),
3248 BgTaskStatus::Killed => "killed".to_string(),
3249 _ => exit_code
3250 .map(|code| format!("exit {code}"))
3251 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3252 }
3253}
3254
3255fn token_count_u32(text: &str) -> u32 {
3256 aft_tokenizer::count_tokens(text)
3257 .try_into()
3258 .unwrap_or(u32::MAX)
3259}
3260
3261impl Default for BgTaskRegistry {
3262 fn default() -> Self {
3263 Self::new(Arc::new(Mutex::new(None)))
3264 }
3265}
3266
3267fn modified_within(path: &Path, grace: Duration) -> bool {
3268 fs::metadata(path)
3269 .and_then(|metadata| metadata.modified())
3270 .ok()
3271 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3272 .map(|age| age < grace)
3273 .unwrap_or(false)
3274}
3275
3276fn canonicalized_path(path: &Path) -> PathBuf {
3277 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3278}
3279
3280fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3281 let now_ms = SystemTime::now()
3282 .duration_since(UNIX_EPOCH)
3283 .ok()
3284 .map(|duration| duration.as_millis() as u64)
3285 .unwrap_or(started_at);
3286 let elapsed_ms = now_ms.saturating_sub(started_at);
3287 Instant::now()
3288 .checked_sub(Duration::from_millis(elapsed_ms))
3289 .unwrap_or_else(Instant::now)
3290}
3291
3292fn gc_quarantine(storage_dir: &Path) {
3293 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3294 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3295 return;
3296 };
3297 for session_entry in session_dirs.flatten() {
3298 let session_quarantine_dir = session_entry.path();
3299 if !session_quarantine_dir.is_dir() {
3300 continue;
3301 }
3302 let entries = match fs::read_dir(&session_quarantine_dir) {
3303 Ok(entries) => entries,
3304 Err(error) => {
3305 crate::slog_warn!(
3306 "failed to read background task quarantine dir {}: {error}",
3307 session_quarantine_dir.display()
3308 );
3309 continue;
3310 }
3311 };
3312 for entry in entries.flatten() {
3313 let path = entry.path();
3314 if modified_within(&path, QUARANTINE_GC_GRACE) {
3315 continue;
3316 }
3317 let result = if path.is_dir() {
3318 fs::remove_dir_all(&path)
3319 } else {
3320 fs::remove_file(&path)
3321 };
3322 match result {
3323 Ok(()) => log::debug!(
3324 "deleted old background task quarantine entry {}",
3325 path.display()
3326 ),
3327 Err(error) => crate::slog_warn!(
3328 "failed to delete old background task quarantine entry {}: {error}",
3329 path.display()
3330 ),
3331 }
3332 }
3333 let _ = fs::remove_dir(&session_quarantine_dir);
3334 }
3335 let _ = fs::remove_dir(&quarantine_root);
3336}
3337
3338enum QuarantineKind {
3339 Corrupt,
3340 Invalid,
3341}
3342
3343fn quarantine_task_json(
3344 storage_dir: &Path,
3345 session_dir: &Path,
3346 json_path: &Path,
3347 kind: QuarantineKind,
3348) -> Result<(), String> {
3349 let session_hash = session_dir
3350 .file_name()
3351 .and_then(|name| name.to_str())
3352 .ok_or_else(|| {
3353 format!(
3354 "invalid background task session dir: {}",
3355 session_dir.display()
3356 )
3357 })?;
3358 let task_name = json_path
3359 .file_name()
3360 .and_then(|name| name.to_str())
3361 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3362 let unix_ts = SystemTime::now()
3363 .duration_since(UNIX_EPOCH)
3364 .map(|duration| duration.as_secs())
3365 .unwrap_or(0);
3366 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3367 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3368 format!(
3369 "failed to create background task quarantine dir {}: {e}",
3370 quarantine_dir.display()
3371 )
3372 })?;
3373 let target_name = quarantine_name(task_name, unix_ts, &kind);
3374 let target = quarantine_dir.join(target_name);
3375 fs::rename(json_path, &target).map_err(|e| {
3376 format!(
3377 "failed to quarantine background task metadata {} to {}: {e}",
3378 json_path.display(),
3379 target.display()
3380 )
3381 })?;
3382
3383 for sibling in task_sibling_paths(json_path) {
3384 if !sibling.exists() {
3385 continue;
3386 }
3387 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3388 crate::slog_warn!(
3389 "skipping background task sibling with invalid name during quarantine: {}",
3390 sibling.display()
3391 );
3392 continue;
3393 };
3394 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3395 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3396 crate::slog_warn!(
3397 "failed to quarantine background task sibling {} to {}: {error}",
3398 sibling.display(),
3399 sibling_target.display()
3400 );
3401 }
3402 }
3403
3404 let _ = fs::remove_dir(session_dir);
3405 Ok(())
3406}
3407
3408fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3409 match kind {
3410 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3411 QuarantineKind::Invalid => {
3412 let path = Path::new(file_name);
3413 let stem = path.file_stem().and_then(|stem| stem.to_str());
3414 let extension = path.extension().and_then(|extension| extension.to_str());
3415 match (stem, extension) {
3416 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3417 _ => format!("{file_name}.invalid.{unix_ts}"),
3418 }
3419 }
3420 }
3421}
3422
3423fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3424 let Some(parent) = json_path.parent() else {
3425 return Vec::new();
3426 };
3427 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3428 return Vec::new();
3429 };
3430 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3431 .into_iter()
3432 .map(|extension| parent.join(format!("{stem}.{extension}")))
3433 .collect()
3434}
3435
3436fn read_for_token_count_from_disk(
3437 metadata: &PersistedTask,
3438 paths: &TaskPaths,
3439 max_bytes_per_stream: usize,
3440) -> TokenCountInput {
3441 if metadata.mode == BgMode::Pty {
3442 return TokenCountInput::Skipped;
3443 }
3444 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3451 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3452 match (stdout, stderr) {
3453 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3454 String::from_utf8_lossy(&stdout).as_ref(),
3455 String::from_utf8_lossy(&stderr).as_ref(),
3456 )),
3457 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3458 String::from_utf8_lossy(&stdout).as_ref(),
3459 "",
3460 )),
3461 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3462 "",
3463 String::from_utf8_lossy(&stderr).as_ref(),
3464 )),
3465 (Err(_), Err(_)) => TokenCountInput::Skipped,
3466 }
3467}
3468
3469fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3474 use std::io::{Read, Seek, SeekFrom};
3475 let mut file = std::fs::File::open(path)?;
3476 let len = file.metadata()?.len();
3477 let read_len = len.min(max_bytes as u64);
3478 if read_len > 0 && len > max_bytes as u64 {
3479 file.seek(SeekFrom::End(-(read_len as i64)))?;
3480 }
3481 let mut bytes = Vec::with_capacity(read_len as usize);
3482 file.read_to_end(&mut bytes)?;
3483 Ok(bytes)
3484}
3485
3486impl BgTask {
3487 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3488 let state = self
3489 .state
3490 .lock()
3491 .unwrap_or_else(|poison| poison.into_inner());
3492 self.snapshot_locked(&state, preview_bytes)
3493 }
3494
3495 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3496 let metadata = &state.metadata;
3497 let duration_ms = metadata.duration_ms.or_else(|| {
3498 metadata
3499 .status
3500 .is_terminal()
3501 .then(|| self.started.elapsed().as_millis() as u64)
3502 });
3503 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3504 (String::new(), false)
3505 } else if metadata.status.is_terminal() {
3506 state
3507 .terminal_output_cache
3508 .as_ref()
3509 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3510 .unwrap_or_else(|| (String::new(), false))
3511 } else {
3512 state.buffer.read_tail(preview_bytes)
3513 };
3514 BgTaskSnapshot {
3515 info: BgTaskInfo {
3516 task_id: self.task_id.clone(),
3517 status: metadata.status.clone(),
3518 command: metadata.command.clone(),
3519 mode: metadata.mode.clone(),
3520 started_at: metadata.started_at,
3521 duration_ms,
3522 },
3523 exit_code: metadata.exit_code,
3524 child_pid: metadata.child_pid,
3525 workdir: metadata.workdir.display().to_string(),
3526 output_preview,
3527 output_truncated,
3528 output_path: state
3529 .buffer
3530 .output_path()
3531 .map(|path| path.display().to_string()),
3532 stderr_path: state
3533 .buffer
3534 .stderr_path()
3535 .map(|path| path.display().to_string()),
3536 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3537 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3538 pty_screen: None,
3539 }
3540 }
3541
3542 pub(crate) fn is_running(&self) -> bool {
3543 self.state
3544 .lock()
3545 .map(|state| {
3546 state.metadata.status == BgTaskStatus::Running
3547 || (state.metadata.mode == BgMode::Pty
3548 && state.metadata.status == BgTaskStatus::Killing)
3549 })
3550 .unwrap_or(false)
3551 }
3552
3553 fn is_terminal(&self) -> bool {
3554 self.state
3555 .lock()
3556 .map(|state| state.metadata.status.is_terminal())
3557 .unwrap_or(false)
3558 }
3559
3560 fn mark_terminal_now(&self) {
3561 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3562 if terminal_at.is_none() {
3563 *terminal_at = Some(Instant::now());
3564 }
3565 }
3566 }
3567
3568 fn set_completion_delivered(
3569 &self,
3570 delivered: bool,
3571 registry: &BgTaskRegistry,
3572 ) -> Result<(), String> {
3573 let mut state = self
3574 .state
3575 .lock()
3576 .map_err(|_| "background task lock poisoned".to_string())?;
3577 let updated = registry
3578 .update_task_metadata(&self.paths, |metadata| {
3579 metadata.completion_delivered = delivered;
3580 })
3581 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3582 state.metadata = updated;
3583 Ok(())
3584 }
3585}
3586
3587#[cfg(unix)]
3608fn reap_piped_child(child_slot: &mut Option<Child>) {
3609 if let Some(mut child) = child_slot.take() {
3610 if matches!(child.try_wait(), Ok(None)) {
3611 let _ = child.wait();
3612 }
3613 }
3614}
3615
3616#[cfg(windows)]
3621fn reap_piped_child(child_slot: &mut Option<Child>) {
3622 *child_slot = None;
3623}
3624
3625fn terminal_metadata_from_marker(
3626 mut metadata: PersistedTask,
3627 marker: ExitMarker,
3628 reason: Option<String>,
3629) -> PersistedTask {
3630 match marker {
3631 ExitMarker::Code(code) => {
3632 let status = if code == 0 {
3633 BgTaskStatus::Completed
3634 } else {
3635 BgTaskStatus::Failed
3636 };
3637 metadata.mark_terminal(status, Some(code), reason);
3638 }
3639 ExitMarker::Killed => metadata.mark_terminal(
3640 BgTaskStatus::Killed,
3641 terminal_exit_code_for_status(&BgTaskStatus::Killed),
3642 reason,
3643 ),
3644 }
3645 metadata
3646}
3647
3648fn terminal_exit_code_for_status(status: &BgTaskStatus) -> Option<i32> {
3649 match status {
3650 BgTaskStatus::TimedOut => Some(124),
3651 BgTaskStatus::Killed => Some(137),
3652 _ => None,
3653 }
3654}
3655
3656#[cfg(unix)]
3657fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3658 let stem = paths
3659 .json
3660 .file_stem()
3661 .and_then(|stem| stem.to_str())
3662 .unwrap_or("wrapper");
3663 let script_path = paths.dir.join(format!("{stem}.sh"));
3664 fs::write(&script_path, command)
3665 .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3666 Ok(script_path)
3667}
3668
3669#[cfg(unix)]
3670fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3671 let shell = resolve_posix_shell();
3672 let mut cmd = Command::new(&shell);
3673 cmd.arg("-c")
3679 .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3680 .arg(&shell)
3681 .arg(command_script)
3682 .arg(exit_path);
3683 unsafe {
3684 cmd.pre_exec(|| {
3685 if libc::setsid() == -1 {
3686 return Err(std::io::Error::last_os_error());
3687 }
3688 Ok(())
3689 });
3690 }
3691 cmd
3692}
3693
3694#[cfg(unix)]
3695fn resolve_posix_shell() -> PathBuf {
3696 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3697 POSIX_SHELL
3698 .get_or_init(|| {
3699 std::env::var_os("BASH")
3700 .filter(|value| !value.is_empty())
3701 .map(PathBuf::from)
3702 .filter(|path| path.exists())
3703 .or_else(|| which::which("bash").ok())
3704 .or_else(|| which::which("zsh").ok())
3705 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3706 })
3707 .clone()
3708}
3709
3710#[cfg(windows)]
3711fn detached_shell_command_for(
3712 shell: crate::windows_shell::WindowsShell,
3713 command: &str,
3714 exit_path: &Path,
3715 paths: &TaskPaths,
3716 creation_flags: u32,
3717) -> Result<Command, String> {
3718 use crate::windows_shell::WindowsShell;
3719 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3732 let wrapper_ext = match shell {
3733 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3734 WindowsShell::Cmd => "bat",
3735 WindowsShell::Posix(_) => "sh",
3739 };
3740 let wrapper_path = paths.dir.join(format!(
3741 "{}.{}",
3742 paths
3743 .json
3744 .file_stem()
3745 .and_then(|s| s.to_str())
3746 .unwrap_or("wrapper"),
3747 wrapper_ext
3748 ));
3749 fs::write(&wrapper_path, wrapper_body)
3750 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3751
3752 let mut cmd = Command::new(shell.binary().as_ref());
3753 match shell {
3754 WindowsShell::Pwsh | WindowsShell::Powershell => {
3755 cmd.args([
3758 "-NoLogo",
3759 "-NoProfile",
3760 "-NonInteractive",
3761 "-ExecutionPolicy",
3762 "Bypass",
3763 "-File",
3764 ]);
3765 cmd.arg(&wrapper_path);
3766 }
3767 WindowsShell::Cmd => {
3768 cmd.args(["/D", "/C"]);
3775 cmd.arg(&wrapper_path);
3776 }
3777 WindowsShell::Posix(_) => {
3778 cmd.arg(&wrapper_path);
3783 }
3784 }
3785
3786 cmd.creation_flags(creation_flags);
3790 Ok(cmd)
3791}
3792
3793fn spawn_detached_child(
3809 command: &str,
3810 paths: &TaskPaths,
3811 workdir: &Path,
3812 env: &HashMap<String, String>,
3813) -> Result<std::process::Child, String> {
3814 #[cfg(not(windows))]
3815 {
3816 let stdout = create_capture_file(&paths.stdout)
3817 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3818 let stderr = create_capture_file(&paths.stderr)
3819 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3820 let command_script = write_unix_command_script(command, paths)?;
3821 detached_shell_command(&command_script, &paths.exit)
3822 .current_dir(workdir)
3823 .envs(env)
3824 .stdin(Stdio::null())
3825 .stdout(Stdio::from(stdout))
3826 .stderr(Stdio::from(stderr))
3827 .spawn()
3828 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3829 }
3830 #[cfg(windows)]
3831 {
3832 use crate::windows_shell::shell_candidates;
3833 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3844 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3865 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3866 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3867 let with_breakaway =
3868 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3869 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3870 let mut last_error: Option<String> = None;
3871 for (idx, shell) in candidates.iter().enumerate() {
3872 for &flags in &[with_breakaway, without_breakaway] {
3876 let stdout = create_capture_file(&paths.stdout)
3878 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3879 let stderr = create_capture_file(&paths.stderr)
3880 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3881 let mut cmd =
3882 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3883 cmd.current_dir(workdir)
3884 .envs(env)
3885 .stdin(Stdio::null())
3886 .stdout(Stdio::from(stdout))
3887 .stderr(Stdio::from(stderr));
3888 match cmd.spawn() {
3889 Ok(child) => {
3890 if idx > 0 {
3891 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3892 the cached PATH probe disagreed with runtime spawn — likely PATH \
3893 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3894 shell.binary(),
3895 idx);
3896 }
3897 if flags == without_breakaway {
3898 crate::slog_warn!(
3899 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3900 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3901 Spawned without breakaway; the bg task will be torn down if the \
3902 AFT process group is killed."
3903 );
3904 }
3905 return Ok(child);
3906 }
3907 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3908 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3909 shell.binary());
3910 last_error = Some(format!("{}: {e}", shell.binary()));
3911 break;
3914 }
3915 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3916 crate::slog_warn!(
3918 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3919 Access Denied — retrying {} without breakaway",
3920 shell.binary()
3921 );
3922 last_error = Some(format!("{}: {e}", shell.binary()));
3923 continue;
3924 }
3925 Err(e) => {
3926 return Err(format!(
3927 "failed to spawn background bash command via {}: {e}",
3928 shell.binary()
3929 ));
3930 }
3931 }
3932 }
3933 }
3934 Err(format!(
3935 "failed to spawn background bash command: no Windows shell could be spawned. \
3936 Last error: {}. PATH-probed candidates: {:?}",
3937 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3938 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3939 ))
3940 }
3941}
3942
3943fn random_slug() -> String {
3944 let mut bytes = [0u8; 8];
3952 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3954 let t = SystemTime::now()
3956 .duration_since(UNIX_EPOCH)
3957 .map(|d| d.as_nanos() as u64)
3958 .unwrap_or(0);
3959 let p = u64::from(std::process::id());
3960 bytes.copy_from_slice(&(t ^ p.rotate_left(32)).to_le_bytes());
3961 });
3962 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3964 format!("bash-{hex}")
3965}
3966
3967#[cfg(test)]
3968mod tests {
3969 use std::collections::HashMap;
3970 use std::fs;
3971 use std::sync::atomic::{AtomicBool, AtomicUsize};
3972 use std::sync::{Arc, Mutex};
3973 use std::time::Duration;
3974 #[cfg(windows)]
3975 use std::time::Instant;
3976
3977 use super::*;
3978
3979 #[cfg(unix)]
3980 const QUICK_SUCCESS_COMMAND: &str = "true";
3981 #[cfg(windows)]
3982 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3983
3984 #[cfg(unix)]
3985 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3986 #[cfg(windows)]
3987 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3988
3989 #[test]
3990 fn gh_structured_detection_rejects_piped_commands() {
3991 assert!(is_gh_structured_command(
3992 "gh issue list --json number,title"
3993 ));
3994 assert!(is_gh_structured_command(
3995 "cd repo && gh issue list --json number,title"
3996 ));
3997
3998 assert!(!is_gh_structured_command(
3999 "gh issue list --json number,title | jq '.[]'"
4000 ));
4001 assert!(!is_gh_structured_command(
4002 "gh issue list --json number,title |"
4003 ));
4004 }
4005
4006 fn insert_terminal_piped_task(
4007 registry: &BgTaskRegistry,
4008 dir: &tempfile::TempDir,
4009 command: &str,
4010 stdout: &str,
4011 stderr: &str,
4012 compressed: bool,
4013 ) -> (String, Arc<BgTask>) {
4014 let task_id = format!("bash-test-{}", random_slug());
4015 let paths = task_paths(dir.path(), "session", &task_id);
4016 fs::create_dir_all(&paths.dir).unwrap();
4017 fs::write(&paths.stdout, stdout).unwrap();
4018 fs::write(&paths.stderr, stderr).unwrap();
4019 let mut metadata = PersistedTask::starting(
4020 task_id.clone(),
4021 "session".to_string(),
4022 command.to_string(),
4023 dir.path().to_path_buf(),
4024 Some(dir.path().to_path_buf()),
4025 Some(30_000),
4026 true,
4027 compressed,
4028 );
4029 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4030 write_task(&paths.json, &metadata).unwrap();
4031 registry
4032 .insert_rehydrated_task(metadata, paths, true)
4033 .expect("insert terminal task");
4034 let task = registry.task_for_session(&task_id, "session").unwrap();
4035 (task_id, task)
4036 }
4037
4038 fn insert_terminal_pty_task(
4039 registry: &BgTaskRegistry,
4040 dir: &tempfile::TempDir,
4041 pty_output: &str,
4042 ) -> (String, Arc<BgTask>) {
4043 let task_id = format!("bash-test-{}", random_slug());
4044 let paths = task_paths(dir.path(), "session", &task_id);
4045 fs::create_dir_all(&paths.dir).unwrap();
4046 fs::write(&paths.pty, pty_output).unwrap();
4047 let mut metadata = PersistedTask::starting(
4048 task_id.clone(),
4049 "session".to_string(),
4050 "python".to_string(),
4051 dir.path().to_path_buf(),
4052 Some(dir.path().to_path_buf()),
4053 Some(30_000),
4054 true,
4055 true,
4056 );
4057 metadata.mode = BgMode::Pty;
4058 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
4059 write_task(&paths.json, &metadata).unwrap();
4060 registry
4061 .insert_rehydrated_task(metadata, paths, true)
4062 .expect("insert terminal pty task");
4063 let task = registry.task_for_session(&task_id, "session").unwrap();
4064 (task_id, task)
4065 }
4066
4067 #[cfg(unix)]
4068 fn wait_for_terminal_snapshot(
4069 registry: &BgTaskRegistry,
4070 task_id: &str,
4071 session_id: &str,
4072 project: &Path,
4073 storage: &Path,
4074 ) -> BgTaskSnapshot {
4075 let started = Instant::now();
4076 loop {
4077 let snapshot = registry
4078 .status(task_id, session_id, Some(project), Some(storage), 4096)
4079 .expect("spawned task should be visible to status");
4080 if snapshot.info.status.is_terminal() {
4081 return snapshot;
4082 }
4083 assert!(
4084 started.elapsed() < Duration::from_secs(10),
4085 "timed out waiting for task {task_id} to finish; last status={:?}",
4086 snapshot.info.status
4087 );
4088 std::thread::sleep(Duration::from_millis(50));
4089 }
4090 }
4091
4092 fn write_running_project_task(storage: &Path, project: &Path, session: &str, task_id: &str) {
4093 let paths = task_paths(storage, session, task_id);
4094 let mut metadata = PersistedTask::starting(
4095 task_id.to_string(),
4096 session.to_string(),
4097 "sleep 60".to_string(),
4098 project.to_path_buf(),
4099 Some(project.to_path_buf()),
4100 Some(30_000),
4101 true,
4102 true,
4103 );
4104 metadata.status = BgTaskStatus::Running;
4105 write_task(&paths.json, &metadata).unwrap();
4106 fs::write(&paths.stdout, "still running\n").unwrap();
4107 fs::write(&paths.stderr, "").unwrap();
4108 }
4109
4110 #[test]
4111 fn status_replay_filters_same_session_by_project_root() {
4112 let project_a = tempfile::tempdir().unwrap();
4113 let project_b = tempfile::tempdir().unwrap();
4114 let storage = tempfile::tempdir().unwrap();
4115 let session = "shared-session";
4116 let task_id = "bash-project-a";
4117 write_running_project_task(storage.path(), project_a.path(), session, task_id);
4118
4119 let actor_b = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4120 assert!(actor_b
4121 .status(
4122 task_id,
4123 session,
4124 Some(project_b.path()),
4125 Some(storage.path()),
4126 1024,
4127 )
4128 .is_none());
4129 assert!(actor_b.task_for_session(task_id, session).is_none());
4130
4131 let actor_a = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4132 let snapshot = actor_a
4133 .status(
4134 task_id,
4135 session,
4136 Some(project_a.path()),
4137 Some(storage.path()),
4138 1024,
4139 )
4140 .expect("owning project should replay its task");
4141 assert_eq!(snapshot.info.status, BgTaskStatus::Running);
4142 }
4143
4144 #[cfg(unix)]
4145 #[test]
4146 fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4147 let cases = [
4148 (
4149 "long-first",
4150 "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4151 vec!["one", "1", "three"],
4152 ),
4153 (
4154 "short-first",
4155 "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4156 vec!["one", "1", "three"],
4157 ),
4158 (
4159 "failing-middle",
4160 "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4161 vec!["one", "after-false", "three"],
4162 ),
4163 ];
4164
4165 for (name, command, expected_lines) in cases {
4166 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4167 let dir = tempfile::tempdir().unwrap();
4168 let session_id = format!("session-{name}");
4169 let task_id = registry
4170 .spawn(
4171 command,
4172 session_id.clone(),
4173 dir.path().to_path_buf(),
4174 HashMap::new(),
4175 Some(Duration::from_secs(30)),
4176 dir.path().to_path_buf(),
4177 10,
4178 true,
4179 true,
4180 Some(dir.path().to_path_buf()),
4181 )
4182 .unwrap();
4183
4184 let snapshot = wait_for_terminal_snapshot(
4185 ®istry,
4186 &task_id,
4187 &session_id,
4188 dir.path(),
4189 dir.path(),
4190 );
4191 assert_eq!(
4192 snapshot.info.status,
4193 BgTaskStatus::Completed,
4194 "{name}: task should complete; snapshot={snapshot:?}"
4195 );
4196 assert_eq!(
4197 snapshot.exit_code,
4198 Some(0),
4199 "{name}: script should use the final command's exit code"
4200 );
4201
4202 let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4203 let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4204 panic!(
4205 "{name}: failed to read raw stdout file {}: {error}",
4206 stdout_path.display()
4207 )
4208 });
4209 let lines: Vec<&str> = stdout.lines().collect();
4210 assert_eq!(
4211 lines,
4212 expected_lines,
4213 "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4214 stdout_path.display()
4215 );
4216 }
4217 }
4218
4219 #[test]
4220 fn recognizes_all_recovery_marker_forms() {
4221 assert!(is_recovery_marker(
4222 "[truncated output; full output: read \"/tmp/out\"]"
4223 ));
4224 assert!(is_recovery_marker(
4225 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4226 ));
4227 assert!(is_recovery_marker(
4228 "[truncated output; full output unavailable]"
4229 ));
4230 assert!(is_recovery_marker(
4231 r#"[truncated 123 bytes from saved output prefix; retained output: read "/tmp/out"]"#
4232 ));
4233 }
4234
4235 #[test]
4236 fn recovery_marker_reports_disk_prefix_truncation_as_retained_output() {
4237 let recovery = RecoveryContext {
4238 dropped_by_class: BTreeMap::new(),
4239 had_inner_drop: false,
4240 offset_hint_eligible: false,
4241 offset_start_line: None,
4242 byte_truncated: false,
4243 disk_truncated_prefix_bytes: 4096,
4244 output_path: Some("/tmp/stdout".to_string()),
4245 stderr_path: None,
4246 include_stderr_path: false,
4247 };
4248
4249 let marker = recovery_marker(&recovery).expect("disk truncation must emit marker");
4250
4251 assert!(marker.contains("truncated 4096 bytes from saved output prefix"));
4252 assert!(marker.contains(r#"retained output: read "/tmp/stdout""#));
4253 assert!(!marker.contains("full output: read"));
4254 }
4255
4256 #[test]
4257 fn killed_exit_marker_sets_nonzero_sentinel_exit_code() {
4258 let metadata = PersistedTask::starting(
4259 "task".to_string(),
4260 "session".to_string(),
4261 "cargo test".to_string(),
4262 PathBuf::from("/tmp"),
4263 None,
4264 None,
4265 true,
4266 true,
4267 );
4268
4269 let terminal = terminal_metadata_from_marker(metadata, ExitMarker::Killed, None);
4270
4271 assert_eq!(terminal.status, BgTaskStatus::Killed);
4272 assert_eq!(terminal.exit_code, Some(137));
4273 }
4274
4275 #[test]
4276 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4277 let registry = BgTaskRegistry::default();
4278 let dir = tempfile::tempdir().unwrap();
4279 let (_task_id, task) = insert_terminal_piped_task(
4280 ®istry,
4281 &dir,
4282 "custom-tool --verbose",
4283 &"stdout line\n".repeat(200_000),
4284 "",
4285 true,
4286 );
4287 let calls = Arc::new(AtomicUsize::new(0));
4288 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4289 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4290 let calls_for_closure = Arc::clone(&calls);
4291 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4292 let task_for_closure = Arc::clone(&task_holder);
4293 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4294 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4295 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4296 if task.state.try_lock().is_ok() {
4297 unlocked_for_closure.store(true, Ordering::SeqCst);
4298 }
4299 }
4300 CompressionResult::new(format!("compressed {} bytes", output.len()))
4301 });
4302
4303 let first = registry
4304 .status(
4305 &task.task_id,
4306 "session",
4307 None,
4308 Some(dir.path()),
4309 RUNNING_OUTPUT_PREVIEW_BYTES,
4310 )
4311 .unwrap();
4312 let second = registry
4313 .status(
4314 &task.task_id,
4315 "session",
4316 None,
4317 Some(dir.path()),
4318 RUNNING_OUTPUT_PREVIEW_BYTES,
4319 )
4320 .unwrap();
4321 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4322
4323 assert_eq!(
4324 calls.load(Ordering::SeqCst),
4325 1,
4326 "terminal render must be cached"
4327 );
4328 assert!(
4329 saw_unlocked_state.load(Ordering::SeqCst),
4330 "compressor must run after releasing the task state lock"
4331 );
4332 assert!(first.output_preview.starts_with("compressed "));
4333 assert_eq!(second.output_preview, first.output_preview);
4334 assert_eq!(listed[0].output_preview, first.output_preview);
4335 }
4336
4337 #[test]
4338 fn completion_preview_success_keeps_tail_only() {
4339 let registry = BgTaskRegistry::default();
4344 let dir = tempfile::tempdir().unwrap();
4345 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4346 let (_task_id, task) =
4347 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4348
4349 registry.post_terminal_transition(&task, true).unwrap();
4350 let completions = registry.drain_completions_for_session(Some("session"));
4351 assert_eq!(completions.len(), 1);
4352 let preview = &completions[0].output_preview;
4353 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4354 assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4355 assert!(completions[0].output_truncated);
4356 }
4357
4358 #[test]
4359 fn completion_preview_failure_keeps_head_and_tail() {
4360 let registry = BgTaskRegistry::default();
4363 let dir = tempfile::tempdir().unwrap();
4364 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4365 let task_id = format!("bash-test-{}", random_slug());
4366 let paths = task_paths(dir.path(), "session", &task_id);
4367 fs::create_dir_all(&paths.dir).unwrap();
4368 fs::write(&paths.stdout, &output).unwrap();
4369 fs::write(&paths.stderr, "").unwrap();
4370 let mut metadata = PersistedTask::starting(
4371 task_id.clone(),
4372 "session".to_string(),
4373 "cat big.log".to_string(),
4374 dir.path().to_path_buf(),
4375 Some(dir.path().to_path_buf()),
4376 Some(30_000),
4377 true,
4378 false,
4379 );
4380 metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4381 write_task(&paths.json, &metadata).unwrap();
4382 registry
4383 .insert_rehydrated_task(metadata, paths, true)
4384 .expect("insert terminal task");
4385 let task = registry.task_for_session(&task_id, "session").unwrap();
4386
4387 registry.post_terminal_transition(&task, true).unwrap();
4388 let completions = registry.drain_completions_for_session(Some("session"));
4389 assert_eq!(completions.len(), 1);
4390 let preview = &completions[0].output_preview;
4391 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4392 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4393 }
4394
4395 #[test]
4396 fn has_completions_for_session_matches_pending_delivery() {
4397 let registry = BgTaskRegistry::default();
4398 assert!(!registry.has_completions_for_session(Some("session")));
4399 assert!(!registry.has_completions_for_session(None));
4400
4401 let dir = tempfile::tempdir().unwrap();
4402 let (_task_id, task) =
4403 insert_terminal_piped_task(®istry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4404 registry.post_terminal_transition(&task, true).unwrap();
4405
4406 assert!(registry.has_completions_for_session(Some("session")));
4407 assert!(registry.has_completions_for_session(None));
4408 assert!(!registry.has_completions_for_session(Some("other-session")));
4409
4410 let completions = registry.drain_completions_for_session(Some("session"));
4411 assert_eq!(completions.len(), 1);
4412 assert_eq!(completions[0].task_id, task.task_id);
4413 }
4414
4415 #[test]
4416 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4417 let registry = BgTaskRegistry::default();
4418 let dir = tempfile::tempdir().unwrap();
4419 let calls = Arc::new(AtomicUsize::new(0));
4420 let calls_for_closure = Arc::clone(&calls);
4421 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4422 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4423 CompressionResult::new(output)
4424 });
4425 let (task_id, _task) = insert_terminal_piped_task(
4426 ®istry,
4427 &dir,
4428 "gh pr view 123 --json body",
4429 "{\"body\":\"hello\"}",
4430 "warning: stderr must not join json",
4431 true,
4432 );
4433
4434 let snapshot = registry
4435 .status(
4436 &task_id,
4437 "session",
4438 None,
4439 Some(dir.path()),
4440 RUNNING_OUTPUT_PREVIEW_BYTES,
4441 )
4442 .unwrap();
4443
4444 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4445 assert!(!snapshot.output_preview.contains("warning"));
4446 assert!(!snapshot.output_truncated);
4447 assert_eq!(
4448 calls.load(Ordering::SeqCst),
4449 0,
4450 "structured JSON bypasses compression"
4451 );
4452 }
4453
4454 #[test]
4455 fn registry_emits_single_recovery_marker_for_class_drops() {
4456 let registry = BgTaskRegistry::default();
4457 let dir = tempfile::tempdir().unwrap();
4458 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4459 let mut dropped = BTreeMap::new();
4460 dropped.insert(DropClass::Error, 18);
4461 dropped.insert(DropClass::Warning, 6);
4462 CompressionResult::with_class_drops("kept diagnostic", dropped)
4463 });
4464 let (task_id, task) =
4465 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4466
4467 let snapshot = registry
4468 .status(
4469 &task_id,
4470 "session",
4471 None,
4472 Some(dir.path()),
4473 RUNNING_OUTPUT_PREVIEW_BYTES,
4474 )
4475 .unwrap();
4476
4477 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4478 assert!(snapshot.output_preview.contains("+18 more errors"));
4479 assert!(snapshot.output_preview.contains("+6 more warnings"));
4480 assert!(snapshot
4481 .output_preview
4482 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4483 assert!(!snapshot.output_preview.contains("tail -n +"));
4484 assert!(snapshot.output_truncated);
4485 }
4486
4487 #[test]
4488 fn registry_marker_reports_semantic_and_byte_drops_once() {
4489 let registry = BgTaskRegistry::default();
4490 let dir = tempfile::tempdir().unwrap();
4491 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4492 let mut dropped = BTreeMap::new();
4493 dropped.insert(DropClass::Error, 1);
4494 CompressionResult::with_class_drops(
4495 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4496 dropped,
4497 )
4498 });
4499 let (task_id, _task) =
4500 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4501
4502 let snapshot = registry
4503 .status(
4504 &task_id,
4505 "session",
4506 None,
4507 Some(dir.path()),
4508 RUNNING_OUTPUT_PREVIEW_BYTES,
4509 )
4510 .unwrap();
4511
4512 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4513 assert!(snapshot.output_preview.contains("+1 more error"));
4514 assert!(snapshot.output_preview.contains("truncated output"));
4515 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4516 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4517 assert!(!snapshot.output_preview.contains("...<truncated"));
4518 assert!(snapshot.output_truncated);
4519 }
4520
4521 #[test]
4522 fn cargo_stderr_class_drops_name_both_capture_paths() {
4523 let registry = BgTaskRegistry::default();
4524 let dir = tempfile::tempdir().unwrap();
4525 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4526 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4527 crate::compress::compress_with_registry_exit_code(
4528 command,
4529 &output,
4530 exit_code,
4531 &filter_registry,
4532 )
4533 });
4534 let stderr = (0..22)
4535 .map(|index| {
4536 format!(
4537 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4538 index + 1,
4539 index + 1
4540 )
4541 })
4542 .collect::<Vec<_>>()
4543 .join("\n");
4544 let (task_id, task) = insert_terminal_piped_task(
4545 ®istry,
4546 &dir,
4547 "cargo check",
4548 "Finished dev [unoptimized] target(s) in 0.01s\n",
4549 &stderr,
4550 true,
4551 );
4552
4553 let snapshot = registry
4554 .status(
4555 &task_id,
4556 "session",
4557 None,
4558 Some(dir.path()),
4559 RUNNING_OUTPUT_PREVIEW_BYTES,
4560 )
4561 .unwrap();
4562
4563 assert!(snapshot.output_preview.contains("+2 more errors"));
4564 assert!(snapshot
4565 .output_preview
4566 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4567 assert!(snapshot
4568 .output_preview
4569 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4570 assert!(!snapshot.output_preview.contains("tail -n +"));
4571 }
4572
4573 #[test]
4574 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4575 let registry = BgTaskRegistry::default();
4576 let dir = tempfile::tempdir().unwrap();
4577 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4578 let (task_id, task) = insert_terminal_piped_task(
4579 ®istry,
4580 &dir,
4581 "cd /repo && gh pr view 123 --json body",
4582 &body,
4583 "",
4584 true,
4585 );
4586
4587 let snapshot = registry
4588 .status(
4589 &task_id,
4590 "session",
4591 None,
4592 Some(dir.path()),
4593 RUNNING_OUTPUT_PREVIEW_BYTES,
4594 )
4595 .unwrap();
4596
4597 assert!(snapshot.output_preview.starts_with("[JSON output "));
4598 assert!(snapshot
4599 .output_preview
4600 .contains(&task.paths.stdout.display().to_string()));
4601 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4602 assert!(snapshot.output_truncated);
4603 }
4604
4605 #[test]
4606 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4607 let registry = BgTaskRegistry::default();
4608 let dir = tempfile::tempdir().unwrap();
4609 let filter_registry = crate::compress::toml_filter::build_registry(
4610 crate::compress::builtin_filters::ALL,
4611 None,
4612 None,
4613 );
4614 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4615 crate::compress::compress_with_registry_exit_code(
4616 command,
4617 &output,
4618 exit_code,
4619 &filter_registry,
4620 )
4621 });
4622 let stdout = format!(
4623 "make[1]: Entering directory `/tmp`\n{}",
4624 (0..100)
4625 .map(|index| format!("compile line {index}"))
4626 .collect::<Vec<_>>()
4627 .join("\n")
4628 );
4629 let (task_id, task) =
4630 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4631
4632 let snapshot = registry
4633 .status(
4634 &task_id,
4635 "session",
4636 None,
4637 Some(dir.path()),
4638 RUNNING_OUTPUT_PREVIEW_BYTES,
4639 )
4640 .unwrap();
4641
4642 assert!(snapshot.output_preview.contains("compile line 99"));
4643 assert!(snapshot.output_preview.contains(&format!(
4644 "full output: read \"{}\"",
4645 task.paths.stdout.display()
4646 )));
4647 assert!(!snapshot
4648 .output_preview
4649 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4650 assert!(!snapshot.output_preview.contains("tail -n +"));
4651 }
4652
4653 #[test]
4654 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4655 let registry = BgTaskRegistry::default();
4656 let dir = tempfile::tempdir().unwrap();
4657 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4658 let (task_id, task) =
4659 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4660
4661 let snapshot = registry
4662 .status(
4663 &task_id,
4664 "session",
4665 None,
4666 Some(dir.path()),
4667 RUNNING_OUTPUT_PREVIEW_BYTES,
4668 )
4669 .unwrap();
4670
4671 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4672 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4673 assert!(snapshot.output_preview.contains("truncated output"));
4674 assert!(snapshot
4675 .output_preview
4676 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4677 assert!(snapshot
4678 .output_preview
4679 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4680 assert!(!snapshot.output_preview.contains("tail -n +"));
4681 assert!(snapshot.output_preview.len() > 16 * 1024);
4682 assert!(snapshot.output_truncated);
4683 }
4684
4685 #[test]
4686 fn pty_terminal_snapshot_bypasses_line_compression() {
4687 let registry = BgTaskRegistry::default();
4688 let dir = tempfile::tempdir().unwrap();
4689 let calls = Arc::new(AtomicUsize::new(0));
4690 let calls_for_closure = Arc::clone(&calls);
4691 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4692 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4693 CompressionResult::new(output)
4694 });
4695 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4696
4697 let snapshot = registry
4698 .status(
4699 &task_id,
4700 "session",
4701 None,
4702 Some(dir.path()),
4703 RUNNING_OUTPUT_PREVIEW_BYTES,
4704 )
4705 .unwrap();
4706
4707 assert_eq!(snapshot.info.mode, BgMode::Pty);
4708 assert_eq!(snapshot.output_preview, "");
4709 assert_eq!(calls.load(Ordering::SeqCst), 0);
4710 }
4711
4712 #[test]
4713 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4714 let registry = BgTaskRegistry::default();
4715 let dir = tempfile::tempdir().unwrap();
4716 let task_id = registry
4717 .spawn_pty(
4718 QUICK_SUCCESS_COMMAND,
4719 "session".to_string(),
4720 dir.path().to_path_buf(),
4721 HashMap::new(),
4722 Some(Duration::from_secs(30)),
4723 dir.path().to_path_buf(),
4724 10,
4725 true,
4726 false,
4727 Some(dir.path().to_path_buf()),
4728 50,
4729 120,
4730 )
4731 .unwrap();
4732
4733 let paths = task_paths(dir.path(), "session", &task_id);
4734 let metadata = read_task(&paths.json).unwrap();
4735 assert_eq!(
4736 metadata.schema_version,
4737 crate::bash_background::persistence::SCHEMA_VERSION
4738 );
4739 assert_eq!(metadata.mode, BgMode::Pty);
4740 assert_eq!(metadata.pty_rows, Some(50));
4741 assert_eq!(metadata.pty_cols, Some(120));
4742
4743 let snapshot = registry
4744 .status(&task_id, "session", None, Some(dir.path()), 1024)
4745 .unwrap();
4746 assert_eq!(snapshot.pty_rows, Some(50));
4747 assert_eq!(snapshot.pty_cols, Some(120));
4748 }
4749
4750 fn spawn_dead_child() -> std::process::Child {
4755 #[cfg(unix)]
4756 let mut cmd = std::process::Command::new("true");
4757 #[cfg(windows)]
4758 let mut cmd = {
4759 let mut c = std::process::Command::new("cmd");
4760 c.args(["/c", "exit", "0"]);
4761 c
4762 };
4763 cmd.stdin(std::process::Stdio::null());
4764 cmd.stdout(std::process::Stdio::null());
4765 cmd.stderr(std::process::Stdio::null());
4766 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4767 let started = Instant::now();
4776 loop {
4777 match child.try_wait() {
4778 Ok(Some(_)) => break,
4779 Ok(None) => {
4780 if started.elapsed() > Duration::from_secs(5) {
4781 panic!("dead-child stand-in did not exit within 5s");
4782 }
4783 std::thread::sleep(Duration::from_millis(10));
4784 }
4785 Err(error) => panic!("dead-child try_wait failed: {error}"),
4786 }
4787 }
4788 child
4789 }
4790
4791 #[test]
4792 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4793 let registry = BgTaskRegistry::default();
4794 let dir = tempfile::tempdir().unwrap();
4795 let task_id = registry
4796 .spawn(
4797 LONG_RUNNING_COMMAND,
4798 "session".to_string(),
4799 dir.path().to_path_buf(),
4800 HashMap::new(),
4801 Some(Duration::from_secs(30)),
4802 dir.path().to_path_buf(),
4803 10,
4804 true,
4805 false,
4806 Some(dir.path().to_path_buf()),
4807 )
4808 .unwrap();
4809 registry
4810 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4811 .unwrap();
4812 assert_eq!(
4813 registry
4814 .drain_completions_for_session(Some("session"))
4815 .len(),
4816 1
4817 );
4818
4819 registry.inner.completions.lock().unwrap().clear();
4822
4823 assert_eq!(
4824 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4825 vec![task_id.clone()]
4826 );
4827 assert!(registry
4828 .drain_completions_for_session(Some("session"))
4829 .is_empty());
4830
4831 let paths = task_paths(dir.path(), "session", &task_id);
4832 let metadata = read_task(&paths.json).unwrap();
4833 assert!(metadata.completion_delivered);
4834
4835 let replayed = BgTaskRegistry::default();
4836 replayed
4837 .replay_session_inner(dir.path(), "session", None)
4838 .unwrap();
4839 assert!(replayed
4840 .drain_completions_for_session(Some("session"))
4841 .is_empty());
4842 }
4843
4844 #[test]
4845 fn register_watch_rejects_unknown_task() {
4846 let registry = BgTaskRegistry::default();
4847
4848 let result = registry.register_watch(
4849 "missing-task".to_string(),
4850 WatchPattern::Substring("READY".into()),
4851 true,
4852 );
4853
4854 assert_eq!(result, Err("task_not_found"));
4855 }
4856
4857 #[test]
4858 fn register_watch_on_terminal_task_scans_existing_output() {
4859 let frames = Arc::new(Mutex::new(Vec::new()));
4860 let captured = Arc::clone(&frames);
4861 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4862 captured.lock().unwrap().push(frame);
4863 })
4864 as Box<dyn Fn(PushFrame) + Send + Sync>);
4865 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4866 let dir = tempfile::tempdir().unwrap();
4867 let task_id = registry
4868 .spawn(
4869 LONG_RUNNING_COMMAND,
4870 "session".to_string(),
4871 dir.path().to_path_buf(),
4872 HashMap::new(),
4873 Some(Duration::from_secs(30)),
4874 dir.path().to_path_buf(),
4875 10,
4876 true,
4877 false,
4878 Some(dir.path().to_path_buf()),
4879 )
4880 .unwrap();
4881 registry
4882 .inner
4883 .shutdown
4884 .store(true, std::sync::atomic::Ordering::SeqCst);
4885 let task = registry.task_for_session(&task_id, "session").unwrap();
4886 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4887 registry
4888 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4889 .unwrap();
4890 frames.lock().unwrap().clear();
4891 registry.inner.completions.lock().unwrap().clear();
4892
4893 registry
4894 .register_watch(
4895 task_id.clone(),
4896 WatchPattern::Substring("READY".into()),
4897 true,
4898 )
4899 .unwrap();
4900
4901 let frames = frames.lock().unwrap();
4902 let frame = frames
4903 .iter()
4904 .find_map(|frame| match frame {
4905 PushFrame::BashPatternMatch(frame) => Some(frame),
4906 _ => None,
4907 })
4908 .expect("terminal watch registration should emit pattern frame");
4909 assert_eq!(frame.reason, "pattern_match");
4910 assert_eq!(frame.task_id, task_id);
4911 assert_eq!(frame.session_id, "session");
4912 assert_eq!(frame.match_text, "READY");
4913 assert_eq!(frame.match_offset, 0);
4914 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4915 let metadata = read_task(&task.paths.json).unwrap();
4916 assert!(metadata.completion_delivered);
4917 }
4918
4919 #[test]
4920 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4921 let registry = BgTaskRegistry::default();
4922 let dir = tempfile::tempdir().unwrap();
4923 let task_id = registry
4924 .spawn(
4925 QUICK_SUCCESS_COMMAND,
4926 "session".to_string(),
4927 dir.path().to_path_buf(),
4928 HashMap::new(),
4929 Some(Duration::from_secs(30)),
4930 dir.path().to_path_buf(),
4931 10,
4932 true,
4933 false,
4934 Some(dir.path().to_path_buf()),
4935 )
4936 .unwrap();
4937 registry
4938 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4939 .unwrap();
4940 let completions = registry.drain_completions_for_session(Some("session"));
4941 assert_eq!(completions.len(), 1);
4942 assert_eq!(
4943 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4944 vec![task_id.clone()]
4945 );
4946
4947 registry.cleanup_finished(Duration::ZERO);
4948
4949 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4950 }
4951
4952 #[test]
4953 fn cleanup_finished_retains_undelivered_terminals() {
4954 let registry = BgTaskRegistry::default();
4955 let dir = tempfile::tempdir().unwrap();
4956 let task_id = registry
4957 .spawn(
4958 QUICK_SUCCESS_COMMAND,
4959 "session".to_string(),
4960 dir.path().to_path_buf(),
4961 HashMap::new(),
4962 Some(Duration::from_secs(30)),
4963 dir.path().to_path_buf(),
4964 10,
4965 true,
4966 false,
4967 Some(dir.path().to_path_buf()),
4968 )
4969 .unwrap();
4970 registry
4971 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4972 .unwrap();
4973
4974 registry.cleanup_finished(Duration::ZERO);
4975
4976 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4977 }
4978
4979 #[test]
4987 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4988 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4989 let dir = tempfile::tempdir().unwrap();
4990 let task_id = registry
4991 .spawn(
4992 QUICK_SUCCESS_COMMAND,
4993 "session".to_string(),
4994 dir.path().to_path_buf(),
4995 HashMap::new(),
4996 Some(Duration::from_secs(30)),
4997 dir.path().to_path_buf(),
4998 10,
4999 true,
5000 false,
5001 Some(dir.path().to_path_buf()),
5002 )
5003 .unwrap();
5004
5005 let task = registry.task_for_session(&task_id, "session").unwrap();
5006
5007 let started = Instant::now();
5012 loop {
5013 let exited = {
5014 let mut state = task.state.lock().unwrap();
5015 match &mut state.runtime {
5016 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5017 _ => true,
5018 }
5019 };
5020 if exited {
5021 break;
5022 }
5023 assert!(
5024 started.elapsed() < Duration::from_secs(5),
5025 "child should exit quickly"
5026 );
5027 std::thread::sleep(Duration::from_millis(20));
5028 }
5029
5030 registry
5038 .inner
5039 .shutdown
5040 .store(true, std::sync::atomic::Ordering::SeqCst);
5041 std::thread::sleep(Duration::from_millis(550));
5045
5046 let _ = std::fs::remove_file(&task.paths.exit);
5049
5050 {
5065 let mut state = task.state.lock().unwrap();
5066 state.metadata.status = BgTaskStatus::Running;
5067 state.metadata.status_reason = None;
5068 state.metadata.exit_code = None;
5069 state.metadata.finished_at = None;
5070 state.metadata.duration_ms = None;
5071 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5074 .expect("persist reset Running metadata for reap_child test");
5075 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5079 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5080 }
5081 }
5082 *task.terminal_at.lock().unwrap() = None;
5085
5086 assert!(
5089 task.is_running(),
5090 "precondition: metadata.status == Running"
5091 );
5092 assert!(
5093 !task.paths.exit.exists(),
5094 "precondition: exit marker absent"
5095 );
5096
5097 registry.reap_child(&task);
5102
5103 {
5104 let state = task.state.lock().unwrap();
5105 assert_eq!(
5106 state.metadata.status,
5107 BgTaskStatus::Running,
5108 "first reap must leave status Running while waiting one pass for marker"
5109 );
5110 assert_eq!(
5111 state.metadata.status_reason, None,
5112 "first reap must not record a failure reason"
5113 );
5114 assert!(
5115 matches!(state.runtime, TaskRuntime::Piped(None)),
5116 "child handle must be released after first reap"
5117 );
5118 assert!(
5119 state.detached,
5120 "task must be marked detached after first reap"
5121 );
5122 }
5123
5124 registry.reap_child(&task);
5128
5129 let state = task.state.lock().unwrap();
5130 assert!(
5131 state.metadata.status.is_terminal(),
5132 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
5133 state.metadata.status
5134 );
5135 assert_eq!(
5136 state.metadata.status,
5137 BgTaskStatus::Failed,
5138 "must specifically be Failed (not Killed): status={:?}",
5139 state.metadata.status
5140 );
5141 assert_eq!(
5142 state.metadata.status_reason.as_deref(),
5143 Some("process exited without exit marker"),
5144 "reason must match replay path's wording: {:?}",
5145 state.metadata.status_reason
5146 );
5147 assert!(
5148 matches!(state.runtime, TaskRuntime::Piped(None)),
5149 "child handle must stay released after second reap"
5150 );
5151 assert!(
5152 state.detached,
5153 "task must remain detached after second reap"
5154 );
5155 }
5156
5157 #[test]
5162 fn reap_child_preserves_running_when_exit_marker_exists() {
5163 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5164 let dir = tempfile::tempdir().unwrap();
5165 let task_id = registry
5166 .spawn(
5167 QUICK_SUCCESS_COMMAND,
5168 "session".to_string(),
5169 dir.path().to_path_buf(),
5170 HashMap::new(),
5171 Some(Duration::from_secs(30)),
5172 dir.path().to_path_buf(),
5173 10,
5174 true,
5175 false,
5176 Some(dir.path().to_path_buf()),
5177 )
5178 .unwrap();
5179
5180 let task = registry.task_for_session(&task_id, "session").unwrap();
5181
5182 let started = Instant::now();
5185 loop {
5186 let exited = {
5187 let mut state = task.state.lock().unwrap();
5188 match &mut state.runtime {
5189 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5190 _ => true,
5191 }
5192 };
5193 if exited && task.paths.exit.exists() {
5194 break;
5195 }
5196 assert!(
5197 started.elapsed() < Duration::from_secs(5),
5198 "child should exit and write marker quickly"
5199 );
5200 std::thread::sleep(Duration::from_millis(20));
5201 }
5202
5203 registry
5209 .inner
5210 .shutdown
5211 .store(true, std::sync::atomic::Ordering::SeqCst);
5212 std::thread::sleep(Duration::from_millis(550));
5213
5214 {
5220 let mut state = task.state.lock().unwrap();
5221 state.metadata.status = BgTaskStatus::Running;
5222 state.metadata.status_reason = None;
5223 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5224 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5225 }
5226 }
5227 *task.terminal_at.lock().unwrap() = None;
5228 if !task.paths.exit.exists() {
5231 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5232 }
5233
5234 registry.reap_child(&task);
5238
5239 let state = task.state.lock().unwrap();
5240 assert!(
5241 matches!(state.runtime, TaskRuntime::Piped(None)),
5242 "child handle still released even when marker exists"
5243 );
5244 assert!(
5245 state.detached,
5246 "task still marked detached even when marker exists"
5247 );
5248 assert_eq!(
5253 state.metadata.status,
5254 BgTaskStatus::Running,
5255 "reap_child must defer to poll_task when marker exists"
5256 );
5257 }
5258
5259 #[cfg(unix)]
5263 fn pid_stat(pid: u32) -> Option<String> {
5264 let output = std::process::Command::new("ps")
5265 .args(["-o", "stat=", "-p", &pid.to_string()])
5266 .output()
5267 .ok()?;
5268 if !output.status.success() {
5269 return None;
5270 }
5271 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5272 if stat.is_empty() {
5273 None
5274 } else {
5275 Some(stat)
5276 }
5277 }
5278
5279 #[cfg(unix)]
5281 fn is_zombie(pid: u32) -> bool {
5282 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5283 }
5284
5285 #[cfg(unix)]
5291 fn spawn_unreaped_zombie() -> std::process::Child {
5292 let child = std::process::Command::new("true")
5293 .stdin(std::process::Stdio::null())
5294 .stdout(std::process::Stdio::null())
5295 .stderr(std::process::Stdio::null())
5296 .spawn()
5297 .expect("spawn zombie stand-in");
5298 let pid = child.id();
5299 let started = Instant::now();
5300 while !is_zombie(pid) {
5301 assert!(
5302 started.elapsed() < Duration::from_secs(5),
5303 "stand-in child should become a zombie within 5s"
5304 );
5305 std::thread::sleep(Duration::from_millis(10));
5306 }
5307 child
5309 }
5310
5311 #[cfg(unix)]
5321 #[test]
5322 fn finalize_from_marker_reaps_child_no_zombie() {
5323 use std::sync::atomic::Ordering;
5324
5325 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5326 let dir = tempfile::tempdir().unwrap();
5327 let task_id = registry
5328 .spawn(
5329 QUICK_SUCCESS_COMMAND,
5330 "session".to_string(),
5331 dir.path().to_path_buf(),
5332 HashMap::new(),
5333 Some(Duration::from_secs(30)),
5334 dir.path().to_path_buf(),
5335 10,
5336 true,
5337 false,
5338 Some(dir.path().to_path_buf()),
5339 )
5340 .unwrap();
5341
5342 registry.inner.shutdown.store(true, Ordering::SeqCst);
5346 std::thread::sleep(Duration::from_millis(550));
5347
5348 let task = registry.task_for_session(&task_id, "session").unwrap();
5349
5350 let started = Instant::now();
5354 while !task.paths.exit.exists() {
5355 assert!(
5356 started.elapsed() < Duration::from_secs(5),
5357 "exit marker should land quickly for `true`"
5358 );
5359 std::thread::sleep(Duration::from_millis(20));
5360 }
5361
5362 let zombie_pid;
5368 {
5369 let mut state = task.state.lock().unwrap();
5370 state.metadata.status = BgTaskStatus::Running;
5371 state.metadata.status_reason = None;
5372 state.metadata.exit_code = None;
5373 state.metadata.finished_at = None;
5374 state.metadata.duration_ms = None;
5375 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5376 .expect("persist reset Running metadata");
5377 let zombie = spawn_unreaped_zombie();
5378 zombie_pid = zombie.id();
5379 state.runtime = TaskRuntime::Piped(Some(zombie));
5380 }
5381 *task.terminal_at.lock().unwrap() = None;
5382
5383 assert!(
5385 is_zombie(zombie_pid),
5386 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5387 );
5388
5389 registry.poll_task(&task).unwrap();
5392
5393 {
5394 let state = task.state.lock().unwrap();
5395 assert!(
5396 matches!(state.runtime, TaskRuntime::Piped(None)),
5397 "child handle must be released after marker finalize"
5398 );
5399 assert!(
5400 state.metadata.status.is_terminal(),
5401 "task must be terminal after marker finalize: {:?}",
5402 state.metadata.status
5403 );
5404 }
5405
5406 assert!(
5409 !is_zombie(zombie_pid),
5410 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5411 after the exit-marker terminal transition"
5412 );
5413 }
5414
5415 #[cfg(unix)]
5419 #[test]
5420 fn kill_with_existing_marker_reaps_child_no_zombie() {
5421 use std::sync::atomic::Ordering;
5422
5423 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5424 let dir = tempfile::tempdir().unwrap();
5425 let task_id = registry
5426 .spawn(
5427 QUICK_SUCCESS_COMMAND,
5428 "session".to_string(),
5429 dir.path().to_path_buf(),
5430 HashMap::new(),
5431 Some(Duration::from_secs(30)),
5432 dir.path().to_path_buf(),
5433 10,
5434 true,
5435 false,
5436 Some(dir.path().to_path_buf()),
5437 )
5438 .unwrap();
5439
5440 registry.inner.shutdown.store(true, Ordering::SeqCst);
5441 std::thread::sleep(Duration::from_millis(550));
5442
5443 let task = registry.task_for_session(&task_id, "session").unwrap();
5444
5445 let started = Instant::now();
5446 while !task.paths.exit.exists() {
5447 assert!(
5448 started.elapsed() < Duration::from_secs(5),
5449 "exit marker should land quickly for `true`"
5450 );
5451 std::thread::sleep(Duration::from_millis(20));
5452 }
5453
5454 let zombie_pid;
5455 {
5456 let mut state = task.state.lock().unwrap();
5457 state.metadata.status = BgTaskStatus::Running;
5458 state.metadata.status_reason = None;
5459 state.metadata.exit_code = None;
5460 state.metadata.finished_at = None;
5461 state.metadata.duration_ms = None;
5462 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5463 .expect("persist reset Running metadata");
5464 let zombie = spawn_unreaped_zombie();
5465 zombie_pid = zombie.id();
5466 state.runtime = TaskRuntime::Piped(Some(zombie));
5467 }
5468 *task.terminal_at.lock().unwrap() = None;
5469
5470 assert!(
5471 is_zombie(zombie_pid),
5472 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5473 );
5474
5475 registry
5477 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5478 .expect("kill should succeed");
5479
5480 {
5481 let state = task.state.lock().unwrap();
5482 assert!(
5483 matches!(state.runtime, TaskRuntime::Piped(None)),
5484 "child handle must be released after marker-aware kill"
5485 );
5486 assert!(state.metadata.status.is_terminal());
5487 }
5488
5489 assert!(
5490 !is_zombie(zombie_pid),
5491 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5492 after a marker-aware kill"
5493 );
5494 }
5495
5496 #[test]
5497 fn cleanup_finished_keeps_running_tasks() {
5498 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5499 let dir = tempfile::tempdir().unwrap();
5500 let task_id = registry
5501 .spawn(
5502 LONG_RUNNING_COMMAND,
5503 "session".to_string(),
5504 dir.path().to_path_buf(),
5505 HashMap::new(),
5506 Some(Duration::from_secs(30)),
5507 dir.path().to_path_buf(),
5508 10,
5509 true,
5510 false,
5511 Some(dir.path().to_path_buf()),
5512 )
5513 .unwrap();
5514
5515 registry.cleanup_finished(Duration::ZERO);
5516
5517 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5518 let _ = registry.kill(&task_id, "session");
5519 }
5520
5521 #[cfg(windows)]
5522 fn wait_for_file(path: &Path) -> String {
5523 let started = Instant::now();
5524 loop {
5525 if path.exists() {
5526 return fs::read_to_string(path).expect("read file");
5527 }
5528 assert!(
5529 started.elapsed() < Duration::from_secs(30),
5530 "timed out waiting for {}",
5531 path.display()
5532 );
5533 std::thread::sleep(Duration::from_millis(100));
5534 }
5535 }
5536
5537 #[cfg(windows)]
5538 fn spawn_windows_registry_command(
5539 command: &str,
5540 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5541 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5542 let dir = tempfile::tempdir().unwrap();
5543 let task_id = registry
5544 .spawn(
5545 command,
5546 "session".to_string(),
5547 dir.path().to_path_buf(),
5548 HashMap::new(),
5549 Some(Duration::from_secs(30)),
5550 dir.path().to_path_buf(),
5551 10,
5552 false,
5553 false,
5554 Some(dir.path().to_path_buf()),
5555 )
5556 .unwrap();
5557 (registry, dir, task_id)
5558 }
5559
5560 #[cfg(windows)]
5561 #[test]
5562 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5563 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5564 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5565
5566 let content = wait_for_file(&exit_path);
5567
5568 assert_eq!(content.trim(), "0");
5569 }
5570
5571 #[cfg(windows)]
5572 #[test]
5573 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5574 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5575 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5576
5577 let content = wait_for_file(&exit_path);
5578
5579 assert_eq!(content.trim(), "42");
5580 }
5581
5582 #[cfg(windows)]
5583 #[test]
5584 fn windows_spawn_captures_stdout_to_disk() {
5585 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5586 let task = registry.task_for_session(&task_id, "session").unwrap();
5587 let stdout_path = task.paths.stdout.clone();
5588 let exit_path = task.paths.exit.clone();
5589
5590 let _ = wait_for_file(&exit_path);
5591 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5592
5593 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5594 }
5595
5596 #[cfg(windows)]
5597 #[test]
5598 fn windows_spawn_uses_pwsh_when_available() {
5599 let candidates = crate::windows_shell::shell_candidates_with(
5603 |binary| match binary {
5604 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5605 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5606 _ => None,
5607 },
5608 || None,
5609 );
5610 let shell = candidates.first().expect("at least one candidate").clone();
5611 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5612 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5613 }
5614
5615 #[cfg(windows)]
5622 #[test]
5623 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5624 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5625 let script =
5626 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5627
5628 assert!(
5632 script.contains("set CODE=%ERRORLEVEL%"),
5633 "wrapper must capture exit code into CODE: {script}"
5634 );
5635 assert!(
5636 script.contains("echo %CODE% >"),
5637 "wrapper must echo CODE to a temp marker file: {script}"
5638 );
5639 assert!(
5640 script.contains("move /Y"),
5641 "wrapper must use atomic move to write the marker: {script}"
5642 );
5643 assert!(
5646 script.contains("> nul"),
5647 "wrapper must redirect move output to nul: {script}"
5648 );
5649 assert!(
5651 script.contains("exit /B %CODE%"),
5652 "wrapper must propagate the captured exit code: {script}"
5653 );
5654 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5655 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5656 }
5657
5658 #[cfg(windows)]
5664 #[test]
5665 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5666 use crate::windows_shell::WindowsShell;
5667 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5668 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5669 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5670 assert_eq!(
5671 args_strs,
5672 vec!["/D", "/S", "/C", "echo wrapped"],
5673 "Cmd::bg_command must prepend /D /S /C"
5674 );
5675 }
5676
5677 #[cfg(windows)]
5680 #[test]
5681 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5682 use crate::windows_shell::WindowsShell;
5683 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5684 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5685 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5686 assert!(
5687 args_strs.contains(&"-Command"),
5688 "Pwsh::bg_command must use -Command: {args_strs:?}"
5689 );
5690 assert!(
5691 args_strs.contains(&"Get-Date"),
5692 "Pwsh::bg_command must include the user command body"
5693 );
5694 }
5695}