1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, json_output_pointer, quote_path, COMPLETION_OUTPUT_PREVIEW_BYTES,
30 COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31 FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32 RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37 ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
56const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
57const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
58const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
59
60const BG_COMPLETION_PREVIEW_BYTES: usize = COMPLETION_OUTPUT_PREVIEW_BYTES;
64const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
65
66#[derive(Debug, Clone, Serialize)]
67pub struct BgCompletion {
68 pub task_id: String,
69 #[serde(skip_serializing)]
72 pub session_id: String,
73 pub status: BgTaskStatus,
74 pub exit_code: Option<i32>,
75 pub command: String,
76 #[serde(default, skip_serializing_if = "String::is_empty")]
82 pub output_preview: String,
83 #[serde(default, skip_serializing_if = "is_false")]
88 pub output_truncated: bool,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub original_tokens: Option<u32>,
93 #[serde(default, skip_serializing_if = "Option::is_none")]
96 pub compressed_tokens: Option<u32>,
97 #[serde(default, skip_serializing_if = "is_false")]
99 pub tokens_skipped: bool,
100}
101
102fn is_false(v: &bool) -> bool {
103 !*v
104}
105
106#[derive(Debug, Clone, Serialize)]
107pub struct BgTaskSnapshot {
108 #[serde(flatten)]
109 pub info: BgTaskInfo,
110 pub exit_code: Option<i32>,
111 pub child_pid: Option<u32>,
112 pub workdir: String,
113 pub output_preview: String,
114 pub output_truncated: bool,
115 pub output_path: Option<String>,
116 pub stderr_path: Option<String>,
117 #[serde(skip_serializing_if = "Option::is_none")]
118 pub pty_rows: Option<u16>,
119 #[serde(skip_serializing_if = "Option::is_none")]
120 pub pty_cols: Option<u16>,
121}
122
123#[derive(Debug, Clone, Copy, PartialEq, Eq)]
124enum TerminalOutputKind {
125 Compressed,
126 Raw,
127 Structured,
128}
129
130#[derive(Debug, Clone, PartialEq, Eq)]
131struct TerminalOutputCache {
132 output_preview: String,
133 output_truncated: bool,
134 kind: TerminalOutputKind,
135 output_path: Option<String>,
136 stderr_path: Option<String>,
137 recovery: Option<RecoveryContext>,
138}
139
140#[derive(Debug, Clone, PartialEq, Eq)]
141struct RecoveryContext {
142 dropped_by_class: BTreeMap<DropClass, usize>,
143 had_inner_drop: bool,
144 offset_hint_eligible: bool,
145 offset_start_line: Option<usize>,
146 byte_truncated: bool,
147 output_path: Option<String>,
148 stderr_path: Option<String>,
149 include_stderr_path: bool,
150}
151
152impl RecoveryContext {
153 fn has_visible_drop(&self) -> bool {
154 self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
155 }
156}
157
158#[derive(Clone)]
159pub struct BgTaskRegistry {
160 pub(crate) inner: Arc<RegistryInner>,
161}
162
163pub(crate) struct RegistryInner {
164 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
165 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
166 pub(crate) progress_sender: SharedProgressSender,
167 watchdog_started: AtomicBool,
168 pub(crate) shutdown: AtomicBool,
169 pub(crate) long_running_reminder_enabled: AtomicBool,
170 pub(crate) long_running_reminder_interval_ms: AtomicU64,
171 persisted_gc_started: AtomicBool,
172 #[cfg(test)]
173 persisted_gc_runs: AtomicU64,
174 pub(crate) compressor:
180 Mutex<Option<Box<dyn Fn(&str, String) -> CompressionResult + Send + Sync>>>,
181 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
182 pub(crate) db_harness: RwLock<Option<String>>,
183 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
184 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
185 pub(crate) watch_registry: Mutex<WatchRegistry>,
186}
187
188pub(crate) struct BgTask {
189 pub(crate) task_id: String,
190 pub(crate) session_id: String,
191 pub(crate) paths: TaskPaths,
192 pub(crate) started: Instant,
193 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
194 pub(crate) terminal_at: Mutex<Option<Instant>>,
195 pub(crate) state: Mutex<BgTaskState>,
196}
197
198pub(crate) enum TaskRuntime {
199 Piped(Option<Child>),
200 Pty(Option<PtyRuntime>),
201}
202
203pub(crate) struct BgTaskState {
204 pub(crate) metadata: PersistedTask,
205 pub(crate) runtime: TaskRuntime,
206 pub(crate) detached: bool,
207 pub(crate) child_exit_observed: bool,
218 pub(crate) buffer: BgBuffer,
219 terminal_output_cache: Option<TerminalOutputCache>,
220 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
222}
223
224impl BgTaskRegistry {
225 pub fn new(progress_sender: SharedProgressSender) -> Self {
226 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
227 Self {
228 inner: Arc::new(RegistryInner {
229 tasks: Mutex::new(HashMap::new()),
230 completions: Mutex::new(VecDeque::new()),
231 progress_sender,
232 watchdog_started: AtomicBool::new(false),
233 shutdown: AtomicBool::new(false),
234 long_running_reminder_enabled: AtomicBool::new(true),
235 long_running_reminder_interval_ms: AtomicU64::new(600_000),
236 persisted_gc_started: AtomicBool::new(false),
237 #[cfg(test)]
238 persisted_gc_runs: AtomicU64::new(0),
239 compressor: Mutex::new(None),
240 db_pool: RwLock::new(None),
241 db_harness: RwLock::new(None),
242 wake_tx,
243 wake_rx,
244 watch_registry: Mutex::new(WatchRegistry::default()),
245 }),
246 }
247 }
248
249 pub fn set_harness(&self, harness: Harness) {
250 if let Ok(mut slot) = self.inner.db_harness.write() {
251 *slot = Some(harness.as_str().to_string());
252 }
253 }
254
255 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
256 if let Ok(mut slot) = self.inner.db_pool.write() {
257 *slot = Some(conn);
258 }
259 }
260
261 pub fn clear_db_pool(&self) {
262 if let Ok(mut slot) = self.inner.db_pool.write() {
263 *slot = None;
264 }
265 }
266
267 pub fn set_compressor<F>(&self, compressor: F)
272 where
273 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
274 {
275 if let Ok(mut slot) = self.inner.compressor.lock() {
276 *slot = Some(Box::new(compressor));
277 }
278 }
279
280 pub(crate) fn compress_output(&self, command: &str, output: String) -> CompressionResult {
283 let Ok(slot) = self.inner.compressor.lock() else {
284 return CompressionResult::new(output);
285 };
286 match slot.as_ref() {
287 Some(compressor) => compressor(command, output),
288 None => CompressionResult::new(output),
289 }
290 }
291
292 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
293 let (metadata, buffer) = {
294 let state = task.state.lock().ok()?;
295 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
296 return None;
297 }
298 if let Some(cache) = state.terminal_output_cache.clone() {
299 return Some(cache);
300 }
301 (state.metadata.clone(), state.buffer.clone())
302 };
303
304 let mut cap_buffer = buffer.clone();
305 cap_buffer.enforce_terminal_cap();
306 let cache = self.render_terminal_output(&metadata, &buffer);
307 let mut state = task.state.lock().ok()?;
308 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
309 return None;
310 }
311 if let Some(existing) = state.terminal_output_cache.clone() {
312 return Some(existing);
313 }
314 state.terminal_output_cache = Some(cache.clone());
315 Some(cache)
316 }
317
318 fn render_terminal_output(
319 &self,
320 metadata: &PersistedTask,
321 buffer: &BgBuffer,
322 ) -> TerminalOutputCache {
323 if metadata.mode == BgMode::Pty {
324 return TerminalOutputCache {
325 output_preview: String::new(),
326 output_truncated: false,
327 kind: TerminalOutputKind::Raw,
328 output_path: buffer.output_path().map(|path| path.display().to_string()),
329 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
330 recovery: None,
331 };
332 }
333
334 if let Some(structured) = render_structured_output(&metadata.command, buffer) {
335 return structured;
336 }
337
338 if !metadata.compressed {
339 return render_raw_passthrough(buffer);
340 }
341
342 let raw = buffer.read_combined_head_tail(
343 COMPRESS_INPUT_CAP_BYTES,
344 COMPRESS_INPUT_HEAD_BYTES,
345 COMPRESS_INPUT_TAIL_BYTES,
346 );
347 let compressed = self.compress_output(&metadata.command, raw.text);
348 render_compressed_with_recovery(buffer, compressed, raw.truncated)
349 }
350
351 fn snapshot_with_terminal_cache(
352 &self,
353 task: &Arc<BgTask>,
354 preview_bytes: usize,
355 ) -> BgTaskSnapshot {
356 let mut snapshot = task.snapshot(preview_bytes);
357 self.maybe_compress_snapshot(task, &mut snapshot);
358 snapshot
359 }
360
361 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
362 let (metadata, buffer) = {
363 let state = task
364 .state
365 .lock()
366 .map_err(|_| "background task lock poisoned".to_string())?;
367 if !state.metadata.status.is_terminal() {
368 return Ok(());
369 }
370 (state.metadata.clone(), state.buffer.clone())
371 };
372
373 let mut cap_buffer = buffer.clone();
374 cap_buffer.enforce_terminal_cap();
375 let cache = self.ensure_terminal_output_cache(task);
376 self.enqueue_completion_from_parts(
377 &metadata,
378 Some(&buffer),
379 None,
380 emit_frame,
381 cache.as_ref(),
382 );
383 Ok(())
384 }
385
386 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
387 write_task(&paths.json, metadata)?;
388 self.dual_write_task(paths, metadata);
389 Ok(())
390 }
391
392 fn update_task_metadata<F>(
393 &self,
394 paths: &TaskPaths,
395 update: F,
396 ) -> std::io::Result<PersistedTask>
397 where
398 F: FnOnce(&mut PersistedTask),
399 {
400 let metadata = update_task(&paths.json, update)?;
401 self.dual_write_task(paths, &metadata);
402 Ok(metadata)
403 }
404
405 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
406 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
407 let Some(pool) = pool else {
408 return;
409 };
410 let harness = self
411 .inner
412 .db_harness
413 .read()
414 .ok()
415 .and_then(|slot| slot.clone());
416 let Some(harness) = harness else {
417 crate::slog_warn!(
418 "dual-write bash_task to DB skipped for {}: harness not configured",
419 metadata.task_id
420 );
421 return;
422 };
423 let row = match metadata.to_bash_task_row(&harness, paths) {
424 Ok(row) => row,
425 Err(error) => {
426 crate::slog_warn!(
427 "dual-write bash_task to DB failed for {}: {}",
428 metadata.task_id,
429 error
430 );
431 return;
432 }
433 };
434 let conn = match pool.lock() {
435 Ok(conn) => conn,
436 Err(_) => {
437 crate::slog_warn!(
438 "dual-write bash_task to DB failed for {}: db mutex poisoned",
439 metadata.task_id
440 );
441 return;
442 }
443 };
444 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
445 crate::slog_warn!(
446 "dual-write bash_task to DB failed for {}: {}",
447 metadata.task_id,
448 error
449 );
450 }
451 }
452
453 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
454 self.inner
455 .long_running_reminder_enabled
456 .store(enabled, Ordering::SeqCst);
457 self.inner
458 .long_running_reminder_interval_ms
459 .store(interval_ms, Ordering::SeqCst);
460 }
461
462 #[cfg(unix)]
463 #[allow(clippy::too_many_arguments)]
464 pub fn spawn(
465 &self,
466 command: &str,
467 session_id: String,
468 workdir: PathBuf,
469 env: HashMap<String, String>,
470 timeout: Option<Duration>,
471 storage_dir: PathBuf,
472 max_running: usize,
473 notify_on_completion: bool,
474 compressed: bool,
475 project_root: Option<PathBuf>,
476 ) -> Result<String, String> {
477 self.start_watchdog();
478
479 let running = self.running_count();
480 if running >= max_running {
481 return Err(format!(
482 "background bash task limit exceeded: {running} running (max {max_running})"
483 ));
484 }
485
486 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
487 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
488 let task_id = self.generate_unique_task_id()?;
489 let paths = task_paths(&storage_dir, &session_id, &task_id);
490 fs::create_dir_all(&paths.dir)
491 .map_err(|e| format!("failed to create background task dir: {e}"))?;
492
493 let mut metadata = PersistedTask::starting(
494 task_id.clone(),
495 session_id.clone(),
496 command.to_string(),
497 workdir.clone(),
498 project_root,
499 timeout_ms,
500 notify_on_completion,
501 compressed,
502 );
503 self.persist_task(&paths, &metadata)
504 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
505
506 create_capture_file(&paths.stdout)
510 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
511 create_capture_file(&paths.stderr)
512 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
513
514 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
515 Ok(child) => child,
516 Err(error) => {
517 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
518 let _ = delete_task_bundle(&paths);
519 return Err(error);
520 }
521 };
522
523 let child_pid = child.id();
524 metadata.mark_running(child_pid, child_pid as i32);
525 self.persist_task(&paths, &metadata)
526 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
527
528 let task = Arc::new(BgTask {
529 task_id: task_id.clone(),
530 session_id,
531 paths: paths.clone(),
532 started: Instant::now(),
533 last_reminder_at: Mutex::new(None),
534 terminal_at: Mutex::new(None),
535 state: Mutex::new(BgTaskState {
536 metadata,
537 runtime: TaskRuntime::Piped(Some(child)),
538 detached: false,
539 child_exit_observed: false,
540 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
541 terminal_output_cache: None,
542 pending_terminal_override: None,
543 }),
544 });
545
546 self.inner
547 .tasks
548 .lock()
549 .map_err(|_| "background task registry lock poisoned".to_string())?
550 .insert(task_id.clone(), task);
551
552 Ok(task_id)
553 }
554
555 #[allow(clippy::too_many_arguments)]
556 pub fn spawn_pty(
557 &self,
558 command: &str,
559 session_id: String,
560 workdir: PathBuf,
561 env: HashMap<String, String>,
562 timeout: Option<Duration>,
563 storage_dir: PathBuf,
564 max_running: usize,
565 notify_on_completion: bool,
566 compressed: bool,
567 project_root: Option<PathBuf>,
568 rows: u16,
569 cols: u16,
570 ) -> Result<String, String> {
571 self.start_watchdog();
572
573 let running = self.running_count();
574 if running >= max_running {
575 return Err(format!(
576 "background bash task limit exceeded: {running} running (max {max_running})"
577 ));
578 }
579
580 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
581 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
582 let task_id = self.generate_unique_task_id()?;
583 let paths = task_paths(&storage_dir, &session_id, &task_id);
584 fs::create_dir_all(&paths.dir)
585 .map_err(|e| format!("failed to create background task dir: {e}"))?;
586
587 let mut metadata = PersistedTask::starting(
588 task_id.clone(),
589 session_id.clone(),
590 command.to_string(),
591 workdir.clone(),
592 project_root,
593 timeout_ms,
594 notify_on_completion,
595 compressed,
596 );
597 metadata.mode = BgMode::Pty;
598 metadata.pty_rows = Some(rows);
599 metadata.pty_cols = Some(cols);
600 self.persist_task(&paths, &metadata)
601 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
602 create_capture_file(&paths.pty)
603 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
604
605 let runtime = match spawn_pty_for_command(
606 &task_id,
607 &session_id,
608 command,
609 &paths,
610 &workdir,
611 &env,
612 rows,
613 cols,
614 self.inner.wake_tx.clone(),
615 ) {
616 Ok(runtime) => runtime,
617 Err(error) => {
618 crate::slog_warn!(
619 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
620 );
621 let _ = delete_task_bundle(&paths);
622 return Err(error);
623 }
624 };
625
626 if let Some(child_pid) = runtime.child_pid {
627 metadata.mark_running(child_pid, child_pid as i32);
628 } else {
629 metadata.status = BgTaskStatus::Running;
630 metadata.pgid = None;
631 }
632 self.persist_task(&paths, &metadata)
633 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
634
635 let task = Arc::new(BgTask {
636 task_id: task_id.clone(),
637 session_id,
638 paths: paths.clone(),
639 started: Instant::now(),
640 last_reminder_at: Mutex::new(None),
641 terminal_at: Mutex::new(None),
642 state: Mutex::new(BgTaskState {
643 metadata,
644 runtime: TaskRuntime::Pty(Some(runtime)),
645 detached: false,
646 child_exit_observed: false,
647 buffer: BgBuffer::pty(paths.pty.clone()),
648 terminal_output_cache: None,
649 pending_terminal_override: None,
650 }),
651 });
652
653 self.inner
654 .tasks
655 .lock()
656 .map_err(|_| "background task registry lock poisoned".to_string())?
657 .insert(task_id.clone(), task);
658
659 Ok(task_id)
660 }
661
662 #[cfg(windows)]
663 #[allow(clippy::too_many_arguments)]
664 pub fn spawn(
665 &self,
666 command: &str,
667 session_id: String,
668 workdir: PathBuf,
669 env: HashMap<String, String>,
670 timeout: Option<Duration>,
671 storage_dir: PathBuf,
672 max_running: usize,
673 notify_on_completion: bool,
674 compressed: bool,
675 project_root: Option<PathBuf>,
676 ) -> Result<String, String> {
677 self.start_watchdog();
678
679 let running = self.running_count();
680 if running >= max_running {
681 return Err(format!(
682 "background bash task limit exceeded: {running} running (max {max_running})"
683 ));
684 }
685
686 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
687 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
688 let task_id = self.generate_unique_task_id()?;
689 let paths = task_paths(&storage_dir, &session_id, &task_id);
690 fs::create_dir_all(&paths.dir)
691 .map_err(|e| format!("failed to create background task dir: {e}"))?;
692
693 let mut metadata = PersistedTask::starting(
694 task_id.clone(),
695 session_id.clone(),
696 command.to_string(),
697 workdir.clone(),
698 project_root,
699 timeout_ms,
700 notify_on_completion,
701 compressed,
702 );
703 self.persist_task(&paths, &metadata)
704 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
705
706 create_capture_file(&paths.stdout)
712 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
713 create_capture_file(&paths.stderr)
714 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
715
716 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
717 Ok(child) => child,
718 Err(error) => {
719 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
720 let _ = delete_task_bundle(&paths);
721 return Err(error);
722 }
723 };
724
725 let child_pid = child.id();
726 metadata.status = BgTaskStatus::Running;
727 metadata.child_pid = Some(child_pid);
728 metadata.pgid = None;
729 self.persist_task(&paths, &metadata)
730 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
731
732 let task = Arc::new(BgTask {
733 task_id: task_id.clone(),
734 session_id,
735 paths: paths.clone(),
736 started: Instant::now(),
737 last_reminder_at: Mutex::new(None),
738 terminal_at: Mutex::new(None),
739 state: Mutex::new(BgTaskState {
740 metadata,
741 runtime: TaskRuntime::Piped(Some(child)),
742 detached: false,
743 child_exit_observed: false,
744 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
745 terminal_output_cache: None,
746 pending_terminal_override: None,
747 }),
748 });
749
750 self.inner
751 .tasks
752 .lock()
753 .map_err(|_| "background task registry lock poisoned".to_string())?
754 .insert(task_id.clone(), task);
755
756 Ok(task_id)
757 }
758
759 pub fn write_pty(
760 &self,
761 task_id: &str,
762 session_id: &str,
763 input: &[u8],
764 ) -> Result<usize, String> {
765 let task = self
766 .task_for_session(task_id, session_id)
767 .ok_or_else(|| "task_not_found".to_string())?;
768
769 let writer = {
770 let state = task
771 .state
772 .lock()
773 .map_err(|_| "background task lock poisoned".to_string())?;
774 if state.metadata.mode != BgMode::Pty {
775 return Err("task_not_pty".to_string());
776 }
777 if state.metadata.status.is_terminal() {
778 return Err("task_exited".to_string());
779 }
780 match &state.runtime {
781 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
782 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
783 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
784 }
785 };
786
787 let mut writer = writer
788 .lock()
789 .map_err(|_| "PTY writer lock poisoned".to_string())?;
790 writer
791 .write_all(input)
792 .map_err(|error| format!("failed to write to PTY: {error}"))?;
793 writer
794 .flush()
795 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
796 Ok(input.len())
797 }
798
799 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
800 self.replay_session_inner(storage_dir, session_id, None)
801 }
802
803 pub fn replay_session_for_project(
804 &self,
805 storage_dir: &Path,
806 session_id: &str,
807 project_root: &Path,
808 ) -> Result<(), String> {
809 self.replay_session_inner(storage_dir, session_id, Some(project_root))
810 }
811
812 fn replay_session_inner(
813 &self,
814 storage_dir: &Path,
815 session_id: &str,
816 project_root: Option<&Path>,
817 ) -> Result<(), String> {
818 self.start_watchdog();
819 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
820 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
821 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
822 }
823 }
824
825 let canonical_project = project_root.map(canonicalized_path);
826 let tasks = match self.replay_session_from_db(session_id) {
838 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
839 Some(Ok(_)) => {
840 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
841 if !disk_tasks.is_empty() {
842 crate::slog_info!(
843 "bash task replay: 0 in DB for session {}, {} from disk fallback",
844 session_id,
845 disk_tasks.len()
846 );
847 }
848 disk_tasks
849 }
850 Some(Err(error)) => {
851 crate::slog_warn!(
852 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
853 session_id,
854 error
855 );
856 self.replay_session_from_disk(storage_dir, session_id)?
857 }
858 None => {
859 self.replay_session_from_disk(storage_dir, session_id)?
861 }
862 };
863
864 for mut metadata in tasks {
865 if metadata.session_id != session_id {
866 continue;
867 }
868 if let Some(canonical_project) = canonical_project.as_deref() {
869 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
870 if metadata_project.as_deref() != Some(canonical_project) {
871 continue;
872 }
873 }
874
875 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
876 match metadata.status {
877 BgTaskStatus::Starting => {
878 let completion_was_delivered = metadata.completion_delivered;
879 metadata.mark_terminal(
880 BgTaskStatus::Failed,
881 None,
882 Some("spawn aborted".to_string()),
883 );
884 metadata.completion_delivered |= completion_was_delivered;
885 let _ = self.persist_task(&paths, &metadata);
886 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
887 self.insert_rehydrated_task(metadata, paths, true)?;
888 }
889 BgTaskStatus::Running | BgTaskStatus::Killing => {
890 if metadata.mode == BgMode::Pty {
891 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
892 let completion_was_delivered = metadata.completion_delivered;
893 metadata = terminal_metadata_from_marker(metadata, marker, None);
894 metadata.completion_delivered |= completion_was_delivered;
895 let _ = self.persist_task(&paths, &metadata);
896 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
897 self.insert_rehydrated_task(metadata, paths, true)?;
898 } else if metadata.status.is_terminal() {
899 self.insert_rehydrated_task(metadata, paths, true)?;
900 } else {
901 let completion_was_delivered = metadata.completion_delivered;
902 metadata.mark_terminal(
903 BgTaskStatus::Killed,
904 None,
905 Some("pty_lost_on_bridge_restart".to_string()),
906 );
907 metadata.completion_delivered |= completion_was_delivered;
908 let _ = self.persist_task(&paths, &metadata);
909 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
910 self.insert_rehydrated_task(metadata, paths, true)?;
911 }
912 } else if self.running_metadata_is_stale(&metadata) {
913 let completion_was_delivered = metadata.completion_delivered;
914 metadata.mark_terminal(
915 BgTaskStatus::Killed,
916 None,
917 Some("orphaned (>24h)".to_string()),
918 );
919 metadata.completion_delivered |= completion_was_delivered;
920 if !paths.exit.exists() {
921 let _ = write_kill_marker_if_absent(&paths.exit);
922 }
923 let _ = self.persist_task(&paths, &metadata);
924 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
925 self.insert_rehydrated_task(metadata, paths, true)?;
926 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
927 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
928 "recovered from inconsistent killing state on replay".to_string()
929 });
930 if reason.is_some() {
931 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
932 metadata.task_id);
933 }
934 let completion_was_delivered = metadata.completion_delivered;
935 metadata = terminal_metadata_from_marker(metadata, marker, reason);
936 metadata.completion_delivered |= completion_was_delivered;
937 let _ = self.persist_task(&paths, &metadata);
938 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
939 self.insert_rehydrated_task(metadata, paths, true)?;
940 } else if metadata.status == BgTaskStatus::Killing {
941 if !paths.exit.exists() {
942 let _ = write_kill_marker_if_absent(&paths.exit);
943 }
944 let completion_was_delivered = metadata.completion_delivered;
945 metadata.mark_terminal(
946 BgTaskStatus::Killed,
947 None,
948 Some("recovered from inconsistent killing state on replay".to_string()),
949 );
950 metadata.completion_delivered |= completion_was_delivered;
951 let _ = self.persist_task(&paths, &metadata);
952 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
953 self.insert_rehydrated_task(metadata, paths, true)?;
954 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
955 let completion_was_delivered = metadata.completion_delivered;
956 metadata.mark_terminal(
957 BgTaskStatus::Failed,
958 None,
959 Some("process exited without exit marker".to_string()),
960 );
961 metadata.completion_delivered |= completion_was_delivered;
962 let _ = self.persist_task(&paths, &metadata);
963 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
964 self.insert_rehydrated_task(metadata, paths, true)?;
965 } else {
966 self.insert_rehydrated_task(metadata, paths, true)?;
967 }
968 }
969 _ if metadata.status.is_terminal() => {
970 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
976 self.insert_rehydrated_task(metadata, paths, true)?;
977 }
978 _ => {}
979 }
980 }
981
982 Ok(())
983 }
984
985 fn replay_session_from_db(
986 &self,
987 session_id: &str,
988 ) -> Option<Result<Vec<PersistedTask>, String>> {
989 let pool = self
990 .inner
991 .db_pool
992 .read()
993 .ok()
994 .and_then(|slot| slot.clone())?;
995 let harness = self
996 .inner
997 .db_harness
998 .read()
999 .ok()
1000 .and_then(|slot| slot.clone())?;
1001 let conn = match pool.lock() {
1002 Ok(conn) => conn,
1003 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1004 };
1005 Some(
1006 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1007 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1008 .map_err(|error| error.to_string()),
1009 )
1010 }
1011
1012 fn replay_session_from_disk(
1013 &self,
1014 storage_dir: &Path,
1015 session_id: &str,
1016 ) -> Result<Vec<PersistedTask>, String> {
1017 let dir = session_tasks_dir(storage_dir, session_id);
1018 if !dir.exists() {
1019 return Ok(Vec::new());
1020 }
1021
1022 let entries = fs::read_dir(&dir)
1023 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1024 let mut tasks = Vec::new();
1025 for entry in entries.flatten() {
1026 let path = entry.path();
1027 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1028 continue;
1029 }
1030 match read_task(&path) {
1031 Ok(metadata) => tasks.push(metadata),
1032 Err(error) => {
1033 crate::slog_warn!(
1034 "quarantining invalid background task metadata {} during replay: {error}",
1035 path.display()
1036 );
1037 if let Err(quarantine_error) =
1038 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1039 {
1040 crate::slog_warn!(
1041 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1042 path.display()
1043 );
1044 }
1045 }
1046 }
1047 }
1048 Ok(tasks)
1049 }
1050
1051 pub fn register_watch(
1052 &self,
1053 task_id: String,
1054 pattern: WatchPattern,
1055 once: bool,
1056 ) -> Result<String, &'static str> {
1057 let task = self.task(&task_id).ok_or("task_not_found")?;
1058 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1059 .state
1060 .lock()
1061 .map(|state| {
1062 (
1063 state.metadata.mode.clone(),
1064 state.metadata.status.is_terminal(),
1065 task.paths.stdout.clone(),
1066 task.paths.stderr.clone(),
1067 task.paths.pty.clone(),
1068 )
1069 })
1070 .map_err(|_| "background_task_lock_poisoned")?;
1071
1072 let mut terminal_matches = Vec::new();
1073 let scanned_terminal = terminal_at_registration;
1074 let watch_id = {
1075 let mut registry = self
1076 .inner
1077 .watch_registry
1078 .lock()
1079 .map_err(|_| "watch_registry_poisoned")?;
1080 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1081 match &mode {
1082 BgMode::Pipes => {
1083 let stdout_key = format!("{task_id}:stdout");
1084 let stderr_key = format!("{task_id}:stderr");
1085 if terminal_at_registration {
1086 registry.set_file_cursor(&stdout_key, 0);
1087 registry.set_file_cursor(&stderr_key, 0);
1088 terminal_matches.extend(registry.scan_file_new_bytes(
1089 &stdout_key,
1090 &task_id,
1091 &stdout,
1092 ));
1093 terminal_matches.extend(registry.scan_file_new_bytes(
1094 &stderr_key,
1095 &task_id,
1096 &stderr,
1097 ));
1098 } else {
1099 registry.prime_file_cursor(&stdout_key, &stdout);
1100 registry.prime_file_cursor(&stderr_key, &stderr);
1101 }
1102 }
1103 BgMode::Pty => {
1104 let pty_key = format!("{task_id}:pty");
1105 if terminal_at_registration {
1106 registry.set_file_cursor(&pty_key, 0);
1107 terminal_matches
1108 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1109 } else {
1110 registry.prime_file_cursor(&pty_key, &pty);
1111 }
1112 }
1113 }
1114 watch_id
1115 };
1116
1117 if task.is_terminal() {
1118 if !scanned_terminal {
1119 terminal_matches = {
1120 let mut registry = self
1121 .inner
1122 .watch_registry
1123 .lock()
1124 .map_err(|_| "watch_registry_poisoned")?;
1125 match &mode {
1126 BgMode::Pipes => {
1127 let stdout_key = format!("{task_id}:stdout");
1128 let stderr_key = format!("{task_id}:stderr");
1129 registry.set_file_cursor(&stdout_key, 0);
1130 registry.set_file_cursor(&stderr_key, 0);
1131 let mut matches =
1132 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1133 matches.extend(registry.scan_file_new_bytes(
1134 &stderr_key,
1135 &task_id,
1136 &stderr,
1137 ));
1138 matches
1139 }
1140 BgMode::Pty => {
1141 let pty_key = format!("{task_id}:pty");
1142 registry.set_file_cursor(&pty_key, 0);
1143 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1144 }
1145 }
1146 };
1147 }
1148
1149 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1150 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1151 if watch_matched {
1152 let _ = task.set_completion_delivered(true, self);
1153 self.clear_task_watch_state(&task_id);
1154 }
1155 return Ok(watch_id);
1156 }
1157
1158 let completion = self
1159 .remove_pending_completion(&task_id)
1160 .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1161 if terminal_matches.is_empty() {
1162 if let Some(completion) = completion.as_ref() {
1163 self.emit_bash_watch_exit(completion);
1164 }
1165 } else {
1166 for pattern_match in terminal_matches {
1167 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1168 }
1169 }
1170 let _ = task.set_completion_delivered(true, self);
1171 self.clear_task_watch_state(&task_id);
1172 }
1173
1174 Ok(watch_id)
1175 }
1176
1177 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1178 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1179 registry.unregister(task_id, watch_id);
1180 }
1181 }
1182
1183 pub fn active_watch_count(&self, task_id: &str) -> usize {
1184 self.inner
1185 .watch_registry
1186 .lock()
1187 .map(|registry| registry.active_count(task_id))
1188 .unwrap_or(0)
1189 }
1190
1191 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1192 self.inner
1193 .watch_registry
1194 .lock()
1195 .map(|registry| {
1196 (
1197 registry.has_controlled_task(task_id),
1198 registry.has_matched_task(task_id),
1199 )
1200 })
1201 .unwrap_or((false, false))
1202 }
1203
1204 fn task_has_watch_control(&self, task_id: &str) -> bool {
1205 self.inner
1206 .watch_registry
1207 .lock()
1208 .map(|registry| registry.has_controlled_task(task_id))
1209 .unwrap_or(false)
1210 }
1211
1212 fn clear_task_watch_state(&self, task_id: &str) {
1213 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1214 registry.clear_task(task_id);
1215 }
1216 }
1217
1218 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1219 let (mode, stdout, stderr, pty) = match task.state.lock() {
1220 Ok(state) => (
1221 state.metadata.mode.clone(),
1222 task.paths.stdout.clone(),
1223 task.paths.stderr.clone(),
1224 task.paths.pty.clone(),
1225 ),
1226 Err(_) => return,
1227 };
1228 let mut matches = Vec::new();
1229 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1230 match mode {
1231 BgMode::Pipes => {
1232 let stdout_key = format!("{}:stdout", task.task_id);
1233 let stderr_key = format!("{}:stderr", task.task_id);
1234 matches.extend(registry.scan_file_new_bytes(
1235 &stdout_key,
1236 &task.task_id,
1237 &stdout,
1238 ));
1239 matches.extend(registry.scan_file_new_bytes(
1240 &stderr_key,
1241 &task.task_id,
1242 &stderr,
1243 ));
1244 }
1245 BgMode::Pty => {
1246 let pty_key = format!("{}:pty", task.task_id);
1247 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1248 }
1249 }
1250 }
1251 for pattern_match in matches {
1252 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1253 }
1254 }
1255
1256 pub fn status(
1257 &self,
1258 task_id: &str,
1259 session_id: &str,
1260 project_root: Option<&Path>,
1261 storage_dir: Option<&Path>,
1262 preview_bytes: usize,
1263 ) -> Option<BgTaskSnapshot> {
1264 let mut task = self.task_for_session(task_id, session_id);
1265 if task.is_none() {
1266 if let Some(storage_dir) = storage_dir {
1267 let _ = self.replay_session(storage_dir, session_id);
1268 task = self.task_for_session(task_id, session_id);
1269 }
1270 }
1271 let Some(task) = task else {
1272 return self.status_relaxed(
1273 task_id,
1274 session_id,
1275 project_root?,
1276 storage_dir?,
1277 preview_bytes,
1278 );
1279 };
1280 let _ = self.poll_task(&task);
1281 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1282 }
1283
1284 fn status_relaxed_task(
1285 &self,
1286 task_id: &str,
1287 project_root: &Path,
1288 storage_dir: &Path,
1289 ) -> Option<Arc<BgTask>> {
1290 let canonical_project = canonicalized_path(project_root);
1291 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1292 Some(Ok(Some(metadata))) => {
1293 if let Some(task) = self.task(task_id) {
1294 let matches_project = task
1295 .state
1296 .lock()
1297 .map(|state| {
1298 state
1299 .metadata
1300 .project_root
1301 .as_deref()
1302 .map(canonicalized_path)
1303 .as_deref()
1304 == Some(canonical_project.as_path())
1305 })
1306 .unwrap_or(false);
1307 return matches_project.then_some(task);
1308 }
1309 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1310 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1311 return None;
1312 }
1313 return self.task(task_id);
1314 }
1315 Some(Ok(None)) => {
1316 crate::slog_info!(
1317 "bash task relaxed DB miss for {}; falling back to disk",
1318 task_id
1319 );
1320 }
1321 Some(Err(error)) => {
1322 crate::slog_warn!(
1323 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1324 task_id,
1325 error
1326 );
1327 }
1328 None => {
1329 crate::slog_info!(
1330 "bash task relaxed DB unavailable for {}; falling back to disk",
1331 task_id
1332 );
1333 }
1334 }
1335 let root = storage_dir.join("bash-tasks");
1336 let entries = fs::read_dir(&root).ok()?;
1337 for entry in entries.flatten() {
1338 let dir = entry.path();
1339 if !dir.is_dir() {
1340 continue;
1341 }
1342 let path = dir.join(format!("{task_id}.json"));
1343 if !path.exists() {
1344 continue;
1345 }
1346 let metadata = match read_task(&path) {
1347 Ok(metadata) => metadata,
1348 Err(error) => {
1349 crate::slog_warn!(
1350 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1351 path.display()
1352 );
1353 if let Err(quarantine_error) =
1354 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1355 {
1356 crate::slog_warn!(
1357 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1358 path.display()
1359 );
1360 }
1361 continue;
1362 }
1363 };
1364 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1365 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1366 continue;
1367 }
1368 if let Some(task) = self.task(task_id) {
1369 let matches_project = task
1370 .state
1371 .lock()
1372 .map(|state| {
1373 state
1374 .metadata
1375 .project_root
1376 .as_deref()
1377 .map(canonicalized_path)
1378 .as_deref()
1379 == Some(canonical_project.as_path())
1380 })
1381 .unwrap_or(false);
1382 return matches_project.then_some(task);
1383 }
1384 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1385 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1386 return None;
1387 }
1388 return self.task(task_id);
1389 }
1390 None
1391 }
1392
1393 fn lookup_relaxed_task_from_db(
1394 &self,
1395 task_id: &str,
1396 project_root: &Path,
1397 ) -> Option<Result<Option<PersistedTask>, String>> {
1398 let pool = self
1399 .inner
1400 .db_pool
1401 .read()
1402 .ok()
1403 .and_then(|slot| slot.clone())?;
1404 let harness = self
1405 .inner
1406 .db_harness
1407 .read()
1408 .ok()
1409 .and_then(|slot| slot.clone())?;
1410 let conn = match pool.lock() {
1411 Ok(conn) => conn,
1412 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1413 };
1414 let project_key = crate::search_index::project_cache_key(project_root);
1415 Some(
1416 crate::db::bash_tasks::find_bash_task_for_project(
1417 &conn,
1418 &harness,
1419 &project_key,
1420 task_id,
1421 )
1422 .map(|row| row.map(PersistedTask::from))
1423 .map_err(|error| error.to_string()),
1424 )
1425 }
1426
1427 pub(super) fn status_relaxed(
1428 &self,
1429 task_id: &str,
1430 _session_id: &str,
1431 project_root: &Path,
1432 storage_dir: &Path,
1433 preview_bytes: usize,
1434 ) -> Option<BgTaskSnapshot> {
1435 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1436 let _ = self.poll_task(&task);
1437 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1438 }
1439
1440 pub fn kill_relaxed(
1441 &self,
1442 task_id: &str,
1443 project_root: &Path,
1444 storage_dir: &Path,
1445 ) -> Result<BgTaskSnapshot, String> {
1446 let task = self
1447 .status_relaxed_task(task_id, project_root, storage_dir)
1448 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1449 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1450 }
1451
1452 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1453 #[cfg(test)]
1454 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1455
1456 let mut deleted = 0usize;
1457
1458 let root = storage_dir.join("bash-tasks");
1459 if root.exists() {
1460 let session_dirs = fs::read_dir(&root).map_err(|e| {
1461 format!(
1462 "failed to read background task root {}: {e}",
1463 root.display()
1464 )
1465 })?;
1466 for session_entry in session_dirs.flatten() {
1467 let session_dir = session_entry.path();
1468 if !session_dir.is_dir() {
1469 continue;
1470 }
1471 let task_entries = match fs::read_dir(&session_dir) {
1472 Ok(entries) => entries,
1473 Err(error) => {
1474 crate::slog_warn!(
1475 "failed to read background task session dir {}: {error}",
1476 session_dir.display()
1477 );
1478 continue;
1479 }
1480 };
1481 for task_entry in task_entries.flatten() {
1482 let json_path = task_entry.path();
1483 if json_path
1484 .extension()
1485 .and_then(|extension| extension.to_str())
1486 != Some("json")
1487 {
1488 continue;
1489 }
1490 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1491 continue;
1492 }
1493 let metadata = match read_task(&json_path) {
1494 Ok(metadata) => metadata,
1495 Err(error) => {
1496 crate::slog_warn!(
1497 "quarantining corrupt background task metadata {}: {error}",
1498 json_path.display()
1499 );
1500 quarantine_task_json(
1501 storage_dir,
1502 &session_dir,
1503 &json_path,
1504 QuarantineKind::Corrupt,
1505 )?;
1506 continue;
1507 }
1508 };
1509 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1510 continue;
1511 }
1512 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1513 match delete_task_bundle(&paths) {
1514 Ok(()) => {
1515 deleted += 1;
1516 log::debug!(
1517 "deleted persisted background task bundle {}",
1518 metadata.task_id
1519 );
1520 }
1521 Err(error) => {
1522 crate::slog_warn!(
1523 "failed to delete background task bundle {}: {error}",
1524 metadata.task_id
1525 );
1526 continue;
1527 }
1528 }
1529 }
1530 }
1531 }
1532 gc_quarantine(storage_dir);
1533 Ok(deleted)
1534 }
1535
1536 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1537 let tasks = self
1538 .inner
1539 .tasks
1540 .lock()
1541 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1542 .unwrap_or_default();
1543 tasks
1544 .into_iter()
1545 .map(|task| {
1546 let _ = self.poll_task(&task);
1547 self.snapshot_with_terminal_cache(&task, preview_bytes)
1548 })
1549 .collect()
1550 }
1551
1552 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1558 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1559 return;
1560 }
1561 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1562 snapshot.output_preview = cache.output_preview;
1563 snapshot.output_truncated = cache.output_truncated;
1564 }
1565 }
1566
1567 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1568 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1569 }
1570
1571 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1572 let task = self
1573 .task_for_session(task_id, session_id)
1574 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1575 let terminal_after_promote = {
1576 let mut state = task
1577 .state
1578 .lock()
1579 .map_err(|_| "background task lock poisoned".to_string())?;
1580 let updated = self
1581 .update_task_metadata(&task.paths, |metadata| {
1582 metadata.notify_on_completion = true;
1583 metadata.completion_delivered = false;
1584 })
1585 .map_err(|e| format!("failed to promote background task: {e}"))?;
1586 state.metadata = updated;
1587 state.metadata.status.is_terminal()
1588 };
1589 if terminal_after_promote {
1590 self.post_terminal_transition(&task, true)?;
1591 }
1592 Ok(true)
1593 }
1594
1595 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1596 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1597 .map(|_| ())
1598 }
1599
1600 pub fn cleanup_finished(&self, older_than: Duration) {
1601 let cutoff = Instant::now().checked_sub(older_than);
1602 let removable_paths: Vec<(String, TaskPaths)> =
1603 if let Ok(mut tasks) = self.inner.tasks.lock() {
1604 let removable = tasks
1605 .iter()
1606 .filter_map(|(task_id, task)| {
1607 let delivered_terminal = task
1608 .state
1609 .lock()
1610 .map(|state| {
1611 state.metadata.status.is_terminal()
1612 && state.metadata.completion_delivered
1613 })
1614 .unwrap_or(false);
1615 if !delivered_terminal {
1616 return None;
1617 }
1618
1619 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1620 let expired = match (terminal_at, cutoff) {
1621 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1622 (Some(_), None) => true,
1623 (None, _) => false,
1624 };
1625 expired.then(|| task_id.clone())
1626 })
1627 .collect::<Vec<_>>();
1628
1629 removable
1630 .into_iter()
1631 .filter_map(|task_id| {
1632 tasks
1633 .remove(&task_id)
1634 .map(|task| (task_id, task.paths.clone()))
1635 })
1636 .collect()
1637 } else {
1638 Vec::new()
1639 };
1640
1641 for (task_id, paths) in removable_paths {
1642 match delete_task_bundle(&paths) {
1643 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1644 Err(error) => crate::slog_warn!(
1645 "failed to delete persisted background task bundle {task_id}: {error}"
1646 ),
1647 }
1648 }
1649 }
1650
1651 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1652 self.drain_completions_for_session(None)
1653 }
1654
1655 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1656 let completions = match self.inner.completions.lock() {
1657 Ok(completions) => completions,
1658 Err(_) => return Vec::new(),
1659 };
1660
1661 completions
1662 .iter()
1663 .filter(|completion| {
1664 session_id
1665 .map(|session_id| completion.session_id == session_id)
1666 .unwrap_or(true)
1667 })
1668 .cloned()
1669 .collect()
1670 }
1671
1672 pub fn ack_completions_for_session(
1673 &self,
1674 session_id: Option<&str>,
1675 task_ids: &[String],
1676 ) -> Vec<String> {
1677 if task_ids.is_empty() {
1678 return Vec::new();
1679 }
1680 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1681 let mut completion_sessions = HashMap::new();
1682 if let Ok(mut completions) = self.inner.completions.lock() {
1683 completions.retain(|completion| {
1684 let session_matches = session_id
1685 .map(|session_id| completion.session_id == session_id)
1686 .unwrap_or(true);
1687 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1688 completion_sessions
1689 .insert(completion.task_id.clone(), completion.session_id.clone());
1690 false
1691 } else {
1692 true
1693 }
1694 });
1695 }
1696
1697 let mut delivered = Vec::new();
1698 for task_id in task_ids {
1699 let task = if let Some(session_id) = session_id {
1700 self.task_for_session(task_id, session_id)
1701 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1702 self.task_for_session(task_id, completion_session_id)
1703 } else {
1704 self.task(task_id)
1705 };
1706 if let Some(task) = task {
1707 if task.set_completion_delivered(true, self).is_ok() {
1708 delivered.push(task_id.clone());
1709 }
1710 }
1711 }
1712
1713 delivered
1714 }
1715
1716 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1717 self.inner
1718 .completions
1719 .lock()
1720 .map(|completions| {
1721 completions
1722 .iter()
1723 .filter(|completion| completion.session_id == session_id)
1724 .cloned()
1725 .collect()
1726 })
1727 .unwrap_or_default()
1728 }
1729
1730 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1731 let mut completions = self.inner.completions.lock().ok()?;
1732 let idx = completions
1733 .iter()
1734 .position(|completion| completion.task_id == task_id)?;
1735 completions.remove(idx)
1736 }
1737
1738 fn completion_snapshot_for_task(
1739 &self,
1740 task: &Arc<BgTask>,
1741 _preview_bytes: usize,
1742 ) -> Option<BgCompletion> {
1743 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1744 if !snapshot.info.status.is_terminal() {
1745 return None;
1746 }
1747 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1748 (String::new(), false)
1749 } else {
1750 self.ensure_terminal_output_cache(task)
1751 .map(|cache| completion_preview_for_cache(&cache))
1752 .unwrap_or_else(|| (String::new(), false))
1753 };
1754 Some(BgCompletion {
1755 task_id: snapshot.info.task_id,
1756 session_id: task.session_id.clone(),
1757 status: snapshot.info.status,
1758 exit_code: snapshot.exit_code,
1759 command: snapshot.info.command,
1760 output_preview,
1761 output_truncated,
1762 original_tokens: None,
1763 compressed_tokens: None,
1764 tokens_skipped: false,
1765 })
1766 }
1767
1768 pub fn detach(&self) {
1769 self.inner.shutdown.store(true, Ordering::SeqCst);
1770 if let Ok(mut tasks) = self.inner.tasks.lock() {
1771 for task in tasks.values() {
1772 if let Ok(mut state) = task.state.lock() {
1773 match &mut state.runtime {
1774 TaskRuntime::Piped(child) => *child = None,
1775 TaskRuntime::Pty(runtime) => *runtime = None,
1776 }
1777 state.detached = true;
1778 }
1779 }
1780 tasks.clear();
1781 }
1782 }
1783
1784 pub fn shutdown(&self) {
1785 let tasks = self
1786 .inner
1787 .tasks
1788 .lock()
1789 .map(|tasks| {
1790 tasks
1791 .values()
1792 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1793 .collect::<Vec<_>>()
1794 })
1795 .unwrap_or_default();
1796 for (task_id, session_id) in tasks {
1797 let _ = self.kill(&task_id, &session_id);
1798 }
1799 }
1800
1801 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1802 if let Ok(state) = task.state.lock() {
1803 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1804 if !pty.exit_observed.load(Ordering::SeqCst) {
1812 return Ok(());
1813 }
1814 }
1815 }
1816 let marker = match read_exit_marker(&task.paths.exit) {
1817 Ok(Some(marker)) => marker,
1818 Ok(None) => return Ok(()),
1819 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1820 };
1821 self.finalize_from_marker(task, marker, None)
1822 }
1823
1824 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1825 let mut needs_completion = false;
1826 {
1827 let Ok(mut state) = task.state.lock() else {
1828 return;
1829 };
1830 match &mut state.runtime {
1831 TaskRuntime::Piped(child_slot) => {
1832 if let Some(child) = child_slot.as_mut() {
1833 if matches!(child.try_wait(), Ok(Some(_))) {
1834 *child_slot = None;
1835 state.detached = true;
1836 state.child_exit_observed = true;
1837 }
1838 } else if state.detached {
1839 let child_known_dead = state.child_exit_observed
1840 || state
1841 .metadata
1842 .child_pid
1843 .is_some_and(|pid| !is_process_alive(pid));
1844 if child_known_dead {
1845 needs_completion =
1846 self.fail_without_exit_marker_if_needed(task, &mut state);
1847 }
1848 }
1849 }
1850 TaskRuntime::Pty(Some(pty)) => {
1851 if pty.exit_observed.load(Ordering::SeqCst) {
1852 drop(state);
1853 let _ = self.poll_task(task);
1854 return;
1855 }
1856 }
1857 TaskRuntime::Pty(None) => {}
1858 }
1859 }
1860 if needs_completion {
1861 let _ = self.post_terminal_transition(task, true);
1862 }
1863 }
1864
1865 fn fail_without_exit_marker_if_needed(
1866 &self,
1867 task: &Arc<BgTask>,
1868 state: &mut BgTaskState,
1869 ) -> bool {
1870 if state.metadata.status.is_terminal() {
1871 return false;
1872 }
1873 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1874 return false;
1875 }
1876 let watch_controlled = self.task_has_watch_control(&task.task_id);
1877 let updated = self.update_task_metadata(&task.paths, |metadata| {
1878 metadata.mark_terminal(
1879 BgTaskStatus::Failed,
1880 None,
1881 Some("process exited without exit marker".to_string()),
1882 );
1883 if watch_controlled {
1884 metadata.completion_delivered = true;
1885 }
1886 });
1887 if let Ok(metadata) = updated {
1888 state.pending_terminal_override = None;
1889 state.metadata = metadata;
1890 task.mark_terminal_now();
1891 return true;
1892 }
1893 false
1894 }
1895
1896 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1897 self.inner
1898 .tasks
1899 .lock()
1900 .map(|tasks| {
1901 tasks
1902 .values()
1903 .filter(|task| task.is_running())
1904 .cloned()
1905 .collect()
1906 })
1907 .unwrap_or_default()
1908 }
1909
1910 fn insert_rehydrated_task(
1911 &self,
1912 metadata: PersistedTask,
1913 paths: TaskPaths,
1914 detached: bool,
1915 ) -> Result<(), String> {
1916 let task_id = metadata.task_id.clone();
1917 let session_id = metadata.session_id.clone();
1918 let started = started_instant_from_unix_millis(metadata.started_at);
1919 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1920 let mode = metadata.mode.clone();
1921 let task = Arc::new(BgTask {
1922 task_id: task_id.clone(),
1923 session_id,
1924 paths: paths.clone(),
1925 started,
1926 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1927 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1928 state: Mutex::new(BgTaskState {
1929 metadata,
1930 runtime: if mode == BgMode::Pty {
1931 TaskRuntime::Pty(None)
1932 } else {
1933 TaskRuntime::Piped(None)
1934 },
1935 detached,
1936 child_exit_observed: false,
1943 buffer: if mode == BgMode::Pty {
1944 BgBuffer::pty(paths.pty.clone())
1945 } else {
1946 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1947 },
1948 terminal_output_cache: None,
1949 pending_terminal_override: None,
1950 }),
1951 });
1952 self.inner
1953 .tasks
1954 .lock()
1955 .map_err(|_| "background task registry lock poisoned".to_string())?
1956 .insert(task_id, task);
1957 Ok(())
1958 }
1959
1960 fn kill_with_status(
1961 &self,
1962 task_id: &str,
1963 session_id: &str,
1964 terminal_status: BgTaskStatus,
1965 ) -> Result<BgTaskSnapshot, String> {
1966 let task = self
1967 .task_for_session(task_id, session_id)
1968 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1969 let mut terminalized = false;
1970
1971 {
1972 let mut state = task
1973 .state
1974 .lock()
1975 .map_err(|_| "background task lock poisoned".to_string())?;
1976 if state.metadata.status.is_terminal() {
1977 state.pending_terminal_override = None;
1978 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1979 state.metadata =
1980 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1981 if self.task_has_watch_control(&task.task_id) {
1982 state.metadata.completion_delivered = true;
1983 }
1984 state.pending_terminal_override = None;
1985 task.mark_terminal_now();
1986 match &mut state.runtime {
1987 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
1994 TaskRuntime::Pty(runtime) => *runtime = None,
1995 }
1996 state.detached = true;
1997 self.persist_task(&task.paths, &state.metadata)
1998 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1999 terminalized = true;
2000 } else {
2001 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2002 if !was_already_killing {
2003 state.metadata.status = BgTaskStatus::Killing;
2004 self.persist_task(&task.paths, &state.metadata)
2005 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2006 }
2007
2008 #[cfg(unix)]
2009 let pgid = state.metadata.pgid;
2010 #[cfg(windows)]
2011 let child_pid = state.metadata.child_pid;
2012 if !was_already_killing
2013 && state.metadata.mode == BgMode::Pty
2014 && terminal_status == BgTaskStatus::TimedOut
2015 {
2016 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2017 }
2018
2019 #[cfg(windows)]
2020 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2021
2022 match &mut state.runtime {
2023 TaskRuntime::Piped(child_slot) => {
2024 #[cfg(unix)]
2025 if let Some(pgid) = pgid {
2026 terminate_pgid(pgid, child_slot.as_mut());
2027 }
2028 #[cfg(windows)]
2029 if let Some(child) = child_slot.as_mut() {
2030 super::process::terminate_process(child);
2031 } else if let Some(pid) = child_pid {
2032 terminate_pid(pid);
2033 }
2034 if let Some(child) = child_slot.as_mut() {
2035 let _ = child.wait();
2036 }
2037 *child_slot = None;
2038 state.detached = true;
2039
2040 if !task.paths.exit.exists() {
2041 write_kill_marker_if_absent(&task.paths.exit)
2042 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2043 }
2044
2045 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2046 Some(124)
2047 } else {
2048 None
2049 };
2050 state
2051 .metadata
2052 .mark_terminal(terminal_status, exit_code, None);
2053 if self.task_has_watch_control(&task.task_id) {
2054 state.metadata.completion_delivered = true;
2055 }
2056 state.pending_terminal_override = None;
2057 task.mark_terminal_now();
2058 self.persist_task(&task.paths, &state.metadata)
2059 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2060 terminalized = true;
2061 }
2062 TaskRuntime::Pty(Some(pty)) => {
2063 pty.was_killed.store(true, Ordering::SeqCst);
2064 if let Err(error) = pty.killer.kill() {
2065 crate::slog_warn!(
2066 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2067 );
2068 }
2069 if let Some(pid) = pty.child_pid {
2070 #[cfg(unix)]
2071 terminate_pgid(pid as i32, None);
2072 #[cfg(windows)]
2073 terminate_pid(pid);
2074 }
2075 drop(pty.master.take());
2076
2077 #[cfg(windows)]
2078 {
2079 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2080 BgTaskStatus::TimedOut
2081 } else {
2082 BgTaskStatus::Killed
2083 };
2084 pty_forced_terminal_status = Some(
2085 state
2086 .pending_terminal_override
2087 .take()
2088 .unwrap_or(default_status),
2089 );
2090 }
2091 }
2092 TaskRuntime::Pty(None) => {}
2093 }
2094
2095 #[cfg(windows)]
2096 if let Some(target_status) = pty_forced_terminal_status {
2097 if !task.paths.exit.exists() {
2098 write_kill_marker_if_absent(&task.paths.exit)
2099 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2100 }
2101
2102 let exit_code = if target_status == BgTaskStatus::TimedOut {
2103 Some(124)
2104 } else {
2105 None
2106 };
2107 state.metadata.mark_terminal(target_status, exit_code, None);
2108 if self.task_has_watch_control(&task.task_id) {
2109 state.metadata.completion_delivered = true;
2110 }
2111 state.pending_terminal_override = None;
2112 task.mark_terminal_now();
2113 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2114 *runtime = None;
2115 }
2116 state.detached = true;
2117 self.persist_task(&task.paths, &state.metadata)
2118 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2119 terminalized = true;
2120 }
2121 }
2122 }
2123
2124 if terminalized {
2125 self.post_terminal_transition(&task, true)?;
2126 }
2127 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2128 }
2129
2130 fn finalize_from_marker(
2131 &self,
2132 task: &Arc<BgTask>,
2133 marker: ExitMarker,
2134 reason: Option<String>,
2135 ) -> Result<(), String> {
2136 let watch_controlled = self.task_has_watch_control(&task.task_id);
2137 let mut pty_reader_done = None;
2138 {
2139 let mut state = task
2140 .state
2141 .lock()
2142 .map_err(|_| "background task lock poisoned".to_string())?;
2143 if state.metadata.status.is_terminal() {
2144 state.pending_terminal_override = None;
2145 return Ok(());
2146 }
2147
2148 let pending_override = state.pending_terminal_override.take();
2149 let is_pty = state.metadata.mode == BgMode::Pty;
2150 let updated = self
2151 .update_task_metadata(&task.paths, |metadata| {
2152 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2153 let mut metadata = metadata.clone();
2154 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2155 let exit_code = if target_status == BgTaskStatus::TimedOut {
2156 Some(124)
2157 } else {
2158 None
2159 };
2160 metadata.mark_terminal(target_status, exit_code, reason);
2161 metadata
2162 } else {
2163 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2164 };
2165 if watch_controlled {
2166 new_metadata.completion_delivered = true;
2167 }
2168 *metadata = new_metadata;
2169 })
2170 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2171 state.metadata = updated;
2172 task.mark_terminal_now();
2173 match &mut state.runtime {
2174 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2179 TaskRuntime::Pty(runtime) => {
2180 pty_reader_done = runtime
2181 .as_ref()
2182 .map(|runtime| Arc::clone(&runtime.reader_done));
2183 *runtime = None;
2184 }
2185 }
2186 state.detached = true;
2187 }
2188
2189 if let Some(reader_done) = pty_reader_done {
2190 let deadline = Instant::now() + Duration::from_millis(200);
2191 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2192 std::thread::sleep(Duration::from_millis(10));
2193 }
2194 }
2195
2196 self.scan_task_watch_output(task);
2199
2200 self.post_terminal_transition(task, true)
2201 }
2202
2203 fn enqueue_completion_if_needed(
2204 &self,
2205 metadata: &PersistedTask,
2206 paths: Option<&TaskPaths>,
2207 emit_frame: bool,
2208 ) {
2209 if metadata.status.is_terminal() && !metadata.completion_delivered {
2210 let cache =
2211 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2212 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2213 }
2214 }
2215
2216 fn render_terminal_output_from_paths(
2217 &self,
2218 metadata: &PersistedTask,
2219 paths: &TaskPaths,
2220 ) -> Option<TerminalOutputCache> {
2221 if metadata.mode == BgMode::Pty {
2222 return None;
2223 }
2224 let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2225 Some(self.render_terminal_output(metadata, &buffer))
2226 }
2227
2228 fn enqueue_completion_from_parts(
2229 &self,
2230 metadata: &PersistedTask,
2231 buffer: Option<&BgBuffer>,
2232 paths: Option<&TaskPaths>,
2233 emit_frame: bool,
2234 terminal_render: Option<&TerminalOutputCache>,
2235 ) {
2236 if !metadata.status.is_terminal() {
2247 return;
2248 }
2249
2250 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2251 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2252 } else {
2253 None
2254 };
2255 let render_buffer = buffer.or(owned_buffer.as_ref());
2256 let owned_render = if terminal_render.is_none() {
2257 render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2258 } else {
2259 None
2260 };
2261 let render = terminal_render.or(owned_render.as_ref());
2262
2263 let (output_preview, output_truncated) = render
2266 .map(completion_preview_for_cache)
2267 .unwrap_or_else(|| (String::new(), false));
2268
2269 let token_counts = self.completion_token_counts(
2270 metadata,
2271 buffer,
2272 paths,
2273 render.map(|render| render.output_preview.as_str()),
2274 );
2275 let completion = BgCompletion {
2276 task_id: metadata.task_id.clone(),
2277 session_id: metadata.session_id.clone(),
2278 status: metadata.status.clone(),
2279 exit_code: metadata.exit_code,
2280 command: metadata.command.clone(),
2281 output_preview,
2282 output_truncated,
2283 original_tokens: token_counts.original_tokens,
2284 compressed_tokens: token_counts.compressed_tokens,
2285 tokens_skipped: token_counts.tokens_skipped,
2286 };
2287
2288 self.record_compression_event_if_applicable(metadata, &token_counts);
2299
2300 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2301 if watch_controlled {
2302 if emit_frame && !watch_matched {
2303 self.emit_bash_watch_exit(&completion);
2304 }
2305 self.clear_task_watch_state(&metadata.task_id);
2306 return;
2307 }
2308
2309 if metadata.completion_delivered {
2319 return;
2320 }
2321
2322 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2325 if completions
2326 .iter()
2327 .any(|existing| existing.task_id == metadata.task_id)
2328 {
2329 false
2330 } else {
2331 completions.push_back(completion.clone());
2332 true
2333 }
2334 } else {
2335 false
2336 };
2337
2338 if pushed && emit_frame {
2339 self.emit_bash_completed(completion);
2340 }
2341 }
2342
2343 fn record_compression_event_if_applicable(
2344 &self,
2345 metadata: &PersistedTask,
2346 token_counts: &CompletionTokenCounts,
2347 ) {
2348 if metadata.mode == BgMode::Pty {
2349 return;
2350 }
2351
2352 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2353 token_counts.original_tokens,
2354 token_counts.compressed_tokens,
2355 token_counts.original_bytes,
2356 token_counts.compressed_bytes,
2357 ) {
2358 (
2359 Some(original_tokens),
2360 Some(compressed_tokens),
2361 Some(original_bytes),
2362 Some(compressed_bytes),
2363 ) => (
2364 original_tokens,
2365 compressed_tokens,
2366 original_bytes,
2367 compressed_bytes,
2368 ),
2369 _ => {
2370 crate::slog_warn!(
2371 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2372 metadata.task_id
2373 );
2374 return;
2375 }
2376 };
2377
2378 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2379 let Some(pool) = pool else {
2380 crate::slog_warn!(
2381 "compression event skipped for {}: db_pool not initialized — was configure run?",
2382 metadata.task_id
2383 );
2384 return;
2385 };
2386 let harness = self
2387 .inner
2388 .db_harness
2389 .read()
2390 .ok()
2391 .and_then(|slot| slot.clone());
2392 let Some(harness) = harness else {
2393 crate::slog_warn!(
2394 "compression event insert skipped for {}: harness not configured",
2395 metadata.task_id
2396 );
2397 return;
2398 };
2399
2400 let project_root = metadata
2401 .project_root
2402 .as_deref()
2403 .unwrap_or(&metadata.workdir);
2404 let project_key = crate::search_index::project_cache_key(project_root);
2405 let row = crate::db::compression_events::CompressionEventRow {
2406 harness: &harness,
2407 session_id: Some(&metadata.session_id),
2408 project_key: &project_key,
2409 tool: "bash",
2410 task_id: Some(&metadata.task_id),
2411 command: Some(&metadata.command),
2412 compressor: if metadata.compressed {
2413 "registry"
2414 } else {
2415 "none"
2416 },
2417 original_bytes,
2418 compressed_bytes,
2419 original_tokens,
2420 compressed_tokens,
2421 created_at: unix_millis() as i64,
2422 };
2423
2424 let conn = match pool.lock() {
2425 Ok(conn) => conn,
2426 Err(_) => {
2427 crate::slog_warn!(
2428 "compression event insert failed for {}: db mutex poisoned",
2429 metadata.task_id
2430 );
2431 return;
2432 }
2433 };
2434 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2435 Ok(_) => {
2436 crate::slog_debug!(
2440 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2441 metadata.task_id,
2442 project_key,
2443 metadata.session_id,
2444 original_tokens,
2445 compressed_tokens
2446 );
2447 }
2448 Err(error) => {
2449 crate::slog_warn!(
2450 "compression event insert failed for {}: {}",
2451 metadata.task_id,
2452 error
2453 );
2454 }
2455 }
2456 }
2457
2458 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2459 let Ok(progress_sender) = self
2460 .inner
2461 .progress_sender
2462 .lock()
2463 .map(|sender| sender.clone())
2464 else {
2465 return;
2466 };
2467 if let Some(sender) = progress_sender.as_ref() {
2468 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2469 pattern_match.task_id,
2470 session_id.to_string(),
2471 pattern_match.watch_id,
2472 pattern_match.match_text,
2473 pattern_match.match_offset,
2474 pattern_match.context,
2475 pattern_match.once,
2476 )));
2477 }
2478 }
2479
2480 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2481 let Ok(progress_sender) = self
2482 .inner
2483 .progress_sender
2484 .lock()
2485 .map(|sender| sender.clone())
2486 else {
2487 return;
2488 };
2489 let Some(sender) = progress_sender.as_ref() else {
2490 return;
2491 };
2492 let status = completion_status_text(&completion.status, completion.exit_code);
2493 let preview = completion.output_preview.trim_end();
2494 let context = if preview.is_empty() {
2495 format!("task {} exited ({status})", completion.task_id)
2496 } else {
2497 format!(
2498 "task {} exited ({status})
2499{preview}",
2500 completion.task_id
2501 )
2502 };
2503 sender(PushFrame::BashPatternMatch(
2504 BashPatternMatchFrame::task_exit(
2505 completion.task_id.clone(),
2506 completion.session_id.clone(),
2507 format!("exited ({status})"),
2508 context,
2509 ),
2510 ));
2511 }
2512
2513 fn emit_bash_completed(&self, completion: BgCompletion) {
2514 let Ok(progress_sender) = self
2515 .inner
2516 .progress_sender
2517 .lock()
2518 .map(|sender| sender.clone())
2519 else {
2520 return;
2521 };
2522 let Some(sender) = progress_sender.as_ref() else {
2523 return;
2524 };
2525 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2533 completion.task_id,
2534 completion.session_id,
2535 completion.status,
2536 completion.exit_code,
2537 completion.command,
2538 completion.output_preview,
2539 completion.output_truncated,
2540 completion.original_tokens,
2541 completion.compressed_tokens,
2542 completion.tokens_skipped,
2543 )));
2544 }
2545
2546 fn completion_token_counts(
2547 &self,
2548 metadata: &PersistedTask,
2549 buffer: Option<&BgBuffer>,
2550 paths: Option<&TaskPaths>,
2551 rendered_output: Option<&str>,
2552 ) -> CompletionTokenCounts {
2553 if metadata.mode == BgMode::Pty {
2554 return CompletionTokenCounts::skipped();
2555 }
2556
2557 let raw = match buffer {
2558 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2559 None => paths
2560 .map(|paths| {
2561 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2562 })
2563 .unwrap_or(TokenCountInput::Skipped),
2564 };
2565
2566 let TokenCountInput::Text(raw_output) = raw else {
2567 return CompletionTokenCounts::skipped();
2568 };
2569
2570 let original_tokens = token_count_u32(&raw_output);
2571 let original_bytes = raw_output.len() as i64;
2572 let compressed_output = rendered_output.unwrap_or(&raw_output);
2573 let compressed_tokens = token_count_u32(compressed_output);
2574 let compressed_bytes = compressed_output.len() as i64;
2575 CompletionTokenCounts {
2576 original_tokens: Some(original_tokens),
2577 compressed_tokens: Some(compressed_tokens),
2578 original_bytes: Some(original_bytes),
2579 compressed_bytes: Some(compressed_bytes),
2580 tokens_skipped: false,
2581 }
2582 }
2583
2584 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2585 if !self
2586 .inner
2587 .long_running_reminder_enabled
2588 .load(Ordering::SeqCst)
2589 {
2590 return;
2591 }
2592 let interval_ms = self
2593 .inner
2594 .long_running_reminder_interval_ms
2595 .load(Ordering::SeqCst);
2596 if interval_ms == 0 {
2597 return;
2598 }
2599 let interval = Duration::from_millis(interval_ms);
2600 let now = Instant::now();
2601 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2602 return;
2603 };
2604 let since = last_reminder_at.unwrap_or(task.started);
2605 if now.duration_since(since) < interval {
2606 return;
2607 }
2608 let command = task
2609 .state
2610 .lock()
2611 .map(|state| state.metadata.command.clone())
2612 .unwrap_or_default();
2613 *last_reminder_at = Some(now);
2614 self.emit_bash_long_running(BashLongRunningFrame::new(
2615 task.task_id.clone(),
2616 task.session_id.clone(),
2617 command,
2618 task.started.elapsed().as_millis() as u64,
2619 ));
2620 }
2621
2622 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2623 let Ok(progress_sender) = self
2624 .inner
2625 .progress_sender
2626 .lock()
2627 .map(|sender| sender.clone())
2628 else {
2629 return;
2630 };
2631 if let Some(sender) = progress_sender.as_ref() {
2632 sender(PushFrame::BashLongRunning(frame));
2633 }
2634 }
2635
2636 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2637 self.inner
2638 .tasks
2639 .lock()
2640 .ok()
2641 .and_then(|tasks| tasks.get(task_id).cloned())
2642 }
2643
2644 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2645 self.task(task_id)
2646 .filter(|task| task.session_id == session_id)
2647 }
2648
2649 fn running_count(&self) -> usize {
2650 self.inner
2651 .tasks
2652 .lock()
2653 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2654 .unwrap_or(0)
2655 }
2656
2657 fn start_watchdog(&self) {
2658 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2659 super::watchdog::start(self.clone());
2660 }
2661 }
2662
2663 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2664 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2665 }
2666
2667 #[cfg(test)]
2668 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2669 self.task_for_session(task_id, session_id)
2670 .map(|task| task.paths.json.clone())
2671 }
2672
2673 #[cfg(test)]
2674 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2675 self.task_for_session(task_id, session_id)
2676 .map(|task| task.paths.exit.clone())
2677 }
2678
2679 fn generate_unique_task_id(&self) -> Result<String, String> {
2681 for _ in 0..32 {
2682 let candidate = random_slug();
2683 let tasks = self
2684 .inner
2685 .tasks
2686 .lock()
2687 .map_err(|_| "background task registry lock poisoned".to_string())?;
2688 if tasks.contains_key(&candidate) {
2689 continue;
2690 }
2691 let completions = self
2692 .inner
2693 .completions
2694 .lock()
2695 .map_err(|_| "background completions lock poisoned".to_string())?;
2696 if completions
2697 .iter()
2698 .any(|completion| completion.task_id == candidate)
2699 {
2700 continue;
2701 }
2702 return Ok(candidate);
2703 }
2704 Err("failed to allocate unique background task id after 32 attempts".to_string())
2705 }
2706}
2707
2708fn render_compressed_with_recovery(
2709 buffer: &BgBuffer,
2710 mut compressed: CompressionResult,
2711 input_truncated: bool,
2712) -> TerminalOutputCache {
2713 let had_trailing_newline = compressed.text.ends_with('\n');
2721 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2722 .trim_end()
2723 .to_string();
2724 if had_trailing_newline && !text.is_empty() {
2725 text.push('\n');
2726 }
2727 compressed.text = text;
2728
2729 let output_path = buffer.output_path().map(|path| path.display().to_string());
2730 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2731 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2732 let mut recovery = RecoveryContext {
2733 dropped_by_class: compressed.dropped_by_class,
2734 had_inner_drop: compressed.had_inner_drop,
2735 offset_hint_eligible: compressed.offset_hint_eligible,
2736 offset_start_line: compressed.offset_start_line,
2737 byte_truncated: input_truncated,
2738 output_path: output_path.clone(),
2739 stderr_path: stderr_path.clone(),
2740 include_stderr_path,
2741 };
2742
2743 let (output_preview, output_truncated) =
2744 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2745 TerminalOutputCache {
2746 output_preview,
2747 output_truncated,
2748 kind: TerminalOutputKind::Compressed,
2749 output_path,
2750 stderr_path,
2751 recovery: Some(recovery),
2752 }
2753}
2754
2755fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2756 render_body_with_recovery_marker_at_cap(
2757 body,
2758 recovery,
2759 FINAL_OUTPUT_CAP_BYTES,
2760 cap_final_output,
2761 cap_final_output_with_marker,
2762 )
2763}
2764
2765fn render_raw_body_with_recovery_marker(
2766 body: &str,
2767 recovery: &mut RecoveryContext,
2768) -> (String, bool) {
2769 render_body_with_recovery_marker_at_cap(
2770 body,
2771 recovery,
2772 RAW_PASSTHROUGH_CAP_BYTES,
2773 |input| {
2774 super::output::cap_head_tail(
2775 input,
2776 RAW_PASSTHROUGH_CAP_BYTES,
2777 RAW_PASSTHROUGH_HEAD_BYTES,
2778 RAW_PASSTHROUGH_TAIL_BYTES,
2779 )
2780 },
2781 |input, marker| {
2782 super::output::cap_head_tail_with_marker(
2783 input,
2784 RAW_PASSTHROUGH_CAP_BYTES,
2785 RAW_PASSTHROUGH_HEAD_BYTES,
2786 RAW_PASSTHROUGH_TAIL_BYTES,
2787 marker,
2788 )
2789 },
2790 )
2791}
2792
2793fn render_body_with_recovery_marker_at_cap<F, G>(
2794 body: &str,
2795 recovery: &mut RecoveryContext,
2796 cap_bytes: usize,
2797 cap_plain: F,
2798 cap_with_marker: G,
2799) -> (String, bool)
2800where
2801 F: Fn(&str) -> super::output::CappedText,
2802 G: Fn(&str, &str) -> super::output::CappedText,
2803{
2804 let needs_marker = recovery.has_visible_drop();
2805 if body.len() > cap_bytes {
2806 recovery.byte_truncated = true;
2807 if let Some(marker) = recovery_marker(recovery) {
2808 let capped = cap_with_marker(body, &marker);
2809 return (capped.text, true);
2810 }
2811 let capped = cap_plain(body);
2812 return (capped.text, capped.truncated || needs_marker);
2813 }
2814
2815 if !needs_marker {
2816 return (body.to_string(), false);
2817 }
2818
2819 let Some(marker) = recovery_marker(recovery) else {
2820 return (body.to_string(), true);
2821 };
2822 let with_marker = append_recovery_marker(body, &marker);
2823 if with_marker.len() <= cap_bytes {
2824 return (with_marker, true);
2825 }
2826
2827 recovery.byte_truncated = true;
2828 let marker = recovery_marker(recovery).unwrap_or(marker);
2829 let capped = cap_with_marker(body, &marker);
2830 (capped.text, true)
2831}
2832
2833fn append_recovery_marker(body: &str, marker: &str) -> String {
2834 if body.is_empty() {
2835 return marker.to_string();
2836 }
2837 let mut output = body.trim_end().to_string();
2838 output.push('\n');
2839 output.push_str(marker);
2840 output
2841}
2842
2843fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2844 let mut parts = Vec::new();
2845 for (class, count) in &recovery.dropped_by_class {
2846 let label = if *count == 1 {
2847 class.singular()
2848 } else {
2849 class.plural()
2850 };
2851 parts.push(format!("+{count} more {label}"));
2852 }
2853 if recovery.byte_truncated {
2854 parts.push("truncated output".to_string());
2855 } else if recovery.had_inner_drop && parts.is_empty() {
2856 parts.push("omitted output".to_string());
2857 }
2858
2859 if parts.is_empty() {
2860 return None;
2861 }
2862
2863 let hint = recovery_hint(recovery);
2864 Some(format!("[{}; {hint}]", parts.join(", ")))
2865}
2866
2867fn recovery_hint(recovery: &RecoveryContext) -> String {
2868 if recovery.offset_hint_eligible
2872 && !recovery.byte_truncated
2873 && recovery.dropped_by_class.is_empty()
2874 && !recovery.include_stderr_path
2875 {
2876 if let (Some(path), Some(line)) =
2877 (recovery.output_path.as_deref(), recovery.offset_start_line)
2878 {
2879 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2880 }
2881 }
2882
2883 let mut paths = Vec::new();
2884 if let Some(path) = recovery.output_path.as_deref() {
2885 paths.push(path);
2886 }
2887 if recovery.include_stderr_path {
2888 if let Some(path) = recovery.stderr_path.as_deref() {
2889 if !paths.contains(&path) {
2890 paths.push(path);
2891 }
2892 }
2893 }
2894
2895 if paths.is_empty() {
2896 return "full output unavailable".to_string();
2897 }
2898
2899 let reads = paths
2900 .into_iter()
2901 .map(|path| format!("read {}", quote_path(path)))
2902 .collect::<Vec<_>>()
2903 .join(" and ");
2904 format!("full output: {reads}")
2905}
2906
2907fn strip_plain_truncation_marker_lines(input: &str) -> String {
2908 input
2909 .lines()
2910 .filter(|line| !is_plain_truncation_marker(line.trim()))
2911 .collect::<Vec<_>>()
2912 .join("\n")
2913}
2914
2915fn strip_recovery_marker_lines(input: &str) -> String {
2916 input
2917 .lines()
2918 .filter(|line| !is_recovery_marker(line.trim()))
2919 .collect::<Vec<_>>()
2920 .join("\n")
2921}
2922
2923fn is_plain_truncation_marker(line: &str) -> bool {
2924 let Some(rest) = line.strip_prefix("...<truncated ") else {
2925 return false;
2926 };
2927 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2928 return false;
2929 };
2930 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2931}
2932
2933fn is_recovery_marker(line: &str) -> bool {
2934 line.starts_with('[')
2935 && line.ends_with(']')
2936 && (line.contains("full output: read ")
2937 || line.contains("see remaining: tail -n +")
2938 || line.contains("full output unavailable"))
2939}
2940
2941fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2942 if !is_gh_structured_command(command) {
2943 return None;
2944 }
2945
2946 let output_path = buffer
2947 .output_path()
2948 .map(|path| path.display().to_string())?;
2949 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2950 if stdout_bytes == 0 {
2951 return None;
2952 }
2953
2954 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2955 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2956 return None;
2957 }
2958 return Some(TerminalOutputCache {
2959 output_preview: json_output_pointer(stdout_bytes, &output_path),
2960 output_truncated: true,
2961 kind: TerminalOutputKind::Structured,
2962 output_path: Some(output_path),
2963 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2964 recovery: None,
2965 });
2966 }
2967
2968 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2969 if stdout.truncated || !is_structured_body(&stdout.text) {
2970 return None;
2971 }
2972
2973 Some(TerminalOutputCache {
2974 output_preview: stdout.text,
2975 output_truncated: false,
2976 kind: TerminalOutputKind::Structured,
2977 output_path: Some(output_path),
2978 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2979 recovery: None,
2980 })
2981}
2982
2983fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
2984 let raw = buffer.read_combined_head_tail(
2985 RAW_PASSTHROUGH_CAP_BYTES,
2986 RAW_PASSTHROUGH_HEAD_BYTES,
2987 RAW_PASSTHROUGH_TAIL_BYTES,
2988 );
2989 let output_path = buffer.output_path().map(|path| path.display().to_string());
2990 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2991 if !raw.truncated {
2992 return TerminalOutputCache {
2993 output_preview: raw.text,
2994 output_truncated: false,
2995 kind: TerminalOutputKind::Raw,
2996 output_path,
2997 stderr_path,
2998 recovery: None,
2999 };
3000 }
3001
3002 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3003 let mut recovery = RecoveryContext {
3004 dropped_by_class: BTreeMap::new(),
3005 had_inner_drop: false,
3006 offset_hint_eligible: false,
3007 offset_start_line: None,
3008 byte_truncated: true,
3009 output_path: output_path.clone(),
3010 stderr_path: stderr_path.clone(),
3011 include_stderr_path,
3012 };
3013 let (output_preview, output_truncated) =
3014 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3015 TerminalOutputCache {
3016 output_preview,
3017 output_truncated,
3018 kind: TerminalOutputKind::Raw,
3019 output_path,
3020 stderr_path,
3021 recovery: Some(recovery),
3022 }
3023}
3024
3025fn completion_preview_for_cache(cache: &TerminalOutputCache) -> (String, bool) {
3026 if cache.kind == TerminalOutputKind::Structured
3027 && cache.output_preview.len() > BG_COMPLETION_PREVIEW_BYTES
3028 {
3029 if let Some(path) = cache.output_path.as_deref() {
3030 return (
3031 json_output_pointer(cache.output_preview.len() as u64, path),
3032 true,
3033 );
3034 }
3035 return (cache.output_preview.clone(), cache.output_truncated);
3036 }
3037
3038 if let Some(recovery) = cache.recovery.as_ref() {
3039 if cache.output_preview.len() <= BG_COMPLETION_PREVIEW_BYTES {
3040 return (cache.output_preview.clone(), cache.output_truncated);
3041 }
3042 let body = strip_recovery_marker_lines(&cache.output_preview);
3043 let mut completion_recovery = recovery.clone();
3044 completion_recovery.byte_truncated = true;
3045 if let Some(marker) = recovery_marker(&completion_recovery) {
3046 let capped = cap_completion_output_with_marker(&body, &marker);
3047 return (capped.text, true);
3048 }
3049 }
3050
3051 let capped = cap_completion_output(&cache.output_preview);
3052 (capped.text, cache.output_truncated || capped.truncated)
3053}
3054
3055fn is_gh_structured_command(command: &str) -> bool {
3056 let normalized = crate::compress::normalize_command_for_dispatch(command)
3057 .unwrap_or_else(|| command.trim_start().to_string());
3058 let tokens = shell_words_for_flags(&normalized);
3059 let Some(head) = tokens.first() else {
3060 return false;
3061 };
3062 let head_name = Path::new(head)
3063 .file_name()
3064 .and_then(|name| name.to_str())
3065 .unwrap_or(head);
3066 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3067 return false;
3068 }
3069 tokens.iter().any(|token| {
3070 matches!(token.as_str(), "--json" | "--jq" | "--template")
3071 || token.starts_with("--json=")
3072 || token.starts_with("--jq=")
3073 || token.starts_with("--template=")
3074 })
3075}
3076
3077fn shell_words_for_flags(command: &str) -> Vec<String> {
3078 let mut words = Vec::new();
3079 let mut current = String::new();
3080 let mut in_single = false;
3081 let mut in_double = false;
3082 let mut escaped = false;
3083
3084 for ch in command.chars() {
3085 if escaped {
3086 current.push(ch);
3087 escaped = false;
3088 continue;
3089 }
3090 if ch == '\\' && !in_single {
3091 escaped = true;
3092 continue;
3093 }
3094 if ch == '\'' && !in_double {
3095 in_single = !in_single;
3096 continue;
3097 }
3098 if ch == '"' && !in_single {
3099 in_double = !in_double;
3100 continue;
3101 }
3102 if ch.is_whitespace() && !in_single && !in_double {
3103 if !current.is_empty() {
3104 words.push(std::mem::take(&mut current));
3105 }
3106 continue;
3107 }
3108 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3109 if !current.is_empty() {
3110 words.push(std::mem::take(&mut current));
3111 }
3112 continue;
3113 }
3114 current.push(ch);
3115 }
3116 if !current.is_empty() {
3117 words.push(current);
3118 }
3119 words
3120}
3121
3122fn is_structured_body(body: &str) -> bool {
3123 let trimmed = body.trim();
3124 if trimmed.is_empty() {
3125 return false;
3126 }
3127 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3128 return true;
3129 }
3130
3131 let mut saw_line = false;
3132 for line in trimmed
3133 .lines()
3134 .map(str::trim)
3135 .filter(|line| !line.is_empty())
3136 {
3137 saw_line = true;
3138 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3139 return false;
3140 }
3141 }
3142 saw_line
3143}
3144
3145fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3146 let path = match (buffer, stream) {
3147 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3148 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3149 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3150 };
3151 let Some(path) = path else {
3152 return false;
3153 };
3154 let Ok(file) = std::fs::File::open(path) else {
3155 return false;
3156 };
3157 let mut limited = file.take(512);
3158 let mut bytes = Vec::new();
3159 if limited.read_to_end(&mut bytes).is_err() {
3160 return false;
3161 }
3162 String::from_utf8_lossy(&bytes)
3163 .chars()
3164 .find(|ch| !ch.is_whitespace())
3165 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3166}
3167
3168struct CompletionTokenCounts {
3169 original_tokens: Option<u32>,
3170 compressed_tokens: Option<u32>,
3171 original_bytes: Option<i64>,
3172 compressed_bytes: Option<i64>,
3173 tokens_skipped: bool,
3174}
3175
3176impl CompletionTokenCounts {
3177 fn skipped() -> Self {
3178 Self {
3179 original_tokens: None,
3180 compressed_tokens: None,
3181 original_bytes: None,
3182 compressed_bytes: None,
3183 tokens_skipped: true,
3184 }
3185 }
3186}
3187
3188fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3189 match status {
3190 BgTaskStatus::TimedOut => "timed out".to_string(),
3191 BgTaskStatus::Killed => "killed".to_string(),
3192 _ => exit_code
3193 .map(|code| format!("exit {code}"))
3194 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3195 }
3196}
3197
3198fn token_count_u32(text: &str) -> u32 {
3199 aft_tokenizer::count_tokens(text)
3200 .try_into()
3201 .unwrap_or(u32::MAX)
3202}
3203
3204impl Default for BgTaskRegistry {
3205 fn default() -> Self {
3206 Self::new(Arc::new(Mutex::new(None)))
3207 }
3208}
3209
3210fn modified_within(path: &Path, grace: Duration) -> bool {
3211 fs::metadata(path)
3212 .and_then(|metadata| metadata.modified())
3213 .ok()
3214 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3215 .map(|age| age < grace)
3216 .unwrap_or(false)
3217}
3218
3219fn canonicalized_path(path: &Path) -> PathBuf {
3220 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3221}
3222
3223fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3224 let now_ms = SystemTime::now()
3225 .duration_since(UNIX_EPOCH)
3226 .ok()
3227 .map(|duration| duration.as_millis() as u64)
3228 .unwrap_or(started_at);
3229 let elapsed_ms = now_ms.saturating_sub(started_at);
3230 Instant::now()
3231 .checked_sub(Duration::from_millis(elapsed_ms))
3232 .unwrap_or_else(Instant::now)
3233}
3234
3235fn gc_quarantine(storage_dir: &Path) {
3236 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3237 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3238 return;
3239 };
3240 for session_entry in session_dirs.flatten() {
3241 let session_quarantine_dir = session_entry.path();
3242 if !session_quarantine_dir.is_dir() {
3243 continue;
3244 }
3245 let entries = match fs::read_dir(&session_quarantine_dir) {
3246 Ok(entries) => entries,
3247 Err(error) => {
3248 crate::slog_warn!(
3249 "failed to read background task quarantine dir {}: {error}",
3250 session_quarantine_dir.display()
3251 );
3252 continue;
3253 }
3254 };
3255 for entry in entries.flatten() {
3256 let path = entry.path();
3257 if modified_within(&path, QUARANTINE_GC_GRACE) {
3258 continue;
3259 }
3260 let result = if path.is_dir() {
3261 fs::remove_dir_all(&path)
3262 } else {
3263 fs::remove_file(&path)
3264 };
3265 match result {
3266 Ok(()) => log::debug!(
3267 "deleted old background task quarantine entry {}",
3268 path.display()
3269 ),
3270 Err(error) => crate::slog_warn!(
3271 "failed to delete old background task quarantine entry {}: {error}",
3272 path.display()
3273 ),
3274 }
3275 }
3276 let _ = fs::remove_dir(&session_quarantine_dir);
3277 }
3278 let _ = fs::remove_dir(&quarantine_root);
3279}
3280
3281enum QuarantineKind {
3282 Corrupt,
3283 Invalid,
3284}
3285
3286fn quarantine_task_json(
3287 storage_dir: &Path,
3288 session_dir: &Path,
3289 json_path: &Path,
3290 kind: QuarantineKind,
3291) -> Result<(), String> {
3292 let session_hash = session_dir
3293 .file_name()
3294 .and_then(|name| name.to_str())
3295 .ok_or_else(|| {
3296 format!(
3297 "invalid background task session dir: {}",
3298 session_dir.display()
3299 )
3300 })?;
3301 let task_name = json_path
3302 .file_name()
3303 .and_then(|name| name.to_str())
3304 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3305 let unix_ts = SystemTime::now()
3306 .duration_since(UNIX_EPOCH)
3307 .map(|duration| duration.as_secs())
3308 .unwrap_or(0);
3309 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3310 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3311 format!(
3312 "failed to create background task quarantine dir {}: {e}",
3313 quarantine_dir.display()
3314 )
3315 })?;
3316 let target_name = quarantine_name(task_name, unix_ts, &kind);
3317 let target = quarantine_dir.join(target_name);
3318 fs::rename(json_path, &target).map_err(|e| {
3319 format!(
3320 "failed to quarantine background task metadata {} to {}: {e}",
3321 json_path.display(),
3322 target.display()
3323 )
3324 })?;
3325
3326 for sibling in task_sibling_paths(json_path) {
3327 if !sibling.exists() {
3328 continue;
3329 }
3330 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3331 crate::slog_warn!(
3332 "skipping background task sibling with invalid name during quarantine: {}",
3333 sibling.display()
3334 );
3335 continue;
3336 };
3337 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3338 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3339 crate::slog_warn!(
3340 "failed to quarantine background task sibling {} to {}: {error}",
3341 sibling.display(),
3342 sibling_target.display()
3343 );
3344 }
3345 }
3346
3347 let _ = fs::remove_dir(session_dir);
3348 Ok(())
3349}
3350
3351fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3352 match kind {
3353 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3354 QuarantineKind::Invalid => {
3355 let path = Path::new(file_name);
3356 let stem = path.file_stem().and_then(|stem| stem.to_str());
3357 let extension = path.extension().and_then(|extension| extension.to_str());
3358 match (stem, extension) {
3359 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3360 _ => format!("{file_name}.invalid.{unix_ts}"),
3361 }
3362 }
3363 }
3364}
3365
3366fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3367 let Some(parent) = json_path.parent() else {
3368 return Vec::new();
3369 };
3370 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3371 return Vec::new();
3372 };
3373 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3374 .into_iter()
3375 .map(|extension| parent.join(format!("{stem}.{extension}")))
3376 .collect()
3377}
3378
3379fn read_for_token_count_from_disk(
3380 metadata: &PersistedTask,
3381 paths: &TaskPaths,
3382 max_bytes_per_stream: usize,
3383) -> TokenCountInput {
3384 if metadata.mode == BgMode::Pty {
3385 return TokenCountInput::Skipped;
3386 }
3387 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3394 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3395 match (stdout, stderr) {
3396 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3397 String::from_utf8_lossy(&stdout).as_ref(),
3398 String::from_utf8_lossy(&stderr).as_ref(),
3399 )),
3400 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3401 String::from_utf8_lossy(&stdout).as_ref(),
3402 "",
3403 )),
3404 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3405 "",
3406 String::from_utf8_lossy(&stderr).as_ref(),
3407 )),
3408 (Err(_), Err(_)) => TokenCountInput::Skipped,
3409 }
3410}
3411
3412fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3417 use std::io::{Read, Seek, SeekFrom};
3418 let mut file = std::fs::File::open(path)?;
3419 let len = file.metadata()?.len();
3420 let read_len = len.min(max_bytes as u64);
3421 if read_len > 0 && len > max_bytes as u64 {
3422 file.seek(SeekFrom::End(-(read_len as i64)))?;
3423 }
3424 let mut bytes = Vec::with_capacity(read_len as usize);
3425 file.read_to_end(&mut bytes)?;
3426 Ok(bytes)
3427}
3428
3429impl BgTask {
3430 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3431 let state = self
3432 .state
3433 .lock()
3434 .unwrap_or_else(|poison| poison.into_inner());
3435 self.snapshot_locked(&state, preview_bytes)
3436 }
3437
3438 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3439 let metadata = &state.metadata;
3440 let duration_ms = metadata.duration_ms.or_else(|| {
3441 metadata
3442 .status
3443 .is_terminal()
3444 .then(|| self.started.elapsed().as_millis() as u64)
3445 });
3446 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3447 (String::new(), false)
3448 } else if metadata.status.is_terminal() {
3449 state
3450 .terminal_output_cache
3451 .as_ref()
3452 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3453 .unwrap_or_else(|| (String::new(), false))
3454 } else {
3455 state.buffer.read_tail(preview_bytes)
3456 };
3457 BgTaskSnapshot {
3458 info: BgTaskInfo {
3459 task_id: self.task_id.clone(),
3460 status: metadata.status.clone(),
3461 command: metadata.command.clone(),
3462 mode: metadata.mode.clone(),
3463 started_at: metadata.started_at,
3464 duration_ms,
3465 },
3466 exit_code: metadata.exit_code,
3467 child_pid: metadata.child_pid,
3468 workdir: metadata.workdir.display().to_string(),
3469 output_preview,
3470 output_truncated,
3471 output_path: state
3472 .buffer
3473 .output_path()
3474 .map(|path| path.display().to_string()),
3475 stderr_path: state
3476 .buffer
3477 .stderr_path()
3478 .map(|path| path.display().to_string()),
3479 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3480 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3481 }
3482 }
3483
3484 pub(crate) fn is_running(&self) -> bool {
3485 self.state
3486 .lock()
3487 .map(|state| {
3488 state.metadata.status == BgTaskStatus::Running
3489 || (state.metadata.mode == BgMode::Pty
3490 && state.metadata.status == BgTaskStatus::Killing)
3491 })
3492 .unwrap_or(false)
3493 }
3494
3495 fn is_terminal(&self) -> bool {
3496 self.state
3497 .lock()
3498 .map(|state| state.metadata.status.is_terminal())
3499 .unwrap_or(false)
3500 }
3501
3502 fn mark_terminal_now(&self) {
3503 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3504 if terminal_at.is_none() {
3505 *terminal_at = Some(Instant::now());
3506 }
3507 }
3508 }
3509
3510 fn set_completion_delivered(
3511 &self,
3512 delivered: bool,
3513 registry: &BgTaskRegistry,
3514 ) -> Result<(), String> {
3515 let mut state = self
3516 .state
3517 .lock()
3518 .map_err(|_| "background task lock poisoned".to_string())?;
3519 let updated = registry
3520 .update_task_metadata(&self.paths, |metadata| {
3521 metadata.completion_delivered = delivered;
3522 })
3523 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3524 state.metadata = updated;
3525 Ok(())
3526 }
3527}
3528
3529#[cfg(unix)]
3550fn reap_piped_child(child_slot: &mut Option<Child>) {
3551 if let Some(mut child) = child_slot.take() {
3552 if matches!(child.try_wait(), Ok(None)) {
3553 let _ = child.wait();
3554 }
3555 }
3556}
3557
3558#[cfg(windows)]
3563fn reap_piped_child(child_slot: &mut Option<Child>) {
3564 *child_slot = None;
3565}
3566
3567fn terminal_metadata_from_marker(
3568 mut metadata: PersistedTask,
3569 marker: ExitMarker,
3570 reason: Option<String>,
3571) -> PersistedTask {
3572 match marker {
3573 ExitMarker::Code(code) => {
3574 let status = if code == 0 {
3575 BgTaskStatus::Completed
3576 } else {
3577 BgTaskStatus::Failed
3578 };
3579 metadata.mark_terminal(status, Some(code), reason);
3580 }
3581 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3582 }
3583 metadata
3584}
3585
3586#[cfg(unix)]
3587fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
3588 let shell = resolve_posix_shell();
3589 let mut cmd = Command::new(&shell);
3590 cmd.arg("-c")
3591 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
3592 .arg(&shell)
3593 .arg(command)
3594 .arg(exit_path);
3595 unsafe {
3596 cmd.pre_exec(|| {
3597 if libc::setsid() == -1 {
3598 return Err(std::io::Error::last_os_error());
3599 }
3600 Ok(())
3601 });
3602 }
3603 cmd
3604}
3605
3606#[cfg(unix)]
3607fn resolve_posix_shell() -> PathBuf {
3608 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3609 POSIX_SHELL
3610 .get_or_init(|| {
3611 std::env::var_os("BASH")
3612 .filter(|value| !value.is_empty())
3613 .map(PathBuf::from)
3614 .filter(|path| path.exists())
3615 .or_else(|| which::which("bash").ok())
3616 .or_else(|| which::which("zsh").ok())
3617 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3618 })
3619 .clone()
3620}
3621
3622#[cfg(windows)]
3623fn detached_shell_command_for(
3624 shell: crate::windows_shell::WindowsShell,
3625 command: &str,
3626 exit_path: &Path,
3627 paths: &TaskPaths,
3628 creation_flags: u32,
3629) -> Result<Command, String> {
3630 use crate::windows_shell::WindowsShell;
3631 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3644 let wrapper_ext = match shell {
3645 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3646 WindowsShell::Cmd => "bat",
3647 WindowsShell::Posix(_) => "sh",
3651 };
3652 let wrapper_path = paths.dir.join(format!(
3653 "{}.{}",
3654 paths
3655 .json
3656 .file_stem()
3657 .and_then(|s| s.to_str())
3658 .unwrap_or("wrapper"),
3659 wrapper_ext
3660 ));
3661 fs::write(&wrapper_path, wrapper_body)
3662 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3663
3664 let mut cmd = Command::new(shell.binary().as_ref());
3665 match shell {
3666 WindowsShell::Pwsh | WindowsShell::Powershell => {
3667 cmd.args([
3670 "-NoLogo",
3671 "-NoProfile",
3672 "-NonInteractive",
3673 "-ExecutionPolicy",
3674 "Bypass",
3675 "-File",
3676 ]);
3677 cmd.arg(&wrapper_path);
3678 }
3679 WindowsShell::Cmd => {
3680 cmd.args(["/D", "/C"]);
3687 cmd.arg(&wrapper_path);
3688 }
3689 WindowsShell::Posix(_) => {
3690 cmd.arg(&wrapper_path);
3695 }
3696 }
3697
3698 cmd.creation_flags(creation_flags);
3702 Ok(cmd)
3703}
3704
3705fn spawn_detached_child(
3721 command: &str,
3722 paths: &TaskPaths,
3723 workdir: &Path,
3724 env: &HashMap<String, String>,
3725) -> Result<std::process::Child, String> {
3726 #[cfg(not(windows))]
3727 {
3728 let stdout = create_capture_file(&paths.stdout)
3729 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3730 let stderr = create_capture_file(&paths.stderr)
3731 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3732 detached_shell_command(command, &paths.exit)
3733 .current_dir(workdir)
3734 .envs(env)
3735 .stdin(Stdio::null())
3736 .stdout(Stdio::from(stdout))
3737 .stderr(Stdio::from(stderr))
3738 .spawn()
3739 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3740 }
3741 #[cfg(windows)]
3742 {
3743 use crate::windows_shell::shell_candidates;
3744 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3755 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3776 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3777 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3778 let with_breakaway =
3779 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3780 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3781 let mut last_error: Option<String> = None;
3782 for (idx, shell) in candidates.iter().enumerate() {
3783 for &flags in &[with_breakaway, without_breakaway] {
3787 let stdout = create_capture_file(&paths.stdout)
3789 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3790 let stderr = create_capture_file(&paths.stderr)
3791 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3792 let mut cmd =
3793 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3794 cmd.current_dir(workdir)
3795 .envs(env)
3796 .stdin(Stdio::null())
3797 .stdout(Stdio::from(stdout))
3798 .stderr(Stdio::from(stderr));
3799 match cmd.spawn() {
3800 Ok(child) => {
3801 if idx > 0 {
3802 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3803 the cached PATH probe disagreed with runtime spawn — likely PATH \
3804 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3805 shell.binary(),
3806 idx);
3807 }
3808 if flags == without_breakaway {
3809 crate::slog_warn!(
3810 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3811 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3812 Spawned without breakaway; the bg task will be torn down if the \
3813 AFT process group is killed."
3814 );
3815 }
3816 return Ok(child);
3817 }
3818 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3819 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3820 shell.binary());
3821 last_error = Some(format!("{}: {e}", shell.binary()));
3822 break;
3825 }
3826 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3827 crate::slog_warn!(
3829 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3830 Access Denied — retrying {} without breakaway",
3831 shell.binary()
3832 );
3833 last_error = Some(format!("{}: {e}", shell.binary()));
3834 continue;
3835 }
3836 Err(e) => {
3837 return Err(format!(
3838 "failed to spawn background bash command via {}: {e}",
3839 shell.binary()
3840 ));
3841 }
3842 }
3843 }
3844 }
3845 Err(format!(
3846 "failed to spawn background bash command: no Windows shell could be spawned. \
3847 Last error: {}. PATH-probed candidates: {:?}",
3848 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3849 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3850 ))
3851 }
3852}
3853
3854fn random_slug() -> String {
3855 let mut bytes = [0u8; 4];
3856 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3858 let t = SystemTime::now()
3860 .duration_since(UNIX_EPOCH)
3861 .map(|d| d.subsec_nanos())
3862 .unwrap_or(0);
3863 let p = std::process::id();
3864 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3865 });
3866 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3868 format!("bash-{hex}")
3869}
3870
3871#[cfg(test)]
3872mod tests {
3873 use std::collections::HashMap;
3874 use std::fs;
3875 use std::sync::atomic::{AtomicBool, AtomicUsize};
3876 use std::sync::{Arc, Mutex};
3877 use std::time::Duration;
3878 #[cfg(windows)]
3879 use std::time::Instant;
3880
3881 use super::*;
3882
3883 #[cfg(unix)]
3884 const QUICK_SUCCESS_COMMAND: &str = "true";
3885 #[cfg(windows)]
3886 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3887
3888 #[cfg(unix)]
3889 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3890 #[cfg(windows)]
3891 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3892
3893 fn insert_terminal_piped_task(
3894 registry: &BgTaskRegistry,
3895 dir: &tempfile::TempDir,
3896 command: &str,
3897 stdout: &str,
3898 stderr: &str,
3899 compressed: bool,
3900 ) -> (String, Arc<BgTask>) {
3901 let task_id = format!("bash-test-{}", random_slug());
3902 let paths = task_paths(dir.path(), "session", &task_id);
3903 fs::create_dir_all(&paths.dir).unwrap();
3904 fs::write(&paths.stdout, stdout).unwrap();
3905 fs::write(&paths.stderr, stderr).unwrap();
3906 let mut metadata = PersistedTask::starting(
3907 task_id.clone(),
3908 "session".to_string(),
3909 command.to_string(),
3910 dir.path().to_path_buf(),
3911 Some(dir.path().to_path_buf()),
3912 Some(30_000),
3913 true,
3914 compressed,
3915 );
3916 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3917 write_task(&paths.json, &metadata).unwrap();
3918 registry
3919 .insert_rehydrated_task(metadata, paths, true)
3920 .expect("insert terminal task");
3921 let task = registry.task_for_session(&task_id, "session").unwrap();
3922 (task_id, task)
3923 }
3924
3925 fn insert_terminal_pty_task(
3926 registry: &BgTaskRegistry,
3927 dir: &tempfile::TempDir,
3928 pty_output: &str,
3929 ) -> (String, Arc<BgTask>) {
3930 let task_id = format!("bash-test-{}", random_slug());
3931 let paths = task_paths(dir.path(), "session", &task_id);
3932 fs::create_dir_all(&paths.dir).unwrap();
3933 fs::write(&paths.pty, pty_output).unwrap();
3934 let mut metadata = PersistedTask::starting(
3935 task_id.clone(),
3936 "session".to_string(),
3937 "python".to_string(),
3938 dir.path().to_path_buf(),
3939 Some(dir.path().to_path_buf()),
3940 Some(30_000),
3941 true,
3942 true,
3943 );
3944 metadata.mode = BgMode::Pty;
3945 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3946 write_task(&paths.json, &metadata).unwrap();
3947 registry
3948 .insert_rehydrated_task(metadata, paths, true)
3949 .expect("insert terminal pty task");
3950 let task = registry.task_for_session(&task_id, "session").unwrap();
3951 (task_id, task)
3952 }
3953
3954 #[test]
3955 fn recognizes_all_recovery_marker_forms() {
3956 assert!(is_recovery_marker(
3957 "[truncated output; full output: read \"/tmp/out\"]"
3958 ));
3959 assert!(is_recovery_marker(
3960 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
3961 ));
3962 assert!(is_recovery_marker(
3963 "[truncated output; full output unavailable]"
3964 ));
3965 }
3966
3967 #[test]
3968 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
3969 let registry = BgTaskRegistry::default();
3970 let dir = tempfile::tempdir().unwrap();
3971 let (_task_id, task) = insert_terminal_piped_task(
3972 ®istry,
3973 &dir,
3974 "custom-tool --verbose",
3975 &"stdout line\n".repeat(200_000),
3976 "",
3977 true,
3978 );
3979 let calls = Arc::new(AtomicUsize::new(0));
3980 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
3981 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
3982 let calls_for_closure = Arc::clone(&calls);
3983 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
3984 let task_for_closure = Arc::clone(&task_holder);
3985 registry.set_compressor(move |_command, output| {
3986 calls_for_closure.fetch_add(1, Ordering::SeqCst);
3987 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
3988 if task.state.try_lock().is_ok() {
3989 unlocked_for_closure.store(true, Ordering::SeqCst);
3990 }
3991 }
3992 CompressionResult::new(format!("compressed {} bytes", output.len()))
3993 });
3994
3995 let first = registry
3996 .status(
3997 &task.task_id,
3998 "session",
3999 None,
4000 Some(dir.path()),
4001 RUNNING_OUTPUT_PREVIEW_BYTES,
4002 )
4003 .unwrap();
4004 let second = registry
4005 .status(
4006 &task.task_id,
4007 "session",
4008 None,
4009 Some(dir.path()),
4010 RUNNING_OUTPUT_PREVIEW_BYTES,
4011 )
4012 .unwrap();
4013 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4014
4015 assert_eq!(
4016 calls.load(Ordering::SeqCst),
4017 1,
4018 "terminal render must be cached"
4019 );
4020 assert!(
4021 saw_unlocked_state.load(Ordering::SeqCst),
4022 "compressor must run after releasing the task state lock"
4023 );
4024 assert!(first.output_preview.starts_with("compressed "));
4025 assert_eq!(second.output_preview, first.output_preview);
4026 assert_eq!(listed[0].output_preview, first.output_preview);
4027 }
4028
4029 #[test]
4030 fn completion_preview_uses_head_and_tail_not_blind_tail() {
4031 let registry = BgTaskRegistry::default();
4032 let dir = tempfile::tempdir().unwrap();
4033 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4034 let (_task_id, task) =
4035 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4036
4037 registry.post_terminal_transition(&task, true).unwrap();
4038 let completions = registry.drain_completions_for_session(Some("session"));
4039 assert_eq!(completions.len(), 1);
4040 let preview = &completions[0].output_preview;
4041 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4042 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4043 assert!(
4044 preview.contains("...<truncated "),
4045 "preview was {preview:?}"
4046 );
4047 }
4048
4049 #[test]
4050 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4051 let registry = BgTaskRegistry::default();
4052 let dir = tempfile::tempdir().unwrap();
4053 let calls = Arc::new(AtomicUsize::new(0));
4054 let calls_for_closure = Arc::clone(&calls);
4055 registry.set_compressor(move |_command, output| {
4056 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4057 CompressionResult::new(output)
4058 });
4059 let (task_id, _task) = insert_terminal_piped_task(
4060 ®istry,
4061 &dir,
4062 "gh pr view 123 --json body",
4063 "{\"body\":\"hello\"}",
4064 "warning: stderr must not join json",
4065 true,
4066 );
4067
4068 let snapshot = registry
4069 .status(
4070 &task_id,
4071 "session",
4072 None,
4073 Some(dir.path()),
4074 RUNNING_OUTPUT_PREVIEW_BYTES,
4075 )
4076 .unwrap();
4077
4078 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4079 assert!(!snapshot.output_preview.contains("warning"));
4080 assert!(!snapshot.output_truncated);
4081 assert_eq!(
4082 calls.load(Ordering::SeqCst),
4083 0,
4084 "structured JSON bypasses compression"
4085 );
4086 }
4087
4088 #[test]
4089 fn registry_emits_single_recovery_marker_for_class_drops() {
4090 let registry = BgTaskRegistry::default();
4091 let dir = tempfile::tempdir().unwrap();
4092 registry.set_compressor(move |_command, _output| {
4093 let mut dropped = BTreeMap::new();
4094 dropped.insert(DropClass::Error, 18);
4095 dropped.insert(DropClass::Warning, 6);
4096 CompressionResult::with_class_drops("kept diagnostic", dropped)
4097 });
4098 let (task_id, task) =
4099 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4100
4101 let snapshot = registry
4102 .status(
4103 &task_id,
4104 "session",
4105 None,
4106 Some(dir.path()),
4107 RUNNING_OUTPUT_PREVIEW_BYTES,
4108 )
4109 .unwrap();
4110
4111 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4112 assert!(snapshot.output_preview.contains("+18 more errors"));
4113 assert!(snapshot.output_preview.contains("+6 more warnings"));
4114 assert!(snapshot
4115 .output_preview
4116 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4117 assert!(!snapshot.output_preview.contains("tail -n +"));
4118 assert!(snapshot.output_truncated);
4119 }
4120
4121 #[test]
4122 fn registry_marker_reports_semantic_and_byte_drops_once() {
4123 let registry = BgTaskRegistry::default();
4124 let dir = tempfile::tempdir().unwrap();
4125 registry.set_compressor(move |_command, _output| {
4126 let mut dropped = BTreeMap::new();
4127 dropped.insert(DropClass::Error, 1);
4128 CompressionResult::with_class_drops(
4129 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4130 dropped,
4131 )
4132 });
4133 let (task_id, _task) =
4134 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4135
4136 let snapshot = registry
4137 .status(
4138 &task_id,
4139 "session",
4140 None,
4141 Some(dir.path()),
4142 RUNNING_OUTPUT_PREVIEW_BYTES,
4143 )
4144 .unwrap();
4145
4146 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4147 assert!(snapshot.output_preview.contains("+1 more error"));
4148 assert!(snapshot.output_preview.contains("truncated output"));
4149 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4150 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4151 assert!(!snapshot.output_preview.contains("...<truncated"));
4152 assert!(snapshot.output_truncated);
4153 }
4154
4155 #[test]
4156 fn cargo_stderr_class_drops_name_both_capture_paths() {
4157 let registry = BgTaskRegistry::default();
4158 let dir = tempfile::tempdir().unwrap();
4159 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4160 registry.set_compressor(move |command, output| {
4161 crate::compress::compress_with_registry(command, &output, &filter_registry)
4162 });
4163 let stderr = (0..22)
4164 .map(|index| {
4165 format!(
4166 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4167 index + 1,
4168 index + 1
4169 )
4170 })
4171 .collect::<Vec<_>>()
4172 .join("\n");
4173 let (task_id, task) = insert_terminal_piped_task(
4174 ®istry,
4175 &dir,
4176 "cargo check",
4177 "Finished dev [unoptimized] target(s) in 0.01s\n",
4178 &stderr,
4179 true,
4180 );
4181
4182 let snapshot = registry
4183 .status(
4184 &task_id,
4185 "session",
4186 None,
4187 Some(dir.path()),
4188 RUNNING_OUTPUT_PREVIEW_BYTES,
4189 )
4190 .unwrap();
4191
4192 assert!(snapshot.output_preview.contains("+2 more errors"));
4193 assert!(snapshot
4194 .output_preview
4195 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4196 assert!(snapshot
4197 .output_preview
4198 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4199 assert!(!snapshot.output_preview.contains("tail -n +"));
4200 }
4201
4202 #[test]
4203 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4204 let registry = BgTaskRegistry::default();
4205 let dir = tempfile::tempdir().unwrap();
4206 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4207 let (task_id, task) = insert_terminal_piped_task(
4208 ®istry,
4209 &dir,
4210 "cd /repo && gh pr view 123 --json body",
4211 &body,
4212 "",
4213 true,
4214 );
4215
4216 let snapshot = registry
4217 .status(
4218 &task_id,
4219 "session",
4220 None,
4221 Some(dir.path()),
4222 RUNNING_OUTPUT_PREVIEW_BYTES,
4223 )
4224 .unwrap();
4225
4226 assert!(snapshot.output_preview.starts_with("[JSON output "));
4227 assert!(snapshot
4228 .output_preview
4229 .contains(&task.paths.stdout.display().to_string()));
4230 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4231 assert!(snapshot.output_truncated);
4232 }
4233
4234 #[test]
4235 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4236 let registry = BgTaskRegistry::default();
4237 let dir = tempfile::tempdir().unwrap();
4238 let filter_registry = crate::compress::toml_filter::build_registry(
4239 crate::compress::builtin_filters::ALL,
4240 None,
4241 None,
4242 );
4243 registry.set_compressor(move |command, output| {
4244 crate::compress::compress_with_registry(command, &output, &filter_registry)
4245 });
4246 let stdout = format!(
4247 "make[1]: Entering directory `/tmp`\n{}",
4248 (0..100)
4249 .map(|index| format!("compile line {index}"))
4250 .collect::<Vec<_>>()
4251 .join("\n")
4252 );
4253 let (task_id, task) =
4254 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4255
4256 let snapshot = registry
4257 .status(
4258 &task_id,
4259 "session",
4260 None,
4261 Some(dir.path()),
4262 RUNNING_OUTPUT_PREVIEW_BYTES,
4263 )
4264 .unwrap();
4265
4266 assert!(snapshot.output_preview.contains("compile line 99"));
4267 assert!(snapshot.output_preview.contains(&format!(
4268 "full output: read \"{}\"",
4269 task.paths.stdout.display()
4270 )));
4271 assert!(!snapshot
4272 .output_preview
4273 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4274 assert!(!snapshot.output_preview.contains("tail -n +"));
4275 }
4276
4277 #[test]
4278 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4279 let registry = BgTaskRegistry::default();
4280 let dir = tempfile::tempdir().unwrap();
4281 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4282 let (task_id, task) =
4283 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4284
4285 let snapshot = registry
4286 .status(
4287 &task_id,
4288 "session",
4289 None,
4290 Some(dir.path()),
4291 RUNNING_OUTPUT_PREVIEW_BYTES,
4292 )
4293 .unwrap();
4294
4295 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4296 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4297 assert!(snapshot.output_preview.contains("truncated output"));
4298 assert!(snapshot
4299 .output_preview
4300 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4301 assert!(snapshot
4302 .output_preview
4303 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4304 assert!(!snapshot.output_preview.contains("tail -n +"));
4305 assert!(snapshot.output_preview.len() > 16 * 1024);
4306 assert!(snapshot.output_truncated);
4307 }
4308
4309 #[test]
4310 fn pty_terminal_snapshot_bypasses_line_compression() {
4311 let registry = BgTaskRegistry::default();
4312 let dir = tempfile::tempdir().unwrap();
4313 let calls = Arc::new(AtomicUsize::new(0));
4314 let calls_for_closure = Arc::clone(&calls);
4315 registry.set_compressor(move |_command, output| {
4316 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4317 CompressionResult::new(output)
4318 });
4319 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4320
4321 let snapshot = registry
4322 .status(
4323 &task_id,
4324 "session",
4325 None,
4326 Some(dir.path()),
4327 RUNNING_OUTPUT_PREVIEW_BYTES,
4328 )
4329 .unwrap();
4330
4331 assert_eq!(snapshot.info.mode, BgMode::Pty);
4332 assert_eq!(snapshot.output_preview, "");
4333 assert_eq!(calls.load(Ordering::SeqCst), 0);
4334 }
4335
4336 #[test]
4337 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4338 let registry = BgTaskRegistry::default();
4339 let dir = tempfile::tempdir().unwrap();
4340 let task_id = registry
4341 .spawn_pty(
4342 QUICK_SUCCESS_COMMAND,
4343 "session".to_string(),
4344 dir.path().to_path_buf(),
4345 HashMap::new(),
4346 Some(Duration::from_secs(30)),
4347 dir.path().to_path_buf(),
4348 10,
4349 true,
4350 false,
4351 Some(dir.path().to_path_buf()),
4352 50,
4353 120,
4354 )
4355 .unwrap();
4356
4357 let paths = task_paths(dir.path(), "session", &task_id);
4358 let metadata = read_task(&paths.json).unwrap();
4359 assert_eq!(
4360 metadata.schema_version,
4361 crate::bash_background::persistence::SCHEMA_VERSION
4362 );
4363 assert_eq!(metadata.mode, BgMode::Pty);
4364 assert_eq!(metadata.pty_rows, Some(50));
4365 assert_eq!(metadata.pty_cols, Some(120));
4366
4367 let snapshot = registry
4368 .status(&task_id, "session", None, Some(dir.path()), 1024)
4369 .unwrap();
4370 assert_eq!(snapshot.pty_rows, Some(50));
4371 assert_eq!(snapshot.pty_cols, Some(120));
4372 }
4373
4374 fn spawn_dead_child() -> std::process::Child {
4379 #[cfg(unix)]
4380 let mut cmd = std::process::Command::new("true");
4381 #[cfg(windows)]
4382 let mut cmd = {
4383 let mut c = std::process::Command::new("cmd");
4384 c.args(["/c", "exit", "0"]);
4385 c
4386 };
4387 cmd.stdin(std::process::Stdio::null());
4388 cmd.stdout(std::process::Stdio::null());
4389 cmd.stderr(std::process::Stdio::null());
4390 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4391 let started = Instant::now();
4400 loop {
4401 match child.try_wait() {
4402 Ok(Some(_)) => break,
4403 Ok(None) => {
4404 if started.elapsed() > Duration::from_secs(5) {
4405 panic!("dead-child stand-in did not exit within 5s");
4406 }
4407 std::thread::sleep(Duration::from_millis(10));
4408 }
4409 Err(error) => panic!("dead-child try_wait failed: {error}"),
4410 }
4411 }
4412 child
4413 }
4414
4415 #[test]
4416 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4417 let registry = BgTaskRegistry::default();
4418 let dir = tempfile::tempdir().unwrap();
4419 let task_id = registry
4420 .spawn(
4421 LONG_RUNNING_COMMAND,
4422 "session".to_string(),
4423 dir.path().to_path_buf(),
4424 HashMap::new(),
4425 Some(Duration::from_secs(30)),
4426 dir.path().to_path_buf(),
4427 10,
4428 true,
4429 false,
4430 Some(dir.path().to_path_buf()),
4431 )
4432 .unwrap();
4433 registry
4434 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4435 .unwrap();
4436 assert_eq!(
4437 registry
4438 .drain_completions_for_session(Some("session"))
4439 .len(),
4440 1
4441 );
4442
4443 registry.inner.completions.lock().unwrap().clear();
4446
4447 assert_eq!(
4448 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4449 vec![task_id.clone()]
4450 );
4451 assert!(registry
4452 .drain_completions_for_session(Some("session"))
4453 .is_empty());
4454
4455 let paths = task_paths(dir.path(), "session", &task_id);
4456 let metadata = read_task(&paths.json).unwrap();
4457 assert!(metadata.completion_delivered);
4458
4459 let replayed = BgTaskRegistry::default();
4460 replayed
4461 .replay_session_inner(dir.path(), "session", None)
4462 .unwrap();
4463 assert!(replayed
4464 .drain_completions_for_session(Some("session"))
4465 .is_empty());
4466 }
4467
4468 #[test]
4469 fn register_watch_rejects_unknown_task() {
4470 let registry = BgTaskRegistry::default();
4471
4472 let result = registry.register_watch(
4473 "missing-task".to_string(),
4474 WatchPattern::Substring("READY".into()),
4475 true,
4476 );
4477
4478 assert_eq!(result, Err("task_not_found"));
4479 }
4480
4481 #[test]
4482 fn register_watch_on_terminal_task_scans_existing_output() {
4483 let frames = Arc::new(Mutex::new(Vec::new()));
4484 let captured = Arc::clone(&frames);
4485 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4486 captured.lock().unwrap().push(frame);
4487 })
4488 as Box<dyn Fn(PushFrame) + Send + Sync>);
4489 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4490 let dir = tempfile::tempdir().unwrap();
4491 let task_id = registry
4492 .spawn(
4493 LONG_RUNNING_COMMAND,
4494 "session".to_string(),
4495 dir.path().to_path_buf(),
4496 HashMap::new(),
4497 Some(Duration::from_secs(30)),
4498 dir.path().to_path_buf(),
4499 10,
4500 true,
4501 false,
4502 Some(dir.path().to_path_buf()),
4503 )
4504 .unwrap();
4505 registry
4506 .inner
4507 .shutdown
4508 .store(true, std::sync::atomic::Ordering::SeqCst);
4509 let task = registry.task_for_session(&task_id, "session").unwrap();
4510 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4511 registry
4512 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4513 .unwrap();
4514 frames.lock().unwrap().clear();
4515 registry.inner.completions.lock().unwrap().clear();
4516
4517 registry
4518 .register_watch(
4519 task_id.clone(),
4520 WatchPattern::Substring("READY".into()),
4521 true,
4522 )
4523 .unwrap();
4524
4525 let frames = frames.lock().unwrap();
4526 let frame = frames
4527 .iter()
4528 .find_map(|frame| match frame {
4529 PushFrame::BashPatternMatch(frame) => Some(frame),
4530 _ => None,
4531 })
4532 .expect("terminal watch registration should emit pattern frame");
4533 assert_eq!(frame.reason, "pattern_match");
4534 assert_eq!(frame.task_id, task_id);
4535 assert_eq!(frame.session_id, "session");
4536 assert_eq!(frame.match_text, "READY");
4537 assert_eq!(frame.match_offset, 0);
4538 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4539 let metadata = read_task(&task.paths.json).unwrap();
4540 assert!(metadata.completion_delivered);
4541 }
4542
4543 #[test]
4544 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4545 let registry = BgTaskRegistry::default();
4546 let dir = tempfile::tempdir().unwrap();
4547 let task_id = registry
4548 .spawn(
4549 QUICK_SUCCESS_COMMAND,
4550 "session".to_string(),
4551 dir.path().to_path_buf(),
4552 HashMap::new(),
4553 Some(Duration::from_secs(30)),
4554 dir.path().to_path_buf(),
4555 10,
4556 true,
4557 false,
4558 Some(dir.path().to_path_buf()),
4559 )
4560 .unwrap();
4561 registry
4562 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4563 .unwrap();
4564 let completions = registry.drain_completions_for_session(Some("session"));
4565 assert_eq!(completions.len(), 1);
4566 assert_eq!(
4567 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4568 vec![task_id.clone()]
4569 );
4570
4571 registry.cleanup_finished(Duration::ZERO);
4572
4573 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4574 }
4575
4576 #[test]
4577 fn cleanup_finished_retains_undelivered_terminals() {
4578 let registry = BgTaskRegistry::default();
4579 let dir = tempfile::tempdir().unwrap();
4580 let task_id = registry
4581 .spawn(
4582 QUICK_SUCCESS_COMMAND,
4583 "session".to_string(),
4584 dir.path().to_path_buf(),
4585 HashMap::new(),
4586 Some(Duration::from_secs(30)),
4587 dir.path().to_path_buf(),
4588 10,
4589 true,
4590 false,
4591 Some(dir.path().to_path_buf()),
4592 )
4593 .unwrap();
4594 registry
4595 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4596 .unwrap();
4597
4598 registry.cleanup_finished(Duration::ZERO);
4599
4600 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4601 }
4602
4603 #[test]
4611 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4612 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4613 let dir = tempfile::tempdir().unwrap();
4614 let task_id = registry
4615 .spawn(
4616 QUICK_SUCCESS_COMMAND,
4617 "session".to_string(),
4618 dir.path().to_path_buf(),
4619 HashMap::new(),
4620 Some(Duration::from_secs(30)),
4621 dir.path().to_path_buf(),
4622 10,
4623 true,
4624 false,
4625 Some(dir.path().to_path_buf()),
4626 )
4627 .unwrap();
4628
4629 let task = registry.task_for_session(&task_id, "session").unwrap();
4630
4631 let started = Instant::now();
4636 loop {
4637 let exited = {
4638 let mut state = task.state.lock().unwrap();
4639 match &mut state.runtime {
4640 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4641 _ => true,
4642 }
4643 };
4644 if exited {
4645 break;
4646 }
4647 assert!(
4648 started.elapsed() < Duration::from_secs(5),
4649 "child should exit quickly"
4650 );
4651 std::thread::sleep(Duration::from_millis(20));
4652 }
4653
4654 registry
4662 .inner
4663 .shutdown
4664 .store(true, std::sync::atomic::Ordering::SeqCst);
4665 std::thread::sleep(Duration::from_millis(550));
4669
4670 let _ = std::fs::remove_file(&task.paths.exit);
4673
4674 {
4689 let mut state = task.state.lock().unwrap();
4690 state.metadata.status = BgTaskStatus::Running;
4691 state.metadata.status_reason = None;
4692 state.metadata.exit_code = None;
4693 state.metadata.finished_at = None;
4694 state.metadata.duration_ms = None;
4695 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4698 .expect("persist reset Running metadata for reap_child test");
4699 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4703 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4704 }
4705 }
4706 *task.terminal_at.lock().unwrap() = None;
4709
4710 assert!(
4713 task.is_running(),
4714 "precondition: metadata.status == Running"
4715 );
4716 assert!(
4717 !task.paths.exit.exists(),
4718 "precondition: exit marker absent"
4719 );
4720
4721 registry.reap_child(&task);
4726
4727 {
4728 let state = task.state.lock().unwrap();
4729 assert_eq!(
4730 state.metadata.status,
4731 BgTaskStatus::Running,
4732 "first reap must leave status Running while waiting one pass for marker"
4733 );
4734 assert_eq!(
4735 state.metadata.status_reason, None,
4736 "first reap must not record a failure reason"
4737 );
4738 assert!(
4739 matches!(state.runtime, TaskRuntime::Piped(None)),
4740 "child handle must be released after first reap"
4741 );
4742 assert!(
4743 state.detached,
4744 "task must be marked detached after first reap"
4745 );
4746 }
4747
4748 registry.reap_child(&task);
4752
4753 let state = task.state.lock().unwrap();
4754 assert!(
4755 state.metadata.status.is_terminal(),
4756 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4757 state.metadata.status
4758 );
4759 assert_eq!(
4760 state.metadata.status,
4761 BgTaskStatus::Failed,
4762 "must specifically be Failed (not Killed): status={:?}",
4763 state.metadata.status
4764 );
4765 assert_eq!(
4766 state.metadata.status_reason.as_deref(),
4767 Some("process exited without exit marker"),
4768 "reason must match replay path's wording: {:?}",
4769 state.metadata.status_reason
4770 );
4771 assert!(
4772 matches!(state.runtime, TaskRuntime::Piped(None)),
4773 "child handle must stay released after second reap"
4774 );
4775 assert!(
4776 state.detached,
4777 "task must remain detached after second reap"
4778 );
4779 }
4780
4781 #[test]
4786 fn reap_child_preserves_running_when_exit_marker_exists() {
4787 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4788 let dir = tempfile::tempdir().unwrap();
4789 let task_id = registry
4790 .spawn(
4791 QUICK_SUCCESS_COMMAND,
4792 "session".to_string(),
4793 dir.path().to_path_buf(),
4794 HashMap::new(),
4795 Some(Duration::from_secs(30)),
4796 dir.path().to_path_buf(),
4797 10,
4798 true,
4799 false,
4800 Some(dir.path().to_path_buf()),
4801 )
4802 .unwrap();
4803
4804 let task = registry.task_for_session(&task_id, "session").unwrap();
4805
4806 let started = Instant::now();
4809 loop {
4810 let exited = {
4811 let mut state = task.state.lock().unwrap();
4812 match &mut state.runtime {
4813 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4814 _ => true,
4815 }
4816 };
4817 if exited && task.paths.exit.exists() {
4818 break;
4819 }
4820 assert!(
4821 started.elapsed() < Duration::from_secs(5),
4822 "child should exit and write marker quickly"
4823 );
4824 std::thread::sleep(Duration::from_millis(20));
4825 }
4826
4827 registry
4833 .inner
4834 .shutdown
4835 .store(true, std::sync::atomic::Ordering::SeqCst);
4836 std::thread::sleep(Duration::from_millis(550));
4837
4838 {
4844 let mut state = task.state.lock().unwrap();
4845 state.metadata.status = BgTaskStatus::Running;
4846 state.metadata.status_reason = None;
4847 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4848 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4849 }
4850 }
4851 *task.terminal_at.lock().unwrap() = None;
4852 if !task.paths.exit.exists() {
4855 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
4856 }
4857
4858 registry.reap_child(&task);
4862
4863 let state = task.state.lock().unwrap();
4864 assert!(
4865 matches!(state.runtime, TaskRuntime::Piped(None)),
4866 "child handle still released even when marker exists"
4867 );
4868 assert!(
4869 state.detached,
4870 "task still marked detached even when marker exists"
4871 );
4872 assert_eq!(
4877 state.metadata.status,
4878 BgTaskStatus::Running,
4879 "reap_child must defer to poll_task when marker exists"
4880 );
4881 }
4882
4883 #[cfg(unix)]
4887 fn pid_stat(pid: u32) -> Option<String> {
4888 let output = std::process::Command::new("ps")
4889 .args(["-o", "stat=", "-p", &pid.to_string()])
4890 .output()
4891 .ok()?;
4892 if !output.status.success() {
4893 return None;
4894 }
4895 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
4896 if stat.is_empty() {
4897 None
4898 } else {
4899 Some(stat)
4900 }
4901 }
4902
4903 #[cfg(unix)]
4905 fn is_zombie(pid: u32) -> bool {
4906 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
4907 }
4908
4909 #[cfg(unix)]
4915 fn spawn_unreaped_zombie() -> std::process::Child {
4916 let child = std::process::Command::new("true")
4917 .stdin(std::process::Stdio::null())
4918 .stdout(std::process::Stdio::null())
4919 .stderr(std::process::Stdio::null())
4920 .spawn()
4921 .expect("spawn zombie stand-in");
4922 let pid = child.id();
4923 let started = Instant::now();
4924 while !is_zombie(pid) {
4925 assert!(
4926 started.elapsed() < Duration::from_secs(5),
4927 "stand-in child should become a zombie within 5s"
4928 );
4929 std::thread::sleep(Duration::from_millis(10));
4930 }
4931 child
4933 }
4934
4935 #[cfg(unix)]
4945 #[test]
4946 fn finalize_from_marker_reaps_child_no_zombie() {
4947 use std::sync::atomic::Ordering;
4948
4949 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4950 let dir = tempfile::tempdir().unwrap();
4951 let task_id = registry
4952 .spawn(
4953 QUICK_SUCCESS_COMMAND,
4954 "session".to_string(),
4955 dir.path().to_path_buf(),
4956 HashMap::new(),
4957 Some(Duration::from_secs(30)),
4958 dir.path().to_path_buf(),
4959 10,
4960 true,
4961 false,
4962 Some(dir.path().to_path_buf()),
4963 )
4964 .unwrap();
4965
4966 registry.inner.shutdown.store(true, Ordering::SeqCst);
4970 std::thread::sleep(Duration::from_millis(550));
4971
4972 let task = registry.task_for_session(&task_id, "session").unwrap();
4973
4974 let started = Instant::now();
4978 while !task.paths.exit.exists() {
4979 assert!(
4980 started.elapsed() < Duration::from_secs(5),
4981 "exit marker should land quickly for `true`"
4982 );
4983 std::thread::sleep(Duration::from_millis(20));
4984 }
4985
4986 let zombie_pid;
4992 {
4993 let mut state = task.state.lock().unwrap();
4994 state.metadata.status = BgTaskStatus::Running;
4995 state.metadata.status_reason = None;
4996 state.metadata.exit_code = None;
4997 state.metadata.finished_at = None;
4998 state.metadata.duration_ms = None;
4999 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5000 .expect("persist reset Running metadata");
5001 let zombie = spawn_unreaped_zombie();
5002 zombie_pid = zombie.id();
5003 state.runtime = TaskRuntime::Piped(Some(zombie));
5004 }
5005 *task.terminal_at.lock().unwrap() = None;
5006
5007 assert!(
5009 is_zombie(zombie_pid),
5010 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5011 );
5012
5013 registry.poll_task(&task).unwrap();
5016
5017 {
5018 let state = task.state.lock().unwrap();
5019 assert!(
5020 matches!(state.runtime, TaskRuntime::Piped(None)),
5021 "child handle must be released after marker finalize"
5022 );
5023 assert!(
5024 state.metadata.status.is_terminal(),
5025 "task must be terminal after marker finalize: {:?}",
5026 state.metadata.status
5027 );
5028 }
5029
5030 assert!(
5033 !is_zombie(zombie_pid),
5034 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5035 after the exit-marker terminal transition"
5036 );
5037 }
5038
5039 #[cfg(unix)]
5043 #[test]
5044 fn kill_with_existing_marker_reaps_child_no_zombie() {
5045 use std::sync::atomic::Ordering;
5046
5047 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5048 let dir = tempfile::tempdir().unwrap();
5049 let task_id = registry
5050 .spawn(
5051 QUICK_SUCCESS_COMMAND,
5052 "session".to_string(),
5053 dir.path().to_path_buf(),
5054 HashMap::new(),
5055 Some(Duration::from_secs(30)),
5056 dir.path().to_path_buf(),
5057 10,
5058 true,
5059 false,
5060 Some(dir.path().to_path_buf()),
5061 )
5062 .unwrap();
5063
5064 registry.inner.shutdown.store(true, Ordering::SeqCst);
5065 std::thread::sleep(Duration::from_millis(550));
5066
5067 let task = registry.task_for_session(&task_id, "session").unwrap();
5068
5069 let started = Instant::now();
5070 while !task.paths.exit.exists() {
5071 assert!(
5072 started.elapsed() < Duration::from_secs(5),
5073 "exit marker should land quickly for `true`"
5074 );
5075 std::thread::sleep(Duration::from_millis(20));
5076 }
5077
5078 let zombie_pid;
5079 {
5080 let mut state = task.state.lock().unwrap();
5081 state.metadata.status = BgTaskStatus::Running;
5082 state.metadata.status_reason = None;
5083 state.metadata.exit_code = None;
5084 state.metadata.finished_at = None;
5085 state.metadata.duration_ms = None;
5086 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5087 .expect("persist reset Running metadata");
5088 let zombie = spawn_unreaped_zombie();
5089 zombie_pid = zombie.id();
5090 state.runtime = TaskRuntime::Piped(Some(zombie));
5091 }
5092 *task.terminal_at.lock().unwrap() = None;
5093
5094 assert!(
5095 is_zombie(zombie_pid),
5096 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5097 );
5098
5099 registry
5101 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5102 .expect("kill should succeed");
5103
5104 {
5105 let state = task.state.lock().unwrap();
5106 assert!(
5107 matches!(state.runtime, TaskRuntime::Piped(None)),
5108 "child handle must be released after marker-aware kill"
5109 );
5110 assert!(state.metadata.status.is_terminal());
5111 }
5112
5113 assert!(
5114 !is_zombie(zombie_pid),
5115 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5116 after a marker-aware kill"
5117 );
5118 }
5119
5120 #[test]
5121 fn cleanup_finished_keeps_running_tasks() {
5122 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5123 let dir = tempfile::tempdir().unwrap();
5124 let task_id = registry
5125 .spawn(
5126 LONG_RUNNING_COMMAND,
5127 "session".to_string(),
5128 dir.path().to_path_buf(),
5129 HashMap::new(),
5130 Some(Duration::from_secs(30)),
5131 dir.path().to_path_buf(),
5132 10,
5133 true,
5134 false,
5135 Some(dir.path().to_path_buf()),
5136 )
5137 .unwrap();
5138
5139 registry.cleanup_finished(Duration::ZERO);
5140
5141 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5142 let _ = registry.kill(&task_id, "session");
5143 }
5144
5145 #[cfg(windows)]
5146 fn wait_for_file(path: &Path) -> String {
5147 let started = Instant::now();
5148 loop {
5149 if path.exists() {
5150 return fs::read_to_string(path).expect("read file");
5151 }
5152 assert!(
5153 started.elapsed() < Duration::from_secs(30),
5154 "timed out waiting for {}",
5155 path.display()
5156 );
5157 std::thread::sleep(Duration::from_millis(100));
5158 }
5159 }
5160
5161 #[cfg(windows)]
5162 fn spawn_windows_registry_command(
5163 command: &str,
5164 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5165 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5166 let dir = tempfile::tempdir().unwrap();
5167 let task_id = registry
5168 .spawn(
5169 command,
5170 "session".to_string(),
5171 dir.path().to_path_buf(),
5172 HashMap::new(),
5173 Some(Duration::from_secs(30)),
5174 dir.path().to_path_buf(),
5175 10,
5176 false,
5177 false,
5178 Some(dir.path().to_path_buf()),
5179 )
5180 .unwrap();
5181 (registry, dir, task_id)
5182 }
5183
5184 #[cfg(windows)]
5185 #[test]
5186 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5187 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5188 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5189
5190 let content = wait_for_file(&exit_path);
5191
5192 assert_eq!(content.trim(), "0");
5193 }
5194
5195 #[cfg(windows)]
5196 #[test]
5197 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5198 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5199 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5200
5201 let content = wait_for_file(&exit_path);
5202
5203 assert_eq!(content.trim(), "42");
5204 }
5205
5206 #[cfg(windows)]
5207 #[test]
5208 fn windows_spawn_captures_stdout_to_disk() {
5209 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5210 let task = registry.task_for_session(&task_id, "session").unwrap();
5211 let stdout_path = task.paths.stdout.clone();
5212 let exit_path = task.paths.exit.clone();
5213
5214 let _ = wait_for_file(&exit_path);
5215 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5216
5217 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5218 }
5219
5220 #[cfg(windows)]
5221 #[test]
5222 fn windows_spawn_uses_pwsh_when_available() {
5223 let candidates = crate::windows_shell::shell_candidates_with(
5227 |binary| match binary {
5228 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5229 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5230 _ => None,
5231 },
5232 || None,
5233 );
5234 let shell = candidates.first().expect("at least one candidate").clone();
5235 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5236 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5237 }
5238
5239 #[cfg(windows)]
5246 #[test]
5247 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5248 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5249 let script =
5250 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5251
5252 assert!(
5256 script.contains("set CODE=%ERRORLEVEL%"),
5257 "wrapper must capture exit code into CODE: {script}"
5258 );
5259 assert!(
5260 script.contains("echo %CODE% >"),
5261 "wrapper must echo CODE to a temp marker file: {script}"
5262 );
5263 assert!(
5264 script.contains("move /Y"),
5265 "wrapper must use atomic move to write the marker: {script}"
5266 );
5267 assert!(
5270 script.contains("> nul"),
5271 "wrapper must redirect move output to nul: {script}"
5272 );
5273 assert!(
5275 script.contains("exit /B %CODE%"),
5276 "wrapper must propagate the captured exit code: {script}"
5277 );
5278 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5279 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5280 }
5281
5282 #[cfg(windows)]
5288 #[test]
5289 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5290 use crate::windows_shell::WindowsShell;
5291 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5292 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5293 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5294 assert_eq!(
5295 args_strs,
5296 vec!["/D", "/S", "/C", "echo wrapped"],
5297 "Cmd::bg_command must prepend /D /S /C"
5298 );
5299 }
5300
5301 #[cfg(windows)]
5305 #[test]
5306 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5307 use crate::windows_shell::WindowsShell;
5308 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5309 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5310 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5311 assert!(
5312 args_strs.contains(&"-Command"),
5313 "Pwsh::bg_command must use -Command: {args_strs:?}"
5314 );
5315 assert!(
5316 args_strs.contains(&"Get-Date"),
5317 "Pwsh::bg_command must include the user command body"
5318 );
5319 }
5320
5321 #[allow(dead_code)]
5352 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
5354}