1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30 COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31 FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32 RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37 ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
51const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
52const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
53const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
54
55const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
56
57#[derive(Debug, Clone, Serialize)]
58pub struct BgCompletion {
59 pub task_id: String,
60 #[serde(skip_serializing)]
63 pub session_id: String,
64 pub status: BgTaskStatus,
65 pub exit_code: Option<i32>,
66 pub command: String,
67 #[serde(default, skip_serializing_if = "String::is_empty")]
73 pub output_preview: String,
74 #[serde(default, skip_serializing_if = "is_false")]
79 pub output_truncated: bool,
80 #[serde(default, skip_serializing_if = "Option::is_none")]
83 pub original_tokens: Option<u32>,
84 #[serde(default, skip_serializing_if = "Option::is_none")]
87 pub compressed_tokens: Option<u32>,
88 #[serde(default, skip_serializing_if = "is_false")]
90 pub tokens_skipped: bool,
91}
92
93fn is_false(v: &bool) -> bool {
94 !*v
95}
96
97#[derive(Debug, Clone, Serialize)]
98pub struct BgTaskSnapshot {
99 #[serde(flatten)]
100 pub info: BgTaskInfo,
101 pub exit_code: Option<i32>,
102 pub child_pid: Option<u32>,
103 pub workdir: String,
104 pub output_preview: String,
105 pub output_truncated: bool,
106 pub output_path: Option<String>,
107 pub stderr_path: Option<String>,
108 #[serde(skip_serializing_if = "Option::is_none")]
109 pub pty_rows: Option<u16>,
110 #[serde(skip_serializing_if = "Option::is_none")]
111 pub pty_cols: Option<u16>,
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115enum TerminalOutputKind {
116 Compressed,
117 Raw,
118 Structured,
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
122struct TerminalOutputCache {
123 output_preview: String,
124 output_truncated: bool,
125 kind: TerminalOutputKind,
126 output_path: Option<String>,
127 stderr_path: Option<String>,
128 recovery: Option<RecoveryContext>,
129}
130
131#[derive(Debug, Clone, PartialEq, Eq)]
132struct RecoveryContext {
133 dropped_by_class: BTreeMap<DropClass, usize>,
134 had_inner_drop: bool,
135 offset_hint_eligible: bool,
136 offset_start_line: Option<usize>,
137 byte_truncated: bool,
138 output_path: Option<String>,
139 stderr_path: Option<String>,
140 include_stderr_path: bool,
141}
142
143impl RecoveryContext {
144 fn has_visible_drop(&self) -> bool {
145 self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
146 }
147}
148
149#[derive(Clone)]
150pub struct BgTaskRegistry {
151 pub(crate) inner: Arc<RegistryInner>,
152}
153
154pub(crate) struct RegistryInner {
155 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
156 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
157 pub(crate) progress_sender: SharedProgressSender,
158 watchdog_started: AtomicBool,
159 pub(crate) shutdown: AtomicBool,
160 pub(crate) long_running_reminder_enabled: AtomicBool,
161 pub(crate) long_running_reminder_interval_ms: AtomicU64,
162 persisted_gc_started: AtomicBool,
163 #[cfg(test)]
164 persisted_gc_runs: AtomicU64,
165 pub(crate) compressor:
171 Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
172 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
173 pub(crate) db_harness: RwLock<Option<String>>,
174 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
175 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
176 pub(crate) watch_registry: Mutex<WatchRegistry>,
177}
178
179pub(crate) struct BgTask {
180 pub(crate) task_id: String,
181 pub(crate) session_id: String,
182 pub(crate) paths: TaskPaths,
183 pub(crate) started: Instant,
184 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
185 pub(crate) terminal_at: Mutex<Option<Instant>>,
186 pub(crate) state: Mutex<BgTaskState>,
187}
188
189pub(crate) enum TaskRuntime {
190 Piped(Option<Child>),
191 Pty(Option<PtyRuntime>),
192}
193
194pub(crate) struct BgTaskState {
195 pub(crate) metadata: PersistedTask,
196 pub(crate) runtime: TaskRuntime,
197 pub(crate) detached: bool,
198 pub(crate) child_exit_observed: bool,
209 pub(crate) buffer: BgBuffer,
210 terminal_output_cache: Option<TerminalOutputCache>,
211 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
213}
214
215fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
216 session_id
217 .map(|session_id| completion.session_id == session_id)
218 .unwrap_or(true)
219}
220
221impl BgTaskRegistry {
222 pub fn new(progress_sender: SharedProgressSender) -> Self {
223 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
224 Self {
225 inner: Arc::new(RegistryInner {
226 tasks: Mutex::new(HashMap::new()),
227 completions: Mutex::new(VecDeque::new()),
228 progress_sender,
229 watchdog_started: AtomicBool::new(false),
230 shutdown: AtomicBool::new(false),
231 long_running_reminder_enabled: AtomicBool::new(true),
232 long_running_reminder_interval_ms: AtomicU64::new(600_000),
233 persisted_gc_started: AtomicBool::new(false),
234 #[cfg(test)]
235 persisted_gc_runs: AtomicU64::new(0),
236 compressor: Mutex::new(None),
237 db_pool: RwLock::new(None),
238 db_harness: RwLock::new(None),
239 wake_tx,
240 wake_rx,
241 watch_registry: Mutex::new(WatchRegistry::default()),
242 }),
243 }
244 }
245
246 pub fn set_harness(&self, harness: Harness) {
247 if let Ok(mut slot) = self.inner.db_harness.write() {
248 *slot = Some(harness.as_str().to_string());
249 }
250 }
251
252 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
253 if let Ok(mut slot) = self.inner.db_pool.write() {
254 *slot = Some(conn);
255 }
256 }
257
258 pub fn clear_db_pool(&self) {
259 if let Ok(mut slot) = self.inner.db_pool.write() {
260 *slot = None;
261 }
262 }
263
264 pub fn set_compressor<F>(&self, compressor: F)
269 where
270 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
271 {
272 self.set_compressor_with_exit_code(move |command, output, _exit_code| {
273 compressor(command, output)
274 });
275 }
276
277 pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
278 where
279 F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
280 {
281 if let Ok(mut slot) = self.inner.compressor.lock() {
282 *slot = Some(Box::new(compressor));
283 }
284 }
285
286 pub(crate) fn compress_output(
289 &self,
290 command: &str,
291 output: String,
292 exit_code: Option<i32>,
293 ) -> CompressionResult {
294 let Ok(slot) = self.inner.compressor.lock() else {
295 return CompressionResult::new(output);
296 };
297 match slot.as_ref() {
298 Some(compressor) => compressor(command, output, exit_code),
299 None => CompressionResult::new(output),
300 }
301 }
302
303 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
304 let (metadata, buffer) = {
305 let state = task.state.lock().ok()?;
306 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
307 return None;
308 }
309 if let Some(cache) = state.terminal_output_cache.clone() {
310 return Some(cache);
311 }
312 (state.metadata.clone(), state.buffer.clone())
313 };
314
315 let mut cap_buffer = buffer.clone();
316 cap_buffer.enforce_terminal_cap();
317 let cache = self.render_terminal_output(&metadata, &buffer);
318 let mut state = task.state.lock().ok()?;
319 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
320 return None;
321 }
322 if let Some(existing) = state.terminal_output_cache.clone() {
323 return Some(existing);
324 }
325 state.terminal_output_cache = Some(cache.clone());
326 Some(cache)
327 }
328
329 fn render_terminal_output(
330 &self,
331 metadata: &PersistedTask,
332 buffer: &BgBuffer,
333 ) -> TerminalOutputCache {
334 if metadata.mode == BgMode::Pty {
335 return TerminalOutputCache {
336 output_preview: String::new(),
337 output_truncated: false,
338 kind: TerminalOutputKind::Raw,
339 output_path: buffer.output_path().map(|path| path.display().to_string()),
340 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
341 recovery: None,
342 };
343 }
344
345 if let Some(structured) = render_structured_output(&metadata.command, buffer) {
346 return structured;
347 }
348
349 if !metadata.compressed {
350 return render_raw_passthrough(buffer);
351 }
352
353 let raw = buffer.read_combined_head_tail(
354 COMPRESS_INPUT_CAP_BYTES,
355 COMPRESS_INPUT_HEAD_BYTES,
356 COMPRESS_INPUT_TAIL_BYTES,
357 );
358 let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
359 render_compressed_with_recovery(buffer, compressed, raw.truncated)
360 }
361
362 fn snapshot_with_terminal_cache(
363 &self,
364 task: &Arc<BgTask>,
365 preview_bytes: usize,
366 ) -> BgTaskSnapshot {
367 let mut snapshot = task.snapshot(preview_bytes);
368 self.maybe_compress_snapshot(task, &mut snapshot);
369 snapshot
370 }
371
372 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
373 let (metadata, buffer) = {
374 let state = task
375 .state
376 .lock()
377 .map_err(|_| "background task lock poisoned".to_string())?;
378 if !state.metadata.status.is_terminal() {
379 return Ok(());
380 }
381 (state.metadata.clone(), state.buffer.clone())
382 };
383
384 let mut cap_buffer = buffer.clone();
385 cap_buffer.enforce_terminal_cap();
386 let cache = self.ensure_terminal_output_cache(task);
387 self.enqueue_completion_from_parts(
388 &metadata,
389 Some(&buffer),
390 None,
391 emit_frame,
392 cache.as_ref(),
393 );
394 Ok(())
395 }
396
397 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
398 write_task(&paths.json, metadata)?;
399 self.dual_write_task(paths, metadata);
400 Ok(())
401 }
402
403 fn update_task_metadata<F>(
404 &self,
405 paths: &TaskPaths,
406 update: F,
407 ) -> std::io::Result<PersistedTask>
408 where
409 F: FnOnce(&mut PersistedTask),
410 {
411 let metadata = update_task(&paths.json, update)?;
412 self.dual_write_task(paths, &metadata);
413 Ok(metadata)
414 }
415
416 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
417 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
418 let Some(pool) = pool else {
419 return;
420 };
421 let harness = self
422 .inner
423 .db_harness
424 .read()
425 .ok()
426 .and_then(|slot| slot.clone());
427 let Some(harness) = harness else {
428 crate::slog_warn!(
429 "dual-write bash_task to DB skipped for {}: harness not configured",
430 metadata.task_id
431 );
432 return;
433 };
434 let row = match metadata.to_bash_task_row(&harness, paths) {
435 Ok(row) => row,
436 Err(error) => {
437 crate::slog_warn!(
438 "dual-write bash_task to DB failed for {}: {}",
439 metadata.task_id,
440 error
441 );
442 return;
443 }
444 };
445 let conn = match pool.lock() {
446 Ok(conn) => conn,
447 Err(_) => {
448 crate::slog_warn!(
449 "dual-write bash_task to DB failed for {}: db mutex poisoned",
450 metadata.task_id
451 );
452 return;
453 }
454 };
455 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
456 crate::slog_warn!(
457 "dual-write bash_task to DB failed for {}: {}",
458 metadata.task_id,
459 error
460 );
461 }
462 }
463
464 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
465 self.inner
466 .long_running_reminder_enabled
467 .store(enabled, Ordering::SeqCst);
468 self.inner
469 .long_running_reminder_interval_ms
470 .store(interval_ms, Ordering::SeqCst);
471 }
472
473 #[cfg(unix)]
474 #[allow(clippy::too_many_arguments)]
475 pub fn spawn(
476 &self,
477 command: &str,
478 session_id: String,
479 workdir: PathBuf,
480 env: HashMap<String, String>,
481 timeout: Option<Duration>,
482 storage_dir: PathBuf,
483 max_running: usize,
484 notify_on_completion: bool,
485 compressed: bool,
486 project_root: Option<PathBuf>,
487 ) -> Result<String, String> {
488 self.start_watchdog();
489
490 let running = self.running_count();
491 if running >= max_running {
492 return Err(format!(
493 "background bash task limit exceeded: {running} running (max {max_running})"
494 ));
495 }
496
497 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
498 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
499 let task_id = self.generate_unique_task_id()?;
500 let paths = task_paths(&storage_dir, &session_id, &task_id);
501 fs::create_dir_all(&paths.dir)
502 .map_err(|e| format!("failed to create background task dir: {e}"))?;
503
504 let mut metadata = PersistedTask::starting(
505 task_id.clone(),
506 session_id.clone(),
507 command.to_string(),
508 workdir.clone(),
509 project_root,
510 timeout_ms,
511 notify_on_completion,
512 compressed,
513 );
514 self.persist_task(&paths, &metadata)
515 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
516
517 create_capture_file(&paths.stdout)
521 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
522 create_capture_file(&paths.stderr)
523 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
524
525 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
526 Ok(child) => child,
527 Err(error) => {
528 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
529 let _ = delete_task_bundle(&paths);
530 return Err(error);
531 }
532 };
533
534 let child_pid = child.id();
535 metadata.mark_running(child_pid, child_pid as i32);
536 self.persist_task(&paths, &metadata)
537 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
538
539 let task = Arc::new(BgTask {
540 task_id: task_id.clone(),
541 session_id,
542 paths: paths.clone(),
543 started: Instant::now(),
544 last_reminder_at: Mutex::new(None),
545 terminal_at: Mutex::new(None),
546 state: Mutex::new(BgTaskState {
547 metadata,
548 runtime: TaskRuntime::Piped(Some(child)),
549 detached: false,
550 child_exit_observed: false,
551 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
552 terminal_output_cache: None,
553 pending_terminal_override: None,
554 }),
555 });
556
557 self.inner
558 .tasks
559 .lock()
560 .map_err(|_| "background task registry lock poisoned".to_string())?
561 .insert(task_id.clone(), task);
562
563 Ok(task_id)
564 }
565
566 #[allow(clippy::too_many_arguments)]
567 pub fn spawn_pty(
568 &self,
569 command: &str,
570 session_id: String,
571 workdir: PathBuf,
572 env: HashMap<String, String>,
573 timeout: Option<Duration>,
574 storage_dir: PathBuf,
575 max_running: usize,
576 notify_on_completion: bool,
577 compressed: bool,
578 project_root: Option<PathBuf>,
579 rows: u16,
580 cols: u16,
581 ) -> Result<String, String> {
582 self.start_watchdog();
583
584 let running = self.running_count();
585 if running >= max_running {
586 return Err(format!(
587 "background bash task limit exceeded: {running} running (max {max_running})"
588 ));
589 }
590
591 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
592 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
593 let task_id = self.generate_unique_task_id()?;
594 let paths = task_paths(&storage_dir, &session_id, &task_id);
595 fs::create_dir_all(&paths.dir)
596 .map_err(|e| format!("failed to create background task dir: {e}"))?;
597
598 let mut metadata = PersistedTask::starting(
599 task_id.clone(),
600 session_id.clone(),
601 command.to_string(),
602 workdir.clone(),
603 project_root,
604 timeout_ms,
605 notify_on_completion,
606 compressed,
607 );
608 metadata.mode = BgMode::Pty;
609 metadata.pty_rows = Some(rows);
610 metadata.pty_cols = Some(cols);
611 self.persist_task(&paths, &metadata)
612 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
613 create_capture_file(&paths.pty)
614 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
615
616 let runtime = match spawn_pty_for_command(
617 &task_id,
618 &session_id,
619 command,
620 &paths,
621 &workdir,
622 &env,
623 rows,
624 cols,
625 self.inner.wake_tx.clone(),
626 ) {
627 Ok(runtime) => runtime,
628 Err(error) => {
629 crate::slog_warn!(
630 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
631 );
632 let _ = delete_task_bundle(&paths);
633 return Err(error);
634 }
635 };
636
637 if let Some(child_pid) = runtime.child_pid {
638 metadata.mark_running(child_pid, child_pid as i32);
639 } else {
640 metadata.status = BgTaskStatus::Running;
641 metadata.pgid = None;
642 }
643 self.persist_task(&paths, &metadata)
644 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
645
646 let task = Arc::new(BgTask {
647 task_id: task_id.clone(),
648 session_id,
649 paths: paths.clone(),
650 started: Instant::now(),
651 last_reminder_at: Mutex::new(None),
652 terminal_at: Mutex::new(None),
653 state: Mutex::new(BgTaskState {
654 metadata,
655 runtime: TaskRuntime::Pty(Some(runtime)),
656 detached: false,
657 child_exit_observed: false,
658 buffer: BgBuffer::pty(paths.pty.clone()),
659 terminal_output_cache: None,
660 pending_terminal_override: None,
661 }),
662 });
663
664 self.inner
665 .tasks
666 .lock()
667 .map_err(|_| "background task registry lock poisoned".to_string())?
668 .insert(task_id.clone(), task);
669
670 Ok(task_id)
671 }
672
673 #[cfg(windows)]
674 #[allow(clippy::too_many_arguments)]
675 pub fn spawn(
676 &self,
677 command: &str,
678 session_id: String,
679 workdir: PathBuf,
680 env: HashMap<String, String>,
681 timeout: Option<Duration>,
682 storage_dir: PathBuf,
683 max_running: usize,
684 notify_on_completion: bool,
685 compressed: bool,
686 project_root: Option<PathBuf>,
687 ) -> Result<String, String> {
688 self.start_watchdog();
689
690 let running = self.running_count();
691 if running >= max_running {
692 return Err(format!(
693 "background bash task limit exceeded: {running} running (max {max_running})"
694 ));
695 }
696
697 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
698 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
699 let task_id = self.generate_unique_task_id()?;
700 let paths = task_paths(&storage_dir, &session_id, &task_id);
701 fs::create_dir_all(&paths.dir)
702 .map_err(|e| format!("failed to create background task dir: {e}"))?;
703
704 let mut metadata = PersistedTask::starting(
705 task_id.clone(),
706 session_id.clone(),
707 command.to_string(),
708 workdir.clone(),
709 project_root,
710 timeout_ms,
711 notify_on_completion,
712 compressed,
713 );
714 self.persist_task(&paths, &metadata)
715 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
716
717 create_capture_file(&paths.stdout)
723 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
724 create_capture_file(&paths.stderr)
725 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
726
727 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
728 Ok(child) => child,
729 Err(error) => {
730 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
731 let _ = delete_task_bundle(&paths);
732 return Err(error);
733 }
734 };
735
736 let child_pid = child.id();
737 metadata.status = BgTaskStatus::Running;
738 metadata.child_pid = Some(child_pid);
739 metadata.pgid = None;
740 self.persist_task(&paths, &metadata)
741 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
742
743 let task = Arc::new(BgTask {
744 task_id: task_id.clone(),
745 session_id,
746 paths: paths.clone(),
747 started: Instant::now(),
748 last_reminder_at: Mutex::new(None),
749 terminal_at: Mutex::new(None),
750 state: Mutex::new(BgTaskState {
751 metadata,
752 runtime: TaskRuntime::Piped(Some(child)),
753 detached: false,
754 child_exit_observed: false,
755 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
756 terminal_output_cache: None,
757 pending_terminal_override: None,
758 }),
759 });
760
761 self.inner
762 .tasks
763 .lock()
764 .map_err(|_| "background task registry lock poisoned".to_string())?
765 .insert(task_id.clone(), task);
766
767 Ok(task_id)
768 }
769
770 pub fn write_pty(
771 &self,
772 task_id: &str,
773 session_id: &str,
774 input: &[u8],
775 ) -> Result<usize, String> {
776 let task = self
777 .task_for_session(task_id, session_id)
778 .ok_or_else(|| "task_not_found".to_string())?;
779
780 let writer = {
781 let state = task
782 .state
783 .lock()
784 .map_err(|_| "background task lock poisoned".to_string())?;
785 if state.metadata.mode != BgMode::Pty {
786 return Err("task_not_pty".to_string());
787 }
788 if state.metadata.status.is_terminal() {
789 return Err("task_exited".to_string());
790 }
791 match &state.runtime {
792 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
793 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
794 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
795 }
796 };
797
798 let mut writer = writer
799 .lock()
800 .map_err(|_| "PTY writer lock poisoned".to_string())?;
801 writer
802 .write_all(input)
803 .map_err(|error| format!("failed to write to PTY: {error}"))?;
804 writer
805 .flush()
806 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
807 Ok(input.len())
808 }
809
810 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
811 self.replay_session_inner(storage_dir, session_id, None)
812 }
813
814 pub fn replay_session_for_project(
815 &self,
816 storage_dir: &Path,
817 session_id: &str,
818 project_root: &Path,
819 ) -> Result<(), String> {
820 self.replay_session_inner(storage_dir, session_id, Some(project_root))
821 }
822
823 fn replay_session_inner(
824 &self,
825 storage_dir: &Path,
826 session_id: &str,
827 project_root: Option<&Path>,
828 ) -> Result<(), String> {
829 self.start_watchdog();
830 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
831 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
832 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
833 }
834 }
835
836 let canonical_project = project_root.map(canonicalized_path);
837 let tasks = match self.replay_session_from_db(session_id) {
849 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
850 Some(Ok(_)) => {
851 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
852 if !disk_tasks.is_empty() {
853 crate::slog_info!(
854 "bash task replay: 0 in DB for session {}, {} from disk fallback",
855 session_id,
856 disk_tasks.len()
857 );
858 }
859 disk_tasks
860 }
861 Some(Err(error)) => {
862 crate::slog_warn!(
863 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
864 session_id,
865 error
866 );
867 self.replay_session_from_disk(storage_dir, session_id)?
868 }
869 None => {
870 self.replay_session_from_disk(storage_dir, session_id)?
872 }
873 };
874
875 for mut metadata in tasks {
876 if metadata.session_id != session_id {
877 continue;
878 }
879 if let Some(canonical_project) = canonical_project.as_deref() {
880 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
881 if metadata_project.as_deref() != Some(canonical_project) {
882 continue;
883 }
884 }
885
886 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
887 match metadata.status {
888 BgTaskStatus::Starting => {
889 let completion_was_delivered = metadata.completion_delivered;
890 metadata.mark_terminal(
891 BgTaskStatus::Failed,
892 None,
893 Some("spawn aborted".to_string()),
894 );
895 metadata.completion_delivered |= completion_was_delivered;
896 let _ = self.persist_task(&paths, &metadata);
897 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
898 self.insert_rehydrated_task(metadata, paths, true)?;
899 }
900 BgTaskStatus::Running | BgTaskStatus::Killing => {
901 if metadata.mode == BgMode::Pty {
902 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
903 let completion_was_delivered = metadata.completion_delivered;
904 metadata = terminal_metadata_from_marker(metadata, marker, None);
905 metadata.completion_delivered |= completion_was_delivered;
906 let _ = self.persist_task(&paths, &metadata);
907 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
908 self.insert_rehydrated_task(metadata, paths, true)?;
909 } else if metadata.status.is_terminal() {
910 self.insert_rehydrated_task(metadata, paths, true)?;
911 } else {
912 let completion_was_delivered = metadata.completion_delivered;
913 metadata.mark_terminal(
914 BgTaskStatus::Killed,
915 None,
916 Some("pty_lost_on_bridge_restart".to_string()),
917 );
918 metadata.completion_delivered |= completion_was_delivered;
919 let _ = self.persist_task(&paths, &metadata);
920 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
921 self.insert_rehydrated_task(metadata, paths, true)?;
922 }
923 } else if self.running_metadata_is_stale(&metadata) {
924 let completion_was_delivered = metadata.completion_delivered;
925 metadata.mark_terminal(
926 BgTaskStatus::Killed,
927 None,
928 Some("orphaned (>24h)".to_string()),
929 );
930 metadata.completion_delivered |= completion_was_delivered;
931 if !paths.exit.exists() {
932 let _ = write_kill_marker_if_absent(&paths.exit);
933 }
934 let _ = self.persist_task(&paths, &metadata);
935 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
936 self.insert_rehydrated_task(metadata, paths, true)?;
937 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
938 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
939 "recovered from inconsistent killing state on replay".to_string()
940 });
941 if reason.is_some() {
942 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
943 metadata.task_id);
944 }
945 let completion_was_delivered = metadata.completion_delivered;
946 metadata = terminal_metadata_from_marker(metadata, marker, reason);
947 metadata.completion_delivered |= completion_was_delivered;
948 let _ = self.persist_task(&paths, &metadata);
949 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
950 self.insert_rehydrated_task(metadata, paths, true)?;
951 } else if metadata.status == BgTaskStatus::Killing {
952 if !paths.exit.exists() {
953 let _ = write_kill_marker_if_absent(&paths.exit);
954 }
955 let completion_was_delivered = metadata.completion_delivered;
956 metadata.mark_terminal(
957 BgTaskStatus::Killed,
958 None,
959 Some("recovered from inconsistent killing state on replay".to_string()),
960 );
961 metadata.completion_delivered |= completion_was_delivered;
962 let _ = self.persist_task(&paths, &metadata);
963 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
964 self.insert_rehydrated_task(metadata, paths, true)?;
965 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
966 let completion_was_delivered = metadata.completion_delivered;
967 metadata.mark_terminal(
968 BgTaskStatus::Failed,
969 None,
970 Some("process exited without exit marker".to_string()),
971 );
972 metadata.completion_delivered |= completion_was_delivered;
973 let _ = self.persist_task(&paths, &metadata);
974 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
975 self.insert_rehydrated_task(metadata, paths, true)?;
976 } else {
977 self.insert_rehydrated_task(metadata, paths, true)?;
978 }
979 }
980 _ if metadata.status.is_terminal() => {
981 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
987 self.insert_rehydrated_task(metadata, paths, true)?;
988 }
989 _ => {}
990 }
991 }
992
993 Ok(())
994 }
995
996 fn replay_session_from_db(
997 &self,
998 session_id: &str,
999 ) -> Option<Result<Vec<PersistedTask>, String>> {
1000 let pool = self
1001 .inner
1002 .db_pool
1003 .read()
1004 .ok()
1005 .and_then(|slot| slot.clone())?;
1006 let harness = self
1007 .inner
1008 .db_harness
1009 .read()
1010 .ok()
1011 .and_then(|slot| slot.clone())?;
1012 let conn = match pool.lock() {
1013 Ok(conn) => conn,
1014 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1015 };
1016 Some(
1017 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1018 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1019 .map_err(|error| error.to_string()),
1020 )
1021 }
1022
1023 fn replay_session_from_disk(
1024 &self,
1025 storage_dir: &Path,
1026 session_id: &str,
1027 ) -> Result<Vec<PersistedTask>, String> {
1028 let dir = session_tasks_dir(storage_dir, session_id);
1029 if !dir.exists() {
1030 return Ok(Vec::new());
1031 }
1032
1033 let entries = fs::read_dir(&dir)
1034 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1035 let mut tasks = Vec::new();
1036 for entry in entries.flatten() {
1037 let path = entry.path();
1038 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1039 continue;
1040 }
1041 match read_task(&path) {
1042 Ok(metadata) => tasks.push(metadata),
1043 Err(error) => {
1044 crate::slog_warn!(
1045 "quarantining invalid background task metadata {} during replay: {error}",
1046 path.display()
1047 );
1048 if let Err(quarantine_error) =
1049 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1050 {
1051 crate::slog_warn!(
1052 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1053 path.display()
1054 );
1055 }
1056 }
1057 }
1058 }
1059 Ok(tasks)
1060 }
1061
1062 pub fn register_watch(
1063 &self,
1064 task_id: String,
1065 pattern: WatchPattern,
1066 once: bool,
1067 ) -> Result<String, &'static str> {
1068 let task = self.task(&task_id).ok_or("task_not_found")?;
1069 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1070 .state
1071 .lock()
1072 .map(|state| {
1073 (
1074 state.metadata.mode.clone(),
1075 state.metadata.status.is_terminal(),
1076 task.paths.stdout.clone(),
1077 task.paths.stderr.clone(),
1078 task.paths.pty.clone(),
1079 )
1080 })
1081 .map_err(|_| "background_task_lock_poisoned")?;
1082
1083 let mut terminal_matches = Vec::new();
1084 let scanned_terminal = terminal_at_registration;
1085 let watch_id = {
1086 let mut registry = self
1087 .inner
1088 .watch_registry
1089 .lock()
1090 .map_err(|_| "watch_registry_poisoned")?;
1091 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1092 match &mode {
1093 BgMode::Pipes => {
1094 let stdout_key = format!("{task_id}:stdout");
1095 let stderr_key = format!("{task_id}:stderr");
1096 if terminal_at_registration {
1097 registry.set_file_cursor(&stdout_key, 0);
1098 registry.set_file_cursor(&stderr_key, 0);
1099 terminal_matches.extend(registry.scan_file_new_bytes(
1100 &stdout_key,
1101 &task_id,
1102 &stdout,
1103 ));
1104 terminal_matches.extend(registry.scan_file_new_bytes(
1105 &stderr_key,
1106 &task_id,
1107 &stderr,
1108 ));
1109 } else {
1110 registry.prime_file_cursor(&stdout_key, &stdout);
1111 registry.prime_file_cursor(&stderr_key, &stderr);
1112 }
1113 }
1114 BgMode::Pty => {
1115 let pty_key = format!("{task_id}:pty");
1116 if terminal_at_registration {
1117 registry.set_file_cursor(&pty_key, 0);
1118 terminal_matches
1119 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1120 } else {
1121 registry.prime_file_cursor(&pty_key, &pty);
1122 }
1123 }
1124 }
1125 watch_id
1126 };
1127
1128 if task.is_terminal() {
1129 if !scanned_terminal {
1130 terminal_matches = {
1131 let mut registry = self
1132 .inner
1133 .watch_registry
1134 .lock()
1135 .map_err(|_| "watch_registry_poisoned")?;
1136 match &mode {
1137 BgMode::Pipes => {
1138 let stdout_key = format!("{task_id}:stdout");
1139 let stderr_key = format!("{task_id}:stderr");
1140 registry.set_file_cursor(&stdout_key, 0);
1141 registry.set_file_cursor(&stderr_key, 0);
1142 let mut matches =
1143 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1144 matches.extend(registry.scan_file_new_bytes(
1145 &stderr_key,
1146 &task_id,
1147 &stderr,
1148 ));
1149 matches
1150 }
1151 BgMode::Pty => {
1152 let pty_key = format!("{task_id}:pty");
1153 registry.set_file_cursor(&pty_key, 0);
1154 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1155 }
1156 }
1157 };
1158 }
1159
1160 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1161 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1162 if watch_matched {
1163 let _ = task.set_completion_delivered(true, self);
1164 self.clear_task_watch_state(&task_id);
1165 }
1166 return Ok(watch_id);
1167 }
1168
1169 let completion = self
1170 .remove_pending_completion(&task_id)
1171 .or_else(|| self.completion_snapshot_for_task(&task));
1172 if terminal_matches.is_empty() {
1173 if let Some(completion) = completion.as_ref() {
1174 self.emit_bash_watch_exit(completion);
1175 }
1176 } else {
1177 for pattern_match in terminal_matches {
1178 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1179 }
1180 }
1181 let _ = task.set_completion_delivered(true, self);
1182 self.clear_task_watch_state(&task_id);
1183 }
1184
1185 Ok(watch_id)
1186 }
1187
1188 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1189 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1190 registry.unregister(task_id, watch_id);
1191 }
1192 }
1193
1194 pub fn active_watch_count(&self, task_id: &str) -> usize {
1195 self.inner
1196 .watch_registry
1197 .lock()
1198 .map(|registry| registry.active_count(task_id))
1199 .unwrap_or(0)
1200 }
1201
1202 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1203 self.inner
1204 .watch_registry
1205 .lock()
1206 .map(|registry| {
1207 (
1208 registry.has_controlled_task(task_id),
1209 registry.has_matched_task(task_id),
1210 )
1211 })
1212 .unwrap_or((false, false))
1213 }
1214
1215 fn task_has_watch_control(&self, task_id: &str) -> bool {
1216 self.inner
1217 .watch_registry
1218 .lock()
1219 .map(|registry| registry.has_controlled_task(task_id))
1220 .unwrap_or(false)
1221 }
1222
1223 fn clear_task_watch_state(&self, task_id: &str) {
1224 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1225 registry.clear_task(task_id);
1226 }
1227 }
1228
1229 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1230 let (mode, stdout, stderr, pty) = match task.state.lock() {
1231 Ok(state) => (
1232 state.metadata.mode.clone(),
1233 task.paths.stdout.clone(),
1234 task.paths.stderr.clone(),
1235 task.paths.pty.clone(),
1236 ),
1237 Err(_) => return,
1238 };
1239 let mut matches = Vec::new();
1240 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1241 match mode {
1242 BgMode::Pipes => {
1243 let stdout_key = format!("{}:stdout", task.task_id);
1244 let stderr_key = format!("{}:stderr", task.task_id);
1245 matches.extend(registry.scan_file_new_bytes(
1246 &stdout_key,
1247 &task.task_id,
1248 &stdout,
1249 ));
1250 matches.extend(registry.scan_file_new_bytes(
1251 &stderr_key,
1252 &task.task_id,
1253 &stderr,
1254 ));
1255 }
1256 BgMode::Pty => {
1257 let pty_key = format!("{}:pty", task.task_id);
1258 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1259 }
1260 }
1261 }
1262 for pattern_match in matches {
1263 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1264 }
1265 }
1266
1267 pub fn status(
1268 &self,
1269 task_id: &str,
1270 session_id: &str,
1271 project_root: Option<&Path>,
1272 storage_dir: Option<&Path>,
1273 preview_bytes: usize,
1274 ) -> Option<BgTaskSnapshot> {
1275 let mut task = self.task_for_session(task_id, session_id);
1276 if task.is_none() {
1277 if let Some(storage_dir) = storage_dir {
1278 let _ = self.replay_session(storage_dir, session_id);
1279 task = self.task_for_session(task_id, session_id);
1280 }
1281 }
1282 let Some(task) = task else {
1283 return self.status_relaxed(
1284 task_id,
1285 session_id,
1286 project_root?,
1287 storage_dir?,
1288 preview_bytes,
1289 );
1290 };
1291 let _ = self.poll_task(&task);
1292 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1293 }
1294
1295 fn status_relaxed_task(
1296 &self,
1297 task_id: &str,
1298 project_root: &Path,
1299 storage_dir: &Path,
1300 ) -> Option<Arc<BgTask>> {
1301 let canonical_project = canonicalized_path(project_root);
1302 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1303 Some(Ok(Some(metadata))) => {
1304 if let Some(task) = self.task(task_id) {
1305 let matches_project = task
1306 .state
1307 .lock()
1308 .map(|state| {
1309 state
1310 .metadata
1311 .project_root
1312 .as_deref()
1313 .map(canonicalized_path)
1314 .as_deref()
1315 == Some(canonical_project.as_path())
1316 })
1317 .unwrap_or(false);
1318 return matches_project.then_some(task);
1319 }
1320 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1321 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1322 return None;
1323 }
1324 return self.task(task_id);
1325 }
1326 Some(Ok(None)) => {
1327 crate::slog_info!(
1328 "bash task relaxed DB miss for {}; falling back to disk",
1329 task_id
1330 );
1331 }
1332 Some(Err(error)) => {
1333 crate::slog_warn!(
1334 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1335 task_id,
1336 error
1337 );
1338 }
1339 None => {
1340 crate::slog_info!(
1341 "bash task relaxed DB unavailable for {}; falling back to disk",
1342 task_id
1343 );
1344 }
1345 }
1346 let root = storage_dir.join("bash-tasks");
1347 let entries = fs::read_dir(&root).ok()?;
1348 for entry in entries.flatten() {
1349 let dir = entry.path();
1350 if !dir.is_dir() {
1351 continue;
1352 }
1353 let path = dir.join(format!("{task_id}.json"));
1354 if !path.exists() {
1355 continue;
1356 }
1357 let metadata = match read_task(&path) {
1358 Ok(metadata) => metadata,
1359 Err(error) => {
1360 crate::slog_warn!(
1361 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1362 path.display()
1363 );
1364 if let Err(quarantine_error) =
1365 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1366 {
1367 crate::slog_warn!(
1368 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1369 path.display()
1370 );
1371 }
1372 continue;
1373 }
1374 };
1375 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1376 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1377 continue;
1378 }
1379 if let Some(task) = self.task(task_id) {
1380 let matches_project = task
1381 .state
1382 .lock()
1383 .map(|state| {
1384 state
1385 .metadata
1386 .project_root
1387 .as_deref()
1388 .map(canonicalized_path)
1389 .as_deref()
1390 == Some(canonical_project.as_path())
1391 })
1392 .unwrap_or(false);
1393 return matches_project.then_some(task);
1394 }
1395 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1396 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1397 return None;
1398 }
1399 return self.task(task_id);
1400 }
1401 None
1402 }
1403
1404 fn lookup_relaxed_task_from_db(
1405 &self,
1406 task_id: &str,
1407 project_root: &Path,
1408 ) -> Option<Result<Option<PersistedTask>, String>> {
1409 let pool = self
1410 .inner
1411 .db_pool
1412 .read()
1413 .ok()
1414 .and_then(|slot| slot.clone())?;
1415 let harness = self
1416 .inner
1417 .db_harness
1418 .read()
1419 .ok()
1420 .and_then(|slot| slot.clone())?;
1421 let conn = match pool.lock() {
1422 Ok(conn) => conn,
1423 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1424 };
1425 let project_key = crate::search_index::project_cache_key(project_root);
1426 Some(
1427 crate::db::bash_tasks::find_bash_task_for_project(
1428 &conn,
1429 &harness,
1430 &project_key,
1431 task_id,
1432 )
1433 .map(|row| row.map(PersistedTask::from))
1434 .map_err(|error| error.to_string()),
1435 )
1436 }
1437
1438 pub(super) fn status_relaxed(
1439 &self,
1440 task_id: &str,
1441 _session_id: &str,
1442 project_root: &Path,
1443 storage_dir: &Path,
1444 preview_bytes: usize,
1445 ) -> Option<BgTaskSnapshot> {
1446 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1447 let _ = self.poll_task(&task);
1448 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1449 }
1450
1451 pub fn kill_relaxed(
1452 &self,
1453 task_id: &str,
1454 project_root: &Path,
1455 storage_dir: &Path,
1456 ) -> Result<BgTaskSnapshot, String> {
1457 let task = self
1458 .status_relaxed_task(task_id, project_root, storage_dir)
1459 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1460 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1461 }
1462
1463 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1464 #[cfg(test)]
1465 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1466
1467 let mut deleted = 0usize;
1468
1469 let root = storage_dir.join("bash-tasks");
1470 if root.exists() {
1471 let session_dirs = fs::read_dir(&root).map_err(|e| {
1472 format!(
1473 "failed to read background task root {}: {e}",
1474 root.display()
1475 )
1476 })?;
1477 for session_entry in session_dirs.flatten() {
1478 let session_dir = session_entry.path();
1479 if !session_dir.is_dir() {
1480 continue;
1481 }
1482 let task_entries = match fs::read_dir(&session_dir) {
1483 Ok(entries) => entries,
1484 Err(error) => {
1485 crate::slog_warn!(
1486 "failed to read background task session dir {}: {error}",
1487 session_dir.display()
1488 );
1489 continue;
1490 }
1491 };
1492 for task_entry in task_entries.flatten() {
1493 let json_path = task_entry.path();
1494 if json_path
1495 .extension()
1496 .and_then(|extension| extension.to_str())
1497 != Some("json")
1498 {
1499 continue;
1500 }
1501 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1502 continue;
1503 }
1504 let metadata = match read_task(&json_path) {
1505 Ok(metadata) => metadata,
1506 Err(error) => {
1507 crate::slog_warn!(
1508 "quarantining corrupt background task metadata {}: {error}",
1509 json_path.display()
1510 );
1511 quarantine_task_json(
1512 storage_dir,
1513 &session_dir,
1514 &json_path,
1515 QuarantineKind::Corrupt,
1516 )?;
1517 continue;
1518 }
1519 };
1520 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1521 continue;
1522 }
1523 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1524 match delete_task_bundle(&paths) {
1525 Ok(()) => {
1526 deleted += 1;
1527 log::debug!(
1528 "deleted persisted background task bundle {}",
1529 metadata.task_id
1530 );
1531 }
1532 Err(error) => {
1533 crate::slog_warn!(
1534 "failed to delete background task bundle {}: {error}",
1535 metadata.task_id
1536 );
1537 continue;
1538 }
1539 }
1540 }
1541 }
1542 }
1543 gc_quarantine(storage_dir);
1544 Ok(deleted)
1545 }
1546
1547 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1548 let tasks = self
1549 .inner
1550 .tasks
1551 .lock()
1552 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1553 .unwrap_or_default();
1554 tasks
1555 .into_iter()
1556 .map(|task| {
1557 let _ = self.poll_task(&task);
1558 self.snapshot_with_terminal_cache(&task, preview_bytes)
1559 })
1560 .collect()
1561 }
1562
1563 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1569 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1570 return;
1571 }
1572 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1573 snapshot.output_preview = cache.output_preview;
1574 snapshot.output_truncated = cache.output_truncated;
1575 }
1576 }
1577
1578 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1579 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1580 }
1581
1582 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1583 let task = self
1584 .task_for_session(task_id, session_id)
1585 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1586 let terminal_after_promote = {
1587 let mut state = task
1588 .state
1589 .lock()
1590 .map_err(|_| "background task lock poisoned".to_string())?;
1591 let updated = self
1592 .update_task_metadata(&task.paths, |metadata| {
1593 metadata.notify_on_completion = true;
1594 metadata.completion_delivered = false;
1595 })
1596 .map_err(|e| format!("failed to promote background task: {e}"))?;
1597 state.metadata = updated;
1598 state.metadata.status.is_terminal()
1599 };
1600 if terminal_after_promote {
1601 self.post_terminal_transition(&task, true)?;
1602 }
1603 Ok(true)
1604 }
1605
1606 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1607 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1608 .map(|_| ())
1609 }
1610
1611 pub fn cleanup_finished(&self, older_than: Duration) {
1612 let cutoff = Instant::now().checked_sub(older_than);
1613 let removable_paths: Vec<(String, TaskPaths)> =
1614 if let Ok(mut tasks) = self.inner.tasks.lock() {
1615 let removable = tasks
1616 .iter()
1617 .filter_map(|(task_id, task)| {
1618 let delivered_terminal = task
1619 .state
1620 .lock()
1621 .map(|state| {
1622 state.metadata.status.is_terminal()
1623 && state.metadata.completion_delivered
1624 })
1625 .unwrap_or(false);
1626 if !delivered_terminal {
1627 return None;
1628 }
1629
1630 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1631 let expired = match (terminal_at, cutoff) {
1632 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1633 (Some(_), None) => true,
1634 (None, _) => false,
1635 };
1636 expired.then(|| task_id.clone())
1637 })
1638 .collect::<Vec<_>>();
1639
1640 removable
1641 .into_iter()
1642 .filter_map(|task_id| {
1643 tasks
1644 .remove(&task_id)
1645 .map(|task| (task_id, task.paths.clone()))
1646 })
1647 .collect()
1648 } else {
1649 Vec::new()
1650 };
1651
1652 for (task_id, paths) in removable_paths {
1653 match delete_task_bundle(&paths) {
1654 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1655 Err(error) => crate::slog_warn!(
1656 "failed to delete persisted background task bundle {task_id}: {error}"
1657 ),
1658 }
1659 }
1660 }
1661
1662 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1663 self.drain_completions_for_session(None)
1664 }
1665
1666 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1667 let completions = match self.inner.completions.lock() {
1668 Ok(completions) => completions,
1669 Err(_) => return Vec::new(),
1670 };
1671
1672 completions
1673 .iter()
1674 .filter(|completion| completion_matches_session(completion, session_id))
1675 .cloned()
1676 .collect()
1677 }
1678
1679 pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1680 match self.inner.completions.lock() {
1681 Ok(completions) => completions
1682 .iter()
1683 .any(|completion| completion_matches_session(completion, session_id)),
1684 Err(_) => true,
1688 }
1689 }
1690
1691 pub fn ack_completions_for_session(
1692 &self,
1693 session_id: Option<&str>,
1694 task_ids: &[String],
1695 ) -> Vec<String> {
1696 if task_ids.is_empty() {
1697 return Vec::new();
1698 }
1699 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1700 let mut completion_sessions = HashMap::new();
1701 if let Ok(mut completions) = self.inner.completions.lock() {
1702 completions.retain(|completion| {
1703 let session_matches = session_id
1704 .map(|session_id| completion.session_id == session_id)
1705 .unwrap_or(true);
1706 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1707 completion_sessions
1708 .insert(completion.task_id.clone(), completion.session_id.clone());
1709 false
1710 } else {
1711 true
1712 }
1713 });
1714 }
1715
1716 let mut delivered = Vec::new();
1717 for task_id in task_ids {
1718 let task = if let Some(session_id) = session_id {
1719 self.task_for_session(task_id, session_id)
1720 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1721 self.task_for_session(task_id, completion_session_id)
1722 } else {
1723 self.task(task_id)
1724 };
1725 if let Some(task) = task {
1726 if task.set_completion_delivered(true, self).is_ok() {
1727 delivered.push(task_id.clone());
1728 }
1729 }
1730 }
1731
1732 delivered
1733 }
1734
1735 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1736 self.inner
1737 .completions
1738 .lock()
1739 .map(|completions| {
1740 completions
1741 .iter()
1742 .filter(|completion| completion.session_id == session_id)
1743 .cloned()
1744 .collect()
1745 })
1746 .unwrap_or_default()
1747 }
1748
1749 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1750 let mut completions = self.inner.completions.lock().ok()?;
1751 let idx = completions
1752 .iter()
1753 .position(|completion| completion.task_id == task_id)?;
1754 completions.remove(idx)
1755 }
1756
1757 fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1758 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1759 if !snapshot.info.status.is_terminal() {
1760 return None;
1761 }
1762 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1763 (String::new(), false)
1764 } else {
1765 self.ensure_terminal_output_cache(task)
1766 .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1767 .unwrap_or_else(|| (String::new(), false))
1768 };
1769 Some(BgCompletion {
1770 task_id: snapshot.info.task_id,
1771 session_id: task.session_id.clone(),
1772 status: snapshot.info.status,
1773 exit_code: snapshot.exit_code,
1774 command: snapshot.info.command,
1775 output_preview,
1776 output_truncated,
1777 original_tokens: None,
1778 compressed_tokens: None,
1779 tokens_skipped: false,
1780 })
1781 }
1782
1783 pub fn detach(&self) {
1784 self.inner.shutdown.store(true, Ordering::SeqCst);
1785 if let Ok(mut tasks) = self.inner.tasks.lock() {
1786 for task in tasks.values() {
1787 if let Ok(mut state) = task.state.lock() {
1788 match &mut state.runtime {
1789 TaskRuntime::Piped(child) => *child = None,
1790 TaskRuntime::Pty(runtime) => *runtime = None,
1791 }
1792 state.detached = true;
1793 }
1794 }
1795 tasks.clear();
1796 }
1797 }
1798
1799 pub fn shutdown(&self) {
1800 let tasks = self
1801 .inner
1802 .tasks
1803 .lock()
1804 .map(|tasks| {
1805 tasks
1806 .values()
1807 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1808 .collect::<Vec<_>>()
1809 })
1810 .unwrap_or_default();
1811 for (task_id, session_id) in tasks {
1812 let _ = self.kill(&task_id, &session_id);
1813 }
1814 }
1815
1816 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1817 if let Ok(state) = task.state.lock() {
1818 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1819 if !pty.exit_observed.load(Ordering::SeqCst) {
1827 return Ok(());
1828 }
1829 }
1830 }
1831 let marker = match read_exit_marker(&task.paths.exit) {
1832 Ok(Some(marker)) => marker,
1833 Ok(None) => return Ok(()),
1834 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1835 };
1836 self.finalize_from_marker(task, marker, None)
1837 }
1838
1839 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1840 let mut needs_completion = false;
1841 {
1842 let Ok(mut state) = task.state.lock() else {
1843 return;
1844 };
1845 match &mut state.runtime {
1846 TaskRuntime::Piped(child_slot) => {
1847 if let Some(child) = child_slot.as_mut() {
1848 if matches!(child.try_wait(), Ok(Some(_))) {
1849 *child_slot = None;
1850 state.detached = true;
1851 state.child_exit_observed = true;
1852 }
1853 } else if state.detached {
1854 let child_known_dead = state.child_exit_observed
1855 || state
1856 .metadata
1857 .child_pid
1858 .is_some_and(|pid| !is_process_alive(pid));
1859 if child_known_dead {
1860 needs_completion =
1861 self.fail_without_exit_marker_if_needed(task, &mut state);
1862 }
1863 }
1864 }
1865 TaskRuntime::Pty(Some(pty)) => {
1866 if pty.exit_observed.load(Ordering::SeqCst) {
1867 drop(state);
1868 let _ = self.poll_task(task);
1869 return;
1870 }
1871 }
1872 TaskRuntime::Pty(None) => {}
1873 }
1874 }
1875 if needs_completion {
1876 let _ = self.post_terminal_transition(task, true);
1877 }
1878 }
1879
1880 fn fail_without_exit_marker_if_needed(
1881 &self,
1882 task: &Arc<BgTask>,
1883 state: &mut BgTaskState,
1884 ) -> bool {
1885 if state.metadata.status.is_terminal() {
1886 return false;
1887 }
1888 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1889 return false;
1890 }
1891 let watch_controlled = self.task_has_watch_control(&task.task_id);
1892 let updated = self.update_task_metadata(&task.paths, |metadata| {
1893 metadata.mark_terminal(
1894 BgTaskStatus::Failed,
1895 None,
1896 Some("process exited without exit marker".to_string()),
1897 );
1898 if watch_controlled {
1899 metadata.completion_delivered = true;
1900 }
1901 });
1902 if let Ok(metadata) = updated {
1903 state.pending_terminal_override = None;
1904 state.metadata = metadata;
1905 task.mark_terminal_now();
1906 return true;
1907 }
1908 false
1909 }
1910
1911 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1912 self.inner
1913 .tasks
1914 .lock()
1915 .map(|tasks| {
1916 tasks
1917 .values()
1918 .filter(|task| task.is_running())
1919 .cloned()
1920 .collect()
1921 })
1922 .unwrap_or_default()
1923 }
1924
1925 fn insert_rehydrated_task(
1926 &self,
1927 metadata: PersistedTask,
1928 paths: TaskPaths,
1929 detached: bool,
1930 ) -> Result<(), String> {
1931 let task_id = metadata.task_id.clone();
1932 let session_id = metadata.session_id.clone();
1933 let started = started_instant_from_unix_millis(metadata.started_at);
1934 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1935 let mode = metadata.mode.clone();
1936 let task = Arc::new(BgTask {
1937 task_id: task_id.clone(),
1938 session_id,
1939 paths: paths.clone(),
1940 started,
1941 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1942 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1943 state: Mutex::new(BgTaskState {
1944 metadata,
1945 runtime: if mode == BgMode::Pty {
1946 TaskRuntime::Pty(None)
1947 } else {
1948 TaskRuntime::Piped(None)
1949 },
1950 detached,
1951 child_exit_observed: false,
1958 buffer: if mode == BgMode::Pty {
1959 BgBuffer::pty(paths.pty.clone())
1960 } else {
1961 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1962 },
1963 terminal_output_cache: None,
1964 pending_terminal_override: None,
1965 }),
1966 });
1967 self.inner
1968 .tasks
1969 .lock()
1970 .map_err(|_| "background task registry lock poisoned".to_string())?
1971 .insert(task_id, task);
1972 Ok(())
1973 }
1974
1975 fn kill_with_status(
1976 &self,
1977 task_id: &str,
1978 session_id: &str,
1979 terminal_status: BgTaskStatus,
1980 ) -> Result<BgTaskSnapshot, String> {
1981 let task = self
1982 .task_for_session(task_id, session_id)
1983 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1984 let mut terminalized = false;
1985
1986 {
1987 let mut state = task
1988 .state
1989 .lock()
1990 .map_err(|_| "background task lock poisoned".to_string())?;
1991 if state.metadata.status.is_terminal() {
1992 state.pending_terminal_override = None;
1993 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1994 state.metadata =
1995 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1996 if self.task_has_watch_control(&task.task_id) {
1997 state.metadata.completion_delivered = true;
1998 }
1999 state.pending_terminal_override = None;
2000 task.mark_terminal_now();
2001 match &mut state.runtime {
2002 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2009 TaskRuntime::Pty(runtime) => *runtime = None,
2010 }
2011 state.detached = true;
2012 self.persist_task(&task.paths, &state.metadata)
2013 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2014 terminalized = true;
2015 } else {
2016 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2017 if !was_already_killing {
2018 state.metadata.status = BgTaskStatus::Killing;
2019 self.persist_task(&task.paths, &state.metadata)
2020 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2021 }
2022
2023 #[cfg(unix)]
2024 let pgid = state.metadata.pgid;
2025 #[cfg(windows)]
2026 let child_pid = state.metadata.child_pid;
2027 if !was_already_killing
2028 && state.metadata.mode == BgMode::Pty
2029 && terminal_status == BgTaskStatus::TimedOut
2030 {
2031 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2032 }
2033
2034 #[cfg(windows)]
2035 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2036
2037 match &mut state.runtime {
2038 TaskRuntime::Piped(child_slot) => {
2039 #[cfg(unix)]
2040 if let Some(pgid) = pgid {
2041 terminate_pgid(pgid, child_slot.as_mut());
2042 }
2043 #[cfg(windows)]
2044 if let Some(child) = child_slot.as_mut() {
2045 super::process::terminate_process(child);
2046 } else if let Some(pid) = child_pid {
2047 terminate_pid(pid);
2048 }
2049 if let Some(child) = child_slot.as_mut() {
2050 let _ = child.wait();
2051 }
2052 *child_slot = None;
2053 state.detached = true;
2054
2055 if !task.paths.exit.exists() {
2056 write_kill_marker_if_absent(&task.paths.exit)
2057 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2058 }
2059
2060 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2061 Some(124)
2062 } else {
2063 None
2064 };
2065 state
2066 .metadata
2067 .mark_terminal(terminal_status, exit_code, None);
2068 if self.task_has_watch_control(&task.task_id) {
2069 state.metadata.completion_delivered = true;
2070 }
2071 state.pending_terminal_override = None;
2072 task.mark_terminal_now();
2073 self.persist_task(&task.paths, &state.metadata)
2074 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2075 terminalized = true;
2076 }
2077 TaskRuntime::Pty(Some(pty)) => {
2078 pty.was_killed.store(true, Ordering::SeqCst);
2079 if let Err(error) = pty.killer.kill() {
2080 crate::slog_warn!(
2081 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2082 );
2083 }
2084 if let Some(pid) = pty.child_pid {
2085 #[cfg(unix)]
2086 terminate_pgid(pid as i32, None);
2087 #[cfg(windows)]
2088 terminate_pid(pid);
2089 }
2090 drop(pty.master.take());
2091
2092 #[cfg(windows)]
2093 {
2094 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2095 BgTaskStatus::TimedOut
2096 } else {
2097 BgTaskStatus::Killed
2098 };
2099 pty_forced_terminal_status = Some(
2100 state
2101 .pending_terminal_override
2102 .take()
2103 .unwrap_or(default_status),
2104 );
2105 }
2106 }
2107 TaskRuntime::Pty(None) => {}
2108 }
2109
2110 #[cfg(windows)]
2111 if let Some(target_status) = pty_forced_terminal_status {
2112 if !task.paths.exit.exists() {
2113 write_kill_marker_if_absent(&task.paths.exit)
2114 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2115 }
2116
2117 let exit_code = if target_status == BgTaskStatus::TimedOut {
2118 Some(124)
2119 } else {
2120 None
2121 };
2122 state.metadata.mark_terminal(target_status, exit_code, None);
2123 if self.task_has_watch_control(&task.task_id) {
2124 state.metadata.completion_delivered = true;
2125 }
2126 state.pending_terminal_override = None;
2127 task.mark_terminal_now();
2128 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2129 *runtime = None;
2130 }
2131 state.detached = true;
2132 self.persist_task(&task.paths, &state.metadata)
2133 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2134 terminalized = true;
2135 }
2136 }
2137 }
2138
2139 if terminalized {
2140 self.post_terminal_transition(&task, true)?;
2141 }
2142 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2143 }
2144
2145 fn finalize_from_marker(
2146 &self,
2147 task: &Arc<BgTask>,
2148 marker: ExitMarker,
2149 reason: Option<String>,
2150 ) -> Result<(), String> {
2151 let watch_controlled = self.task_has_watch_control(&task.task_id);
2152 let mut pty_reader_done = None;
2153 {
2154 let mut state = task
2155 .state
2156 .lock()
2157 .map_err(|_| "background task lock poisoned".to_string())?;
2158 if state.metadata.status.is_terminal() {
2159 state.pending_terminal_override = None;
2160 return Ok(());
2161 }
2162
2163 let pending_override = state.pending_terminal_override.take();
2164 let is_pty = state.metadata.mode == BgMode::Pty;
2165 let updated = self
2166 .update_task_metadata(&task.paths, |metadata| {
2167 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2168 let mut metadata = metadata.clone();
2169 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2170 let exit_code = if target_status == BgTaskStatus::TimedOut {
2171 Some(124)
2172 } else {
2173 None
2174 };
2175 metadata.mark_terminal(target_status, exit_code, reason);
2176 metadata
2177 } else {
2178 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2179 };
2180 if watch_controlled {
2181 new_metadata.completion_delivered = true;
2182 }
2183 *metadata = new_metadata;
2184 })
2185 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2186 state.metadata = updated;
2187 task.mark_terminal_now();
2188 match &mut state.runtime {
2189 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2194 TaskRuntime::Pty(runtime) => {
2195 pty_reader_done = runtime
2196 .as_ref()
2197 .map(|runtime| Arc::clone(&runtime.reader_done));
2198 *runtime = None;
2199 }
2200 }
2201 state.detached = true;
2202 }
2203
2204 if let Some(reader_done) = pty_reader_done {
2205 let deadline = Instant::now() + Duration::from_millis(200);
2206 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2207 std::thread::sleep(Duration::from_millis(10));
2208 }
2209 }
2210
2211 self.scan_task_watch_output(task);
2214
2215 self.post_terminal_transition(task, true)
2216 }
2217
2218 fn enqueue_completion_if_needed(
2219 &self,
2220 metadata: &PersistedTask,
2221 paths: Option<&TaskPaths>,
2222 emit_frame: bool,
2223 ) {
2224 if metadata.status.is_terminal() && !metadata.completion_delivered {
2225 let cache =
2226 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2227 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2228 }
2229 }
2230
2231 fn render_terminal_output_from_paths(
2232 &self,
2233 metadata: &PersistedTask,
2234 paths: &TaskPaths,
2235 ) -> Option<TerminalOutputCache> {
2236 if metadata.mode == BgMode::Pty {
2237 return None;
2238 }
2239 let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2240 Some(self.render_terminal_output(metadata, &buffer))
2241 }
2242
2243 fn enqueue_completion_from_parts(
2244 &self,
2245 metadata: &PersistedTask,
2246 buffer: Option<&BgBuffer>,
2247 paths: Option<&TaskPaths>,
2248 emit_frame: bool,
2249 terminal_render: Option<&TerminalOutputCache>,
2250 ) {
2251 if !metadata.status.is_terminal() {
2262 return;
2263 }
2264
2265 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2266 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2267 } else {
2268 None
2269 };
2270 let render_buffer = buffer.or(owned_buffer.as_ref());
2271 let owned_render = if terminal_render.is_none() {
2272 render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2273 } else {
2274 None
2275 };
2276 let render = terminal_render.or(owned_render.as_ref());
2277
2278 let (output_preview, output_truncated) = render
2282 .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2283 .unwrap_or_else(|| (String::new(), false));
2284
2285 let token_counts = self.completion_token_counts(
2286 metadata,
2287 buffer,
2288 paths,
2289 render.map(|render| render.output_preview.as_str()),
2290 );
2291 let completion = BgCompletion {
2292 task_id: metadata.task_id.clone(),
2293 session_id: metadata.session_id.clone(),
2294 status: metadata.status.clone(),
2295 exit_code: metadata.exit_code,
2296 command: metadata.command.clone(),
2297 output_preview,
2298 output_truncated,
2299 original_tokens: token_counts.original_tokens,
2300 compressed_tokens: token_counts.compressed_tokens,
2301 tokens_skipped: token_counts.tokens_skipped,
2302 };
2303
2304 self.record_compression_event_if_applicable(metadata, &token_counts);
2315
2316 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2317 if watch_controlled {
2318 if emit_frame && !watch_matched {
2319 self.emit_bash_watch_exit(&completion);
2320 }
2321 self.clear_task_watch_state(&metadata.task_id);
2322 return;
2323 }
2324
2325 if metadata.completion_delivered {
2335 return;
2336 }
2337
2338 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2341 if completions
2342 .iter()
2343 .any(|existing| existing.task_id == metadata.task_id)
2344 {
2345 false
2346 } else {
2347 completions.push_back(completion.clone());
2348 true
2349 }
2350 } else {
2351 false
2352 };
2353
2354 if pushed && emit_frame {
2355 self.emit_bash_completed(completion);
2356 }
2357 }
2358
2359 fn record_compression_event_if_applicable(
2360 &self,
2361 metadata: &PersistedTask,
2362 token_counts: &CompletionTokenCounts,
2363 ) {
2364 if metadata.mode == BgMode::Pty {
2365 return;
2366 }
2367
2368 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2369 token_counts.original_tokens,
2370 token_counts.compressed_tokens,
2371 token_counts.original_bytes,
2372 token_counts.compressed_bytes,
2373 ) {
2374 (
2375 Some(original_tokens),
2376 Some(compressed_tokens),
2377 Some(original_bytes),
2378 Some(compressed_bytes),
2379 ) => (
2380 original_tokens,
2381 compressed_tokens,
2382 original_bytes,
2383 compressed_bytes,
2384 ),
2385 _ => {
2386 crate::slog_warn!(
2387 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2388 metadata.task_id
2389 );
2390 return;
2391 }
2392 };
2393
2394 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2395 let Some(pool) = pool else {
2396 crate::slog_warn!(
2397 "compression event skipped for {}: db_pool not initialized — was configure run?",
2398 metadata.task_id
2399 );
2400 return;
2401 };
2402 let harness = self
2403 .inner
2404 .db_harness
2405 .read()
2406 .ok()
2407 .and_then(|slot| slot.clone());
2408 let Some(harness) = harness else {
2409 crate::slog_warn!(
2410 "compression event insert skipped for {}: harness not configured",
2411 metadata.task_id
2412 );
2413 return;
2414 };
2415
2416 let project_root = metadata
2417 .project_root
2418 .as_deref()
2419 .unwrap_or(&metadata.workdir);
2420 let project_key = crate::search_index::project_cache_key(project_root);
2421 let row = crate::db::compression_events::CompressionEventRow {
2422 harness: &harness,
2423 session_id: Some(&metadata.session_id),
2424 project_key: &project_key,
2425 tool: "bash",
2426 task_id: Some(&metadata.task_id),
2427 command: Some(&metadata.command),
2428 compressor: if metadata.compressed {
2429 "registry"
2430 } else {
2431 "none"
2432 },
2433 original_bytes,
2434 compressed_bytes,
2435 original_tokens,
2436 compressed_tokens,
2437 created_at: unix_millis() as i64,
2438 };
2439
2440 let conn = match pool.lock() {
2441 Ok(conn) => conn,
2442 Err(_) => {
2443 crate::slog_warn!(
2444 "compression event insert failed for {}: db mutex poisoned",
2445 metadata.task_id
2446 );
2447 return;
2448 }
2449 };
2450 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2451 Ok(_) => {
2452 crate::slog_debug!(
2456 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2457 metadata.task_id,
2458 project_key,
2459 metadata.session_id,
2460 original_tokens,
2461 compressed_tokens
2462 );
2463 }
2464 Err(error) => {
2465 crate::slog_warn!(
2466 "compression event insert failed for {}: {}",
2467 metadata.task_id,
2468 error
2469 );
2470 }
2471 }
2472 }
2473
2474 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2475 let Ok(progress_sender) = self
2476 .inner
2477 .progress_sender
2478 .lock()
2479 .map(|sender| sender.clone())
2480 else {
2481 return;
2482 };
2483 if let Some(sender) = progress_sender.as_ref() {
2484 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2485 pattern_match.task_id,
2486 session_id.to_string(),
2487 pattern_match.watch_id,
2488 pattern_match.match_text,
2489 pattern_match.match_offset,
2490 pattern_match.context,
2491 pattern_match.once,
2492 )));
2493 }
2494 }
2495
2496 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2497 let Ok(progress_sender) = self
2498 .inner
2499 .progress_sender
2500 .lock()
2501 .map(|sender| sender.clone())
2502 else {
2503 return;
2504 };
2505 let Some(sender) = progress_sender.as_ref() else {
2506 return;
2507 };
2508 let status = completion_status_text(&completion.status, completion.exit_code);
2509 let preview = completion.output_preview.trim_end();
2510 let context = if preview.is_empty() {
2511 format!("task {} exited ({status})", completion.task_id)
2512 } else {
2513 format!(
2514 "task {} exited ({status})
2515{preview}",
2516 completion.task_id
2517 )
2518 };
2519 sender(PushFrame::BashPatternMatch(
2520 BashPatternMatchFrame::task_exit(
2521 completion.task_id.clone(),
2522 completion.session_id.clone(),
2523 format!("exited ({status})"),
2524 context,
2525 ),
2526 ));
2527 }
2528
2529 fn emit_bash_completed(&self, completion: BgCompletion) {
2530 let Ok(progress_sender) = self
2531 .inner
2532 .progress_sender
2533 .lock()
2534 .map(|sender| sender.clone())
2535 else {
2536 return;
2537 };
2538 let Some(sender) = progress_sender.as_ref() else {
2539 return;
2540 };
2541 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2549 completion.task_id,
2550 completion.session_id,
2551 completion.status,
2552 completion.exit_code,
2553 completion.command,
2554 completion.output_preview,
2555 completion.output_truncated,
2556 completion.original_tokens,
2557 completion.compressed_tokens,
2558 completion.tokens_skipped,
2559 )));
2560 }
2561
2562 fn completion_token_counts(
2563 &self,
2564 metadata: &PersistedTask,
2565 buffer: Option<&BgBuffer>,
2566 paths: Option<&TaskPaths>,
2567 rendered_output: Option<&str>,
2568 ) -> CompletionTokenCounts {
2569 if metadata.mode == BgMode::Pty {
2570 return CompletionTokenCounts::skipped();
2571 }
2572
2573 let raw = match buffer {
2574 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2575 None => paths
2576 .map(|paths| {
2577 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2578 })
2579 .unwrap_or(TokenCountInput::Skipped),
2580 };
2581
2582 let TokenCountInput::Text(raw_output) = raw else {
2583 return CompletionTokenCounts::skipped();
2584 };
2585
2586 let original_tokens = token_count_u32(&raw_output);
2587 let original_bytes = raw_output.len() as i64;
2588 let compressed_output = rendered_output.unwrap_or(&raw_output);
2589 let compressed_tokens = token_count_u32(compressed_output);
2590 let compressed_bytes = compressed_output.len() as i64;
2591 CompletionTokenCounts {
2592 original_tokens: Some(original_tokens),
2593 compressed_tokens: Some(compressed_tokens),
2594 original_bytes: Some(original_bytes),
2595 compressed_bytes: Some(compressed_bytes),
2596 tokens_skipped: false,
2597 }
2598 }
2599
2600 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2601 if !self
2602 .inner
2603 .long_running_reminder_enabled
2604 .load(Ordering::SeqCst)
2605 {
2606 return;
2607 }
2608 let interval_ms = self
2609 .inner
2610 .long_running_reminder_interval_ms
2611 .load(Ordering::SeqCst);
2612 if interval_ms == 0 {
2613 return;
2614 }
2615 let interval = Duration::from_millis(interval_ms);
2616 let now = Instant::now();
2617 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2618 return;
2619 };
2620 let since = last_reminder_at.unwrap_or(task.started);
2621 if now.duration_since(since) < interval {
2622 return;
2623 }
2624 let command = task
2625 .state
2626 .lock()
2627 .map(|state| state.metadata.command.clone())
2628 .unwrap_or_default();
2629 *last_reminder_at = Some(now);
2630 self.emit_bash_long_running(BashLongRunningFrame::new(
2631 task.task_id.clone(),
2632 task.session_id.clone(),
2633 command,
2634 task.started.elapsed().as_millis() as u64,
2635 ));
2636 }
2637
2638 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2639 let Ok(progress_sender) = self
2640 .inner
2641 .progress_sender
2642 .lock()
2643 .map(|sender| sender.clone())
2644 else {
2645 return;
2646 };
2647 if let Some(sender) = progress_sender.as_ref() {
2648 sender(PushFrame::BashLongRunning(frame));
2649 }
2650 }
2651
2652 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2653 self.inner
2654 .tasks
2655 .lock()
2656 .ok()
2657 .and_then(|tasks| tasks.get(task_id).cloned())
2658 }
2659
2660 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2661 self.task(task_id)
2662 .filter(|task| task.session_id == session_id)
2663 }
2664
2665 fn running_count(&self) -> usize {
2666 self.inner
2667 .tasks
2668 .lock()
2669 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2670 .unwrap_or(0)
2671 }
2672
2673 fn start_watchdog(&self) {
2674 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2675 super::watchdog::start(self.clone());
2676 }
2677 }
2678
2679 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2680 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2681 }
2682
2683 #[cfg(test)]
2684 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2685 self.task_for_session(task_id, session_id)
2686 .map(|task| task.paths.json.clone())
2687 }
2688
2689 #[cfg(test)]
2690 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2691 self.task_for_session(task_id, session_id)
2692 .map(|task| task.paths.exit.clone())
2693 }
2694
2695 fn generate_unique_task_id(&self) -> Result<String, String> {
2697 for _ in 0..32 {
2698 let candidate = random_slug();
2699 let tasks = self
2700 .inner
2701 .tasks
2702 .lock()
2703 .map_err(|_| "background task registry lock poisoned".to_string())?;
2704 if tasks.contains_key(&candidate) {
2705 continue;
2706 }
2707 let completions = self
2708 .inner
2709 .completions
2710 .lock()
2711 .map_err(|_| "background completions lock poisoned".to_string())?;
2712 if completions
2713 .iter()
2714 .any(|completion| completion.task_id == candidate)
2715 {
2716 continue;
2717 }
2718 return Ok(candidate);
2719 }
2720 Err("failed to allocate unique background task id after 32 attempts".to_string())
2721 }
2722}
2723
2724fn render_compressed_with_recovery(
2725 buffer: &BgBuffer,
2726 mut compressed: CompressionResult,
2727 input_truncated: bool,
2728) -> TerminalOutputCache {
2729 let had_trailing_newline = compressed.text.ends_with('\n');
2737 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2738 .trim_end()
2739 .to_string();
2740 if had_trailing_newline && !text.is_empty() {
2741 text.push('\n');
2742 }
2743 compressed.text = text;
2744
2745 let output_path = buffer.output_path().map(|path| path.display().to_string());
2746 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2747 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2748 let mut recovery = RecoveryContext {
2749 dropped_by_class: compressed.dropped_by_class,
2750 had_inner_drop: compressed.had_inner_drop,
2751 offset_hint_eligible: compressed.offset_hint_eligible,
2752 offset_start_line: compressed.offset_start_line,
2753 byte_truncated: input_truncated,
2754 output_path: output_path.clone(),
2755 stderr_path: stderr_path.clone(),
2756 include_stderr_path,
2757 };
2758
2759 let (output_preview, output_truncated) =
2760 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2761 TerminalOutputCache {
2762 output_preview,
2763 output_truncated,
2764 kind: TerminalOutputKind::Compressed,
2765 output_path,
2766 stderr_path,
2767 recovery: Some(recovery),
2768 }
2769}
2770
2771fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2772 render_body_with_recovery_marker_at_cap(
2773 body,
2774 recovery,
2775 FINAL_OUTPUT_CAP_BYTES,
2776 cap_final_output,
2777 cap_final_output_with_marker,
2778 )
2779}
2780
2781fn render_raw_body_with_recovery_marker(
2782 body: &str,
2783 recovery: &mut RecoveryContext,
2784) -> (String, bool) {
2785 render_body_with_recovery_marker_at_cap(
2786 body,
2787 recovery,
2788 RAW_PASSTHROUGH_CAP_BYTES,
2789 |input| {
2790 super::output::cap_head_tail(
2791 input,
2792 RAW_PASSTHROUGH_CAP_BYTES,
2793 RAW_PASSTHROUGH_HEAD_BYTES,
2794 RAW_PASSTHROUGH_TAIL_BYTES,
2795 )
2796 },
2797 |input, marker| {
2798 super::output::cap_head_tail_with_marker(
2799 input,
2800 RAW_PASSTHROUGH_CAP_BYTES,
2801 RAW_PASSTHROUGH_HEAD_BYTES,
2802 RAW_PASSTHROUGH_TAIL_BYTES,
2803 marker,
2804 )
2805 },
2806 )
2807}
2808
2809fn render_body_with_recovery_marker_at_cap<F, G>(
2810 body: &str,
2811 recovery: &mut RecoveryContext,
2812 cap_bytes: usize,
2813 cap_plain: F,
2814 cap_with_marker: G,
2815) -> (String, bool)
2816where
2817 F: Fn(&str) -> super::output::CappedText,
2818 G: Fn(&str, &str) -> super::output::CappedText,
2819{
2820 let needs_marker = recovery.has_visible_drop();
2821 if body.len() > cap_bytes {
2822 recovery.byte_truncated = true;
2823 if let Some(marker) = recovery_marker(recovery) {
2824 let capped = cap_with_marker(body, &marker);
2825 return (capped.text, true);
2826 }
2827 let capped = cap_plain(body);
2828 return (capped.text, capped.truncated || needs_marker);
2829 }
2830
2831 if !needs_marker {
2832 return (body.to_string(), false);
2833 }
2834
2835 let Some(marker) = recovery_marker(recovery) else {
2836 return (body.to_string(), true);
2837 };
2838 let with_marker = append_recovery_marker(body, &marker);
2839 if with_marker.len() <= cap_bytes {
2840 return (with_marker, true);
2841 }
2842
2843 recovery.byte_truncated = true;
2844 let marker = recovery_marker(recovery).unwrap_or(marker);
2845 let capped = cap_with_marker(body, &marker);
2846 (capped.text, true)
2847}
2848
2849fn append_recovery_marker(body: &str, marker: &str) -> String {
2850 if body.is_empty() {
2851 return marker.to_string();
2852 }
2853 let mut output = body.trim_end().to_string();
2854 output.push('\n');
2855 output.push_str(marker);
2856 output
2857}
2858
2859fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2860 let mut parts = Vec::new();
2861 for (class, count) in &recovery.dropped_by_class {
2862 let label = if *count == 1 {
2863 class.singular()
2864 } else {
2865 class.plural()
2866 };
2867 parts.push(format!("+{count} more {label}"));
2868 }
2869 if recovery.byte_truncated {
2870 parts.push("truncated output".to_string());
2871 } else if recovery.had_inner_drop && parts.is_empty() {
2872 parts.push("omitted output".to_string());
2873 }
2874
2875 if parts.is_empty() {
2876 return None;
2877 }
2878
2879 let hint = recovery_hint(recovery);
2880 Some(format!("[{}; {hint}]", parts.join(", ")))
2881}
2882
2883fn recovery_hint(recovery: &RecoveryContext) -> String {
2884 if recovery.offset_hint_eligible
2888 && !recovery.byte_truncated
2889 && recovery.dropped_by_class.is_empty()
2890 && !recovery.include_stderr_path
2891 {
2892 if let (Some(path), Some(line)) =
2893 (recovery.output_path.as_deref(), recovery.offset_start_line)
2894 {
2895 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2896 }
2897 }
2898
2899 let mut paths = Vec::new();
2900 if let Some(path) = recovery.output_path.as_deref() {
2901 paths.push(path);
2902 }
2903 if recovery.include_stderr_path {
2904 if let Some(path) = recovery.stderr_path.as_deref() {
2905 if !paths.contains(&path) {
2906 paths.push(path);
2907 }
2908 }
2909 }
2910
2911 if paths.is_empty() {
2912 return "full output unavailable".to_string();
2913 }
2914
2915 let reads = paths
2916 .into_iter()
2917 .map(|path| format!("read {}", quote_path(path)))
2918 .collect::<Vec<_>>()
2919 .join(" and ");
2920 format!("full output: {reads}")
2921}
2922
2923fn strip_plain_truncation_marker_lines(input: &str) -> String {
2924 input
2925 .lines()
2926 .filter(|line| !is_plain_truncation_marker(line.trim()))
2927 .collect::<Vec<_>>()
2928 .join("\n")
2929}
2930
2931fn strip_recovery_marker_lines(input: &str) -> String {
2932 input
2933 .lines()
2934 .filter(|line| !is_recovery_marker(line.trim()))
2935 .collect::<Vec<_>>()
2936 .join("\n")
2937}
2938
2939fn is_plain_truncation_marker(line: &str) -> bool {
2940 let Some(rest) = line.strip_prefix("...<truncated ") else {
2941 return false;
2942 };
2943 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2944 return false;
2945 };
2946 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2947}
2948
2949fn is_recovery_marker(line: &str) -> bool {
2950 line.starts_with('[')
2951 && line.ends_with(']')
2952 && (line.contains("full output: read ")
2953 || line.contains("see remaining: tail -n +")
2954 || line.contains("full output unavailable"))
2955}
2956
2957fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2958 if !is_gh_structured_command(command) {
2959 return None;
2960 }
2961
2962 let output_path = buffer
2963 .output_path()
2964 .map(|path| path.display().to_string())?;
2965 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2966 if stdout_bytes == 0 {
2967 return None;
2968 }
2969
2970 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2971 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2972 return None;
2973 }
2974 return Some(TerminalOutputCache {
2975 output_preview: json_output_pointer(stdout_bytes, &output_path),
2976 output_truncated: true,
2977 kind: TerminalOutputKind::Structured,
2978 output_path: Some(output_path),
2979 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2980 recovery: None,
2981 });
2982 }
2983
2984 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2985 if stdout.truncated || !is_structured_body(&stdout.text) {
2986 return None;
2987 }
2988
2989 Some(TerminalOutputCache {
2990 output_preview: stdout.text,
2991 output_truncated: false,
2992 kind: TerminalOutputKind::Structured,
2993 output_path: Some(output_path),
2994 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2995 recovery: None,
2996 })
2997}
2998
2999fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
3000 let raw = buffer.read_combined_head_tail(
3001 RAW_PASSTHROUGH_CAP_BYTES,
3002 RAW_PASSTHROUGH_HEAD_BYTES,
3003 RAW_PASSTHROUGH_TAIL_BYTES,
3004 );
3005 let output_path = buffer.output_path().map(|path| path.display().to_string());
3006 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3007 if !raw.truncated {
3008 return TerminalOutputCache {
3009 output_preview: raw.text,
3010 output_truncated: false,
3011 kind: TerminalOutputKind::Raw,
3012 output_path,
3013 stderr_path,
3014 recovery: None,
3015 };
3016 }
3017
3018 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3019 let mut recovery = RecoveryContext {
3020 dropped_by_class: BTreeMap::new(),
3021 had_inner_drop: false,
3022 offset_hint_eligible: false,
3023 offset_start_line: None,
3024 byte_truncated: true,
3025 output_path: output_path.clone(),
3026 stderr_path: stderr_path.clone(),
3027 include_stderr_path,
3028 };
3029 let (output_preview, output_truncated) =
3030 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3031 TerminalOutputCache {
3032 output_preview,
3033 output_truncated,
3034 kind: TerminalOutputKind::Raw,
3035 output_path,
3036 stderr_path,
3037 recovery: Some(recovery),
3038 }
3039}
3040
3041fn completion_preview_for_cache(
3042 cache: &TerminalOutputCache,
3043 exit_code: Option<i32>,
3044) -> (String, bool) {
3045 let exit_ok = exit_code == Some(0);
3048 let threshold = completion_preview_threshold(exit_ok);
3049 if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3050 if let Some(path) = cache.output_path.as_deref() {
3051 return (
3052 json_output_pointer(cache.output_preview.len() as u64, path),
3053 true,
3054 );
3055 }
3056 return (cache.output_preview.clone(), cache.output_truncated);
3057 }
3058
3059 if let Some(recovery) = cache.recovery.as_ref() {
3060 if cache.output_preview.len() <= threshold {
3061 return (cache.output_preview.clone(), cache.output_truncated);
3062 }
3063 let body = strip_recovery_marker_lines(&cache.output_preview);
3064 let mut completion_recovery = recovery.clone();
3065 completion_recovery.byte_truncated = true;
3066 if let Some(marker) = recovery_marker(&completion_recovery) {
3067 let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3068 return (capped.text, true);
3069 }
3070 }
3071
3072 let capped = cap_completion_output(&cache.output_preview, exit_ok);
3073 (capped.text, cache.output_truncated || capped.truncated)
3074}
3075
3076fn is_gh_structured_command(command: &str) -> bool {
3077 let normalized = crate::compress::normalize_command_for_dispatch(command)
3078 .unwrap_or_else(|| command.trim_start().to_string());
3079 let tokens = shell_words_for_flags(&normalized);
3080 let Some(head) = tokens.first() else {
3081 return false;
3082 };
3083 let head_name = Path::new(head)
3084 .file_name()
3085 .and_then(|name| name.to_str())
3086 .unwrap_or(head);
3087 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3088 return false;
3089 }
3090 tokens.iter().any(|token| {
3091 matches!(token.as_str(), "--json" | "--jq" | "--template")
3092 || token.starts_with("--json=")
3093 || token.starts_with("--jq=")
3094 || token.starts_with("--template=")
3095 })
3096}
3097
3098fn shell_words_for_flags(command: &str) -> Vec<String> {
3099 let mut words = Vec::new();
3100 let mut current = String::new();
3101 let mut in_single = false;
3102 let mut in_double = false;
3103 let mut escaped = false;
3104
3105 for ch in command.chars() {
3106 if escaped {
3107 current.push(ch);
3108 escaped = false;
3109 continue;
3110 }
3111 if ch == '\\' && !in_single {
3112 escaped = true;
3113 continue;
3114 }
3115 if ch == '\'' && !in_double {
3116 in_single = !in_single;
3117 continue;
3118 }
3119 if ch == '"' && !in_single {
3120 in_double = !in_double;
3121 continue;
3122 }
3123 if ch.is_whitespace() && !in_single && !in_double {
3124 if !current.is_empty() {
3125 words.push(std::mem::take(&mut current));
3126 }
3127 continue;
3128 }
3129 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3130 if !current.is_empty() {
3131 words.push(std::mem::take(&mut current));
3132 }
3133 continue;
3134 }
3135 current.push(ch);
3136 }
3137 if !current.is_empty() {
3138 words.push(current);
3139 }
3140 words
3141}
3142
3143fn is_structured_body(body: &str) -> bool {
3144 let trimmed = body.trim();
3145 if trimmed.is_empty() {
3146 return false;
3147 }
3148 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3149 return true;
3150 }
3151
3152 let mut saw_line = false;
3153 for line in trimmed
3154 .lines()
3155 .map(str::trim)
3156 .filter(|line| !line.is_empty())
3157 {
3158 saw_line = true;
3159 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3160 return false;
3161 }
3162 }
3163 saw_line
3164}
3165
3166fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3167 let path = match (buffer, stream) {
3168 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3169 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3170 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3171 };
3172 let Some(path) = path else {
3173 return false;
3174 };
3175 let Ok(file) = std::fs::File::open(path) else {
3176 return false;
3177 };
3178 let mut limited = file.take(512);
3179 let mut bytes = Vec::new();
3180 if limited.read_to_end(&mut bytes).is_err() {
3181 return false;
3182 }
3183 String::from_utf8_lossy(&bytes)
3184 .chars()
3185 .find(|ch| !ch.is_whitespace())
3186 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3187}
3188
3189struct CompletionTokenCounts {
3190 original_tokens: Option<u32>,
3191 compressed_tokens: Option<u32>,
3192 original_bytes: Option<i64>,
3193 compressed_bytes: Option<i64>,
3194 tokens_skipped: bool,
3195}
3196
3197impl CompletionTokenCounts {
3198 fn skipped() -> Self {
3199 Self {
3200 original_tokens: None,
3201 compressed_tokens: None,
3202 original_bytes: None,
3203 compressed_bytes: None,
3204 tokens_skipped: true,
3205 }
3206 }
3207}
3208
3209fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3210 match status {
3211 BgTaskStatus::TimedOut => "timed out".to_string(),
3212 BgTaskStatus::Killed => "killed".to_string(),
3213 _ => exit_code
3214 .map(|code| format!("exit {code}"))
3215 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3216 }
3217}
3218
3219fn token_count_u32(text: &str) -> u32 {
3220 aft_tokenizer::count_tokens(text)
3221 .try_into()
3222 .unwrap_or(u32::MAX)
3223}
3224
3225impl Default for BgTaskRegistry {
3226 fn default() -> Self {
3227 Self::new(Arc::new(Mutex::new(None)))
3228 }
3229}
3230
3231fn modified_within(path: &Path, grace: Duration) -> bool {
3232 fs::metadata(path)
3233 .and_then(|metadata| metadata.modified())
3234 .ok()
3235 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3236 .map(|age| age < grace)
3237 .unwrap_or(false)
3238}
3239
3240fn canonicalized_path(path: &Path) -> PathBuf {
3241 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3242}
3243
3244fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3245 let now_ms = SystemTime::now()
3246 .duration_since(UNIX_EPOCH)
3247 .ok()
3248 .map(|duration| duration.as_millis() as u64)
3249 .unwrap_or(started_at);
3250 let elapsed_ms = now_ms.saturating_sub(started_at);
3251 Instant::now()
3252 .checked_sub(Duration::from_millis(elapsed_ms))
3253 .unwrap_or_else(Instant::now)
3254}
3255
3256fn gc_quarantine(storage_dir: &Path) {
3257 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3258 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3259 return;
3260 };
3261 for session_entry in session_dirs.flatten() {
3262 let session_quarantine_dir = session_entry.path();
3263 if !session_quarantine_dir.is_dir() {
3264 continue;
3265 }
3266 let entries = match fs::read_dir(&session_quarantine_dir) {
3267 Ok(entries) => entries,
3268 Err(error) => {
3269 crate::slog_warn!(
3270 "failed to read background task quarantine dir {}: {error}",
3271 session_quarantine_dir.display()
3272 );
3273 continue;
3274 }
3275 };
3276 for entry in entries.flatten() {
3277 let path = entry.path();
3278 if modified_within(&path, QUARANTINE_GC_GRACE) {
3279 continue;
3280 }
3281 let result = if path.is_dir() {
3282 fs::remove_dir_all(&path)
3283 } else {
3284 fs::remove_file(&path)
3285 };
3286 match result {
3287 Ok(()) => log::debug!(
3288 "deleted old background task quarantine entry {}",
3289 path.display()
3290 ),
3291 Err(error) => crate::slog_warn!(
3292 "failed to delete old background task quarantine entry {}: {error}",
3293 path.display()
3294 ),
3295 }
3296 }
3297 let _ = fs::remove_dir(&session_quarantine_dir);
3298 }
3299 let _ = fs::remove_dir(&quarantine_root);
3300}
3301
3302enum QuarantineKind {
3303 Corrupt,
3304 Invalid,
3305}
3306
3307fn quarantine_task_json(
3308 storage_dir: &Path,
3309 session_dir: &Path,
3310 json_path: &Path,
3311 kind: QuarantineKind,
3312) -> Result<(), String> {
3313 let session_hash = session_dir
3314 .file_name()
3315 .and_then(|name| name.to_str())
3316 .ok_or_else(|| {
3317 format!(
3318 "invalid background task session dir: {}",
3319 session_dir.display()
3320 )
3321 })?;
3322 let task_name = json_path
3323 .file_name()
3324 .and_then(|name| name.to_str())
3325 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3326 let unix_ts = SystemTime::now()
3327 .duration_since(UNIX_EPOCH)
3328 .map(|duration| duration.as_secs())
3329 .unwrap_or(0);
3330 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3331 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3332 format!(
3333 "failed to create background task quarantine dir {}: {e}",
3334 quarantine_dir.display()
3335 )
3336 })?;
3337 let target_name = quarantine_name(task_name, unix_ts, &kind);
3338 let target = quarantine_dir.join(target_name);
3339 fs::rename(json_path, &target).map_err(|e| {
3340 format!(
3341 "failed to quarantine background task metadata {} to {}: {e}",
3342 json_path.display(),
3343 target.display()
3344 )
3345 })?;
3346
3347 for sibling in task_sibling_paths(json_path) {
3348 if !sibling.exists() {
3349 continue;
3350 }
3351 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3352 crate::slog_warn!(
3353 "skipping background task sibling with invalid name during quarantine: {}",
3354 sibling.display()
3355 );
3356 continue;
3357 };
3358 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3359 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3360 crate::slog_warn!(
3361 "failed to quarantine background task sibling {} to {}: {error}",
3362 sibling.display(),
3363 sibling_target.display()
3364 );
3365 }
3366 }
3367
3368 let _ = fs::remove_dir(session_dir);
3369 Ok(())
3370}
3371
3372fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3373 match kind {
3374 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3375 QuarantineKind::Invalid => {
3376 let path = Path::new(file_name);
3377 let stem = path.file_stem().and_then(|stem| stem.to_str());
3378 let extension = path.extension().and_then(|extension| extension.to_str());
3379 match (stem, extension) {
3380 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3381 _ => format!("{file_name}.invalid.{unix_ts}"),
3382 }
3383 }
3384 }
3385}
3386
3387fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3388 let Some(parent) = json_path.parent() else {
3389 return Vec::new();
3390 };
3391 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3392 return Vec::new();
3393 };
3394 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3395 .into_iter()
3396 .map(|extension| parent.join(format!("{stem}.{extension}")))
3397 .collect()
3398}
3399
3400fn read_for_token_count_from_disk(
3401 metadata: &PersistedTask,
3402 paths: &TaskPaths,
3403 max_bytes_per_stream: usize,
3404) -> TokenCountInput {
3405 if metadata.mode == BgMode::Pty {
3406 return TokenCountInput::Skipped;
3407 }
3408 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3415 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3416 match (stdout, stderr) {
3417 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3418 String::from_utf8_lossy(&stdout).as_ref(),
3419 String::from_utf8_lossy(&stderr).as_ref(),
3420 )),
3421 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3422 String::from_utf8_lossy(&stdout).as_ref(),
3423 "",
3424 )),
3425 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3426 "",
3427 String::from_utf8_lossy(&stderr).as_ref(),
3428 )),
3429 (Err(_), Err(_)) => TokenCountInput::Skipped,
3430 }
3431}
3432
3433fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3438 use std::io::{Read, Seek, SeekFrom};
3439 let mut file = std::fs::File::open(path)?;
3440 let len = file.metadata()?.len();
3441 let read_len = len.min(max_bytes as u64);
3442 if read_len > 0 && len > max_bytes as u64 {
3443 file.seek(SeekFrom::End(-(read_len as i64)))?;
3444 }
3445 let mut bytes = Vec::with_capacity(read_len as usize);
3446 file.read_to_end(&mut bytes)?;
3447 Ok(bytes)
3448}
3449
3450impl BgTask {
3451 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3452 let state = self
3453 .state
3454 .lock()
3455 .unwrap_or_else(|poison| poison.into_inner());
3456 self.snapshot_locked(&state, preview_bytes)
3457 }
3458
3459 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3460 let metadata = &state.metadata;
3461 let duration_ms = metadata.duration_ms.or_else(|| {
3462 metadata
3463 .status
3464 .is_terminal()
3465 .then(|| self.started.elapsed().as_millis() as u64)
3466 });
3467 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3468 (String::new(), false)
3469 } else if metadata.status.is_terminal() {
3470 state
3471 .terminal_output_cache
3472 .as_ref()
3473 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3474 .unwrap_or_else(|| (String::new(), false))
3475 } else {
3476 state.buffer.read_tail(preview_bytes)
3477 };
3478 BgTaskSnapshot {
3479 info: BgTaskInfo {
3480 task_id: self.task_id.clone(),
3481 status: metadata.status.clone(),
3482 command: metadata.command.clone(),
3483 mode: metadata.mode.clone(),
3484 started_at: metadata.started_at,
3485 duration_ms,
3486 },
3487 exit_code: metadata.exit_code,
3488 child_pid: metadata.child_pid,
3489 workdir: metadata.workdir.display().to_string(),
3490 output_preview,
3491 output_truncated,
3492 output_path: state
3493 .buffer
3494 .output_path()
3495 .map(|path| path.display().to_string()),
3496 stderr_path: state
3497 .buffer
3498 .stderr_path()
3499 .map(|path| path.display().to_string()),
3500 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3501 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3502 }
3503 }
3504
3505 pub(crate) fn is_running(&self) -> bool {
3506 self.state
3507 .lock()
3508 .map(|state| {
3509 state.metadata.status == BgTaskStatus::Running
3510 || (state.metadata.mode == BgMode::Pty
3511 && state.metadata.status == BgTaskStatus::Killing)
3512 })
3513 .unwrap_or(false)
3514 }
3515
3516 fn is_terminal(&self) -> bool {
3517 self.state
3518 .lock()
3519 .map(|state| state.metadata.status.is_terminal())
3520 .unwrap_or(false)
3521 }
3522
3523 fn mark_terminal_now(&self) {
3524 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3525 if terminal_at.is_none() {
3526 *terminal_at = Some(Instant::now());
3527 }
3528 }
3529 }
3530
3531 fn set_completion_delivered(
3532 &self,
3533 delivered: bool,
3534 registry: &BgTaskRegistry,
3535 ) -> Result<(), String> {
3536 let mut state = self
3537 .state
3538 .lock()
3539 .map_err(|_| "background task lock poisoned".to_string())?;
3540 let updated = registry
3541 .update_task_metadata(&self.paths, |metadata| {
3542 metadata.completion_delivered = delivered;
3543 })
3544 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3545 state.metadata = updated;
3546 Ok(())
3547 }
3548}
3549
3550#[cfg(unix)]
3571fn reap_piped_child(child_slot: &mut Option<Child>) {
3572 if let Some(mut child) = child_slot.take() {
3573 if matches!(child.try_wait(), Ok(None)) {
3574 let _ = child.wait();
3575 }
3576 }
3577}
3578
3579#[cfg(windows)]
3584fn reap_piped_child(child_slot: &mut Option<Child>) {
3585 *child_slot = None;
3586}
3587
3588fn terminal_metadata_from_marker(
3589 mut metadata: PersistedTask,
3590 marker: ExitMarker,
3591 reason: Option<String>,
3592) -> PersistedTask {
3593 match marker {
3594 ExitMarker::Code(code) => {
3595 let status = if code == 0 {
3596 BgTaskStatus::Completed
3597 } else {
3598 BgTaskStatus::Failed
3599 };
3600 metadata.mark_terminal(status, Some(code), reason);
3601 }
3602 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3603 }
3604 metadata
3605}
3606
3607#[cfg(unix)]
3608fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3609 let stem = paths
3610 .json
3611 .file_stem()
3612 .and_then(|stem| stem.to_str())
3613 .unwrap_or("wrapper");
3614 let script_path = paths.dir.join(format!("{stem}.sh"));
3615 fs::write(&script_path, command)
3616 .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3617 Ok(script_path)
3618}
3619
3620#[cfg(unix)]
3621fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3622 let shell = resolve_posix_shell();
3623 let mut cmd = Command::new(&shell);
3624 cmd.arg("-c")
3630 .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3631 .arg(&shell)
3632 .arg(command_script)
3633 .arg(exit_path);
3634 unsafe {
3635 cmd.pre_exec(|| {
3636 if libc::setsid() == -1 {
3637 return Err(std::io::Error::last_os_error());
3638 }
3639 Ok(())
3640 });
3641 }
3642 cmd
3643}
3644
3645#[cfg(unix)]
3646fn resolve_posix_shell() -> PathBuf {
3647 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3648 POSIX_SHELL
3649 .get_or_init(|| {
3650 std::env::var_os("BASH")
3651 .filter(|value| !value.is_empty())
3652 .map(PathBuf::from)
3653 .filter(|path| path.exists())
3654 .or_else(|| which::which("bash").ok())
3655 .or_else(|| which::which("zsh").ok())
3656 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3657 })
3658 .clone()
3659}
3660
3661#[cfg(windows)]
3662fn detached_shell_command_for(
3663 shell: crate::windows_shell::WindowsShell,
3664 command: &str,
3665 exit_path: &Path,
3666 paths: &TaskPaths,
3667 creation_flags: u32,
3668) -> Result<Command, String> {
3669 use crate::windows_shell::WindowsShell;
3670 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3683 let wrapper_ext = match shell {
3684 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3685 WindowsShell::Cmd => "bat",
3686 WindowsShell::Posix(_) => "sh",
3690 };
3691 let wrapper_path = paths.dir.join(format!(
3692 "{}.{}",
3693 paths
3694 .json
3695 .file_stem()
3696 .and_then(|s| s.to_str())
3697 .unwrap_or("wrapper"),
3698 wrapper_ext
3699 ));
3700 fs::write(&wrapper_path, wrapper_body)
3701 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3702
3703 let mut cmd = Command::new(shell.binary().as_ref());
3704 match shell {
3705 WindowsShell::Pwsh | WindowsShell::Powershell => {
3706 cmd.args([
3709 "-NoLogo",
3710 "-NoProfile",
3711 "-NonInteractive",
3712 "-ExecutionPolicy",
3713 "Bypass",
3714 "-File",
3715 ]);
3716 cmd.arg(&wrapper_path);
3717 }
3718 WindowsShell::Cmd => {
3719 cmd.args(["/D", "/C"]);
3726 cmd.arg(&wrapper_path);
3727 }
3728 WindowsShell::Posix(_) => {
3729 cmd.arg(&wrapper_path);
3734 }
3735 }
3736
3737 cmd.creation_flags(creation_flags);
3741 Ok(cmd)
3742}
3743
3744fn spawn_detached_child(
3760 command: &str,
3761 paths: &TaskPaths,
3762 workdir: &Path,
3763 env: &HashMap<String, String>,
3764) -> Result<std::process::Child, String> {
3765 #[cfg(not(windows))]
3766 {
3767 let stdout = create_capture_file(&paths.stdout)
3768 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3769 let stderr = create_capture_file(&paths.stderr)
3770 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3771 let command_script = write_unix_command_script(command, paths)?;
3772 detached_shell_command(&command_script, &paths.exit)
3773 .current_dir(workdir)
3774 .envs(env)
3775 .stdin(Stdio::null())
3776 .stdout(Stdio::from(stdout))
3777 .stderr(Stdio::from(stderr))
3778 .spawn()
3779 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3780 }
3781 #[cfg(windows)]
3782 {
3783 use crate::windows_shell::shell_candidates;
3784 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3795 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3816 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3817 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3818 let with_breakaway =
3819 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3820 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3821 let mut last_error: Option<String> = None;
3822 for (idx, shell) in candidates.iter().enumerate() {
3823 for &flags in &[with_breakaway, without_breakaway] {
3827 let stdout = create_capture_file(&paths.stdout)
3829 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3830 let stderr = create_capture_file(&paths.stderr)
3831 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3832 let mut cmd =
3833 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3834 cmd.current_dir(workdir)
3835 .envs(env)
3836 .stdin(Stdio::null())
3837 .stdout(Stdio::from(stdout))
3838 .stderr(Stdio::from(stderr));
3839 match cmd.spawn() {
3840 Ok(child) => {
3841 if idx > 0 {
3842 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3843 the cached PATH probe disagreed with runtime spawn — likely PATH \
3844 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3845 shell.binary(),
3846 idx);
3847 }
3848 if flags == without_breakaway {
3849 crate::slog_warn!(
3850 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3851 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3852 Spawned without breakaway; the bg task will be torn down if the \
3853 AFT process group is killed."
3854 );
3855 }
3856 return Ok(child);
3857 }
3858 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3859 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3860 shell.binary());
3861 last_error = Some(format!("{}: {e}", shell.binary()));
3862 break;
3865 }
3866 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3867 crate::slog_warn!(
3869 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3870 Access Denied — retrying {} without breakaway",
3871 shell.binary()
3872 );
3873 last_error = Some(format!("{}: {e}", shell.binary()));
3874 continue;
3875 }
3876 Err(e) => {
3877 return Err(format!(
3878 "failed to spawn background bash command via {}: {e}",
3879 shell.binary()
3880 ));
3881 }
3882 }
3883 }
3884 }
3885 Err(format!(
3886 "failed to spawn background bash command: no Windows shell could be spawned. \
3887 Last error: {}. PATH-probed candidates: {:?}",
3888 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3889 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3890 ))
3891 }
3892}
3893
3894fn random_slug() -> String {
3895 let mut bytes = [0u8; 4];
3896 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3898 let t = SystemTime::now()
3900 .duration_since(UNIX_EPOCH)
3901 .map(|d| d.subsec_nanos())
3902 .unwrap_or(0);
3903 let p = std::process::id();
3904 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3905 });
3906 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3908 format!("bash-{hex}")
3909}
3910
3911#[cfg(test)]
3912mod tests {
3913 use std::collections::HashMap;
3914 use std::fs;
3915 use std::sync::atomic::{AtomicBool, AtomicUsize};
3916 use std::sync::{Arc, Mutex};
3917 use std::time::Duration;
3918 #[cfg(windows)]
3919 use std::time::Instant;
3920
3921 use super::*;
3922
3923 #[cfg(unix)]
3924 const QUICK_SUCCESS_COMMAND: &str = "true";
3925 #[cfg(windows)]
3926 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3927
3928 #[cfg(unix)]
3929 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3930 #[cfg(windows)]
3931 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3932
3933 fn insert_terminal_piped_task(
3934 registry: &BgTaskRegistry,
3935 dir: &tempfile::TempDir,
3936 command: &str,
3937 stdout: &str,
3938 stderr: &str,
3939 compressed: bool,
3940 ) -> (String, Arc<BgTask>) {
3941 let task_id = format!("bash-test-{}", random_slug());
3942 let paths = task_paths(dir.path(), "session", &task_id);
3943 fs::create_dir_all(&paths.dir).unwrap();
3944 fs::write(&paths.stdout, stdout).unwrap();
3945 fs::write(&paths.stderr, stderr).unwrap();
3946 let mut metadata = PersistedTask::starting(
3947 task_id.clone(),
3948 "session".to_string(),
3949 command.to_string(),
3950 dir.path().to_path_buf(),
3951 Some(dir.path().to_path_buf()),
3952 Some(30_000),
3953 true,
3954 compressed,
3955 );
3956 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3957 write_task(&paths.json, &metadata).unwrap();
3958 registry
3959 .insert_rehydrated_task(metadata, paths, true)
3960 .expect("insert terminal task");
3961 let task = registry.task_for_session(&task_id, "session").unwrap();
3962 (task_id, task)
3963 }
3964
3965 fn insert_terminal_pty_task(
3966 registry: &BgTaskRegistry,
3967 dir: &tempfile::TempDir,
3968 pty_output: &str,
3969 ) -> (String, Arc<BgTask>) {
3970 let task_id = format!("bash-test-{}", random_slug());
3971 let paths = task_paths(dir.path(), "session", &task_id);
3972 fs::create_dir_all(&paths.dir).unwrap();
3973 fs::write(&paths.pty, pty_output).unwrap();
3974 let mut metadata = PersistedTask::starting(
3975 task_id.clone(),
3976 "session".to_string(),
3977 "python".to_string(),
3978 dir.path().to_path_buf(),
3979 Some(dir.path().to_path_buf()),
3980 Some(30_000),
3981 true,
3982 true,
3983 );
3984 metadata.mode = BgMode::Pty;
3985 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3986 write_task(&paths.json, &metadata).unwrap();
3987 registry
3988 .insert_rehydrated_task(metadata, paths, true)
3989 .expect("insert terminal pty task");
3990 let task = registry.task_for_session(&task_id, "session").unwrap();
3991 (task_id, task)
3992 }
3993
3994 #[cfg(unix)]
3995 fn wait_for_terminal_snapshot(
3996 registry: &BgTaskRegistry,
3997 task_id: &str,
3998 session_id: &str,
3999 project: &Path,
4000 storage: &Path,
4001 ) -> BgTaskSnapshot {
4002 let started = Instant::now();
4003 loop {
4004 let snapshot = registry
4005 .status(task_id, session_id, Some(project), Some(storage), 4096)
4006 .expect("spawned task should be visible to status");
4007 if snapshot.info.status.is_terminal() {
4008 return snapshot;
4009 }
4010 assert!(
4011 started.elapsed() < Duration::from_secs(10),
4012 "timed out waiting for task {task_id} to finish; last status={:?}",
4013 snapshot.info.status
4014 );
4015 std::thread::sleep(Duration::from_millis(50));
4016 }
4017 }
4018
4019 #[cfg(unix)]
4020 #[test]
4021 fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4022 let cases = [
4023 (
4024 "long-first",
4025 "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4026 vec!["one", "1", "three"],
4027 ),
4028 (
4029 "short-first",
4030 "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4031 vec!["one", "1", "three"],
4032 ),
4033 (
4034 "failing-middle",
4035 "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4036 vec!["one", "after-false", "three"],
4037 ),
4038 ];
4039
4040 for (name, command, expected_lines) in cases {
4041 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4042 let dir = tempfile::tempdir().unwrap();
4043 let session_id = format!("session-{name}");
4044 let task_id = registry
4045 .spawn(
4046 command,
4047 session_id.clone(),
4048 dir.path().to_path_buf(),
4049 HashMap::new(),
4050 Some(Duration::from_secs(30)),
4051 dir.path().to_path_buf(),
4052 10,
4053 true,
4054 true,
4055 Some(dir.path().to_path_buf()),
4056 )
4057 .unwrap();
4058
4059 let snapshot = wait_for_terminal_snapshot(
4060 ®istry,
4061 &task_id,
4062 &session_id,
4063 dir.path(),
4064 dir.path(),
4065 );
4066 assert_eq!(
4067 snapshot.info.status,
4068 BgTaskStatus::Completed,
4069 "{name}: task should complete; snapshot={snapshot:?}"
4070 );
4071 assert_eq!(
4072 snapshot.exit_code,
4073 Some(0),
4074 "{name}: script should use the final command's exit code"
4075 );
4076
4077 let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4078 let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4079 panic!(
4080 "{name}: failed to read raw stdout file {}: {error}",
4081 stdout_path.display()
4082 )
4083 });
4084 let lines: Vec<&str> = stdout.lines().collect();
4085 assert_eq!(
4086 lines,
4087 expected_lines,
4088 "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4089 stdout_path.display()
4090 );
4091 }
4092 }
4093
4094 #[test]
4095 fn recognizes_all_recovery_marker_forms() {
4096 assert!(is_recovery_marker(
4097 "[truncated output; full output: read \"/tmp/out\"]"
4098 ));
4099 assert!(is_recovery_marker(
4100 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4101 ));
4102 assert!(is_recovery_marker(
4103 "[truncated output; full output unavailable]"
4104 ));
4105 }
4106
4107 #[test]
4108 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4109 let registry = BgTaskRegistry::default();
4110 let dir = tempfile::tempdir().unwrap();
4111 let (_task_id, task) = insert_terminal_piped_task(
4112 ®istry,
4113 &dir,
4114 "custom-tool --verbose",
4115 &"stdout line\n".repeat(200_000),
4116 "",
4117 true,
4118 );
4119 let calls = Arc::new(AtomicUsize::new(0));
4120 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4121 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4122 let calls_for_closure = Arc::clone(&calls);
4123 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4124 let task_for_closure = Arc::clone(&task_holder);
4125 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4126 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4127 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4128 if task.state.try_lock().is_ok() {
4129 unlocked_for_closure.store(true, Ordering::SeqCst);
4130 }
4131 }
4132 CompressionResult::new(format!("compressed {} bytes", output.len()))
4133 });
4134
4135 let first = registry
4136 .status(
4137 &task.task_id,
4138 "session",
4139 None,
4140 Some(dir.path()),
4141 RUNNING_OUTPUT_PREVIEW_BYTES,
4142 )
4143 .unwrap();
4144 let second = registry
4145 .status(
4146 &task.task_id,
4147 "session",
4148 None,
4149 Some(dir.path()),
4150 RUNNING_OUTPUT_PREVIEW_BYTES,
4151 )
4152 .unwrap();
4153 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4154
4155 assert_eq!(
4156 calls.load(Ordering::SeqCst),
4157 1,
4158 "terminal render must be cached"
4159 );
4160 assert!(
4161 saw_unlocked_state.load(Ordering::SeqCst),
4162 "compressor must run after releasing the task state lock"
4163 );
4164 assert!(first.output_preview.starts_with("compressed "));
4165 assert_eq!(second.output_preview, first.output_preview);
4166 assert_eq!(listed[0].output_preview, first.output_preview);
4167 }
4168
4169 #[test]
4170 fn completion_preview_success_keeps_tail_only() {
4171 let registry = BgTaskRegistry::default();
4176 let dir = tempfile::tempdir().unwrap();
4177 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4178 let (_task_id, task) =
4179 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4180
4181 registry.post_terminal_transition(&task, true).unwrap();
4182 let completions = registry.drain_completions_for_session(Some("session"));
4183 assert_eq!(completions.len(), 1);
4184 let preview = &completions[0].output_preview;
4185 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4186 assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4187 assert!(completions[0].output_truncated);
4188 }
4189
4190 #[test]
4191 fn completion_preview_failure_keeps_head_and_tail() {
4192 let registry = BgTaskRegistry::default();
4195 let dir = tempfile::tempdir().unwrap();
4196 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4197 let task_id = format!("bash-test-{}", random_slug());
4198 let paths = task_paths(dir.path(), "session", &task_id);
4199 fs::create_dir_all(&paths.dir).unwrap();
4200 fs::write(&paths.stdout, &output).unwrap();
4201 fs::write(&paths.stderr, "").unwrap();
4202 let mut metadata = PersistedTask::starting(
4203 task_id.clone(),
4204 "session".to_string(),
4205 "cat big.log".to_string(),
4206 dir.path().to_path_buf(),
4207 Some(dir.path().to_path_buf()),
4208 Some(30_000),
4209 true,
4210 false,
4211 );
4212 metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4213 write_task(&paths.json, &metadata).unwrap();
4214 registry
4215 .insert_rehydrated_task(metadata, paths, true)
4216 .expect("insert terminal task");
4217 let task = registry.task_for_session(&task_id, "session").unwrap();
4218
4219 registry.post_terminal_transition(&task, true).unwrap();
4220 let completions = registry.drain_completions_for_session(Some("session"));
4221 assert_eq!(completions.len(), 1);
4222 let preview = &completions[0].output_preview;
4223 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4224 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4225 }
4226
4227 #[test]
4228 fn has_completions_for_session_matches_pending_delivery() {
4229 let registry = BgTaskRegistry::default();
4230 assert!(!registry.has_completions_for_session(Some("session")));
4231 assert!(!registry.has_completions_for_session(None));
4232
4233 let dir = tempfile::tempdir().unwrap();
4234 let (_task_id, task) =
4235 insert_terminal_piped_task(®istry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4236 registry.post_terminal_transition(&task, true).unwrap();
4237
4238 assert!(registry.has_completions_for_session(Some("session")));
4239 assert!(registry.has_completions_for_session(None));
4240 assert!(!registry.has_completions_for_session(Some("other-session")));
4241
4242 let completions = registry.drain_completions_for_session(Some("session"));
4243 assert_eq!(completions.len(), 1);
4244 assert_eq!(completions[0].task_id, task.task_id);
4245 }
4246
4247 #[test]
4248 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4249 let registry = BgTaskRegistry::default();
4250 let dir = tempfile::tempdir().unwrap();
4251 let calls = Arc::new(AtomicUsize::new(0));
4252 let calls_for_closure = Arc::clone(&calls);
4253 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4254 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4255 CompressionResult::new(output)
4256 });
4257 let (task_id, _task) = insert_terminal_piped_task(
4258 ®istry,
4259 &dir,
4260 "gh pr view 123 --json body",
4261 "{\"body\":\"hello\"}",
4262 "warning: stderr must not join json",
4263 true,
4264 );
4265
4266 let snapshot = registry
4267 .status(
4268 &task_id,
4269 "session",
4270 None,
4271 Some(dir.path()),
4272 RUNNING_OUTPUT_PREVIEW_BYTES,
4273 )
4274 .unwrap();
4275
4276 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4277 assert!(!snapshot.output_preview.contains("warning"));
4278 assert!(!snapshot.output_truncated);
4279 assert_eq!(
4280 calls.load(Ordering::SeqCst),
4281 0,
4282 "structured JSON bypasses compression"
4283 );
4284 }
4285
4286 #[test]
4287 fn registry_emits_single_recovery_marker_for_class_drops() {
4288 let registry = BgTaskRegistry::default();
4289 let dir = tempfile::tempdir().unwrap();
4290 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4291 let mut dropped = BTreeMap::new();
4292 dropped.insert(DropClass::Error, 18);
4293 dropped.insert(DropClass::Warning, 6);
4294 CompressionResult::with_class_drops("kept diagnostic", dropped)
4295 });
4296 let (task_id, task) =
4297 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4298
4299 let snapshot = registry
4300 .status(
4301 &task_id,
4302 "session",
4303 None,
4304 Some(dir.path()),
4305 RUNNING_OUTPUT_PREVIEW_BYTES,
4306 )
4307 .unwrap();
4308
4309 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4310 assert!(snapshot.output_preview.contains("+18 more errors"));
4311 assert!(snapshot.output_preview.contains("+6 more warnings"));
4312 assert!(snapshot
4313 .output_preview
4314 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4315 assert!(!snapshot.output_preview.contains("tail -n +"));
4316 assert!(snapshot.output_truncated);
4317 }
4318
4319 #[test]
4320 fn registry_marker_reports_semantic_and_byte_drops_once() {
4321 let registry = BgTaskRegistry::default();
4322 let dir = tempfile::tempdir().unwrap();
4323 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4324 let mut dropped = BTreeMap::new();
4325 dropped.insert(DropClass::Error, 1);
4326 CompressionResult::with_class_drops(
4327 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4328 dropped,
4329 )
4330 });
4331 let (task_id, _task) =
4332 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4333
4334 let snapshot = registry
4335 .status(
4336 &task_id,
4337 "session",
4338 None,
4339 Some(dir.path()),
4340 RUNNING_OUTPUT_PREVIEW_BYTES,
4341 )
4342 .unwrap();
4343
4344 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4345 assert!(snapshot.output_preview.contains("+1 more error"));
4346 assert!(snapshot.output_preview.contains("truncated output"));
4347 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4348 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4349 assert!(!snapshot.output_preview.contains("...<truncated"));
4350 assert!(snapshot.output_truncated);
4351 }
4352
4353 #[test]
4354 fn cargo_stderr_class_drops_name_both_capture_paths() {
4355 let registry = BgTaskRegistry::default();
4356 let dir = tempfile::tempdir().unwrap();
4357 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4358 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4359 crate::compress::compress_with_registry_exit_code(
4360 command,
4361 &output,
4362 exit_code,
4363 &filter_registry,
4364 )
4365 });
4366 let stderr = (0..22)
4367 .map(|index| {
4368 format!(
4369 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4370 index + 1,
4371 index + 1
4372 )
4373 })
4374 .collect::<Vec<_>>()
4375 .join("\n");
4376 let (task_id, task) = insert_terminal_piped_task(
4377 ®istry,
4378 &dir,
4379 "cargo check",
4380 "Finished dev [unoptimized] target(s) in 0.01s\n",
4381 &stderr,
4382 true,
4383 );
4384
4385 let snapshot = registry
4386 .status(
4387 &task_id,
4388 "session",
4389 None,
4390 Some(dir.path()),
4391 RUNNING_OUTPUT_PREVIEW_BYTES,
4392 )
4393 .unwrap();
4394
4395 assert!(snapshot.output_preview.contains("+2 more errors"));
4396 assert!(snapshot
4397 .output_preview
4398 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4399 assert!(snapshot
4400 .output_preview
4401 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4402 assert!(!snapshot.output_preview.contains("tail -n +"));
4403 }
4404
4405 #[test]
4406 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4407 let registry = BgTaskRegistry::default();
4408 let dir = tempfile::tempdir().unwrap();
4409 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4410 let (task_id, task) = insert_terminal_piped_task(
4411 ®istry,
4412 &dir,
4413 "cd /repo && gh pr view 123 --json body",
4414 &body,
4415 "",
4416 true,
4417 );
4418
4419 let snapshot = registry
4420 .status(
4421 &task_id,
4422 "session",
4423 None,
4424 Some(dir.path()),
4425 RUNNING_OUTPUT_PREVIEW_BYTES,
4426 )
4427 .unwrap();
4428
4429 assert!(snapshot.output_preview.starts_with("[JSON output "));
4430 assert!(snapshot
4431 .output_preview
4432 .contains(&task.paths.stdout.display().to_string()));
4433 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4434 assert!(snapshot.output_truncated);
4435 }
4436
4437 #[test]
4438 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4439 let registry = BgTaskRegistry::default();
4440 let dir = tempfile::tempdir().unwrap();
4441 let filter_registry = crate::compress::toml_filter::build_registry(
4442 crate::compress::builtin_filters::ALL,
4443 None,
4444 None,
4445 );
4446 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4447 crate::compress::compress_with_registry_exit_code(
4448 command,
4449 &output,
4450 exit_code,
4451 &filter_registry,
4452 )
4453 });
4454 let stdout = format!(
4455 "make[1]: Entering directory `/tmp`\n{}",
4456 (0..100)
4457 .map(|index| format!("compile line {index}"))
4458 .collect::<Vec<_>>()
4459 .join("\n")
4460 );
4461 let (task_id, task) =
4462 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4463
4464 let snapshot = registry
4465 .status(
4466 &task_id,
4467 "session",
4468 None,
4469 Some(dir.path()),
4470 RUNNING_OUTPUT_PREVIEW_BYTES,
4471 )
4472 .unwrap();
4473
4474 assert!(snapshot.output_preview.contains("compile line 99"));
4475 assert!(snapshot.output_preview.contains(&format!(
4476 "full output: read \"{}\"",
4477 task.paths.stdout.display()
4478 )));
4479 assert!(!snapshot
4480 .output_preview
4481 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4482 assert!(!snapshot.output_preview.contains("tail -n +"));
4483 }
4484
4485 #[test]
4486 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4487 let registry = BgTaskRegistry::default();
4488 let dir = tempfile::tempdir().unwrap();
4489 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4490 let (task_id, task) =
4491 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4492
4493 let snapshot = registry
4494 .status(
4495 &task_id,
4496 "session",
4497 None,
4498 Some(dir.path()),
4499 RUNNING_OUTPUT_PREVIEW_BYTES,
4500 )
4501 .unwrap();
4502
4503 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4504 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4505 assert!(snapshot.output_preview.contains("truncated output"));
4506 assert!(snapshot
4507 .output_preview
4508 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4509 assert!(snapshot
4510 .output_preview
4511 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4512 assert!(!snapshot.output_preview.contains("tail -n +"));
4513 assert!(snapshot.output_preview.len() > 16 * 1024);
4514 assert!(snapshot.output_truncated);
4515 }
4516
4517 #[test]
4518 fn pty_terminal_snapshot_bypasses_line_compression() {
4519 let registry = BgTaskRegistry::default();
4520 let dir = tempfile::tempdir().unwrap();
4521 let calls = Arc::new(AtomicUsize::new(0));
4522 let calls_for_closure = Arc::clone(&calls);
4523 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4524 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4525 CompressionResult::new(output)
4526 });
4527 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4528
4529 let snapshot = registry
4530 .status(
4531 &task_id,
4532 "session",
4533 None,
4534 Some(dir.path()),
4535 RUNNING_OUTPUT_PREVIEW_BYTES,
4536 )
4537 .unwrap();
4538
4539 assert_eq!(snapshot.info.mode, BgMode::Pty);
4540 assert_eq!(snapshot.output_preview, "");
4541 assert_eq!(calls.load(Ordering::SeqCst), 0);
4542 }
4543
4544 #[test]
4545 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4546 let registry = BgTaskRegistry::default();
4547 let dir = tempfile::tempdir().unwrap();
4548 let task_id = registry
4549 .spawn_pty(
4550 QUICK_SUCCESS_COMMAND,
4551 "session".to_string(),
4552 dir.path().to_path_buf(),
4553 HashMap::new(),
4554 Some(Duration::from_secs(30)),
4555 dir.path().to_path_buf(),
4556 10,
4557 true,
4558 false,
4559 Some(dir.path().to_path_buf()),
4560 50,
4561 120,
4562 )
4563 .unwrap();
4564
4565 let paths = task_paths(dir.path(), "session", &task_id);
4566 let metadata = read_task(&paths.json).unwrap();
4567 assert_eq!(
4568 metadata.schema_version,
4569 crate::bash_background::persistence::SCHEMA_VERSION
4570 );
4571 assert_eq!(metadata.mode, BgMode::Pty);
4572 assert_eq!(metadata.pty_rows, Some(50));
4573 assert_eq!(metadata.pty_cols, Some(120));
4574
4575 let snapshot = registry
4576 .status(&task_id, "session", None, Some(dir.path()), 1024)
4577 .unwrap();
4578 assert_eq!(snapshot.pty_rows, Some(50));
4579 assert_eq!(snapshot.pty_cols, Some(120));
4580 }
4581
4582 fn spawn_dead_child() -> std::process::Child {
4587 #[cfg(unix)]
4588 let mut cmd = std::process::Command::new("true");
4589 #[cfg(windows)]
4590 let mut cmd = {
4591 let mut c = std::process::Command::new("cmd");
4592 c.args(["/c", "exit", "0"]);
4593 c
4594 };
4595 cmd.stdin(std::process::Stdio::null());
4596 cmd.stdout(std::process::Stdio::null());
4597 cmd.stderr(std::process::Stdio::null());
4598 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4599 let started = Instant::now();
4608 loop {
4609 match child.try_wait() {
4610 Ok(Some(_)) => break,
4611 Ok(None) => {
4612 if started.elapsed() > Duration::from_secs(5) {
4613 panic!("dead-child stand-in did not exit within 5s");
4614 }
4615 std::thread::sleep(Duration::from_millis(10));
4616 }
4617 Err(error) => panic!("dead-child try_wait failed: {error}"),
4618 }
4619 }
4620 child
4621 }
4622
4623 #[test]
4624 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4625 let registry = BgTaskRegistry::default();
4626 let dir = tempfile::tempdir().unwrap();
4627 let task_id = registry
4628 .spawn(
4629 LONG_RUNNING_COMMAND,
4630 "session".to_string(),
4631 dir.path().to_path_buf(),
4632 HashMap::new(),
4633 Some(Duration::from_secs(30)),
4634 dir.path().to_path_buf(),
4635 10,
4636 true,
4637 false,
4638 Some(dir.path().to_path_buf()),
4639 )
4640 .unwrap();
4641 registry
4642 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4643 .unwrap();
4644 assert_eq!(
4645 registry
4646 .drain_completions_for_session(Some("session"))
4647 .len(),
4648 1
4649 );
4650
4651 registry.inner.completions.lock().unwrap().clear();
4654
4655 assert_eq!(
4656 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4657 vec![task_id.clone()]
4658 );
4659 assert!(registry
4660 .drain_completions_for_session(Some("session"))
4661 .is_empty());
4662
4663 let paths = task_paths(dir.path(), "session", &task_id);
4664 let metadata = read_task(&paths.json).unwrap();
4665 assert!(metadata.completion_delivered);
4666
4667 let replayed = BgTaskRegistry::default();
4668 replayed
4669 .replay_session_inner(dir.path(), "session", None)
4670 .unwrap();
4671 assert!(replayed
4672 .drain_completions_for_session(Some("session"))
4673 .is_empty());
4674 }
4675
4676 #[test]
4677 fn register_watch_rejects_unknown_task() {
4678 let registry = BgTaskRegistry::default();
4679
4680 let result = registry.register_watch(
4681 "missing-task".to_string(),
4682 WatchPattern::Substring("READY".into()),
4683 true,
4684 );
4685
4686 assert_eq!(result, Err("task_not_found"));
4687 }
4688
4689 #[test]
4690 fn register_watch_on_terminal_task_scans_existing_output() {
4691 let frames = Arc::new(Mutex::new(Vec::new()));
4692 let captured = Arc::clone(&frames);
4693 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4694 captured.lock().unwrap().push(frame);
4695 })
4696 as Box<dyn Fn(PushFrame) + Send + Sync>);
4697 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4698 let dir = tempfile::tempdir().unwrap();
4699 let task_id = registry
4700 .spawn(
4701 LONG_RUNNING_COMMAND,
4702 "session".to_string(),
4703 dir.path().to_path_buf(),
4704 HashMap::new(),
4705 Some(Duration::from_secs(30)),
4706 dir.path().to_path_buf(),
4707 10,
4708 true,
4709 false,
4710 Some(dir.path().to_path_buf()),
4711 )
4712 .unwrap();
4713 registry
4714 .inner
4715 .shutdown
4716 .store(true, std::sync::atomic::Ordering::SeqCst);
4717 let task = registry.task_for_session(&task_id, "session").unwrap();
4718 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4719 registry
4720 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4721 .unwrap();
4722 frames.lock().unwrap().clear();
4723 registry.inner.completions.lock().unwrap().clear();
4724
4725 registry
4726 .register_watch(
4727 task_id.clone(),
4728 WatchPattern::Substring("READY".into()),
4729 true,
4730 )
4731 .unwrap();
4732
4733 let frames = frames.lock().unwrap();
4734 let frame = frames
4735 .iter()
4736 .find_map(|frame| match frame {
4737 PushFrame::BashPatternMatch(frame) => Some(frame),
4738 _ => None,
4739 })
4740 .expect("terminal watch registration should emit pattern frame");
4741 assert_eq!(frame.reason, "pattern_match");
4742 assert_eq!(frame.task_id, task_id);
4743 assert_eq!(frame.session_id, "session");
4744 assert_eq!(frame.match_text, "READY");
4745 assert_eq!(frame.match_offset, 0);
4746 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4747 let metadata = read_task(&task.paths.json).unwrap();
4748 assert!(metadata.completion_delivered);
4749 }
4750
4751 #[test]
4752 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4753 let registry = BgTaskRegistry::default();
4754 let dir = tempfile::tempdir().unwrap();
4755 let task_id = registry
4756 .spawn(
4757 QUICK_SUCCESS_COMMAND,
4758 "session".to_string(),
4759 dir.path().to_path_buf(),
4760 HashMap::new(),
4761 Some(Duration::from_secs(30)),
4762 dir.path().to_path_buf(),
4763 10,
4764 true,
4765 false,
4766 Some(dir.path().to_path_buf()),
4767 )
4768 .unwrap();
4769 registry
4770 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4771 .unwrap();
4772 let completions = registry.drain_completions_for_session(Some("session"));
4773 assert_eq!(completions.len(), 1);
4774 assert_eq!(
4775 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4776 vec![task_id.clone()]
4777 );
4778
4779 registry.cleanup_finished(Duration::ZERO);
4780
4781 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4782 }
4783
4784 #[test]
4785 fn cleanup_finished_retains_undelivered_terminals() {
4786 let registry = BgTaskRegistry::default();
4787 let dir = tempfile::tempdir().unwrap();
4788 let task_id = registry
4789 .spawn(
4790 QUICK_SUCCESS_COMMAND,
4791 "session".to_string(),
4792 dir.path().to_path_buf(),
4793 HashMap::new(),
4794 Some(Duration::from_secs(30)),
4795 dir.path().to_path_buf(),
4796 10,
4797 true,
4798 false,
4799 Some(dir.path().to_path_buf()),
4800 )
4801 .unwrap();
4802 registry
4803 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4804 .unwrap();
4805
4806 registry.cleanup_finished(Duration::ZERO);
4807
4808 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4809 }
4810
4811 #[test]
4819 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4820 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4821 let dir = tempfile::tempdir().unwrap();
4822 let task_id = registry
4823 .spawn(
4824 QUICK_SUCCESS_COMMAND,
4825 "session".to_string(),
4826 dir.path().to_path_buf(),
4827 HashMap::new(),
4828 Some(Duration::from_secs(30)),
4829 dir.path().to_path_buf(),
4830 10,
4831 true,
4832 false,
4833 Some(dir.path().to_path_buf()),
4834 )
4835 .unwrap();
4836
4837 let task = registry.task_for_session(&task_id, "session").unwrap();
4838
4839 let started = Instant::now();
4844 loop {
4845 let exited = {
4846 let mut state = task.state.lock().unwrap();
4847 match &mut state.runtime {
4848 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4849 _ => true,
4850 }
4851 };
4852 if exited {
4853 break;
4854 }
4855 assert!(
4856 started.elapsed() < Duration::from_secs(5),
4857 "child should exit quickly"
4858 );
4859 std::thread::sleep(Duration::from_millis(20));
4860 }
4861
4862 registry
4870 .inner
4871 .shutdown
4872 .store(true, std::sync::atomic::Ordering::SeqCst);
4873 std::thread::sleep(Duration::from_millis(550));
4877
4878 let _ = std::fs::remove_file(&task.paths.exit);
4881
4882 {
4897 let mut state = task.state.lock().unwrap();
4898 state.metadata.status = BgTaskStatus::Running;
4899 state.metadata.status_reason = None;
4900 state.metadata.exit_code = None;
4901 state.metadata.finished_at = None;
4902 state.metadata.duration_ms = None;
4903 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4906 .expect("persist reset Running metadata for reap_child test");
4907 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4911 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4912 }
4913 }
4914 *task.terminal_at.lock().unwrap() = None;
4917
4918 assert!(
4921 task.is_running(),
4922 "precondition: metadata.status == Running"
4923 );
4924 assert!(
4925 !task.paths.exit.exists(),
4926 "precondition: exit marker absent"
4927 );
4928
4929 registry.reap_child(&task);
4934
4935 {
4936 let state = task.state.lock().unwrap();
4937 assert_eq!(
4938 state.metadata.status,
4939 BgTaskStatus::Running,
4940 "first reap must leave status Running while waiting one pass for marker"
4941 );
4942 assert_eq!(
4943 state.metadata.status_reason, None,
4944 "first reap must not record a failure reason"
4945 );
4946 assert!(
4947 matches!(state.runtime, TaskRuntime::Piped(None)),
4948 "child handle must be released after first reap"
4949 );
4950 assert!(
4951 state.detached,
4952 "task must be marked detached after first reap"
4953 );
4954 }
4955
4956 registry.reap_child(&task);
4960
4961 let state = task.state.lock().unwrap();
4962 assert!(
4963 state.metadata.status.is_terminal(),
4964 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4965 state.metadata.status
4966 );
4967 assert_eq!(
4968 state.metadata.status,
4969 BgTaskStatus::Failed,
4970 "must specifically be Failed (not Killed): status={:?}",
4971 state.metadata.status
4972 );
4973 assert_eq!(
4974 state.metadata.status_reason.as_deref(),
4975 Some("process exited without exit marker"),
4976 "reason must match replay path's wording: {:?}",
4977 state.metadata.status_reason
4978 );
4979 assert!(
4980 matches!(state.runtime, TaskRuntime::Piped(None)),
4981 "child handle must stay released after second reap"
4982 );
4983 assert!(
4984 state.detached,
4985 "task must remain detached after second reap"
4986 );
4987 }
4988
4989 #[test]
4994 fn reap_child_preserves_running_when_exit_marker_exists() {
4995 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4996 let dir = tempfile::tempdir().unwrap();
4997 let task_id = registry
4998 .spawn(
4999 QUICK_SUCCESS_COMMAND,
5000 "session".to_string(),
5001 dir.path().to_path_buf(),
5002 HashMap::new(),
5003 Some(Duration::from_secs(30)),
5004 dir.path().to_path_buf(),
5005 10,
5006 true,
5007 false,
5008 Some(dir.path().to_path_buf()),
5009 )
5010 .unwrap();
5011
5012 let task = registry.task_for_session(&task_id, "session").unwrap();
5013
5014 let started = Instant::now();
5017 loop {
5018 let exited = {
5019 let mut state = task.state.lock().unwrap();
5020 match &mut state.runtime {
5021 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5022 _ => true,
5023 }
5024 };
5025 if exited && task.paths.exit.exists() {
5026 break;
5027 }
5028 assert!(
5029 started.elapsed() < Duration::from_secs(5),
5030 "child should exit and write marker quickly"
5031 );
5032 std::thread::sleep(Duration::from_millis(20));
5033 }
5034
5035 registry
5041 .inner
5042 .shutdown
5043 .store(true, std::sync::atomic::Ordering::SeqCst);
5044 std::thread::sleep(Duration::from_millis(550));
5045
5046 {
5052 let mut state = task.state.lock().unwrap();
5053 state.metadata.status = BgTaskStatus::Running;
5054 state.metadata.status_reason = None;
5055 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5056 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5057 }
5058 }
5059 *task.terminal_at.lock().unwrap() = None;
5060 if !task.paths.exit.exists() {
5063 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5064 }
5065
5066 registry.reap_child(&task);
5070
5071 let state = task.state.lock().unwrap();
5072 assert!(
5073 matches!(state.runtime, TaskRuntime::Piped(None)),
5074 "child handle still released even when marker exists"
5075 );
5076 assert!(
5077 state.detached,
5078 "task still marked detached even when marker exists"
5079 );
5080 assert_eq!(
5085 state.metadata.status,
5086 BgTaskStatus::Running,
5087 "reap_child must defer to poll_task when marker exists"
5088 );
5089 }
5090
5091 #[cfg(unix)]
5095 fn pid_stat(pid: u32) -> Option<String> {
5096 let output = std::process::Command::new("ps")
5097 .args(["-o", "stat=", "-p", &pid.to_string()])
5098 .output()
5099 .ok()?;
5100 if !output.status.success() {
5101 return None;
5102 }
5103 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5104 if stat.is_empty() {
5105 None
5106 } else {
5107 Some(stat)
5108 }
5109 }
5110
5111 #[cfg(unix)]
5113 fn is_zombie(pid: u32) -> bool {
5114 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5115 }
5116
5117 #[cfg(unix)]
5123 fn spawn_unreaped_zombie() -> std::process::Child {
5124 let child = std::process::Command::new("true")
5125 .stdin(std::process::Stdio::null())
5126 .stdout(std::process::Stdio::null())
5127 .stderr(std::process::Stdio::null())
5128 .spawn()
5129 .expect("spawn zombie stand-in");
5130 let pid = child.id();
5131 let started = Instant::now();
5132 while !is_zombie(pid) {
5133 assert!(
5134 started.elapsed() < Duration::from_secs(5),
5135 "stand-in child should become a zombie within 5s"
5136 );
5137 std::thread::sleep(Duration::from_millis(10));
5138 }
5139 child
5141 }
5142
5143 #[cfg(unix)]
5153 #[test]
5154 fn finalize_from_marker_reaps_child_no_zombie() {
5155 use std::sync::atomic::Ordering;
5156
5157 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5158 let dir = tempfile::tempdir().unwrap();
5159 let task_id = registry
5160 .spawn(
5161 QUICK_SUCCESS_COMMAND,
5162 "session".to_string(),
5163 dir.path().to_path_buf(),
5164 HashMap::new(),
5165 Some(Duration::from_secs(30)),
5166 dir.path().to_path_buf(),
5167 10,
5168 true,
5169 false,
5170 Some(dir.path().to_path_buf()),
5171 )
5172 .unwrap();
5173
5174 registry.inner.shutdown.store(true, Ordering::SeqCst);
5178 std::thread::sleep(Duration::from_millis(550));
5179
5180 let task = registry.task_for_session(&task_id, "session").unwrap();
5181
5182 let started = Instant::now();
5186 while !task.paths.exit.exists() {
5187 assert!(
5188 started.elapsed() < Duration::from_secs(5),
5189 "exit marker should land quickly for `true`"
5190 );
5191 std::thread::sleep(Duration::from_millis(20));
5192 }
5193
5194 let zombie_pid;
5200 {
5201 let mut state = task.state.lock().unwrap();
5202 state.metadata.status = BgTaskStatus::Running;
5203 state.metadata.status_reason = None;
5204 state.metadata.exit_code = None;
5205 state.metadata.finished_at = None;
5206 state.metadata.duration_ms = None;
5207 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5208 .expect("persist reset Running metadata");
5209 let zombie = spawn_unreaped_zombie();
5210 zombie_pid = zombie.id();
5211 state.runtime = TaskRuntime::Piped(Some(zombie));
5212 }
5213 *task.terminal_at.lock().unwrap() = None;
5214
5215 assert!(
5217 is_zombie(zombie_pid),
5218 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5219 );
5220
5221 registry.poll_task(&task).unwrap();
5224
5225 {
5226 let state = task.state.lock().unwrap();
5227 assert!(
5228 matches!(state.runtime, TaskRuntime::Piped(None)),
5229 "child handle must be released after marker finalize"
5230 );
5231 assert!(
5232 state.metadata.status.is_terminal(),
5233 "task must be terminal after marker finalize: {:?}",
5234 state.metadata.status
5235 );
5236 }
5237
5238 assert!(
5241 !is_zombie(zombie_pid),
5242 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5243 after the exit-marker terminal transition"
5244 );
5245 }
5246
5247 #[cfg(unix)]
5251 #[test]
5252 fn kill_with_existing_marker_reaps_child_no_zombie() {
5253 use std::sync::atomic::Ordering;
5254
5255 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5256 let dir = tempfile::tempdir().unwrap();
5257 let task_id = registry
5258 .spawn(
5259 QUICK_SUCCESS_COMMAND,
5260 "session".to_string(),
5261 dir.path().to_path_buf(),
5262 HashMap::new(),
5263 Some(Duration::from_secs(30)),
5264 dir.path().to_path_buf(),
5265 10,
5266 true,
5267 false,
5268 Some(dir.path().to_path_buf()),
5269 )
5270 .unwrap();
5271
5272 registry.inner.shutdown.store(true, Ordering::SeqCst);
5273 std::thread::sleep(Duration::from_millis(550));
5274
5275 let task = registry.task_for_session(&task_id, "session").unwrap();
5276
5277 let started = Instant::now();
5278 while !task.paths.exit.exists() {
5279 assert!(
5280 started.elapsed() < Duration::from_secs(5),
5281 "exit marker should land quickly for `true`"
5282 );
5283 std::thread::sleep(Duration::from_millis(20));
5284 }
5285
5286 let zombie_pid;
5287 {
5288 let mut state = task.state.lock().unwrap();
5289 state.metadata.status = BgTaskStatus::Running;
5290 state.metadata.status_reason = None;
5291 state.metadata.exit_code = None;
5292 state.metadata.finished_at = None;
5293 state.metadata.duration_ms = None;
5294 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5295 .expect("persist reset Running metadata");
5296 let zombie = spawn_unreaped_zombie();
5297 zombie_pid = zombie.id();
5298 state.runtime = TaskRuntime::Piped(Some(zombie));
5299 }
5300 *task.terminal_at.lock().unwrap() = None;
5301
5302 assert!(
5303 is_zombie(zombie_pid),
5304 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5305 );
5306
5307 registry
5309 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5310 .expect("kill should succeed");
5311
5312 {
5313 let state = task.state.lock().unwrap();
5314 assert!(
5315 matches!(state.runtime, TaskRuntime::Piped(None)),
5316 "child handle must be released after marker-aware kill"
5317 );
5318 assert!(state.metadata.status.is_terminal());
5319 }
5320
5321 assert!(
5322 !is_zombie(zombie_pid),
5323 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5324 after a marker-aware kill"
5325 );
5326 }
5327
5328 #[test]
5329 fn cleanup_finished_keeps_running_tasks() {
5330 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5331 let dir = tempfile::tempdir().unwrap();
5332 let task_id = registry
5333 .spawn(
5334 LONG_RUNNING_COMMAND,
5335 "session".to_string(),
5336 dir.path().to_path_buf(),
5337 HashMap::new(),
5338 Some(Duration::from_secs(30)),
5339 dir.path().to_path_buf(),
5340 10,
5341 true,
5342 false,
5343 Some(dir.path().to_path_buf()),
5344 )
5345 .unwrap();
5346
5347 registry.cleanup_finished(Duration::ZERO);
5348
5349 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5350 let _ = registry.kill(&task_id, "session");
5351 }
5352
5353 #[cfg(windows)]
5354 fn wait_for_file(path: &Path) -> String {
5355 let started = Instant::now();
5356 loop {
5357 if path.exists() {
5358 return fs::read_to_string(path).expect("read file");
5359 }
5360 assert!(
5361 started.elapsed() < Duration::from_secs(30),
5362 "timed out waiting for {}",
5363 path.display()
5364 );
5365 std::thread::sleep(Duration::from_millis(100));
5366 }
5367 }
5368
5369 #[cfg(windows)]
5370 fn spawn_windows_registry_command(
5371 command: &str,
5372 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5373 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5374 let dir = tempfile::tempdir().unwrap();
5375 let task_id = registry
5376 .spawn(
5377 command,
5378 "session".to_string(),
5379 dir.path().to_path_buf(),
5380 HashMap::new(),
5381 Some(Duration::from_secs(30)),
5382 dir.path().to_path_buf(),
5383 10,
5384 false,
5385 false,
5386 Some(dir.path().to_path_buf()),
5387 )
5388 .unwrap();
5389 (registry, dir, task_id)
5390 }
5391
5392 #[cfg(windows)]
5393 #[test]
5394 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5395 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5396 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5397
5398 let content = wait_for_file(&exit_path);
5399
5400 assert_eq!(content.trim(), "0");
5401 }
5402
5403 #[cfg(windows)]
5404 #[test]
5405 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5406 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5407 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5408
5409 let content = wait_for_file(&exit_path);
5410
5411 assert_eq!(content.trim(), "42");
5412 }
5413
5414 #[cfg(windows)]
5415 #[test]
5416 fn windows_spawn_captures_stdout_to_disk() {
5417 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5418 let task = registry.task_for_session(&task_id, "session").unwrap();
5419 let stdout_path = task.paths.stdout.clone();
5420 let exit_path = task.paths.exit.clone();
5421
5422 let _ = wait_for_file(&exit_path);
5423 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5424
5425 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5426 }
5427
5428 #[cfg(windows)]
5429 #[test]
5430 fn windows_spawn_uses_pwsh_when_available() {
5431 let candidates = crate::windows_shell::shell_candidates_with(
5435 |binary| match binary {
5436 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5437 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5438 _ => None,
5439 },
5440 || None,
5441 );
5442 let shell = candidates.first().expect("at least one candidate").clone();
5443 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5444 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5445 }
5446
5447 #[cfg(windows)]
5454 #[test]
5455 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5456 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5457 let script =
5458 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5459
5460 assert!(
5464 script.contains("set CODE=%ERRORLEVEL%"),
5465 "wrapper must capture exit code into CODE: {script}"
5466 );
5467 assert!(
5468 script.contains("echo %CODE% >"),
5469 "wrapper must echo CODE to a temp marker file: {script}"
5470 );
5471 assert!(
5472 script.contains("move /Y"),
5473 "wrapper must use atomic move to write the marker: {script}"
5474 );
5475 assert!(
5478 script.contains("> nul"),
5479 "wrapper must redirect move output to nul: {script}"
5480 );
5481 assert!(
5483 script.contains("exit /B %CODE%"),
5484 "wrapper must propagate the captured exit code: {script}"
5485 );
5486 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5487 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5488 }
5489
5490 #[cfg(windows)]
5496 #[test]
5497 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5498 use crate::windows_shell::WindowsShell;
5499 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5500 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5501 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5502 assert_eq!(
5503 args_strs,
5504 vec!["/D", "/S", "/C", "echo wrapped"],
5505 "Cmd::bg_command must prepend /D /S /C"
5506 );
5507 }
5508
5509 #[cfg(windows)]
5512 #[test]
5513 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5514 use crate::windows_shell::WindowsShell;
5515 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5516 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5517 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5518 assert!(
5519 args_strs.contains(&"-Command"),
5520 "Pwsh::bg_command must use -Command: {args_strs:?}"
5521 );
5522 assert!(
5523 args_strs.contains(&"Get-Date"),
5524 "Pwsh::bg_command must include the user command body"
5525 );
5526 }
5527}