1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30 COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31 FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32 RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37 ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
56const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
57const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
58const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
59
60const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
61
62#[derive(Debug, Clone, Serialize)]
63pub struct BgCompletion {
64 pub task_id: String,
65 #[serde(skip_serializing)]
68 pub session_id: String,
69 pub status: BgTaskStatus,
70 pub exit_code: Option<i32>,
71 pub command: String,
72 #[serde(default, skip_serializing_if = "String::is_empty")]
78 pub output_preview: String,
79 #[serde(default, skip_serializing_if = "is_false")]
84 pub output_truncated: bool,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub original_tokens: Option<u32>,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub compressed_tokens: Option<u32>,
93 #[serde(default, skip_serializing_if = "is_false")]
95 pub tokens_skipped: bool,
96}
97
98fn is_false(v: &bool) -> bool {
99 !*v
100}
101
102#[derive(Debug, Clone, Serialize)]
103pub struct BgTaskSnapshot {
104 #[serde(flatten)]
105 pub info: BgTaskInfo,
106 pub exit_code: Option<i32>,
107 pub child_pid: Option<u32>,
108 pub workdir: String,
109 pub output_preview: String,
110 pub output_truncated: bool,
111 pub output_path: Option<String>,
112 pub stderr_path: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_rows: Option<u16>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub pty_cols: Option<u16>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120enum TerminalOutputKind {
121 Compressed,
122 Raw,
123 Structured,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127struct TerminalOutputCache {
128 output_preview: String,
129 output_truncated: bool,
130 kind: TerminalOutputKind,
131 output_path: Option<String>,
132 stderr_path: Option<String>,
133 recovery: Option<RecoveryContext>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137struct RecoveryContext {
138 dropped_by_class: BTreeMap<DropClass, usize>,
139 had_inner_drop: bool,
140 offset_hint_eligible: bool,
141 offset_start_line: Option<usize>,
142 byte_truncated: bool,
143 output_path: Option<String>,
144 stderr_path: Option<String>,
145 include_stderr_path: bool,
146}
147
148impl RecoveryContext {
149 fn has_visible_drop(&self) -> bool {
150 self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
151 }
152}
153
154#[derive(Clone)]
155pub struct BgTaskRegistry {
156 pub(crate) inner: Arc<RegistryInner>,
157}
158
159pub(crate) struct RegistryInner {
160 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
161 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
162 pub(crate) progress_sender: SharedProgressSender,
163 watchdog_started: AtomicBool,
164 pub(crate) shutdown: AtomicBool,
165 pub(crate) long_running_reminder_enabled: AtomicBool,
166 pub(crate) long_running_reminder_interval_ms: AtomicU64,
167 persisted_gc_started: AtomicBool,
168 #[cfg(test)]
169 persisted_gc_runs: AtomicU64,
170 pub(crate) compressor:
176 Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
177 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
178 pub(crate) db_harness: RwLock<Option<String>>,
179 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
180 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
181 pub(crate) watch_registry: Mutex<WatchRegistry>,
182}
183
184pub(crate) struct BgTask {
185 pub(crate) task_id: String,
186 pub(crate) session_id: String,
187 pub(crate) paths: TaskPaths,
188 pub(crate) started: Instant,
189 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
190 pub(crate) terminal_at: Mutex<Option<Instant>>,
191 pub(crate) state: Mutex<BgTaskState>,
192}
193
194pub(crate) enum TaskRuntime {
195 Piped(Option<Child>),
196 Pty(Option<PtyRuntime>),
197}
198
199pub(crate) struct BgTaskState {
200 pub(crate) metadata: PersistedTask,
201 pub(crate) runtime: TaskRuntime,
202 pub(crate) detached: bool,
203 pub(crate) child_exit_observed: bool,
214 pub(crate) buffer: BgBuffer,
215 terminal_output_cache: Option<TerminalOutputCache>,
216 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
218}
219
220fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
221 session_id
222 .map(|session_id| completion.session_id == session_id)
223 .unwrap_or(true)
224}
225
226impl BgTaskRegistry {
227 pub fn new(progress_sender: SharedProgressSender) -> Self {
228 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
229 Self {
230 inner: Arc::new(RegistryInner {
231 tasks: Mutex::new(HashMap::new()),
232 completions: Mutex::new(VecDeque::new()),
233 progress_sender,
234 watchdog_started: AtomicBool::new(false),
235 shutdown: AtomicBool::new(false),
236 long_running_reminder_enabled: AtomicBool::new(true),
237 long_running_reminder_interval_ms: AtomicU64::new(600_000),
238 persisted_gc_started: AtomicBool::new(false),
239 #[cfg(test)]
240 persisted_gc_runs: AtomicU64::new(0),
241 compressor: Mutex::new(None),
242 db_pool: RwLock::new(None),
243 db_harness: RwLock::new(None),
244 wake_tx,
245 wake_rx,
246 watch_registry: Mutex::new(WatchRegistry::default()),
247 }),
248 }
249 }
250
251 pub fn set_harness(&self, harness: Harness) {
252 if let Ok(mut slot) = self.inner.db_harness.write() {
253 *slot = Some(harness.as_str().to_string());
254 }
255 }
256
257 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
258 if let Ok(mut slot) = self.inner.db_pool.write() {
259 *slot = Some(conn);
260 }
261 }
262
263 pub fn clear_db_pool(&self) {
264 if let Ok(mut slot) = self.inner.db_pool.write() {
265 *slot = None;
266 }
267 }
268
269 pub fn set_compressor<F>(&self, compressor: F)
274 where
275 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
276 {
277 self.set_compressor_with_exit_code(move |command, output, _exit_code| {
278 compressor(command, output)
279 });
280 }
281
282 pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
283 where
284 F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
285 {
286 if let Ok(mut slot) = self.inner.compressor.lock() {
287 *slot = Some(Box::new(compressor));
288 }
289 }
290
291 pub(crate) fn compress_output(
294 &self,
295 command: &str,
296 output: String,
297 exit_code: Option<i32>,
298 ) -> CompressionResult {
299 let Ok(slot) = self.inner.compressor.lock() else {
300 return CompressionResult::new(output);
301 };
302 match slot.as_ref() {
303 Some(compressor) => compressor(command, output, exit_code),
304 None => CompressionResult::new(output),
305 }
306 }
307
308 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
309 let (metadata, buffer) = {
310 let state = task.state.lock().ok()?;
311 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
312 return None;
313 }
314 if let Some(cache) = state.terminal_output_cache.clone() {
315 return Some(cache);
316 }
317 (state.metadata.clone(), state.buffer.clone())
318 };
319
320 let mut cap_buffer = buffer.clone();
321 cap_buffer.enforce_terminal_cap();
322 let cache = self.render_terminal_output(&metadata, &buffer);
323 let mut state = task.state.lock().ok()?;
324 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
325 return None;
326 }
327 if let Some(existing) = state.terminal_output_cache.clone() {
328 return Some(existing);
329 }
330 state.terminal_output_cache = Some(cache.clone());
331 Some(cache)
332 }
333
334 fn render_terminal_output(
335 &self,
336 metadata: &PersistedTask,
337 buffer: &BgBuffer,
338 ) -> TerminalOutputCache {
339 if metadata.mode == BgMode::Pty {
340 return TerminalOutputCache {
341 output_preview: String::new(),
342 output_truncated: false,
343 kind: TerminalOutputKind::Raw,
344 output_path: buffer.output_path().map(|path| path.display().to_string()),
345 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
346 recovery: None,
347 };
348 }
349
350 if let Some(structured) = render_structured_output(&metadata.command, buffer) {
351 return structured;
352 }
353
354 if !metadata.compressed {
355 return render_raw_passthrough(buffer);
356 }
357
358 let raw = buffer.read_combined_head_tail(
359 COMPRESS_INPUT_CAP_BYTES,
360 COMPRESS_INPUT_HEAD_BYTES,
361 COMPRESS_INPUT_TAIL_BYTES,
362 );
363 let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
364 render_compressed_with_recovery(buffer, compressed, raw.truncated)
365 }
366
367 fn snapshot_with_terminal_cache(
368 &self,
369 task: &Arc<BgTask>,
370 preview_bytes: usize,
371 ) -> BgTaskSnapshot {
372 let mut snapshot = task.snapshot(preview_bytes);
373 self.maybe_compress_snapshot(task, &mut snapshot);
374 snapshot
375 }
376
377 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
378 let (metadata, buffer) = {
379 let state = task
380 .state
381 .lock()
382 .map_err(|_| "background task lock poisoned".to_string())?;
383 if !state.metadata.status.is_terminal() {
384 return Ok(());
385 }
386 (state.metadata.clone(), state.buffer.clone())
387 };
388
389 let mut cap_buffer = buffer.clone();
390 cap_buffer.enforce_terminal_cap();
391 let cache = self.ensure_terminal_output_cache(task);
392 self.enqueue_completion_from_parts(
393 &metadata,
394 Some(&buffer),
395 None,
396 emit_frame,
397 cache.as_ref(),
398 );
399 Ok(())
400 }
401
402 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
403 write_task(&paths.json, metadata)?;
404 self.dual_write_task(paths, metadata);
405 Ok(())
406 }
407
408 fn update_task_metadata<F>(
409 &self,
410 paths: &TaskPaths,
411 update: F,
412 ) -> std::io::Result<PersistedTask>
413 where
414 F: FnOnce(&mut PersistedTask),
415 {
416 let metadata = update_task(&paths.json, update)?;
417 self.dual_write_task(paths, &metadata);
418 Ok(metadata)
419 }
420
421 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
422 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
423 let Some(pool) = pool else {
424 return;
425 };
426 let harness = self
427 .inner
428 .db_harness
429 .read()
430 .ok()
431 .and_then(|slot| slot.clone());
432 let Some(harness) = harness else {
433 crate::slog_warn!(
434 "dual-write bash_task to DB skipped for {}: harness not configured",
435 metadata.task_id
436 );
437 return;
438 };
439 let row = match metadata.to_bash_task_row(&harness, paths) {
440 Ok(row) => row,
441 Err(error) => {
442 crate::slog_warn!(
443 "dual-write bash_task to DB failed for {}: {}",
444 metadata.task_id,
445 error
446 );
447 return;
448 }
449 };
450 let conn = match pool.lock() {
451 Ok(conn) => conn,
452 Err(_) => {
453 crate::slog_warn!(
454 "dual-write bash_task to DB failed for {}: db mutex poisoned",
455 metadata.task_id
456 );
457 return;
458 }
459 };
460 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
461 crate::slog_warn!(
462 "dual-write bash_task to DB failed for {}: {}",
463 metadata.task_id,
464 error
465 );
466 }
467 }
468
469 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
470 self.inner
471 .long_running_reminder_enabled
472 .store(enabled, Ordering::SeqCst);
473 self.inner
474 .long_running_reminder_interval_ms
475 .store(interval_ms, Ordering::SeqCst);
476 }
477
478 #[cfg(unix)]
479 #[allow(clippy::too_many_arguments)]
480 pub fn spawn(
481 &self,
482 command: &str,
483 session_id: String,
484 workdir: PathBuf,
485 env: HashMap<String, String>,
486 timeout: Option<Duration>,
487 storage_dir: PathBuf,
488 max_running: usize,
489 notify_on_completion: bool,
490 compressed: bool,
491 project_root: Option<PathBuf>,
492 ) -> Result<String, String> {
493 self.start_watchdog();
494
495 let running = self.running_count();
496 if running >= max_running {
497 return Err(format!(
498 "background bash task limit exceeded: {running} running (max {max_running})"
499 ));
500 }
501
502 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
503 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
504 let task_id = self.generate_unique_task_id()?;
505 let paths = task_paths(&storage_dir, &session_id, &task_id);
506 fs::create_dir_all(&paths.dir)
507 .map_err(|e| format!("failed to create background task dir: {e}"))?;
508
509 let mut metadata = PersistedTask::starting(
510 task_id.clone(),
511 session_id.clone(),
512 command.to_string(),
513 workdir.clone(),
514 project_root,
515 timeout_ms,
516 notify_on_completion,
517 compressed,
518 );
519 self.persist_task(&paths, &metadata)
520 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
521
522 create_capture_file(&paths.stdout)
526 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
527 create_capture_file(&paths.stderr)
528 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
529
530 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
531 Ok(child) => child,
532 Err(error) => {
533 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
534 let _ = delete_task_bundle(&paths);
535 return Err(error);
536 }
537 };
538
539 let child_pid = child.id();
540 metadata.mark_running(child_pid, child_pid as i32);
541 self.persist_task(&paths, &metadata)
542 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
543
544 let task = Arc::new(BgTask {
545 task_id: task_id.clone(),
546 session_id,
547 paths: paths.clone(),
548 started: Instant::now(),
549 last_reminder_at: Mutex::new(None),
550 terminal_at: Mutex::new(None),
551 state: Mutex::new(BgTaskState {
552 metadata,
553 runtime: TaskRuntime::Piped(Some(child)),
554 detached: false,
555 child_exit_observed: false,
556 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
557 terminal_output_cache: None,
558 pending_terminal_override: None,
559 }),
560 });
561
562 self.inner
563 .tasks
564 .lock()
565 .map_err(|_| "background task registry lock poisoned".to_string())?
566 .insert(task_id.clone(), task);
567
568 Ok(task_id)
569 }
570
571 #[allow(clippy::too_many_arguments)]
572 pub fn spawn_pty(
573 &self,
574 command: &str,
575 session_id: String,
576 workdir: PathBuf,
577 env: HashMap<String, String>,
578 timeout: Option<Duration>,
579 storage_dir: PathBuf,
580 max_running: usize,
581 notify_on_completion: bool,
582 compressed: bool,
583 project_root: Option<PathBuf>,
584 rows: u16,
585 cols: u16,
586 ) -> Result<String, String> {
587 self.start_watchdog();
588
589 let running = self.running_count();
590 if running >= max_running {
591 return Err(format!(
592 "background bash task limit exceeded: {running} running (max {max_running})"
593 ));
594 }
595
596 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
597 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
598 let task_id = self.generate_unique_task_id()?;
599 let paths = task_paths(&storage_dir, &session_id, &task_id);
600 fs::create_dir_all(&paths.dir)
601 .map_err(|e| format!("failed to create background task dir: {e}"))?;
602
603 let mut metadata = PersistedTask::starting(
604 task_id.clone(),
605 session_id.clone(),
606 command.to_string(),
607 workdir.clone(),
608 project_root,
609 timeout_ms,
610 notify_on_completion,
611 compressed,
612 );
613 metadata.mode = BgMode::Pty;
614 metadata.pty_rows = Some(rows);
615 metadata.pty_cols = Some(cols);
616 self.persist_task(&paths, &metadata)
617 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
618 create_capture_file(&paths.pty)
619 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
620
621 let runtime = match spawn_pty_for_command(
622 &task_id,
623 &session_id,
624 command,
625 &paths,
626 &workdir,
627 &env,
628 rows,
629 cols,
630 self.inner.wake_tx.clone(),
631 ) {
632 Ok(runtime) => runtime,
633 Err(error) => {
634 crate::slog_warn!(
635 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
636 );
637 let _ = delete_task_bundle(&paths);
638 return Err(error);
639 }
640 };
641
642 if let Some(child_pid) = runtime.child_pid {
643 metadata.mark_running(child_pid, child_pid as i32);
644 } else {
645 metadata.status = BgTaskStatus::Running;
646 metadata.pgid = None;
647 }
648 self.persist_task(&paths, &metadata)
649 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
650
651 let task = Arc::new(BgTask {
652 task_id: task_id.clone(),
653 session_id,
654 paths: paths.clone(),
655 started: Instant::now(),
656 last_reminder_at: Mutex::new(None),
657 terminal_at: Mutex::new(None),
658 state: Mutex::new(BgTaskState {
659 metadata,
660 runtime: TaskRuntime::Pty(Some(runtime)),
661 detached: false,
662 child_exit_observed: false,
663 buffer: BgBuffer::pty(paths.pty.clone()),
664 terminal_output_cache: None,
665 pending_terminal_override: None,
666 }),
667 });
668
669 self.inner
670 .tasks
671 .lock()
672 .map_err(|_| "background task registry lock poisoned".to_string())?
673 .insert(task_id.clone(), task);
674
675 Ok(task_id)
676 }
677
678 #[cfg(windows)]
679 #[allow(clippy::too_many_arguments)]
680 pub fn spawn(
681 &self,
682 command: &str,
683 session_id: String,
684 workdir: PathBuf,
685 env: HashMap<String, String>,
686 timeout: Option<Duration>,
687 storage_dir: PathBuf,
688 max_running: usize,
689 notify_on_completion: bool,
690 compressed: bool,
691 project_root: Option<PathBuf>,
692 ) -> Result<String, String> {
693 self.start_watchdog();
694
695 let running = self.running_count();
696 if running >= max_running {
697 return Err(format!(
698 "background bash task limit exceeded: {running} running (max {max_running})"
699 ));
700 }
701
702 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
703 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
704 let task_id = self.generate_unique_task_id()?;
705 let paths = task_paths(&storage_dir, &session_id, &task_id);
706 fs::create_dir_all(&paths.dir)
707 .map_err(|e| format!("failed to create background task dir: {e}"))?;
708
709 let mut metadata = PersistedTask::starting(
710 task_id.clone(),
711 session_id.clone(),
712 command.to_string(),
713 workdir.clone(),
714 project_root,
715 timeout_ms,
716 notify_on_completion,
717 compressed,
718 );
719 self.persist_task(&paths, &metadata)
720 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
721
722 create_capture_file(&paths.stdout)
728 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
729 create_capture_file(&paths.stderr)
730 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
731
732 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
733 Ok(child) => child,
734 Err(error) => {
735 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
736 let _ = delete_task_bundle(&paths);
737 return Err(error);
738 }
739 };
740
741 let child_pid = child.id();
742 metadata.status = BgTaskStatus::Running;
743 metadata.child_pid = Some(child_pid);
744 metadata.pgid = None;
745 self.persist_task(&paths, &metadata)
746 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
747
748 let task = Arc::new(BgTask {
749 task_id: task_id.clone(),
750 session_id,
751 paths: paths.clone(),
752 started: Instant::now(),
753 last_reminder_at: Mutex::new(None),
754 terminal_at: Mutex::new(None),
755 state: Mutex::new(BgTaskState {
756 metadata,
757 runtime: TaskRuntime::Piped(Some(child)),
758 detached: false,
759 child_exit_observed: false,
760 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
761 terminal_output_cache: None,
762 pending_terminal_override: None,
763 }),
764 });
765
766 self.inner
767 .tasks
768 .lock()
769 .map_err(|_| "background task registry lock poisoned".to_string())?
770 .insert(task_id.clone(), task);
771
772 Ok(task_id)
773 }
774
775 pub fn write_pty(
776 &self,
777 task_id: &str,
778 session_id: &str,
779 input: &[u8],
780 ) -> Result<usize, String> {
781 let task = self
782 .task_for_session(task_id, session_id)
783 .ok_or_else(|| "task_not_found".to_string())?;
784
785 let writer = {
786 let state = task
787 .state
788 .lock()
789 .map_err(|_| "background task lock poisoned".to_string())?;
790 if state.metadata.mode != BgMode::Pty {
791 return Err("task_not_pty".to_string());
792 }
793 if state.metadata.status.is_terminal() {
794 return Err("task_exited".to_string());
795 }
796 match &state.runtime {
797 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
798 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
799 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
800 }
801 };
802
803 let mut writer = writer
804 .lock()
805 .map_err(|_| "PTY writer lock poisoned".to_string())?;
806 writer
807 .write_all(input)
808 .map_err(|error| format!("failed to write to PTY: {error}"))?;
809 writer
810 .flush()
811 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
812 Ok(input.len())
813 }
814
815 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
816 self.replay_session_inner(storage_dir, session_id, None)
817 }
818
819 pub fn replay_session_for_project(
820 &self,
821 storage_dir: &Path,
822 session_id: &str,
823 project_root: &Path,
824 ) -> Result<(), String> {
825 self.replay_session_inner(storage_dir, session_id, Some(project_root))
826 }
827
828 fn replay_session_inner(
829 &self,
830 storage_dir: &Path,
831 session_id: &str,
832 project_root: Option<&Path>,
833 ) -> Result<(), String> {
834 self.start_watchdog();
835 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
836 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
837 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
838 }
839 }
840
841 let canonical_project = project_root.map(canonicalized_path);
842 let tasks = match self.replay_session_from_db(session_id) {
854 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
855 Some(Ok(_)) => {
856 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
857 if !disk_tasks.is_empty() {
858 crate::slog_info!(
859 "bash task replay: 0 in DB for session {}, {} from disk fallback",
860 session_id,
861 disk_tasks.len()
862 );
863 }
864 disk_tasks
865 }
866 Some(Err(error)) => {
867 crate::slog_warn!(
868 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
869 session_id,
870 error
871 );
872 self.replay_session_from_disk(storage_dir, session_id)?
873 }
874 None => {
875 self.replay_session_from_disk(storage_dir, session_id)?
877 }
878 };
879
880 for mut metadata in tasks {
881 if metadata.session_id != session_id {
882 continue;
883 }
884 if let Some(canonical_project) = canonical_project.as_deref() {
885 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
886 if metadata_project.as_deref() != Some(canonical_project) {
887 continue;
888 }
889 }
890
891 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
892 match metadata.status {
893 BgTaskStatus::Starting => {
894 let completion_was_delivered = metadata.completion_delivered;
895 metadata.mark_terminal(
896 BgTaskStatus::Failed,
897 None,
898 Some("spawn aborted".to_string()),
899 );
900 metadata.completion_delivered |= completion_was_delivered;
901 let _ = self.persist_task(&paths, &metadata);
902 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
903 self.insert_rehydrated_task(metadata, paths, true)?;
904 }
905 BgTaskStatus::Running | BgTaskStatus::Killing => {
906 if metadata.mode == BgMode::Pty {
907 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
908 let completion_was_delivered = metadata.completion_delivered;
909 metadata = terminal_metadata_from_marker(metadata, marker, None);
910 metadata.completion_delivered |= completion_was_delivered;
911 let _ = self.persist_task(&paths, &metadata);
912 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
913 self.insert_rehydrated_task(metadata, paths, true)?;
914 } else if metadata.status.is_terminal() {
915 self.insert_rehydrated_task(metadata, paths, true)?;
916 } else {
917 let completion_was_delivered = metadata.completion_delivered;
918 metadata.mark_terminal(
919 BgTaskStatus::Killed,
920 None,
921 Some("pty_lost_on_bridge_restart".to_string()),
922 );
923 metadata.completion_delivered |= completion_was_delivered;
924 let _ = self.persist_task(&paths, &metadata);
925 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
926 self.insert_rehydrated_task(metadata, paths, true)?;
927 }
928 } else if self.running_metadata_is_stale(&metadata) {
929 let completion_was_delivered = metadata.completion_delivered;
930 metadata.mark_terminal(
931 BgTaskStatus::Killed,
932 None,
933 Some("orphaned (>24h)".to_string()),
934 );
935 metadata.completion_delivered |= completion_was_delivered;
936 if !paths.exit.exists() {
937 let _ = write_kill_marker_if_absent(&paths.exit);
938 }
939 let _ = self.persist_task(&paths, &metadata);
940 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
941 self.insert_rehydrated_task(metadata, paths, true)?;
942 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
943 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
944 "recovered from inconsistent killing state on replay".to_string()
945 });
946 if reason.is_some() {
947 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
948 metadata.task_id);
949 }
950 let completion_was_delivered = metadata.completion_delivered;
951 metadata = terminal_metadata_from_marker(metadata, marker, reason);
952 metadata.completion_delivered |= completion_was_delivered;
953 let _ = self.persist_task(&paths, &metadata);
954 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
955 self.insert_rehydrated_task(metadata, paths, true)?;
956 } else if metadata.status == BgTaskStatus::Killing {
957 if !paths.exit.exists() {
958 let _ = write_kill_marker_if_absent(&paths.exit);
959 }
960 let completion_was_delivered = metadata.completion_delivered;
961 metadata.mark_terminal(
962 BgTaskStatus::Killed,
963 None,
964 Some("recovered from inconsistent killing state on replay".to_string()),
965 );
966 metadata.completion_delivered |= completion_was_delivered;
967 let _ = self.persist_task(&paths, &metadata);
968 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
969 self.insert_rehydrated_task(metadata, paths, true)?;
970 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
971 let completion_was_delivered = metadata.completion_delivered;
972 metadata.mark_terminal(
973 BgTaskStatus::Failed,
974 None,
975 Some("process exited without exit marker".to_string()),
976 );
977 metadata.completion_delivered |= completion_was_delivered;
978 let _ = self.persist_task(&paths, &metadata);
979 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
980 self.insert_rehydrated_task(metadata, paths, true)?;
981 } else {
982 self.insert_rehydrated_task(metadata, paths, true)?;
983 }
984 }
985 _ if metadata.status.is_terminal() => {
986 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
992 self.insert_rehydrated_task(metadata, paths, true)?;
993 }
994 _ => {}
995 }
996 }
997
998 Ok(())
999 }
1000
1001 fn replay_session_from_db(
1002 &self,
1003 session_id: &str,
1004 ) -> Option<Result<Vec<PersistedTask>, String>> {
1005 let pool = self
1006 .inner
1007 .db_pool
1008 .read()
1009 .ok()
1010 .and_then(|slot| slot.clone())?;
1011 let harness = self
1012 .inner
1013 .db_harness
1014 .read()
1015 .ok()
1016 .and_then(|slot| slot.clone())?;
1017 let conn = match pool.lock() {
1018 Ok(conn) => conn,
1019 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1020 };
1021 Some(
1022 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1023 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1024 .map_err(|error| error.to_string()),
1025 )
1026 }
1027
1028 fn replay_session_from_disk(
1029 &self,
1030 storage_dir: &Path,
1031 session_id: &str,
1032 ) -> Result<Vec<PersistedTask>, String> {
1033 let dir = session_tasks_dir(storage_dir, session_id);
1034 if !dir.exists() {
1035 return Ok(Vec::new());
1036 }
1037
1038 let entries = fs::read_dir(&dir)
1039 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1040 let mut tasks = Vec::new();
1041 for entry in entries.flatten() {
1042 let path = entry.path();
1043 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1044 continue;
1045 }
1046 match read_task(&path) {
1047 Ok(metadata) => tasks.push(metadata),
1048 Err(error) => {
1049 crate::slog_warn!(
1050 "quarantining invalid background task metadata {} during replay: {error}",
1051 path.display()
1052 );
1053 if let Err(quarantine_error) =
1054 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1055 {
1056 crate::slog_warn!(
1057 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1058 path.display()
1059 );
1060 }
1061 }
1062 }
1063 }
1064 Ok(tasks)
1065 }
1066
1067 pub fn register_watch(
1068 &self,
1069 task_id: String,
1070 pattern: WatchPattern,
1071 once: bool,
1072 ) -> Result<String, &'static str> {
1073 let task = self.task(&task_id).ok_or("task_not_found")?;
1074 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1075 .state
1076 .lock()
1077 .map(|state| {
1078 (
1079 state.metadata.mode.clone(),
1080 state.metadata.status.is_terminal(),
1081 task.paths.stdout.clone(),
1082 task.paths.stderr.clone(),
1083 task.paths.pty.clone(),
1084 )
1085 })
1086 .map_err(|_| "background_task_lock_poisoned")?;
1087
1088 let mut terminal_matches = Vec::new();
1089 let scanned_terminal = terminal_at_registration;
1090 let watch_id = {
1091 let mut registry = self
1092 .inner
1093 .watch_registry
1094 .lock()
1095 .map_err(|_| "watch_registry_poisoned")?;
1096 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1097 match &mode {
1098 BgMode::Pipes => {
1099 let stdout_key = format!("{task_id}:stdout");
1100 let stderr_key = format!("{task_id}:stderr");
1101 if terminal_at_registration {
1102 registry.set_file_cursor(&stdout_key, 0);
1103 registry.set_file_cursor(&stderr_key, 0);
1104 terminal_matches.extend(registry.scan_file_new_bytes(
1105 &stdout_key,
1106 &task_id,
1107 &stdout,
1108 ));
1109 terminal_matches.extend(registry.scan_file_new_bytes(
1110 &stderr_key,
1111 &task_id,
1112 &stderr,
1113 ));
1114 } else {
1115 registry.prime_file_cursor(&stdout_key, &stdout);
1116 registry.prime_file_cursor(&stderr_key, &stderr);
1117 }
1118 }
1119 BgMode::Pty => {
1120 let pty_key = format!("{task_id}:pty");
1121 if terminal_at_registration {
1122 registry.set_file_cursor(&pty_key, 0);
1123 terminal_matches
1124 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1125 } else {
1126 registry.prime_file_cursor(&pty_key, &pty);
1127 }
1128 }
1129 }
1130 watch_id
1131 };
1132
1133 if task.is_terminal() {
1134 if !scanned_terminal {
1135 terminal_matches = {
1136 let mut registry = self
1137 .inner
1138 .watch_registry
1139 .lock()
1140 .map_err(|_| "watch_registry_poisoned")?;
1141 match &mode {
1142 BgMode::Pipes => {
1143 let stdout_key = format!("{task_id}:stdout");
1144 let stderr_key = format!("{task_id}:stderr");
1145 registry.set_file_cursor(&stdout_key, 0);
1146 registry.set_file_cursor(&stderr_key, 0);
1147 let mut matches =
1148 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1149 matches.extend(registry.scan_file_new_bytes(
1150 &stderr_key,
1151 &task_id,
1152 &stderr,
1153 ));
1154 matches
1155 }
1156 BgMode::Pty => {
1157 let pty_key = format!("{task_id}:pty");
1158 registry.set_file_cursor(&pty_key, 0);
1159 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1160 }
1161 }
1162 };
1163 }
1164
1165 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1166 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1167 if watch_matched {
1168 let _ = task.set_completion_delivered(true, self);
1169 self.clear_task_watch_state(&task_id);
1170 }
1171 return Ok(watch_id);
1172 }
1173
1174 let completion = self
1175 .remove_pending_completion(&task_id)
1176 .or_else(|| self.completion_snapshot_for_task(&task));
1177 if terminal_matches.is_empty() {
1178 if let Some(completion) = completion.as_ref() {
1179 self.emit_bash_watch_exit(completion);
1180 }
1181 } else {
1182 for pattern_match in terminal_matches {
1183 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1184 }
1185 }
1186 let _ = task.set_completion_delivered(true, self);
1187 self.clear_task_watch_state(&task_id);
1188 }
1189
1190 Ok(watch_id)
1191 }
1192
1193 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1194 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1195 registry.unregister(task_id, watch_id);
1196 }
1197 }
1198
1199 pub fn active_watch_count(&self, task_id: &str) -> usize {
1200 self.inner
1201 .watch_registry
1202 .lock()
1203 .map(|registry| registry.active_count(task_id))
1204 .unwrap_or(0)
1205 }
1206
1207 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1208 self.inner
1209 .watch_registry
1210 .lock()
1211 .map(|registry| {
1212 (
1213 registry.has_controlled_task(task_id),
1214 registry.has_matched_task(task_id),
1215 )
1216 })
1217 .unwrap_or((false, false))
1218 }
1219
1220 fn task_has_watch_control(&self, task_id: &str) -> bool {
1221 self.inner
1222 .watch_registry
1223 .lock()
1224 .map(|registry| registry.has_controlled_task(task_id))
1225 .unwrap_or(false)
1226 }
1227
1228 fn clear_task_watch_state(&self, task_id: &str) {
1229 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1230 registry.clear_task(task_id);
1231 }
1232 }
1233
1234 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1235 let (mode, stdout, stderr, pty) = match task.state.lock() {
1236 Ok(state) => (
1237 state.metadata.mode.clone(),
1238 task.paths.stdout.clone(),
1239 task.paths.stderr.clone(),
1240 task.paths.pty.clone(),
1241 ),
1242 Err(_) => return,
1243 };
1244 let mut matches = Vec::new();
1245 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1246 match mode {
1247 BgMode::Pipes => {
1248 let stdout_key = format!("{}:stdout", task.task_id);
1249 let stderr_key = format!("{}:stderr", task.task_id);
1250 matches.extend(registry.scan_file_new_bytes(
1251 &stdout_key,
1252 &task.task_id,
1253 &stdout,
1254 ));
1255 matches.extend(registry.scan_file_new_bytes(
1256 &stderr_key,
1257 &task.task_id,
1258 &stderr,
1259 ));
1260 }
1261 BgMode::Pty => {
1262 let pty_key = format!("{}:pty", task.task_id);
1263 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1264 }
1265 }
1266 }
1267 for pattern_match in matches {
1268 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1269 }
1270 }
1271
1272 pub fn status(
1273 &self,
1274 task_id: &str,
1275 session_id: &str,
1276 project_root: Option<&Path>,
1277 storage_dir: Option<&Path>,
1278 preview_bytes: usize,
1279 ) -> Option<BgTaskSnapshot> {
1280 let mut task = self.task_for_session(task_id, session_id);
1281 if task.is_none() {
1282 if let Some(storage_dir) = storage_dir {
1283 let _ = self.replay_session(storage_dir, session_id);
1284 task = self.task_for_session(task_id, session_id);
1285 }
1286 }
1287 let Some(task) = task else {
1288 return self.status_relaxed(
1289 task_id,
1290 session_id,
1291 project_root?,
1292 storage_dir?,
1293 preview_bytes,
1294 );
1295 };
1296 let _ = self.poll_task(&task);
1297 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1298 }
1299
1300 fn status_relaxed_task(
1301 &self,
1302 task_id: &str,
1303 project_root: &Path,
1304 storage_dir: &Path,
1305 ) -> Option<Arc<BgTask>> {
1306 let canonical_project = canonicalized_path(project_root);
1307 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1308 Some(Ok(Some(metadata))) => {
1309 if let Some(task) = self.task(task_id) {
1310 let matches_project = task
1311 .state
1312 .lock()
1313 .map(|state| {
1314 state
1315 .metadata
1316 .project_root
1317 .as_deref()
1318 .map(canonicalized_path)
1319 .as_deref()
1320 == Some(canonical_project.as_path())
1321 })
1322 .unwrap_or(false);
1323 return matches_project.then_some(task);
1324 }
1325 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1326 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1327 return None;
1328 }
1329 return self.task(task_id);
1330 }
1331 Some(Ok(None)) => {
1332 crate::slog_info!(
1333 "bash task relaxed DB miss for {}; falling back to disk",
1334 task_id
1335 );
1336 }
1337 Some(Err(error)) => {
1338 crate::slog_warn!(
1339 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1340 task_id,
1341 error
1342 );
1343 }
1344 None => {
1345 crate::slog_info!(
1346 "bash task relaxed DB unavailable for {}; falling back to disk",
1347 task_id
1348 );
1349 }
1350 }
1351 let root = storage_dir.join("bash-tasks");
1352 let entries = fs::read_dir(&root).ok()?;
1353 for entry in entries.flatten() {
1354 let dir = entry.path();
1355 if !dir.is_dir() {
1356 continue;
1357 }
1358 let path = dir.join(format!("{task_id}.json"));
1359 if !path.exists() {
1360 continue;
1361 }
1362 let metadata = match read_task(&path) {
1363 Ok(metadata) => metadata,
1364 Err(error) => {
1365 crate::slog_warn!(
1366 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1367 path.display()
1368 );
1369 if let Err(quarantine_error) =
1370 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1371 {
1372 crate::slog_warn!(
1373 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1374 path.display()
1375 );
1376 }
1377 continue;
1378 }
1379 };
1380 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1381 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1382 continue;
1383 }
1384 if let Some(task) = self.task(task_id) {
1385 let matches_project = task
1386 .state
1387 .lock()
1388 .map(|state| {
1389 state
1390 .metadata
1391 .project_root
1392 .as_deref()
1393 .map(canonicalized_path)
1394 .as_deref()
1395 == Some(canonical_project.as_path())
1396 })
1397 .unwrap_or(false);
1398 return matches_project.then_some(task);
1399 }
1400 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1401 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1402 return None;
1403 }
1404 return self.task(task_id);
1405 }
1406 None
1407 }
1408
1409 fn lookup_relaxed_task_from_db(
1410 &self,
1411 task_id: &str,
1412 project_root: &Path,
1413 ) -> Option<Result<Option<PersistedTask>, String>> {
1414 let pool = self
1415 .inner
1416 .db_pool
1417 .read()
1418 .ok()
1419 .and_then(|slot| slot.clone())?;
1420 let harness = self
1421 .inner
1422 .db_harness
1423 .read()
1424 .ok()
1425 .and_then(|slot| slot.clone())?;
1426 let conn = match pool.lock() {
1427 Ok(conn) => conn,
1428 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1429 };
1430 let project_key = crate::search_index::project_cache_key(project_root);
1431 Some(
1432 crate::db::bash_tasks::find_bash_task_for_project(
1433 &conn,
1434 &harness,
1435 &project_key,
1436 task_id,
1437 )
1438 .map(|row| row.map(PersistedTask::from))
1439 .map_err(|error| error.to_string()),
1440 )
1441 }
1442
1443 pub(super) fn status_relaxed(
1444 &self,
1445 task_id: &str,
1446 _session_id: &str,
1447 project_root: &Path,
1448 storage_dir: &Path,
1449 preview_bytes: usize,
1450 ) -> Option<BgTaskSnapshot> {
1451 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1452 let _ = self.poll_task(&task);
1453 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1454 }
1455
1456 pub fn kill_relaxed(
1457 &self,
1458 task_id: &str,
1459 project_root: &Path,
1460 storage_dir: &Path,
1461 ) -> Result<BgTaskSnapshot, String> {
1462 let task = self
1463 .status_relaxed_task(task_id, project_root, storage_dir)
1464 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1465 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1466 }
1467
1468 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1469 #[cfg(test)]
1470 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1471
1472 let mut deleted = 0usize;
1473
1474 let root = storage_dir.join("bash-tasks");
1475 if root.exists() {
1476 let session_dirs = fs::read_dir(&root).map_err(|e| {
1477 format!(
1478 "failed to read background task root {}: {e}",
1479 root.display()
1480 )
1481 })?;
1482 for session_entry in session_dirs.flatten() {
1483 let session_dir = session_entry.path();
1484 if !session_dir.is_dir() {
1485 continue;
1486 }
1487 let task_entries = match fs::read_dir(&session_dir) {
1488 Ok(entries) => entries,
1489 Err(error) => {
1490 crate::slog_warn!(
1491 "failed to read background task session dir {}: {error}",
1492 session_dir.display()
1493 );
1494 continue;
1495 }
1496 };
1497 for task_entry in task_entries.flatten() {
1498 let json_path = task_entry.path();
1499 if json_path
1500 .extension()
1501 .and_then(|extension| extension.to_str())
1502 != Some("json")
1503 {
1504 continue;
1505 }
1506 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1507 continue;
1508 }
1509 let metadata = match read_task(&json_path) {
1510 Ok(metadata) => metadata,
1511 Err(error) => {
1512 crate::slog_warn!(
1513 "quarantining corrupt background task metadata {}: {error}",
1514 json_path.display()
1515 );
1516 quarantine_task_json(
1517 storage_dir,
1518 &session_dir,
1519 &json_path,
1520 QuarantineKind::Corrupt,
1521 )?;
1522 continue;
1523 }
1524 };
1525 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1526 continue;
1527 }
1528 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1529 match delete_task_bundle(&paths) {
1530 Ok(()) => {
1531 deleted += 1;
1532 log::debug!(
1533 "deleted persisted background task bundle {}",
1534 metadata.task_id
1535 );
1536 }
1537 Err(error) => {
1538 crate::slog_warn!(
1539 "failed to delete background task bundle {}: {error}",
1540 metadata.task_id
1541 );
1542 continue;
1543 }
1544 }
1545 }
1546 }
1547 }
1548 gc_quarantine(storage_dir);
1549 Ok(deleted)
1550 }
1551
1552 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1553 let tasks = self
1554 .inner
1555 .tasks
1556 .lock()
1557 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1558 .unwrap_or_default();
1559 tasks
1560 .into_iter()
1561 .map(|task| {
1562 let _ = self.poll_task(&task);
1563 self.snapshot_with_terminal_cache(&task, preview_bytes)
1564 })
1565 .collect()
1566 }
1567
1568 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1574 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1575 return;
1576 }
1577 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1578 snapshot.output_preview = cache.output_preview;
1579 snapshot.output_truncated = cache.output_truncated;
1580 }
1581 }
1582
1583 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1584 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1585 }
1586
1587 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1588 let task = self
1589 .task_for_session(task_id, session_id)
1590 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1591 let terminal_after_promote = {
1592 let mut state = task
1593 .state
1594 .lock()
1595 .map_err(|_| "background task lock poisoned".to_string())?;
1596 let updated = self
1597 .update_task_metadata(&task.paths, |metadata| {
1598 metadata.notify_on_completion = true;
1599 metadata.completion_delivered = false;
1600 })
1601 .map_err(|e| format!("failed to promote background task: {e}"))?;
1602 state.metadata = updated;
1603 state.metadata.status.is_terminal()
1604 };
1605 if terminal_after_promote {
1606 self.post_terminal_transition(&task, true)?;
1607 }
1608 Ok(true)
1609 }
1610
1611 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1612 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1613 .map(|_| ())
1614 }
1615
1616 pub fn cleanup_finished(&self, older_than: Duration) {
1617 let cutoff = Instant::now().checked_sub(older_than);
1618 let removable_paths: Vec<(String, TaskPaths)> =
1619 if let Ok(mut tasks) = self.inner.tasks.lock() {
1620 let removable = tasks
1621 .iter()
1622 .filter_map(|(task_id, task)| {
1623 let delivered_terminal = task
1624 .state
1625 .lock()
1626 .map(|state| {
1627 state.metadata.status.is_terminal()
1628 && state.metadata.completion_delivered
1629 })
1630 .unwrap_or(false);
1631 if !delivered_terminal {
1632 return None;
1633 }
1634
1635 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1636 let expired = match (terminal_at, cutoff) {
1637 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1638 (Some(_), None) => true,
1639 (None, _) => false,
1640 };
1641 expired.then(|| task_id.clone())
1642 })
1643 .collect::<Vec<_>>();
1644
1645 removable
1646 .into_iter()
1647 .filter_map(|task_id| {
1648 tasks
1649 .remove(&task_id)
1650 .map(|task| (task_id, task.paths.clone()))
1651 })
1652 .collect()
1653 } else {
1654 Vec::new()
1655 };
1656
1657 for (task_id, paths) in removable_paths {
1658 match delete_task_bundle(&paths) {
1659 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1660 Err(error) => crate::slog_warn!(
1661 "failed to delete persisted background task bundle {task_id}: {error}"
1662 ),
1663 }
1664 }
1665 }
1666
1667 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1668 self.drain_completions_for_session(None)
1669 }
1670
1671 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1672 let completions = match self.inner.completions.lock() {
1673 Ok(completions) => completions,
1674 Err(_) => return Vec::new(),
1675 };
1676
1677 completions
1678 .iter()
1679 .filter(|completion| completion_matches_session(completion, session_id))
1680 .cloned()
1681 .collect()
1682 }
1683
1684 pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1685 match self.inner.completions.lock() {
1686 Ok(completions) => completions
1687 .iter()
1688 .any(|completion| completion_matches_session(completion, session_id)),
1689 Err(_) => true,
1693 }
1694 }
1695
1696 pub fn ack_completions_for_session(
1697 &self,
1698 session_id: Option<&str>,
1699 task_ids: &[String],
1700 ) -> Vec<String> {
1701 if task_ids.is_empty() {
1702 return Vec::new();
1703 }
1704 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1705 let mut completion_sessions = HashMap::new();
1706 if let Ok(mut completions) = self.inner.completions.lock() {
1707 completions.retain(|completion| {
1708 let session_matches = session_id
1709 .map(|session_id| completion.session_id == session_id)
1710 .unwrap_or(true);
1711 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1712 completion_sessions
1713 .insert(completion.task_id.clone(), completion.session_id.clone());
1714 false
1715 } else {
1716 true
1717 }
1718 });
1719 }
1720
1721 let mut delivered = Vec::new();
1722 for task_id in task_ids {
1723 let task = if let Some(session_id) = session_id {
1724 self.task_for_session(task_id, session_id)
1725 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1726 self.task_for_session(task_id, completion_session_id)
1727 } else {
1728 self.task(task_id)
1729 };
1730 if let Some(task) = task {
1731 if task.set_completion_delivered(true, self).is_ok() {
1732 delivered.push(task_id.clone());
1733 }
1734 }
1735 }
1736
1737 delivered
1738 }
1739
1740 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1741 self.inner
1742 .completions
1743 .lock()
1744 .map(|completions| {
1745 completions
1746 .iter()
1747 .filter(|completion| completion.session_id == session_id)
1748 .cloned()
1749 .collect()
1750 })
1751 .unwrap_or_default()
1752 }
1753
1754 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1755 let mut completions = self.inner.completions.lock().ok()?;
1756 let idx = completions
1757 .iter()
1758 .position(|completion| completion.task_id == task_id)?;
1759 completions.remove(idx)
1760 }
1761
1762 fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1763 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1764 if !snapshot.info.status.is_terminal() {
1765 return None;
1766 }
1767 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1768 (String::new(), false)
1769 } else {
1770 self.ensure_terminal_output_cache(task)
1771 .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1772 .unwrap_or_else(|| (String::new(), false))
1773 };
1774 Some(BgCompletion {
1775 task_id: snapshot.info.task_id,
1776 session_id: task.session_id.clone(),
1777 status: snapshot.info.status,
1778 exit_code: snapshot.exit_code,
1779 command: snapshot.info.command,
1780 output_preview,
1781 output_truncated,
1782 original_tokens: None,
1783 compressed_tokens: None,
1784 tokens_skipped: false,
1785 })
1786 }
1787
1788 pub fn detach(&self) {
1789 self.inner.shutdown.store(true, Ordering::SeqCst);
1790 if let Ok(mut tasks) = self.inner.tasks.lock() {
1791 for task in tasks.values() {
1792 if let Ok(mut state) = task.state.lock() {
1793 match &mut state.runtime {
1794 TaskRuntime::Piped(child) => *child = None,
1795 TaskRuntime::Pty(runtime) => *runtime = None,
1796 }
1797 state.detached = true;
1798 }
1799 }
1800 tasks.clear();
1801 }
1802 }
1803
1804 pub fn shutdown(&self) {
1805 let tasks = self
1806 .inner
1807 .tasks
1808 .lock()
1809 .map(|tasks| {
1810 tasks
1811 .values()
1812 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1813 .collect::<Vec<_>>()
1814 })
1815 .unwrap_or_default();
1816 for (task_id, session_id) in tasks {
1817 let _ = self.kill(&task_id, &session_id);
1818 }
1819 }
1820
1821 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1822 if let Ok(state) = task.state.lock() {
1823 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1824 if !pty.exit_observed.load(Ordering::SeqCst) {
1832 return Ok(());
1833 }
1834 }
1835 }
1836 let marker = match read_exit_marker(&task.paths.exit) {
1837 Ok(Some(marker)) => marker,
1838 Ok(None) => return Ok(()),
1839 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1840 };
1841 self.finalize_from_marker(task, marker, None)
1842 }
1843
1844 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1845 let mut needs_completion = false;
1846 {
1847 let Ok(mut state) = task.state.lock() else {
1848 return;
1849 };
1850 match &mut state.runtime {
1851 TaskRuntime::Piped(child_slot) => {
1852 if let Some(child) = child_slot.as_mut() {
1853 if matches!(child.try_wait(), Ok(Some(_))) {
1854 *child_slot = None;
1855 state.detached = true;
1856 state.child_exit_observed = true;
1857 }
1858 } else if state.detached {
1859 let child_known_dead = state.child_exit_observed
1860 || state
1861 .metadata
1862 .child_pid
1863 .is_some_and(|pid| !is_process_alive(pid));
1864 if child_known_dead {
1865 needs_completion =
1866 self.fail_without_exit_marker_if_needed(task, &mut state);
1867 }
1868 }
1869 }
1870 TaskRuntime::Pty(Some(pty)) => {
1871 if pty.exit_observed.load(Ordering::SeqCst) {
1872 drop(state);
1873 let _ = self.poll_task(task);
1874 return;
1875 }
1876 }
1877 TaskRuntime::Pty(None) => {}
1878 }
1879 }
1880 if needs_completion {
1881 let _ = self.post_terminal_transition(task, true);
1882 }
1883 }
1884
1885 fn fail_without_exit_marker_if_needed(
1886 &self,
1887 task: &Arc<BgTask>,
1888 state: &mut BgTaskState,
1889 ) -> bool {
1890 if state.metadata.status.is_terminal() {
1891 return false;
1892 }
1893 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1894 return false;
1895 }
1896 let watch_controlled = self.task_has_watch_control(&task.task_id);
1897 let updated = self.update_task_metadata(&task.paths, |metadata| {
1898 metadata.mark_terminal(
1899 BgTaskStatus::Failed,
1900 None,
1901 Some("process exited without exit marker".to_string()),
1902 );
1903 if watch_controlled {
1904 metadata.completion_delivered = true;
1905 }
1906 });
1907 if let Ok(metadata) = updated {
1908 state.pending_terminal_override = None;
1909 state.metadata = metadata;
1910 task.mark_terminal_now();
1911 return true;
1912 }
1913 false
1914 }
1915
1916 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1917 self.inner
1918 .tasks
1919 .lock()
1920 .map(|tasks| {
1921 tasks
1922 .values()
1923 .filter(|task| task.is_running())
1924 .cloned()
1925 .collect()
1926 })
1927 .unwrap_or_default()
1928 }
1929
1930 fn insert_rehydrated_task(
1931 &self,
1932 metadata: PersistedTask,
1933 paths: TaskPaths,
1934 detached: bool,
1935 ) -> Result<(), String> {
1936 let task_id = metadata.task_id.clone();
1937 let session_id = metadata.session_id.clone();
1938 let started = started_instant_from_unix_millis(metadata.started_at);
1939 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1940 let mode = metadata.mode.clone();
1941 let task = Arc::new(BgTask {
1942 task_id: task_id.clone(),
1943 session_id,
1944 paths: paths.clone(),
1945 started,
1946 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1947 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1948 state: Mutex::new(BgTaskState {
1949 metadata,
1950 runtime: if mode == BgMode::Pty {
1951 TaskRuntime::Pty(None)
1952 } else {
1953 TaskRuntime::Piped(None)
1954 },
1955 detached,
1956 child_exit_observed: false,
1963 buffer: if mode == BgMode::Pty {
1964 BgBuffer::pty(paths.pty.clone())
1965 } else {
1966 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1967 },
1968 terminal_output_cache: None,
1969 pending_terminal_override: None,
1970 }),
1971 });
1972 self.inner
1973 .tasks
1974 .lock()
1975 .map_err(|_| "background task registry lock poisoned".to_string())?
1976 .insert(task_id, task);
1977 Ok(())
1978 }
1979
1980 fn kill_with_status(
1981 &self,
1982 task_id: &str,
1983 session_id: &str,
1984 terminal_status: BgTaskStatus,
1985 ) -> Result<BgTaskSnapshot, String> {
1986 let task = self
1987 .task_for_session(task_id, session_id)
1988 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1989 let mut terminalized = false;
1990
1991 {
1992 let mut state = task
1993 .state
1994 .lock()
1995 .map_err(|_| "background task lock poisoned".to_string())?;
1996 if state.metadata.status.is_terminal() {
1997 state.pending_terminal_override = None;
1998 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1999 state.metadata =
2000 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2001 if self.task_has_watch_control(&task.task_id) {
2002 state.metadata.completion_delivered = true;
2003 }
2004 state.pending_terminal_override = None;
2005 task.mark_terminal_now();
2006 match &mut state.runtime {
2007 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2014 TaskRuntime::Pty(runtime) => *runtime = None,
2015 }
2016 state.detached = true;
2017 self.persist_task(&task.paths, &state.metadata)
2018 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2019 terminalized = true;
2020 } else {
2021 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2022 if !was_already_killing {
2023 state.metadata.status = BgTaskStatus::Killing;
2024 self.persist_task(&task.paths, &state.metadata)
2025 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2026 }
2027
2028 #[cfg(unix)]
2029 let pgid = state.metadata.pgid;
2030 #[cfg(windows)]
2031 let child_pid = state.metadata.child_pid;
2032 if !was_already_killing
2033 && state.metadata.mode == BgMode::Pty
2034 && terminal_status == BgTaskStatus::TimedOut
2035 {
2036 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2037 }
2038
2039 #[cfg(windows)]
2040 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2041
2042 match &mut state.runtime {
2043 TaskRuntime::Piped(child_slot) => {
2044 #[cfg(unix)]
2045 if let Some(pgid) = pgid {
2046 terminate_pgid(pgid, child_slot.as_mut());
2047 }
2048 #[cfg(windows)]
2049 if let Some(child) = child_slot.as_mut() {
2050 super::process::terminate_process(child);
2051 } else if let Some(pid) = child_pid {
2052 terminate_pid(pid);
2053 }
2054 if let Some(child) = child_slot.as_mut() {
2055 let _ = child.wait();
2056 }
2057 *child_slot = None;
2058 state.detached = true;
2059
2060 if !task.paths.exit.exists() {
2061 write_kill_marker_if_absent(&task.paths.exit)
2062 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2063 }
2064
2065 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2066 Some(124)
2067 } else {
2068 None
2069 };
2070 state
2071 .metadata
2072 .mark_terminal(terminal_status, exit_code, None);
2073 if self.task_has_watch_control(&task.task_id) {
2074 state.metadata.completion_delivered = true;
2075 }
2076 state.pending_terminal_override = None;
2077 task.mark_terminal_now();
2078 self.persist_task(&task.paths, &state.metadata)
2079 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2080 terminalized = true;
2081 }
2082 TaskRuntime::Pty(Some(pty)) => {
2083 pty.was_killed.store(true, Ordering::SeqCst);
2084 if let Err(error) = pty.killer.kill() {
2085 crate::slog_warn!(
2086 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2087 );
2088 }
2089 if let Some(pid) = pty.child_pid {
2090 #[cfg(unix)]
2091 terminate_pgid(pid as i32, None);
2092 #[cfg(windows)]
2093 terminate_pid(pid);
2094 }
2095 drop(pty.master.take());
2096
2097 #[cfg(windows)]
2098 {
2099 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2100 BgTaskStatus::TimedOut
2101 } else {
2102 BgTaskStatus::Killed
2103 };
2104 pty_forced_terminal_status = Some(
2105 state
2106 .pending_terminal_override
2107 .take()
2108 .unwrap_or(default_status),
2109 );
2110 }
2111 }
2112 TaskRuntime::Pty(None) => {}
2113 }
2114
2115 #[cfg(windows)]
2116 if let Some(target_status) = pty_forced_terminal_status {
2117 if !task.paths.exit.exists() {
2118 write_kill_marker_if_absent(&task.paths.exit)
2119 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2120 }
2121
2122 let exit_code = if target_status == BgTaskStatus::TimedOut {
2123 Some(124)
2124 } else {
2125 None
2126 };
2127 state.metadata.mark_terminal(target_status, exit_code, None);
2128 if self.task_has_watch_control(&task.task_id) {
2129 state.metadata.completion_delivered = true;
2130 }
2131 state.pending_terminal_override = None;
2132 task.mark_terminal_now();
2133 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2134 *runtime = None;
2135 }
2136 state.detached = true;
2137 self.persist_task(&task.paths, &state.metadata)
2138 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2139 terminalized = true;
2140 }
2141 }
2142 }
2143
2144 if terminalized {
2145 self.post_terminal_transition(&task, true)?;
2146 }
2147 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2148 }
2149
2150 fn finalize_from_marker(
2151 &self,
2152 task: &Arc<BgTask>,
2153 marker: ExitMarker,
2154 reason: Option<String>,
2155 ) -> Result<(), String> {
2156 let watch_controlled = self.task_has_watch_control(&task.task_id);
2157 let mut pty_reader_done = None;
2158 {
2159 let mut state = task
2160 .state
2161 .lock()
2162 .map_err(|_| "background task lock poisoned".to_string())?;
2163 if state.metadata.status.is_terminal() {
2164 state.pending_terminal_override = None;
2165 return Ok(());
2166 }
2167
2168 let pending_override = state.pending_terminal_override.take();
2169 let is_pty = state.metadata.mode == BgMode::Pty;
2170 let updated = self
2171 .update_task_metadata(&task.paths, |metadata| {
2172 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2173 let mut metadata = metadata.clone();
2174 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2175 let exit_code = if target_status == BgTaskStatus::TimedOut {
2176 Some(124)
2177 } else {
2178 None
2179 };
2180 metadata.mark_terminal(target_status, exit_code, reason);
2181 metadata
2182 } else {
2183 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2184 };
2185 if watch_controlled {
2186 new_metadata.completion_delivered = true;
2187 }
2188 *metadata = new_metadata;
2189 })
2190 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2191 state.metadata = updated;
2192 task.mark_terminal_now();
2193 match &mut state.runtime {
2194 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2199 TaskRuntime::Pty(runtime) => {
2200 pty_reader_done = runtime
2201 .as_ref()
2202 .map(|runtime| Arc::clone(&runtime.reader_done));
2203 *runtime = None;
2204 }
2205 }
2206 state.detached = true;
2207 }
2208
2209 if let Some(reader_done) = pty_reader_done {
2210 let deadline = Instant::now() + Duration::from_millis(200);
2211 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2212 std::thread::sleep(Duration::from_millis(10));
2213 }
2214 }
2215
2216 self.scan_task_watch_output(task);
2219
2220 self.post_terminal_transition(task, true)
2221 }
2222
2223 fn enqueue_completion_if_needed(
2224 &self,
2225 metadata: &PersistedTask,
2226 paths: Option<&TaskPaths>,
2227 emit_frame: bool,
2228 ) {
2229 if metadata.status.is_terminal() && !metadata.completion_delivered {
2230 let cache =
2231 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2232 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2233 }
2234 }
2235
2236 fn render_terminal_output_from_paths(
2237 &self,
2238 metadata: &PersistedTask,
2239 paths: &TaskPaths,
2240 ) -> Option<TerminalOutputCache> {
2241 if metadata.mode == BgMode::Pty {
2242 return None;
2243 }
2244 let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2245 Some(self.render_terminal_output(metadata, &buffer))
2246 }
2247
2248 fn enqueue_completion_from_parts(
2249 &self,
2250 metadata: &PersistedTask,
2251 buffer: Option<&BgBuffer>,
2252 paths: Option<&TaskPaths>,
2253 emit_frame: bool,
2254 terminal_render: Option<&TerminalOutputCache>,
2255 ) {
2256 if !metadata.status.is_terminal() {
2267 return;
2268 }
2269
2270 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2271 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2272 } else {
2273 None
2274 };
2275 let render_buffer = buffer.or(owned_buffer.as_ref());
2276 let owned_render = if terminal_render.is_none() {
2277 render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2278 } else {
2279 None
2280 };
2281 let render = terminal_render.or(owned_render.as_ref());
2282
2283 let (output_preview, output_truncated) = render
2287 .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2288 .unwrap_or_else(|| (String::new(), false));
2289
2290 let token_counts = self.completion_token_counts(
2291 metadata,
2292 buffer,
2293 paths,
2294 render.map(|render| render.output_preview.as_str()),
2295 );
2296 let completion = BgCompletion {
2297 task_id: metadata.task_id.clone(),
2298 session_id: metadata.session_id.clone(),
2299 status: metadata.status.clone(),
2300 exit_code: metadata.exit_code,
2301 command: metadata.command.clone(),
2302 output_preview,
2303 output_truncated,
2304 original_tokens: token_counts.original_tokens,
2305 compressed_tokens: token_counts.compressed_tokens,
2306 tokens_skipped: token_counts.tokens_skipped,
2307 };
2308
2309 self.record_compression_event_if_applicable(metadata, &token_counts);
2320
2321 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2322 if watch_controlled {
2323 if emit_frame && !watch_matched {
2324 self.emit_bash_watch_exit(&completion);
2325 }
2326 self.clear_task_watch_state(&metadata.task_id);
2327 return;
2328 }
2329
2330 if metadata.completion_delivered {
2340 return;
2341 }
2342
2343 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2346 if completions
2347 .iter()
2348 .any(|existing| existing.task_id == metadata.task_id)
2349 {
2350 false
2351 } else {
2352 completions.push_back(completion.clone());
2353 true
2354 }
2355 } else {
2356 false
2357 };
2358
2359 if pushed && emit_frame {
2360 self.emit_bash_completed(completion);
2361 }
2362 }
2363
2364 fn record_compression_event_if_applicable(
2365 &self,
2366 metadata: &PersistedTask,
2367 token_counts: &CompletionTokenCounts,
2368 ) {
2369 if metadata.mode == BgMode::Pty {
2370 return;
2371 }
2372
2373 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2374 token_counts.original_tokens,
2375 token_counts.compressed_tokens,
2376 token_counts.original_bytes,
2377 token_counts.compressed_bytes,
2378 ) {
2379 (
2380 Some(original_tokens),
2381 Some(compressed_tokens),
2382 Some(original_bytes),
2383 Some(compressed_bytes),
2384 ) => (
2385 original_tokens,
2386 compressed_tokens,
2387 original_bytes,
2388 compressed_bytes,
2389 ),
2390 _ => {
2391 crate::slog_warn!(
2392 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2393 metadata.task_id
2394 );
2395 return;
2396 }
2397 };
2398
2399 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2400 let Some(pool) = pool else {
2401 crate::slog_warn!(
2402 "compression event skipped for {}: db_pool not initialized — was configure run?",
2403 metadata.task_id
2404 );
2405 return;
2406 };
2407 let harness = self
2408 .inner
2409 .db_harness
2410 .read()
2411 .ok()
2412 .and_then(|slot| slot.clone());
2413 let Some(harness) = harness else {
2414 crate::slog_warn!(
2415 "compression event insert skipped for {}: harness not configured",
2416 metadata.task_id
2417 );
2418 return;
2419 };
2420
2421 let project_root = metadata
2422 .project_root
2423 .as_deref()
2424 .unwrap_or(&metadata.workdir);
2425 let project_key = crate::search_index::project_cache_key(project_root);
2426 let row = crate::db::compression_events::CompressionEventRow {
2427 harness: &harness,
2428 session_id: Some(&metadata.session_id),
2429 project_key: &project_key,
2430 tool: "bash",
2431 task_id: Some(&metadata.task_id),
2432 command: Some(&metadata.command),
2433 compressor: if metadata.compressed {
2434 "registry"
2435 } else {
2436 "none"
2437 },
2438 original_bytes,
2439 compressed_bytes,
2440 original_tokens,
2441 compressed_tokens,
2442 created_at: unix_millis() as i64,
2443 };
2444
2445 let conn = match pool.lock() {
2446 Ok(conn) => conn,
2447 Err(_) => {
2448 crate::slog_warn!(
2449 "compression event insert failed for {}: db mutex poisoned",
2450 metadata.task_id
2451 );
2452 return;
2453 }
2454 };
2455 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2456 Ok(_) => {
2457 crate::slog_debug!(
2461 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2462 metadata.task_id,
2463 project_key,
2464 metadata.session_id,
2465 original_tokens,
2466 compressed_tokens
2467 );
2468 }
2469 Err(error) => {
2470 crate::slog_warn!(
2471 "compression event insert failed for {}: {}",
2472 metadata.task_id,
2473 error
2474 );
2475 }
2476 }
2477 }
2478
2479 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2480 let Ok(progress_sender) = self
2481 .inner
2482 .progress_sender
2483 .lock()
2484 .map(|sender| sender.clone())
2485 else {
2486 return;
2487 };
2488 if let Some(sender) = progress_sender.as_ref() {
2489 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2490 pattern_match.task_id,
2491 session_id.to_string(),
2492 pattern_match.watch_id,
2493 pattern_match.match_text,
2494 pattern_match.match_offset,
2495 pattern_match.context,
2496 pattern_match.once,
2497 )));
2498 }
2499 }
2500
2501 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2502 let Ok(progress_sender) = self
2503 .inner
2504 .progress_sender
2505 .lock()
2506 .map(|sender| sender.clone())
2507 else {
2508 return;
2509 };
2510 let Some(sender) = progress_sender.as_ref() else {
2511 return;
2512 };
2513 let status = completion_status_text(&completion.status, completion.exit_code);
2514 let preview = completion.output_preview.trim_end();
2515 let context = if preview.is_empty() {
2516 format!("task {} exited ({status})", completion.task_id)
2517 } else {
2518 format!(
2519 "task {} exited ({status})
2520{preview}",
2521 completion.task_id
2522 )
2523 };
2524 sender(PushFrame::BashPatternMatch(
2525 BashPatternMatchFrame::task_exit(
2526 completion.task_id.clone(),
2527 completion.session_id.clone(),
2528 format!("exited ({status})"),
2529 context,
2530 ),
2531 ));
2532 }
2533
2534 fn emit_bash_completed(&self, completion: BgCompletion) {
2535 let Ok(progress_sender) = self
2536 .inner
2537 .progress_sender
2538 .lock()
2539 .map(|sender| sender.clone())
2540 else {
2541 return;
2542 };
2543 let Some(sender) = progress_sender.as_ref() else {
2544 return;
2545 };
2546 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2554 completion.task_id,
2555 completion.session_id,
2556 completion.status,
2557 completion.exit_code,
2558 completion.command,
2559 completion.output_preview,
2560 completion.output_truncated,
2561 completion.original_tokens,
2562 completion.compressed_tokens,
2563 completion.tokens_skipped,
2564 )));
2565 }
2566
2567 fn completion_token_counts(
2568 &self,
2569 metadata: &PersistedTask,
2570 buffer: Option<&BgBuffer>,
2571 paths: Option<&TaskPaths>,
2572 rendered_output: Option<&str>,
2573 ) -> CompletionTokenCounts {
2574 if metadata.mode == BgMode::Pty {
2575 return CompletionTokenCounts::skipped();
2576 }
2577
2578 let raw = match buffer {
2579 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2580 None => paths
2581 .map(|paths| {
2582 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2583 })
2584 .unwrap_or(TokenCountInput::Skipped),
2585 };
2586
2587 let TokenCountInput::Text(raw_output) = raw else {
2588 return CompletionTokenCounts::skipped();
2589 };
2590
2591 let original_tokens = token_count_u32(&raw_output);
2592 let original_bytes = raw_output.len() as i64;
2593 let compressed_output = rendered_output.unwrap_or(&raw_output);
2594 let compressed_tokens = token_count_u32(compressed_output);
2595 let compressed_bytes = compressed_output.len() as i64;
2596 CompletionTokenCounts {
2597 original_tokens: Some(original_tokens),
2598 compressed_tokens: Some(compressed_tokens),
2599 original_bytes: Some(original_bytes),
2600 compressed_bytes: Some(compressed_bytes),
2601 tokens_skipped: false,
2602 }
2603 }
2604
2605 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2606 if !self
2607 .inner
2608 .long_running_reminder_enabled
2609 .load(Ordering::SeqCst)
2610 {
2611 return;
2612 }
2613 let interval_ms = self
2614 .inner
2615 .long_running_reminder_interval_ms
2616 .load(Ordering::SeqCst);
2617 if interval_ms == 0 {
2618 return;
2619 }
2620 let interval = Duration::from_millis(interval_ms);
2621 let now = Instant::now();
2622 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2623 return;
2624 };
2625 let since = last_reminder_at.unwrap_or(task.started);
2626 if now.duration_since(since) < interval {
2627 return;
2628 }
2629 let command = task
2630 .state
2631 .lock()
2632 .map(|state| state.metadata.command.clone())
2633 .unwrap_or_default();
2634 *last_reminder_at = Some(now);
2635 self.emit_bash_long_running(BashLongRunningFrame::new(
2636 task.task_id.clone(),
2637 task.session_id.clone(),
2638 command,
2639 task.started.elapsed().as_millis() as u64,
2640 ));
2641 }
2642
2643 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2644 let Ok(progress_sender) = self
2645 .inner
2646 .progress_sender
2647 .lock()
2648 .map(|sender| sender.clone())
2649 else {
2650 return;
2651 };
2652 if let Some(sender) = progress_sender.as_ref() {
2653 sender(PushFrame::BashLongRunning(frame));
2654 }
2655 }
2656
2657 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2658 self.inner
2659 .tasks
2660 .lock()
2661 .ok()
2662 .and_then(|tasks| tasks.get(task_id).cloned())
2663 }
2664
2665 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2666 self.task(task_id)
2667 .filter(|task| task.session_id == session_id)
2668 }
2669
2670 fn running_count(&self) -> usize {
2671 self.inner
2672 .tasks
2673 .lock()
2674 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2675 .unwrap_or(0)
2676 }
2677
2678 fn start_watchdog(&self) {
2679 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2680 super::watchdog::start(self.clone());
2681 }
2682 }
2683
2684 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2685 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2686 }
2687
2688 #[cfg(test)]
2689 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2690 self.task_for_session(task_id, session_id)
2691 .map(|task| task.paths.json.clone())
2692 }
2693
2694 #[cfg(test)]
2695 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2696 self.task_for_session(task_id, session_id)
2697 .map(|task| task.paths.exit.clone())
2698 }
2699
2700 fn generate_unique_task_id(&self) -> Result<String, String> {
2702 for _ in 0..32 {
2703 let candidate = random_slug();
2704 let tasks = self
2705 .inner
2706 .tasks
2707 .lock()
2708 .map_err(|_| "background task registry lock poisoned".to_string())?;
2709 if tasks.contains_key(&candidate) {
2710 continue;
2711 }
2712 let completions = self
2713 .inner
2714 .completions
2715 .lock()
2716 .map_err(|_| "background completions lock poisoned".to_string())?;
2717 if completions
2718 .iter()
2719 .any(|completion| completion.task_id == candidate)
2720 {
2721 continue;
2722 }
2723 return Ok(candidate);
2724 }
2725 Err("failed to allocate unique background task id after 32 attempts".to_string())
2726 }
2727}
2728
2729fn render_compressed_with_recovery(
2730 buffer: &BgBuffer,
2731 mut compressed: CompressionResult,
2732 input_truncated: bool,
2733) -> TerminalOutputCache {
2734 let had_trailing_newline = compressed.text.ends_with('\n');
2742 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2743 .trim_end()
2744 .to_string();
2745 if had_trailing_newline && !text.is_empty() {
2746 text.push('\n');
2747 }
2748 compressed.text = text;
2749
2750 let output_path = buffer.output_path().map(|path| path.display().to_string());
2751 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2752 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2753 let mut recovery = RecoveryContext {
2754 dropped_by_class: compressed.dropped_by_class,
2755 had_inner_drop: compressed.had_inner_drop,
2756 offset_hint_eligible: compressed.offset_hint_eligible,
2757 offset_start_line: compressed.offset_start_line,
2758 byte_truncated: input_truncated,
2759 output_path: output_path.clone(),
2760 stderr_path: stderr_path.clone(),
2761 include_stderr_path,
2762 };
2763
2764 let (output_preview, output_truncated) =
2765 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2766 TerminalOutputCache {
2767 output_preview,
2768 output_truncated,
2769 kind: TerminalOutputKind::Compressed,
2770 output_path,
2771 stderr_path,
2772 recovery: Some(recovery),
2773 }
2774}
2775
2776fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2777 render_body_with_recovery_marker_at_cap(
2778 body,
2779 recovery,
2780 FINAL_OUTPUT_CAP_BYTES,
2781 cap_final_output,
2782 cap_final_output_with_marker,
2783 )
2784}
2785
2786fn render_raw_body_with_recovery_marker(
2787 body: &str,
2788 recovery: &mut RecoveryContext,
2789) -> (String, bool) {
2790 render_body_with_recovery_marker_at_cap(
2791 body,
2792 recovery,
2793 RAW_PASSTHROUGH_CAP_BYTES,
2794 |input| {
2795 super::output::cap_head_tail(
2796 input,
2797 RAW_PASSTHROUGH_CAP_BYTES,
2798 RAW_PASSTHROUGH_HEAD_BYTES,
2799 RAW_PASSTHROUGH_TAIL_BYTES,
2800 )
2801 },
2802 |input, marker| {
2803 super::output::cap_head_tail_with_marker(
2804 input,
2805 RAW_PASSTHROUGH_CAP_BYTES,
2806 RAW_PASSTHROUGH_HEAD_BYTES,
2807 RAW_PASSTHROUGH_TAIL_BYTES,
2808 marker,
2809 )
2810 },
2811 )
2812}
2813
2814fn render_body_with_recovery_marker_at_cap<F, G>(
2815 body: &str,
2816 recovery: &mut RecoveryContext,
2817 cap_bytes: usize,
2818 cap_plain: F,
2819 cap_with_marker: G,
2820) -> (String, bool)
2821where
2822 F: Fn(&str) -> super::output::CappedText,
2823 G: Fn(&str, &str) -> super::output::CappedText,
2824{
2825 let needs_marker = recovery.has_visible_drop();
2826 if body.len() > cap_bytes {
2827 recovery.byte_truncated = true;
2828 if let Some(marker) = recovery_marker(recovery) {
2829 let capped = cap_with_marker(body, &marker);
2830 return (capped.text, true);
2831 }
2832 let capped = cap_plain(body);
2833 return (capped.text, capped.truncated || needs_marker);
2834 }
2835
2836 if !needs_marker {
2837 return (body.to_string(), false);
2838 }
2839
2840 let Some(marker) = recovery_marker(recovery) else {
2841 return (body.to_string(), true);
2842 };
2843 let with_marker = append_recovery_marker(body, &marker);
2844 if with_marker.len() <= cap_bytes {
2845 return (with_marker, true);
2846 }
2847
2848 recovery.byte_truncated = true;
2849 let marker = recovery_marker(recovery).unwrap_or(marker);
2850 let capped = cap_with_marker(body, &marker);
2851 (capped.text, true)
2852}
2853
2854fn append_recovery_marker(body: &str, marker: &str) -> String {
2855 if body.is_empty() {
2856 return marker.to_string();
2857 }
2858 let mut output = body.trim_end().to_string();
2859 output.push('\n');
2860 output.push_str(marker);
2861 output
2862}
2863
2864fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2865 let mut parts = Vec::new();
2866 for (class, count) in &recovery.dropped_by_class {
2867 let label = if *count == 1 {
2868 class.singular()
2869 } else {
2870 class.plural()
2871 };
2872 parts.push(format!("+{count} more {label}"));
2873 }
2874 if recovery.byte_truncated {
2875 parts.push("truncated output".to_string());
2876 } else if recovery.had_inner_drop && parts.is_empty() {
2877 parts.push("omitted output".to_string());
2878 }
2879
2880 if parts.is_empty() {
2881 return None;
2882 }
2883
2884 let hint = recovery_hint(recovery);
2885 Some(format!("[{}; {hint}]", parts.join(", ")))
2886}
2887
2888fn recovery_hint(recovery: &RecoveryContext) -> String {
2889 if recovery.offset_hint_eligible
2893 && !recovery.byte_truncated
2894 && recovery.dropped_by_class.is_empty()
2895 && !recovery.include_stderr_path
2896 {
2897 if let (Some(path), Some(line)) =
2898 (recovery.output_path.as_deref(), recovery.offset_start_line)
2899 {
2900 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2901 }
2902 }
2903
2904 let mut paths = Vec::new();
2905 if let Some(path) = recovery.output_path.as_deref() {
2906 paths.push(path);
2907 }
2908 if recovery.include_stderr_path {
2909 if let Some(path) = recovery.stderr_path.as_deref() {
2910 if !paths.contains(&path) {
2911 paths.push(path);
2912 }
2913 }
2914 }
2915
2916 if paths.is_empty() {
2917 return "full output unavailable".to_string();
2918 }
2919
2920 let reads = paths
2921 .into_iter()
2922 .map(|path| format!("read {}", quote_path(path)))
2923 .collect::<Vec<_>>()
2924 .join(" and ");
2925 format!("full output: {reads}")
2926}
2927
2928fn strip_plain_truncation_marker_lines(input: &str) -> String {
2929 input
2930 .lines()
2931 .filter(|line| !is_plain_truncation_marker(line.trim()))
2932 .collect::<Vec<_>>()
2933 .join("\n")
2934}
2935
2936fn strip_recovery_marker_lines(input: &str) -> String {
2937 input
2938 .lines()
2939 .filter(|line| !is_recovery_marker(line.trim()))
2940 .collect::<Vec<_>>()
2941 .join("\n")
2942}
2943
2944fn is_plain_truncation_marker(line: &str) -> bool {
2945 let Some(rest) = line.strip_prefix("...<truncated ") else {
2946 return false;
2947 };
2948 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2949 return false;
2950 };
2951 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2952}
2953
2954fn is_recovery_marker(line: &str) -> bool {
2955 line.starts_with('[')
2956 && line.ends_with(']')
2957 && (line.contains("full output: read ")
2958 || line.contains("see remaining: tail -n +")
2959 || line.contains("full output unavailable"))
2960}
2961
2962fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2963 if !is_gh_structured_command(command) {
2964 return None;
2965 }
2966
2967 let output_path = buffer
2968 .output_path()
2969 .map(|path| path.display().to_string())?;
2970 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2971 if stdout_bytes == 0 {
2972 return None;
2973 }
2974
2975 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2976 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2977 return None;
2978 }
2979 return Some(TerminalOutputCache {
2980 output_preview: json_output_pointer(stdout_bytes, &output_path),
2981 output_truncated: true,
2982 kind: TerminalOutputKind::Structured,
2983 output_path: Some(output_path),
2984 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2985 recovery: None,
2986 });
2987 }
2988
2989 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2990 if stdout.truncated || !is_structured_body(&stdout.text) {
2991 return None;
2992 }
2993
2994 Some(TerminalOutputCache {
2995 output_preview: stdout.text,
2996 output_truncated: false,
2997 kind: TerminalOutputKind::Structured,
2998 output_path: Some(output_path),
2999 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3000 recovery: None,
3001 })
3002}
3003
3004fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
3005 let raw = buffer.read_combined_head_tail(
3006 RAW_PASSTHROUGH_CAP_BYTES,
3007 RAW_PASSTHROUGH_HEAD_BYTES,
3008 RAW_PASSTHROUGH_TAIL_BYTES,
3009 );
3010 let output_path = buffer.output_path().map(|path| path.display().to_string());
3011 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3012 if !raw.truncated {
3013 return TerminalOutputCache {
3014 output_preview: raw.text,
3015 output_truncated: false,
3016 kind: TerminalOutputKind::Raw,
3017 output_path,
3018 stderr_path,
3019 recovery: None,
3020 };
3021 }
3022
3023 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3024 let mut recovery = RecoveryContext {
3025 dropped_by_class: BTreeMap::new(),
3026 had_inner_drop: false,
3027 offset_hint_eligible: false,
3028 offset_start_line: None,
3029 byte_truncated: true,
3030 output_path: output_path.clone(),
3031 stderr_path: stderr_path.clone(),
3032 include_stderr_path,
3033 };
3034 let (output_preview, output_truncated) =
3035 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3036 TerminalOutputCache {
3037 output_preview,
3038 output_truncated,
3039 kind: TerminalOutputKind::Raw,
3040 output_path,
3041 stderr_path,
3042 recovery: Some(recovery),
3043 }
3044}
3045
3046fn completion_preview_for_cache(
3047 cache: &TerminalOutputCache,
3048 exit_code: Option<i32>,
3049) -> (String, bool) {
3050 let exit_ok = exit_code == Some(0);
3053 let threshold = completion_preview_threshold(exit_ok);
3054 if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3055 if let Some(path) = cache.output_path.as_deref() {
3056 return (
3057 json_output_pointer(cache.output_preview.len() as u64, path),
3058 true,
3059 );
3060 }
3061 return (cache.output_preview.clone(), cache.output_truncated);
3062 }
3063
3064 if let Some(recovery) = cache.recovery.as_ref() {
3065 if cache.output_preview.len() <= threshold {
3066 return (cache.output_preview.clone(), cache.output_truncated);
3067 }
3068 let body = strip_recovery_marker_lines(&cache.output_preview);
3069 let mut completion_recovery = recovery.clone();
3070 completion_recovery.byte_truncated = true;
3071 if let Some(marker) = recovery_marker(&completion_recovery) {
3072 let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3073 return (capped.text, true);
3074 }
3075 }
3076
3077 let capped = cap_completion_output(&cache.output_preview, exit_ok);
3078 (capped.text, cache.output_truncated || capped.truncated)
3079}
3080
3081fn is_gh_structured_command(command: &str) -> bool {
3082 let normalized = crate::compress::normalize_command_for_dispatch(command)
3083 .unwrap_or_else(|| command.trim_start().to_string());
3084 let tokens = shell_words_for_flags(&normalized);
3085 let Some(head) = tokens.first() else {
3086 return false;
3087 };
3088 let head_name = Path::new(head)
3089 .file_name()
3090 .and_then(|name| name.to_str())
3091 .unwrap_or(head);
3092 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3093 return false;
3094 }
3095 tokens.iter().any(|token| {
3096 matches!(token.as_str(), "--json" | "--jq" | "--template")
3097 || token.starts_with("--json=")
3098 || token.starts_with("--jq=")
3099 || token.starts_with("--template=")
3100 })
3101}
3102
3103fn shell_words_for_flags(command: &str) -> Vec<String> {
3104 let mut words = Vec::new();
3105 let mut current = String::new();
3106 let mut in_single = false;
3107 let mut in_double = false;
3108 let mut escaped = false;
3109
3110 for ch in command.chars() {
3111 if escaped {
3112 current.push(ch);
3113 escaped = false;
3114 continue;
3115 }
3116 if ch == '\\' && !in_single {
3117 escaped = true;
3118 continue;
3119 }
3120 if ch == '\'' && !in_double {
3121 in_single = !in_single;
3122 continue;
3123 }
3124 if ch == '"' && !in_single {
3125 in_double = !in_double;
3126 continue;
3127 }
3128 if ch.is_whitespace() && !in_single && !in_double {
3129 if !current.is_empty() {
3130 words.push(std::mem::take(&mut current));
3131 }
3132 continue;
3133 }
3134 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3135 if !current.is_empty() {
3136 words.push(std::mem::take(&mut current));
3137 }
3138 continue;
3139 }
3140 current.push(ch);
3141 }
3142 if !current.is_empty() {
3143 words.push(current);
3144 }
3145 words
3146}
3147
3148fn is_structured_body(body: &str) -> bool {
3149 let trimmed = body.trim();
3150 if trimmed.is_empty() {
3151 return false;
3152 }
3153 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3154 return true;
3155 }
3156
3157 let mut saw_line = false;
3158 for line in trimmed
3159 .lines()
3160 .map(str::trim)
3161 .filter(|line| !line.is_empty())
3162 {
3163 saw_line = true;
3164 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3165 return false;
3166 }
3167 }
3168 saw_line
3169}
3170
3171fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3172 let path = match (buffer, stream) {
3173 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3174 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3175 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3176 };
3177 let Some(path) = path else {
3178 return false;
3179 };
3180 let Ok(file) = std::fs::File::open(path) else {
3181 return false;
3182 };
3183 let mut limited = file.take(512);
3184 let mut bytes = Vec::new();
3185 if limited.read_to_end(&mut bytes).is_err() {
3186 return false;
3187 }
3188 String::from_utf8_lossy(&bytes)
3189 .chars()
3190 .find(|ch| !ch.is_whitespace())
3191 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3192}
3193
3194struct CompletionTokenCounts {
3195 original_tokens: Option<u32>,
3196 compressed_tokens: Option<u32>,
3197 original_bytes: Option<i64>,
3198 compressed_bytes: Option<i64>,
3199 tokens_skipped: bool,
3200}
3201
3202impl CompletionTokenCounts {
3203 fn skipped() -> Self {
3204 Self {
3205 original_tokens: None,
3206 compressed_tokens: None,
3207 original_bytes: None,
3208 compressed_bytes: None,
3209 tokens_skipped: true,
3210 }
3211 }
3212}
3213
3214fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3215 match status {
3216 BgTaskStatus::TimedOut => "timed out".to_string(),
3217 BgTaskStatus::Killed => "killed".to_string(),
3218 _ => exit_code
3219 .map(|code| format!("exit {code}"))
3220 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3221 }
3222}
3223
3224fn token_count_u32(text: &str) -> u32 {
3225 aft_tokenizer::count_tokens(text)
3226 .try_into()
3227 .unwrap_or(u32::MAX)
3228}
3229
3230impl Default for BgTaskRegistry {
3231 fn default() -> Self {
3232 Self::new(Arc::new(Mutex::new(None)))
3233 }
3234}
3235
3236fn modified_within(path: &Path, grace: Duration) -> bool {
3237 fs::metadata(path)
3238 .and_then(|metadata| metadata.modified())
3239 .ok()
3240 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3241 .map(|age| age < grace)
3242 .unwrap_or(false)
3243}
3244
3245fn canonicalized_path(path: &Path) -> PathBuf {
3246 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3247}
3248
3249fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3250 let now_ms = SystemTime::now()
3251 .duration_since(UNIX_EPOCH)
3252 .ok()
3253 .map(|duration| duration.as_millis() as u64)
3254 .unwrap_or(started_at);
3255 let elapsed_ms = now_ms.saturating_sub(started_at);
3256 Instant::now()
3257 .checked_sub(Duration::from_millis(elapsed_ms))
3258 .unwrap_or_else(Instant::now)
3259}
3260
3261fn gc_quarantine(storage_dir: &Path) {
3262 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3263 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3264 return;
3265 };
3266 for session_entry in session_dirs.flatten() {
3267 let session_quarantine_dir = session_entry.path();
3268 if !session_quarantine_dir.is_dir() {
3269 continue;
3270 }
3271 let entries = match fs::read_dir(&session_quarantine_dir) {
3272 Ok(entries) => entries,
3273 Err(error) => {
3274 crate::slog_warn!(
3275 "failed to read background task quarantine dir {}: {error}",
3276 session_quarantine_dir.display()
3277 );
3278 continue;
3279 }
3280 };
3281 for entry in entries.flatten() {
3282 let path = entry.path();
3283 if modified_within(&path, QUARANTINE_GC_GRACE) {
3284 continue;
3285 }
3286 let result = if path.is_dir() {
3287 fs::remove_dir_all(&path)
3288 } else {
3289 fs::remove_file(&path)
3290 };
3291 match result {
3292 Ok(()) => log::debug!(
3293 "deleted old background task quarantine entry {}",
3294 path.display()
3295 ),
3296 Err(error) => crate::slog_warn!(
3297 "failed to delete old background task quarantine entry {}: {error}",
3298 path.display()
3299 ),
3300 }
3301 }
3302 let _ = fs::remove_dir(&session_quarantine_dir);
3303 }
3304 let _ = fs::remove_dir(&quarantine_root);
3305}
3306
3307enum QuarantineKind {
3308 Corrupt,
3309 Invalid,
3310}
3311
3312fn quarantine_task_json(
3313 storage_dir: &Path,
3314 session_dir: &Path,
3315 json_path: &Path,
3316 kind: QuarantineKind,
3317) -> Result<(), String> {
3318 let session_hash = session_dir
3319 .file_name()
3320 .and_then(|name| name.to_str())
3321 .ok_or_else(|| {
3322 format!(
3323 "invalid background task session dir: {}",
3324 session_dir.display()
3325 )
3326 })?;
3327 let task_name = json_path
3328 .file_name()
3329 .and_then(|name| name.to_str())
3330 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3331 let unix_ts = SystemTime::now()
3332 .duration_since(UNIX_EPOCH)
3333 .map(|duration| duration.as_secs())
3334 .unwrap_or(0);
3335 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3336 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3337 format!(
3338 "failed to create background task quarantine dir {}: {e}",
3339 quarantine_dir.display()
3340 )
3341 })?;
3342 let target_name = quarantine_name(task_name, unix_ts, &kind);
3343 let target = quarantine_dir.join(target_name);
3344 fs::rename(json_path, &target).map_err(|e| {
3345 format!(
3346 "failed to quarantine background task metadata {} to {}: {e}",
3347 json_path.display(),
3348 target.display()
3349 )
3350 })?;
3351
3352 for sibling in task_sibling_paths(json_path) {
3353 if !sibling.exists() {
3354 continue;
3355 }
3356 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3357 crate::slog_warn!(
3358 "skipping background task sibling with invalid name during quarantine: {}",
3359 sibling.display()
3360 );
3361 continue;
3362 };
3363 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3364 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3365 crate::slog_warn!(
3366 "failed to quarantine background task sibling {} to {}: {error}",
3367 sibling.display(),
3368 sibling_target.display()
3369 );
3370 }
3371 }
3372
3373 let _ = fs::remove_dir(session_dir);
3374 Ok(())
3375}
3376
3377fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3378 match kind {
3379 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3380 QuarantineKind::Invalid => {
3381 let path = Path::new(file_name);
3382 let stem = path.file_stem().and_then(|stem| stem.to_str());
3383 let extension = path.extension().and_then(|extension| extension.to_str());
3384 match (stem, extension) {
3385 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3386 _ => format!("{file_name}.invalid.{unix_ts}"),
3387 }
3388 }
3389 }
3390}
3391
3392fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3393 let Some(parent) = json_path.parent() else {
3394 return Vec::new();
3395 };
3396 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3397 return Vec::new();
3398 };
3399 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3400 .into_iter()
3401 .map(|extension| parent.join(format!("{stem}.{extension}")))
3402 .collect()
3403}
3404
3405fn read_for_token_count_from_disk(
3406 metadata: &PersistedTask,
3407 paths: &TaskPaths,
3408 max_bytes_per_stream: usize,
3409) -> TokenCountInput {
3410 if metadata.mode == BgMode::Pty {
3411 return TokenCountInput::Skipped;
3412 }
3413 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3420 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3421 match (stdout, stderr) {
3422 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3423 String::from_utf8_lossy(&stdout).as_ref(),
3424 String::from_utf8_lossy(&stderr).as_ref(),
3425 )),
3426 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3427 String::from_utf8_lossy(&stdout).as_ref(),
3428 "",
3429 )),
3430 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3431 "",
3432 String::from_utf8_lossy(&stderr).as_ref(),
3433 )),
3434 (Err(_), Err(_)) => TokenCountInput::Skipped,
3435 }
3436}
3437
3438fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3443 use std::io::{Read, Seek, SeekFrom};
3444 let mut file = std::fs::File::open(path)?;
3445 let len = file.metadata()?.len();
3446 let read_len = len.min(max_bytes as u64);
3447 if read_len > 0 && len > max_bytes as u64 {
3448 file.seek(SeekFrom::End(-(read_len as i64)))?;
3449 }
3450 let mut bytes = Vec::with_capacity(read_len as usize);
3451 file.read_to_end(&mut bytes)?;
3452 Ok(bytes)
3453}
3454
3455impl BgTask {
3456 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3457 let state = self
3458 .state
3459 .lock()
3460 .unwrap_or_else(|poison| poison.into_inner());
3461 self.snapshot_locked(&state, preview_bytes)
3462 }
3463
3464 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3465 let metadata = &state.metadata;
3466 let duration_ms = metadata.duration_ms.or_else(|| {
3467 metadata
3468 .status
3469 .is_terminal()
3470 .then(|| self.started.elapsed().as_millis() as u64)
3471 });
3472 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3473 (String::new(), false)
3474 } else if metadata.status.is_terminal() {
3475 state
3476 .terminal_output_cache
3477 .as_ref()
3478 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3479 .unwrap_or_else(|| (String::new(), false))
3480 } else {
3481 state.buffer.read_tail(preview_bytes)
3482 };
3483 BgTaskSnapshot {
3484 info: BgTaskInfo {
3485 task_id: self.task_id.clone(),
3486 status: metadata.status.clone(),
3487 command: metadata.command.clone(),
3488 mode: metadata.mode.clone(),
3489 started_at: metadata.started_at,
3490 duration_ms,
3491 },
3492 exit_code: metadata.exit_code,
3493 child_pid: metadata.child_pid,
3494 workdir: metadata.workdir.display().to_string(),
3495 output_preview,
3496 output_truncated,
3497 output_path: state
3498 .buffer
3499 .output_path()
3500 .map(|path| path.display().to_string()),
3501 stderr_path: state
3502 .buffer
3503 .stderr_path()
3504 .map(|path| path.display().to_string()),
3505 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3506 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3507 }
3508 }
3509
3510 pub(crate) fn is_running(&self) -> bool {
3511 self.state
3512 .lock()
3513 .map(|state| {
3514 state.metadata.status == BgTaskStatus::Running
3515 || (state.metadata.mode == BgMode::Pty
3516 && state.metadata.status == BgTaskStatus::Killing)
3517 })
3518 .unwrap_or(false)
3519 }
3520
3521 fn is_terminal(&self) -> bool {
3522 self.state
3523 .lock()
3524 .map(|state| state.metadata.status.is_terminal())
3525 .unwrap_or(false)
3526 }
3527
3528 fn mark_terminal_now(&self) {
3529 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3530 if terminal_at.is_none() {
3531 *terminal_at = Some(Instant::now());
3532 }
3533 }
3534 }
3535
3536 fn set_completion_delivered(
3537 &self,
3538 delivered: bool,
3539 registry: &BgTaskRegistry,
3540 ) -> Result<(), String> {
3541 let mut state = self
3542 .state
3543 .lock()
3544 .map_err(|_| "background task lock poisoned".to_string())?;
3545 let updated = registry
3546 .update_task_metadata(&self.paths, |metadata| {
3547 metadata.completion_delivered = delivered;
3548 })
3549 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3550 state.metadata = updated;
3551 Ok(())
3552 }
3553}
3554
3555#[cfg(unix)]
3576fn reap_piped_child(child_slot: &mut Option<Child>) {
3577 if let Some(mut child) = child_slot.take() {
3578 if matches!(child.try_wait(), Ok(None)) {
3579 let _ = child.wait();
3580 }
3581 }
3582}
3583
3584#[cfg(windows)]
3589fn reap_piped_child(child_slot: &mut Option<Child>) {
3590 *child_slot = None;
3591}
3592
3593fn terminal_metadata_from_marker(
3594 mut metadata: PersistedTask,
3595 marker: ExitMarker,
3596 reason: Option<String>,
3597) -> PersistedTask {
3598 match marker {
3599 ExitMarker::Code(code) => {
3600 let status = if code == 0 {
3601 BgTaskStatus::Completed
3602 } else {
3603 BgTaskStatus::Failed
3604 };
3605 metadata.mark_terminal(status, Some(code), reason);
3606 }
3607 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3608 }
3609 metadata
3610}
3611
3612#[cfg(unix)]
3613fn write_unix_command_script(command: &str, paths: &TaskPaths) -> Result<PathBuf, String> {
3614 let stem = paths
3615 .json
3616 .file_stem()
3617 .and_then(|stem| stem.to_str())
3618 .unwrap_or("wrapper");
3619 let script_path = paths.dir.join(format!("{stem}.sh"));
3620 fs::write(&script_path, command)
3621 .map_err(|e| format!("failed to write background bash command script: {e}"))?;
3622 Ok(script_path)
3623}
3624
3625#[cfg(unix)]
3626fn detached_shell_command(command_script: &Path, exit_path: &Path) -> Command {
3627 let shell = resolve_posix_shell();
3628 let mut cmd = Command::new(&shell);
3629 cmd.arg("-c")
3635 .arg(r#""$0" "$1"; code=$?; printf "%s" "$code" > "$2.tmp.$$"; mv -f "$2.tmp.$$" "$2""#)
3636 .arg(&shell)
3637 .arg(command_script)
3638 .arg(exit_path);
3639 unsafe {
3640 cmd.pre_exec(|| {
3641 if libc::setsid() == -1 {
3642 return Err(std::io::Error::last_os_error());
3643 }
3644 Ok(())
3645 });
3646 }
3647 cmd
3648}
3649
3650#[cfg(unix)]
3651fn resolve_posix_shell() -> PathBuf {
3652 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3653 POSIX_SHELL
3654 .get_or_init(|| {
3655 std::env::var_os("BASH")
3656 .filter(|value| !value.is_empty())
3657 .map(PathBuf::from)
3658 .filter(|path| path.exists())
3659 .or_else(|| which::which("bash").ok())
3660 .or_else(|| which::which("zsh").ok())
3661 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3662 })
3663 .clone()
3664}
3665
3666#[cfg(windows)]
3667fn detached_shell_command_for(
3668 shell: crate::windows_shell::WindowsShell,
3669 command: &str,
3670 exit_path: &Path,
3671 paths: &TaskPaths,
3672 creation_flags: u32,
3673) -> Result<Command, String> {
3674 use crate::windows_shell::WindowsShell;
3675 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3688 let wrapper_ext = match shell {
3689 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3690 WindowsShell::Cmd => "bat",
3691 WindowsShell::Posix(_) => "sh",
3695 };
3696 let wrapper_path = paths.dir.join(format!(
3697 "{}.{}",
3698 paths
3699 .json
3700 .file_stem()
3701 .and_then(|s| s.to_str())
3702 .unwrap_or("wrapper"),
3703 wrapper_ext
3704 ));
3705 fs::write(&wrapper_path, wrapper_body)
3706 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3707
3708 let mut cmd = Command::new(shell.binary().as_ref());
3709 match shell {
3710 WindowsShell::Pwsh | WindowsShell::Powershell => {
3711 cmd.args([
3714 "-NoLogo",
3715 "-NoProfile",
3716 "-NonInteractive",
3717 "-ExecutionPolicy",
3718 "Bypass",
3719 "-File",
3720 ]);
3721 cmd.arg(&wrapper_path);
3722 }
3723 WindowsShell::Cmd => {
3724 cmd.args(["/D", "/C"]);
3731 cmd.arg(&wrapper_path);
3732 }
3733 WindowsShell::Posix(_) => {
3734 cmd.arg(&wrapper_path);
3739 }
3740 }
3741
3742 cmd.creation_flags(creation_flags);
3746 Ok(cmd)
3747}
3748
3749fn spawn_detached_child(
3765 command: &str,
3766 paths: &TaskPaths,
3767 workdir: &Path,
3768 env: &HashMap<String, String>,
3769) -> Result<std::process::Child, String> {
3770 #[cfg(not(windows))]
3771 {
3772 let stdout = create_capture_file(&paths.stdout)
3773 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3774 let stderr = create_capture_file(&paths.stderr)
3775 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3776 let command_script = write_unix_command_script(command, paths)?;
3777 detached_shell_command(&command_script, &paths.exit)
3778 .current_dir(workdir)
3779 .envs(env)
3780 .stdin(Stdio::null())
3781 .stdout(Stdio::from(stdout))
3782 .stderr(Stdio::from(stderr))
3783 .spawn()
3784 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3785 }
3786 #[cfg(windows)]
3787 {
3788 use crate::windows_shell::shell_candidates;
3789 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3800 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3821 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3822 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3823 let with_breakaway =
3824 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3825 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3826 let mut last_error: Option<String> = None;
3827 for (idx, shell) in candidates.iter().enumerate() {
3828 for &flags in &[with_breakaway, without_breakaway] {
3832 let stdout = create_capture_file(&paths.stdout)
3834 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3835 let stderr = create_capture_file(&paths.stderr)
3836 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3837 let mut cmd =
3838 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3839 cmd.current_dir(workdir)
3840 .envs(env)
3841 .stdin(Stdio::null())
3842 .stdout(Stdio::from(stdout))
3843 .stderr(Stdio::from(stderr));
3844 match cmd.spawn() {
3845 Ok(child) => {
3846 if idx > 0 {
3847 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3848 the cached PATH probe disagreed with runtime spawn — likely PATH \
3849 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3850 shell.binary(),
3851 idx);
3852 }
3853 if flags == without_breakaway {
3854 crate::slog_warn!(
3855 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3856 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3857 Spawned without breakaway; the bg task will be torn down if the \
3858 AFT process group is killed."
3859 );
3860 }
3861 return Ok(child);
3862 }
3863 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3864 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3865 shell.binary());
3866 last_error = Some(format!("{}: {e}", shell.binary()));
3867 break;
3870 }
3871 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3872 crate::slog_warn!(
3874 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3875 Access Denied — retrying {} without breakaway",
3876 shell.binary()
3877 );
3878 last_error = Some(format!("{}: {e}", shell.binary()));
3879 continue;
3880 }
3881 Err(e) => {
3882 return Err(format!(
3883 "failed to spawn background bash command via {}: {e}",
3884 shell.binary()
3885 ));
3886 }
3887 }
3888 }
3889 }
3890 Err(format!(
3891 "failed to spawn background bash command: no Windows shell could be spawned. \
3892 Last error: {}. PATH-probed candidates: {:?}",
3893 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3894 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3895 ))
3896 }
3897}
3898
3899fn random_slug() -> String {
3900 let mut bytes = [0u8; 4];
3901 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3903 let t = SystemTime::now()
3905 .duration_since(UNIX_EPOCH)
3906 .map(|d| d.subsec_nanos())
3907 .unwrap_or(0);
3908 let p = std::process::id();
3909 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3910 });
3911 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3913 format!("bash-{hex}")
3914}
3915
3916#[cfg(test)]
3917mod tests {
3918 use std::collections::HashMap;
3919 use std::fs;
3920 use std::sync::atomic::{AtomicBool, AtomicUsize};
3921 use std::sync::{Arc, Mutex};
3922 use std::time::Duration;
3923 #[cfg(windows)]
3924 use std::time::Instant;
3925
3926 use super::*;
3927
3928 #[cfg(unix)]
3929 const QUICK_SUCCESS_COMMAND: &str = "true";
3930 #[cfg(windows)]
3931 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3932
3933 #[cfg(unix)]
3934 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3935 #[cfg(windows)]
3936 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3937
3938 fn insert_terminal_piped_task(
3939 registry: &BgTaskRegistry,
3940 dir: &tempfile::TempDir,
3941 command: &str,
3942 stdout: &str,
3943 stderr: &str,
3944 compressed: bool,
3945 ) -> (String, Arc<BgTask>) {
3946 let task_id = format!("bash-test-{}", random_slug());
3947 let paths = task_paths(dir.path(), "session", &task_id);
3948 fs::create_dir_all(&paths.dir).unwrap();
3949 fs::write(&paths.stdout, stdout).unwrap();
3950 fs::write(&paths.stderr, stderr).unwrap();
3951 let mut metadata = PersistedTask::starting(
3952 task_id.clone(),
3953 "session".to_string(),
3954 command.to_string(),
3955 dir.path().to_path_buf(),
3956 Some(dir.path().to_path_buf()),
3957 Some(30_000),
3958 true,
3959 compressed,
3960 );
3961 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3962 write_task(&paths.json, &metadata).unwrap();
3963 registry
3964 .insert_rehydrated_task(metadata, paths, true)
3965 .expect("insert terminal task");
3966 let task = registry.task_for_session(&task_id, "session").unwrap();
3967 (task_id, task)
3968 }
3969
3970 fn insert_terminal_pty_task(
3971 registry: &BgTaskRegistry,
3972 dir: &tempfile::TempDir,
3973 pty_output: &str,
3974 ) -> (String, Arc<BgTask>) {
3975 let task_id = format!("bash-test-{}", random_slug());
3976 let paths = task_paths(dir.path(), "session", &task_id);
3977 fs::create_dir_all(&paths.dir).unwrap();
3978 fs::write(&paths.pty, pty_output).unwrap();
3979 let mut metadata = PersistedTask::starting(
3980 task_id.clone(),
3981 "session".to_string(),
3982 "python".to_string(),
3983 dir.path().to_path_buf(),
3984 Some(dir.path().to_path_buf()),
3985 Some(30_000),
3986 true,
3987 true,
3988 );
3989 metadata.mode = BgMode::Pty;
3990 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3991 write_task(&paths.json, &metadata).unwrap();
3992 registry
3993 .insert_rehydrated_task(metadata, paths, true)
3994 .expect("insert terminal pty task");
3995 let task = registry.task_for_session(&task_id, "session").unwrap();
3996 (task_id, task)
3997 }
3998
3999 #[cfg(unix)]
4000 fn wait_for_terminal_snapshot(
4001 registry: &BgTaskRegistry,
4002 task_id: &str,
4003 session_id: &str,
4004 project: &Path,
4005 storage: &Path,
4006 ) -> BgTaskSnapshot {
4007 let started = Instant::now();
4008 loop {
4009 let snapshot = registry
4010 .status(task_id, session_id, Some(project), Some(storage), 4096)
4011 .expect("spawned task should be visible to status");
4012 if snapshot.info.status.is_terminal() {
4013 return snapshot;
4014 }
4015 assert!(
4016 started.elapsed() < Duration::from_secs(10),
4017 "timed out waiting for task {task_id} to finish; last status={:?}",
4018 snapshot.info.status
4019 );
4020 std::thread::sleep(Duration::from_millis(50));
4021 }
4022 }
4023
4024 #[cfg(unix)]
4025 #[test]
4026 fn multiline_pipeline_stdout_persists_all_lines_after_terminal_status() {
4027 let cases = [
4028 (
4029 "long-first",
4030 "sleep 0.5; printf 'one\\n' | cat\nprintf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4031 vec!["one", "1", "three"],
4032 ),
4033 (
4034 "short-first",
4035 "printf 'one\\n' | cat\nsleep 0.2; printf 'two\\n' | grep -c two\nprintf 'three\\n' | cat",
4036 vec!["one", "1", "three"],
4037 ),
4038 (
4039 "failing-middle",
4040 "sleep 0.2; printf 'one\\n' | cat\nfalse; printf 'after-false\\n' | cat\nprintf 'three\\n' | cat",
4041 vec!["one", "after-false", "three"],
4042 ),
4043 ];
4044
4045 for (name, command, expected_lines) in cases {
4046 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4047 let dir = tempfile::tempdir().unwrap();
4048 let session_id = format!("session-{name}");
4049 let task_id = registry
4050 .spawn(
4051 command,
4052 session_id.clone(),
4053 dir.path().to_path_buf(),
4054 HashMap::new(),
4055 Some(Duration::from_secs(30)),
4056 dir.path().to_path_buf(),
4057 10,
4058 true,
4059 true,
4060 Some(dir.path().to_path_buf()),
4061 )
4062 .unwrap();
4063
4064 let snapshot = wait_for_terminal_snapshot(
4065 ®istry,
4066 &task_id,
4067 &session_id,
4068 dir.path(),
4069 dir.path(),
4070 );
4071 assert_eq!(
4072 snapshot.info.status,
4073 BgTaskStatus::Completed,
4074 "{name}: task should complete; snapshot={snapshot:?}"
4075 );
4076 assert_eq!(
4077 snapshot.exit_code,
4078 Some(0),
4079 "{name}: script should use the final command's exit code"
4080 );
4081
4082 let stdout_path = task_paths(dir.path(), &session_id, &task_id).stdout;
4083 let stdout = fs::read_to_string(&stdout_path).unwrap_or_else(|error| {
4084 panic!(
4085 "{name}: failed to read raw stdout file {}: {error}",
4086 stdout_path.display()
4087 )
4088 });
4089 let lines: Vec<&str> = stdout.lines().collect();
4090 assert_eq!(
4091 lines,
4092 expected_lines,
4093 "{name}: raw stdout file must include every newline-separated command's output; stdout_path={}",
4094 stdout_path.display()
4095 );
4096 }
4097 }
4098
4099 #[test]
4100 fn recognizes_all_recovery_marker_forms() {
4101 assert!(is_recovery_marker(
4102 "[truncated output; full output: read \"/tmp/out\"]"
4103 ));
4104 assert!(is_recovery_marker(
4105 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
4106 ));
4107 assert!(is_recovery_marker(
4108 "[truncated output; full output unavailable]"
4109 ));
4110 }
4111
4112 #[test]
4113 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
4114 let registry = BgTaskRegistry::default();
4115 let dir = tempfile::tempdir().unwrap();
4116 let (_task_id, task) = insert_terminal_piped_task(
4117 ®istry,
4118 &dir,
4119 "custom-tool --verbose",
4120 &"stdout line\n".repeat(200_000),
4121 "",
4122 true,
4123 );
4124 let calls = Arc::new(AtomicUsize::new(0));
4125 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4126 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4127 let calls_for_closure = Arc::clone(&calls);
4128 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4129 let task_for_closure = Arc::clone(&task_holder);
4130 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4131 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4132 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4133 if task.state.try_lock().is_ok() {
4134 unlocked_for_closure.store(true, Ordering::SeqCst);
4135 }
4136 }
4137 CompressionResult::new(format!("compressed {} bytes", output.len()))
4138 });
4139
4140 let first = registry
4141 .status(
4142 &task.task_id,
4143 "session",
4144 None,
4145 Some(dir.path()),
4146 RUNNING_OUTPUT_PREVIEW_BYTES,
4147 )
4148 .unwrap();
4149 let second = registry
4150 .status(
4151 &task.task_id,
4152 "session",
4153 None,
4154 Some(dir.path()),
4155 RUNNING_OUTPUT_PREVIEW_BYTES,
4156 )
4157 .unwrap();
4158 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4159
4160 assert_eq!(
4161 calls.load(Ordering::SeqCst),
4162 1,
4163 "terminal render must be cached"
4164 );
4165 assert!(
4166 saw_unlocked_state.load(Ordering::SeqCst),
4167 "compressor must run after releasing the task state lock"
4168 );
4169 assert!(first.output_preview.starts_with("compressed "));
4170 assert_eq!(second.output_preview, first.output_preview);
4171 assert_eq!(listed[0].output_preview, first.output_preview);
4172 }
4173
4174 #[test]
4175 fn completion_preview_success_keeps_tail_only() {
4176 let registry = BgTaskRegistry::default();
4181 let dir = tempfile::tempdir().unwrap();
4182 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4183 let (_task_id, task) =
4184 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4185
4186 registry.post_terminal_transition(&task, true).unwrap();
4187 let completions = registry.drain_completions_for_session(Some("session"));
4188 assert_eq!(completions.len(), 1);
4189 let preview = &completions[0].output_preview;
4190 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4191 assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4192 assert!(completions[0].output_truncated);
4193 }
4194
4195 #[test]
4196 fn completion_preview_failure_keeps_head_and_tail() {
4197 let registry = BgTaskRegistry::default();
4200 let dir = tempfile::tempdir().unwrap();
4201 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4202 let task_id = format!("bash-test-{}", random_slug());
4203 let paths = task_paths(dir.path(), "session", &task_id);
4204 fs::create_dir_all(&paths.dir).unwrap();
4205 fs::write(&paths.stdout, &output).unwrap();
4206 fs::write(&paths.stderr, "").unwrap();
4207 let mut metadata = PersistedTask::starting(
4208 task_id.clone(),
4209 "session".to_string(),
4210 "cat big.log".to_string(),
4211 dir.path().to_path_buf(),
4212 Some(dir.path().to_path_buf()),
4213 Some(30_000),
4214 true,
4215 false,
4216 );
4217 metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4218 write_task(&paths.json, &metadata).unwrap();
4219 registry
4220 .insert_rehydrated_task(metadata, paths, true)
4221 .expect("insert terminal task");
4222 let task = registry.task_for_session(&task_id, "session").unwrap();
4223
4224 registry.post_terminal_transition(&task, true).unwrap();
4225 let completions = registry.drain_completions_for_session(Some("session"));
4226 assert_eq!(completions.len(), 1);
4227 let preview = &completions[0].output_preview;
4228 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4229 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4230 }
4231
4232 #[test]
4233 fn has_completions_for_session_matches_pending_delivery() {
4234 let registry = BgTaskRegistry::default();
4235 assert!(!registry.has_completions_for_session(Some("session")));
4236 assert!(!registry.has_completions_for_session(None));
4237
4238 let dir = tempfile::tempdir().unwrap();
4239 let (_task_id, task) =
4240 insert_terminal_piped_task(®istry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4241 registry.post_terminal_transition(&task, true).unwrap();
4242
4243 assert!(registry.has_completions_for_session(Some("session")));
4244 assert!(registry.has_completions_for_session(None));
4245 assert!(!registry.has_completions_for_session(Some("other-session")));
4246
4247 let completions = registry.drain_completions_for_session(Some("session"));
4248 assert_eq!(completions.len(), 1);
4249 assert_eq!(completions[0].task_id, task.task_id);
4250 }
4251
4252 #[test]
4253 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4254 let registry = BgTaskRegistry::default();
4255 let dir = tempfile::tempdir().unwrap();
4256 let calls = Arc::new(AtomicUsize::new(0));
4257 let calls_for_closure = Arc::clone(&calls);
4258 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4259 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4260 CompressionResult::new(output)
4261 });
4262 let (task_id, _task) = insert_terminal_piped_task(
4263 ®istry,
4264 &dir,
4265 "gh pr view 123 --json body",
4266 "{\"body\":\"hello\"}",
4267 "warning: stderr must not join json",
4268 true,
4269 );
4270
4271 let snapshot = registry
4272 .status(
4273 &task_id,
4274 "session",
4275 None,
4276 Some(dir.path()),
4277 RUNNING_OUTPUT_PREVIEW_BYTES,
4278 )
4279 .unwrap();
4280
4281 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4282 assert!(!snapshot.output_preview.contains("warning"));
4283 assert!(!snapshot.output_truncated);
4284 assert_eq!(
4285 calls.load(Ordering::SeqCst),
4286 0,
4287 "structured JSON bypasses compression"
4288 );
4289 }
4290
4291 #[test]
4292 fn registry_emits_single_recovery_marker_for_class_drops() {
4293 let registry = BgTaskRegistry::default();
4294 let dir = tempfile::tempdir().unwrap();
4295 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4296 let mut dropped = BTreeMap::new();
4297 dropped.insert(DropClass::Error, 18);
4298 dropped.insert(DropClass::Warning, 6);
4299 CompressionResult::with_class_drops("kept diagnostic", dropped)
4300 });
4301 let (task_id, task) =
4302 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4303
4304 let snapshot = registry
4305 .status(
4306 &task_id,
4307 "session",
4308 None,
4309 Some(dir.path()),
4310 RUNNING_OUTPUT_PREVIEW_BYTES,
4311 )
4312 .unwrap();
4313
4314 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4315 assert!(snapshot.output_preview.contains("+18 more errors"));
4316 assert!(snapshot.output_preview.contains("+6 more warnings"));
4317 assert!(snapshot
4318 .output_preview
4319 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4320 assert!(!snapshot.output_preview.contains("tail -n +"));
4321 assert!(snapshot.output_truncated);
4322 }
4323
4324 #[test]
4325 fn registry_marker_reports_semantic_and_byte_drops_once() {
4326 let registry = BgTaskRegistry::default();
4327 let dir = tempfile::tempdir().unwrap();
4328 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4329 let mut dropped = BTreeMap::new();
4330 dropped.insert(DropClass::Error, 1);
4331 CompressionResult::with_class_drops(
4332 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4333 dropped,
4334 )
4335 });
4336 let (task_id, _task) =
4337 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4338
4339 let snapshot = registry
4340 .status(
4341 &task_id,
4342 "session",
4343 None,
4344 Some(dir.path()),
4345 RUNNING_OUTPUT_PREVIEW_BYTES,
4346 )
4347 .unwrap();
4348
4349 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4350 assert!(snapshot.output_preview.contains("+1 more error"));
4351 assert!(snapshot.output_preview.contains("truncated output"));
4352 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4353 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4354 assert!(!snapshot.output_preview.contains("...<truncated"));
4355 assert!(snapshot.output_truncated);
4356 }
4357
4358 #[test]
4359 fn cargo_stderr_class_drops_name_both_capture_paths() {
4360 let registry = BgTaskRegistry::default();
4361 let dir = tempfile::tempdir().unwrap();
4362 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4363 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4364 crate::compress::compress_with_registry_exit_code(
4365 command,
4366 &output,
4367 exit_code,
4368 &filter_registry,
4369 )
4370 });
4371 let stderr = (0..22)
4372 .map(|index| {
4373 format!(
4374 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4375 index + 1,
4376 index + 1
4377 )
4378 })
4379 .collect::<Vec<_>>()
4380 .join("\n");
4381 let (task_id, task) = insert_terminal_piped_task(
4382 ®istry,
4383 &dir,
4384 "cargo check",
4385 "Finished dev [unoptimized] target(s) in 0.01s\n",
4386 &stderr,
4387 true,
4388 );
4389
4390 let snapshot = registry
4391 .status(
4392 &task_id,
4393 "session",
4394 None,
4395 Some(dir.path()),
4396 RUNNING_OUTPUT_PREVIEW_BYTES,
4397 )
4398 .unwrap();
4399
4400 assert!(snapshot.output_preview.contains("+2 more errors"));
4401 assert!(snapshot
4402 .output_preview
4403 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4404 assert!(snapshot
4405 .output_preview
4406 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4407 assert!(!snapshot.output_preview.contains("tail -n +"));
4408 }
4409
4410 #[test]
4411 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4412 let registry = BgTaskRegistry::default();
4413 let dir = tempfile::tempdir().unwrap();
4414 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4415 let (task_id, task) = insert_terminal_piped_task(
4416 ®istry,
4417 &dir,
4418 "cd /repo && gh pr view 123 --json body",
4419 &body,
4420 "",
4421 true,
4422 );
4423
4424 let snapshot = registry
4425 .status(
4426 &task_id,
4427 "session",
4428 None,
4429 Some(dir.path()),
4430 RUNNING_OUTPUT_PREVIEW_BYTES,
4431 )
4432 .unwrap();
4433
4434 assert!(snapshot.output_preview.starts_with("[JSON output "));
4435 assert!(snapshot
4436 .output_preview
4437 .contains(&task.paths.stdout.display().to_string()));
4438 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4439 assert!(snapshot.output_truncated);
4440 }
4441
4442 #[test]
4443 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4444 let registry = BgTaskRegistry::default();
4445 let dir = tempfile::tempdir().unwrap();
4446 let filter_registry = crate::compress::toml_filter::build_registry(
4447 crate::compress::builtin_filters::ALL,
4448 None,
4449 None,
4450 );
4451 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4452 crate::compress::compress_with_registry_exit_code(
4453 command,
4454 &output,
4455 exit_code,
4456 &filter_registry,
4457 )
4458 });
4459 let stdout = format!(
4460 "make[1]: Entering directory `/tmp`\n{}",
4461 (0..100)
4462 .map(|index| format!("compile line {index}"))
4463 .collect::<Vec<_>>()
4464 .join("\n")
4465 );
4466 let (task_id, task) =
4467 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4468
4469 let snapshot = registry
4470 .status(
4471 &task_id,
4472 "session",
4473 None,
4474 Some(dir.path()),
4475 RUNNING_OUTPUT_PREVIEW_BYTES,
4476 )
4477 .unwrap();
4478
4479 assert!(snapshot.output_preview.contains("compile line 99"));
4480 assert!(snapshot.output_preview.contains(&format!(
4481 "full output: read \"{}\"",
4482 task.paths.stdout.display()
4483 )));
4484 assert!(!snapshot
4485 .output_preview
4486 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4487 assert!(!snapshot.output_preview.contains("tail -n +"));
4488 }
4489
4490 #[test]
4491 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4492 let registry = BgTaskRegistry::default();
4493 let dir = tempfile::tempdir().unwrap();
4494 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4495 let (task_id, task) =
4496 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4497
4498 let snapshot = registry
4499 .status(
4500 &task_id,
4501 "session",
4502 None,
4503 Some(dir.path()),
4504 RUNNING_OUTPUT_PREVIEW_BYTES,
4505 )
4506 .unwrap();
4507
4508 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4509 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4510 assert!(snapshot.output_preview.contains("truncated output"));
4511 assert!(snapshot
4512 .output_preview
4513 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4514 assert!(snapshot
4515 .output_preview
4516 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4517 assert!(!snapshot.output_preview.contains("tail -n +"));
4518 assert!(snapshot.output_preview.len() > 16 * 1024);
4519 assert!(snapshot.output_truncated);
4520 }
4521
4522 #[test]
4523 fn pty_terminal_snapshot_bypasses_line_compression() {
4524 let registry = BgTaskRegistry::default();
4525 let dir = tempfile::tempdir().unwrap();
4526 let calls = Arc::new(AtomicUsize::new(0));
4527 let calls_for_closure = Arc::clone(&calls);
4528 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4529 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4530 CompressionResult::new(output)
4531 });
4532 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4533
4534 let snapshot = registry
4535 .status(
4536 &task_id,
4537 "session",
4538 None,
4539 Some(dir.path()),
4540 RUNNING_OUTPUT_PREVIEW_BYTES,
4541 )
4542 .unwrap();
4543
4544 assert_eq!(snapshot.info.mode, BgMode::Pty);
4545 assert_eq!(snapshot.output_preview, "");
4546 assert_eq!(calls.load(Ordering::SeqCst), 0);
4547 }
4548
4549 #[test]
4550 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4551 let registry = BgTaskRegistry::default();
4552 let dir = tempfile::tempdir().unwrap();
4553 let task_id = registry
4554 .spawn_pty(
4555 QUICK_SUCCESS_COMMAND,
4556 "session".to_string(),
4557 dir.path().to_path_buf(),
4558 HashMap::new(),
4559 Some(Duration::from_secs(30)),
4560 dir.path().to_path_buf(),
4561 10,
4562 true,
4563 false,
4564 Some(dir.path().to_path_buf()),
4565 50,
4566 120,
4567 )
4568 .unwrap();
4569
4570 let paths = task_paths(dir.path(), "session", &task_id);
4571 let metadata = read_task(&paths.json).unwrap();
4572 assert_eq!(
4573 metadata.schema_version,
4574 crate::bash_background::persistence::SCHEMA_VERSION
4575 );
4576 assert_eq!(metadata.mode, BgMode::Pty);
4577 assert_eq!(metadata.pty_rows, Some(50));
4578 assert_eq!(metadata.pty_cols, Some(120));
4579
4580 let snapshot = registry
4581 .status(&task_id, "session", None, Some(dir.path()), 1024)
4582 .unwrap();
4583 assert_eq!(snapshot.pty_rows, Some(50));
4584 assert_eq!(snapshot.pty_cols, Some(120));
4585 }
4586
4587 fn spawn_dead_child() -> std::process::Child {
4592 #[cfg(unix)]
4593 let mut cmd = std::process::Command::new("true");
4594 #[cfg(windows)]
4595 let mut cmd = {
4596 let mut c = std::process::Command::new("cmd");
4597 c.args(["/c", "exit", "0"]);
4598 c
4599 };
4600 cmd.stdin(std::process::Stdio::null());
4601 cmd.stdout(std::process::Stdio::null());
4602 cmd.stderr(std::process::Stdio::null());
4603 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4604 let started = Instant::now();
4613 loop {
4614 match child.try_wait() {
4615 Ok(Some(_)) => break,
4616 Ok(None) => {
4617 if started.elapsed() > Duration::from_secs(5) {
4618 panic!("dead-child stand-in did not exit within 5s");
4619 }
4620 std::thread::sleep(Duration::from_millis(10));
4621 }
4622 Err(error) => panic!("dead-child try_wait failed: {error}"),
4623 }
4624 }
4625 child
4626 }
4627
4628 #[test]
4629 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4630 let registry = BgTaskRegistry::default();
4631 let dir = tempfile::tempdir().unwrap();
4632 let task_id = registry
4633 .spawn(
4634 LONG_RUNNING_COMMAND,
4635 "session".to_string(),
4636 dir.path().to_path_buf(),
4637 HashMap::new(),
4638 Some(Duration::from_secs(30)),
4639 dir.path().to_path_buf(),
4640 10,
4641 true,
4642 false,
4643 Some(dir.path().to_path_buf()),
4644 )
4645 .unwrap();
4646 registry
4647 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4648 .unwrap();
4649 assert_eq!(
4650 registry
4651 .drain_completions_for_session(Some("session"))
4652 .len(),
4653 1
4654 );
4655
4656 registry.inner.completions.lock().unwrap().clear();
4659
4660 assert_eq!(
4661 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4662 vec![task_id.clone()]
4663 );
4664 assert!(registry
4665 .drain_completions_for_session(Some("session"))
4666 .is_empty());
4667
4668 let paths = task_paths(dir.path(), "session", &task_id);
4669 let metadata = read_task(&paths.json).unwrap();
4670 assert!(metadata.completion_delivered);
4671
4672 let replayed = BgTaskRegistry::default();
4673 replayed
4674 .replay_session_inner(dir.path(), "session", None)
4675 .unwrap();
4676 assert!(replayed
4677 .drain_completions_for_session(Some("session"))
4678 .is_empty());
4679 }
4680
4681 #[test]
4682 fn register_watch_rejects_unknown_task() {
4683 let registry = BgTaskRegistry::default();
4684
4685 let result = registry.register_watch(
4686 "missing-task".to_string(),
4687 WatchPattern::Substring("READY".into()),
4688 true,
4689 );
4690
4691 assert_eq!(result, Err("task_not_found"));
4692 }
4693
4694 #[test]
4695 fn register_watch_on_terminal_task_scans_existing_output() {
4696 let frames = Arc::new(Mutex::new(Vec::new()));
4697 let captured = Arc::clone(&frames);
4698 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4699 captured.lock().unwrap().push(frame);
4700 })
4701 as Box<dyn Fn(PushFrame) + Send + Sync>);
4702 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4703 let dir = tempfile::tempdir().unwrap();
4704 let task_id = registry
4705 .spawn(
4706 LONG_RUNNING_COMMAND,
4707 "session".to_string(),
4708 dir.path().to_path_buf(),
4709 HashMap::new(),
4710 Some(Duration::from_secs(30)),
4711 dir.path().to_path_buf(),
4712 10,
4713 true,
4714 false,
4715 Some(dir.path().to_path_buf()),
4716 )
4717 .unwrap();
4718 registry
4719 .inner
4720 .shutdown
4721 .store(true, std::sync::atomic::Ordering::SeqCst);
4722 let task = registry.task_for_session(&task_id, "session").unwrap();
4723 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4724 registry
4725 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4726 .unwrap();
4727 frames.lock().unwrap().clear();
4728 registry.inner.completions.lock().unwrap().clear();
4729
4730 registry
4731 .register_watch(
4732 task_id.clone(),
4733 WatchPattern::Substring("READY".into()),
4734 true,
4735 )
4736 .unwrap();
4737
4738 let frames = frames.lock().unwrap();
4739 let frame = frames
4740 .iter()
4741 .find_map(|frame| match frame {
4742 PushFrame::BashPatternMatch(frame) => Some(frame),
4743 _ => None,
4744 })
4745 .expect("terminal watch registration should emit pattern frame");
4746 assert_eq!(frame.reason, "pattern_match");
4747 assert_eq!(frame.task_id, task_id);
4748 assert_eq!(frame.session_id, "session");
4749 assert_eq!(frame.match_text, "READY");
4750 assert_eq!(frame.match_offset, 0);
4751 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4752 let metadata = read_task(&task.paths.json).unwrap();
4753 assert!(metadata.completion_delivered);
4754 }
4755
4756 #[test]
4757 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4758 let registry = BgTaskRegistry::default();
4759 let dir = tempfile::tempdir().unwrap();
4760 let task_id = registry
4761 .spawn(
4762 QUICK_SUCCESS_COMMAND,
4763 "session".to_string(),
4764 dir.path().to_path_buf(),
4765 HashMap::new(),
4766 Some(Duration::from_secs(30)),
4767 dir.path().to_path_buf(),
4768 10,
4769 true,
4770 false,
4771 Some(dir.path().to_path_buf()),
4772 )
4773 .unwrap();
4774 registry
4775 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4776 .unwrap();
4777 let completions = registry.drain_completions_for_session(Some("session"));
4778 assert_eq!(completions.len(), 1);
4779 assert_eq!(
4780 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4781 vec![task_id.clone()]
4782 );
4783
4784 registry.cleanup_finished(Duration::ZERO);
4785
4786 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4787 }
4788
4789 #[test]
4790 fn cleanup_finished_retains_undelivered_terminals() {
4791 let registry = BgTaskRegistry::default();
4792 let dir = tempfile::tempdir().unwrap();
4793 let task_id = registry
4794 .spawn(
4795 QUICK_SUCCESS_COMMAND,
4796 "session".to_string(),
4797 dir.path().to_path_buf(),
4798 HashMap::new(),
4799 Some(Duration::from_secs(30)),
4800 dir.path().to_path_buf(),
4801 10,
4802 true,
4803 false,
4804 Some(dir.path().to_path_buf()),
4805 )
4806 .unwrap();
4807 registry
4808 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4809 .unwrap();
4810
4811 registry.cleanup_finished(Duration::ZERO);
4812
4813 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4814 }
4815
4816 #[test]
4824 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4825 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4826 let dir = tempfile::tempdir().unwrap();
4827 let task_id = registry
4828 .spawn(
4829 QUICK_SUCCESS_COMMAND,
4830 "session".to_string(),
4831 dir.path().to_path_buf(),
4832 HashMap::new(),
4833 Some(Duration::from_secs(30)),
4834 dir.path().to_path_buf(),
4835 10,
4836 true,
4837 false,
4838 Some(dir.path().to_path_buf()),
4839 )
4840 .unwrap();
4841
4842 let task = registry.task_for_session(&task_id, "session").unwrap();
4843
4844 let started = Instant::now();
4849 loop {
4850 let exited = {
4851 let mut state = task.state.lock().unwrap();
4852 match &mut state.runtime {
4853 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4854 _ => true,
4855 }
4856 };
4857 if exited {
4858 break;
4859 }
4860 assert!(
4861 started.elapsed() < Duration::from_secs(5),
4862 "child should exit quickly"
4863 );
4864 std::thread::sleep(Duration::from_millis(20));
4865 }
4866
4867 registry
4875 .inner
4876 .shutdown
4877 .store(true, std::sync::atomic::Ordering::SeqCst);
4878 std::thread::sleep(Duration::from_millis(550));
4882
4883 let _ = std::fs::remove_file(&task.paths.exit);
4886
4887 {
4902 let mut state = task.state.lock().unwrap();
4903 state.metadata.status = BgTaskStatus::Running;
4904 state.metadata.status_reason = None;
4905 state.metadata.exit_code = None;
4906 state.metadata.finished_at = None;
4907 state.metadata.duration_ms = None;
4908 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4911 .expect("persist reset Running metadata for reap_child test");
4912 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4916 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4917 }
4918 }
4919 *task.terminal_at.lock().unwrap() = None;
4922
4923 assert!(
4926 task.is_running(),
4927 "precondition: metadata.status == Running"
4928 );
4929 assert!(
4930 !task.paths.exit.exists(),
4931 "precondition: exit marker absent"
4932 );
4933
4934 registry.reap_child(&task);
4939
4940 {
4941 let state = task.state.lock().unwrap();
4942 assert_eq!(
4943 state.metadata.status,
4944 BgTaskStatus::Running,
4945 "first reap must leave status Running while waiting one pass for marker"
4946 );
4947 assert_eq!(
4948 state.metadata.status_reason, None,
4949 "first reap must not record a failure reason"
4950 );
4951 assert!(
4952 matches!(state.runtime, TaskRuntime::Piped(None)),
4953 "child handle must be released after first reap"
4954 );
4955 assert!(
4956 state.detached,
4957 "task must be marked detached after first reap"
4958 );
4959 }
4960
4961 registry.reap_child(&task);
4965
4966 let state = task.state.lock().unwrap();
4967 assert!(
4968 state.metadata.status.is_terminal(),
4969 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4970 state.metadata.status
4971 );
4972 assert_eq!(
4973 state.metadata.status,
4974 BgTaskStatus::Failed,
4975 "must specifically be Failed (not Killed): status={:?}",
4976 state.metadata.status
4977 );
4978 assert_eq!(
4979 state.metadata.status_reason.as_deref(),
4980 Some("process exited without exit marker"),
4981 "reason must match replay path's wording: {:?}",
4982 state.metadata.status_reason
4983 );
4984 assert!(
4985 matches!(state.runtime, TaskRuntime::Piped(None)),
4986 "child handle must stay released after second reap"
4987 );
4988 assert!(
4989 state.detached,
4990 "task must remain detached after second reap"
4991 );
4992 }
4993
4994 #[test]
4999 fn reap_child_preserves_running_when_exit_marker_exists() {
5000 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5001 let dir = tempfile::tempdir().unwrap();
5002 let task_id = registry
5003 .spawn(
5004 QUICK_SUCCESS_COMMAND,
5005 "session".to_string(),
5006 dir.path().to_path_buf(),
5007 HashMap::new(),
5008 Some(Duration::from_secs(30)),
5009 dir.path().to_path_buf(),
5010 10,
5011 true,
5012 false,
5013 Some(dir.path().to_path_buf()),
5014 )
5015 .unwrap();
5016
5017 let task = registry.task_for_session(&task_id, "session").unwrap();
5018
5019 let started = Instant::now();
5022 loop {
5023 let exited = {
5024 let mut state = task.state.lock().unwrap();
5025 match &mut state.runtime {
5026 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
5027 _ => true,
5028 }
5029 };
5030 if exited && task.paths.exit.exists() {
5031 break;
5032 }
5033 assert!(
5034 started.elapsed() < Duration::from_secs(5),
5035 "child should exit and write marker quickly"
5036 );
5037 std::thread::sleep(Duration::from_millis(20));
5038 }
5039
5040 registry
5046 .inner
5047 .shutdown
5048 .store(true, std::sync::atomic::Ordering::SeqCst);
5049 std::thread::sleep(Duration::from_millis(550));
5050
5051 {
5057 let mut state = task.state.lock().unwrap();
5058 state.metadata.status = BgTaskStatus::Running;
5059 state.metadata.status_reason = None;
5060 if matches!(state.runtime, TaskRuntime::Piped(None)) {
5061 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
5062 }
5063 }
5064 *task.terminal_at.lock().unwrap() = None;
5065 if !task.paths.exit.exists() {
5068 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
5069 }
5070
5071 registry.reap_child(&task);
5075
5076 let state = task.state.lock().unwrap();
5077 assert!(
5078 matches!(state.runtime, TaskRuntime::Piped(None)),
5079 "child handle still released even when marker exists"
5080 );
5081 assert!(
5082 state.detached,
5083 "task still marked detached even when marker exists"
5084 );
5085 assert_eq!(
5090 state.metadata.status,
5091 BgTaskStatus::Running,
5092 "reap_child must defer to poll_task when marker exists"
5093 );
5094 }
5095
5096 #[cfg(unix)]
5100 fn pid_stat(pid: u32) -> Option<String> {
5101 let output = std::process::Command::new("ps")
5102 .args(["-o", "stat=", "-p", &pid.to_string()])
5103 .output()
5104 .ok()?;
5105 if !output.status.success() {
5106 return None;
5107 }
5108 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
5109 if stat.is_empty() {
5110 None
5111 } else {
5112 Some(stat)
5113 }
5114 }
5115
5116 #[cfg(unix)]
5118 fn is_zombie(pid: u32) -> bool {
5119 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5120 }
5121
5122 #[cfg(unix)]
5128 fn spawn_unreaped_zombie() -> std::process::Child {
5129 let child = std::process::Command::new("true")
5130 .stdin(std::process::Stdio::null())
5131 .stdout(std::process::Stdio::null())
5132 .stderr(std::process::Stdio::null())
5133 .spawn()
5134 .expect("spawn zombie stand-in");
5135 let pid = child.id();
5136 let started = Instant::now();
5137 while !is_zombie(pid) {
5138 assert!(
5139 started.elapsed() < Duration::from_secs(5),
5140 "stand-in child should become a zombie within 5s"
5141 );
5142 std::thread::sleep(Duration::from_millis(10));
5143 }
5144 child
5146 }
5147
5148 #[cfg(unix)]
5158 #[test]
5159 fn finalize_from_marker_reaps_child_no_zombie() {
5160 use std::sync::atomic::Ordering;
5161
5162 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5163 let dir = tempfile::tempdir().unwrap();
5164 let task_id = registry
5165 .spawn(
5166 QUICK_SUCCESS_COMMAND,
5167 "session".to_string(),
5168 dir.path().to_path_buf(),
5169 HashMap::new(),
5170 Some(Duration::from_secs(30)),
5171 dir.path().to_path_buf(),
5172 10,
5173 true,
5174 false,
5175 Some(dir.path().to_path_buf()),
5176 )
5177 .unwrap();
5178
5179 registry.inner.shutdown.store(true, Ordering::SeqCst);
5183 std::thread::sleep(Duration::from_millis(550));
5184
5185 let task = registry.task_for_session(&task_id, "session").unwrap();
5186
5187 let started = Instant::now();
5191 while !task.paths.exit.exists() {
5192 assert!(
5193 started.elapsed() < Duration::from_secs(5),
5194 "exit marker should land quickly for `true`"
5195 );
5196 std::thread::sleep(Duration::from_millis(20));
5197 }
5198
5199 let zombie_pid;
5205 {
5206 let mut state = task.state.lock().unwrap();
5207 state.metadata.status = BgTaskStatus::Running;
5208 state.metadata.status_reason = None;
5209 state.metadata.exit_code = None;
5210 state.metadata.finished_at = None;
5211 state.metadata.duration_ms = None;
5212 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5213 .expect("persist reset Running metadata");
5214 let zombie = spawn_unreaped_zombie();
5215 zombie_pid = zombie.id();
5216 state.runtime = TaskRuntime::Piped(Some(zombie));
5217 }
5218 *task.terminal_at.lock().unwrap() = None;
5219
5220 assert!(
5222 is_zombie(zombie_pid),
5223 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5224 );
5225
5226 registry.poll_task(&task).unwrap();
5229
5230 {
5231 let state = task.state.lock().unwrap();
5232 assert!(
5233 matches!(state.runtime, TaskRuntime::Piped(None)),
5234 "child handle must be released after marker finalize"
5235 );
5236 assert!(
5237 state.metadata.status.is_terminal(),
5238 "task must be terminal after marker finalize: {:?}",
5239 state.metadata.status
5240 );
5241 }
5242
5243 assert!(
5246 !is_zombie(zombie_pid),
5247 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5248 after the exit-marker terminal transition"
5249 );
5250 }
5251
5252 #[cfg(unix)]
5256 #[test]
5257 fn kill_with_existing_marker_reaps_child_no_zombie() {
5258 use std::sync::atomic::Ordering;
5259
5260 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5261 let dir = tempfile::tempdir().unwrap();
5262 let task_id = registry
5263 .spawn(
5264 QUICK_SUCCESS_COMMAND,
5265 "session".to_string(),
5266 dir.path().to_path_buf(),
5267 HashMap::new(),
5268 Some(Duration::from_secs(30)),
5269 dir.path().to_path_buf(),
5270 10,
5271 true,
5272 false,
5273 Some(dir.path().to_path_buf()),
5274 )
5275 .unwrap();
5276
5277 registry.inner.shutdown.store(true, Ordering::SeqCst);
5278 std::thread::sleep(Duration::from_millis(550));
5279
5280 let task = registry.task_for_session(&task_id, "session").unwrap();
5281
5282 let started = Instant::now();
5283 while !task.paths.exit.exists() {
5284 assert!(
5285 started.elapsed() < Duration::from_secs(5),
5286 "exit marker should land quickly for `true`"
5287 );
5288 std::thread::sleep(Duration::from_millis(20));
5289 }
5290
5291 let zombie_pid;
5292 {
5293 let mut state = task.state.lock().unwrap();
5294 state.metadata.status = BgTaskStatus::Running;
5295 state.metadata.status_reason = None;
5296 state.metadata.exit_code = None;
5297 state.metadata.finished_at = None;
5298 state.metadata.duration_ms = None;
5299 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5300 .expect("persist reset Running metadata");
5301 let zombie = spawn_unreaped_zombie();
5302 zombie_pid = zombie.id();
5303 state.runtime = TaskRuntime::Piped(Some(zombie));
5304 }
5305 *task.terminal_at.lock().unwrap() = None;
5306
5307 assert!(
5308 is_zombie(zombie_pid),
5309 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5310 );
5311
5312 registry
5314 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5315 .expect("kill should succeed");
5316
5317 {
5318 let state = task.state.lock().unwrap();
5319 assert!(
5320 matches!(state.runtime, TaskRuntime::Piped(None)),
5321 "child handle must be released after marker-aware kill"
5322 );
5323 assert!(state.metadata.status.is_terminal());
5324 }
5325
5326 assert!(
5327 !is_zombie(zombie_pid),
5328 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5329 after a marker-aware kill"
5330 );
5331 }
5332
5333 #[test]
5334 fn cleanup_finished_keeps_running_tasks() {
5335 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5336 let dir = tempfile::tempdir().unwrap();
5337 let task_id = registry
5338 .spawn(
5339 LONG_RUNNING_COMMAND,
5340 "session".to_string(),
5341 dir.path().to_path_buf(),
5342 HashMap::new(),
5343 Some(Duration::from_secs(30)),
5344 dir.path().to_path_buf(),
5345 10,
5346 true,
5347 false,
5348 Some(dir.path().to_path_buf()),
5349 )
5350 .unwrap();
5351
5352 registry.cleanup_finished(Duration::ZERO);
5353
5354 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5355 let _ = registry.kill(&task_id, "session");
5356 }
5357
5358 #[cfg(windows)]
5359 fn wait_for_file(path: &Path) -> String {
5360 let started = Instant::now();
5361 loop {
5362 if path.exists() {
5363 return fs::read_to_string(path).expect("read file");
5364 }
5365 assert!(
5366 started.elapsed() < Duration::from_secs(30),
5367 "timed out waiting for {}",
5368 path.display()
5369 );
5370 std::thread::sleep(Duration::from_millis(100));
5371 }
5372 }
5373
5374 #[cfg(windows)]
5375 fn spawn_windows_registry_command(
5376 command: &str,
5377 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5378 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5379 let dir = tempfile::tempdir().unwrap();
5380 let task_id = registry
5381 .spawn(
5382 command,
5383 "session".to_string(),
5384 dir.path().to_path_buf(),
5385 HashMap::new(),
5386 Some(Duration::from_secs(30)),
5387 dir.path().to_path_buf(),
5388 10,
5389 false,
5390 false,
5391 Some(dir.path().to_path_buf()),
5392 )
5393 .unwrap();
5394 (registry, dir, task_id)
5395 }
5396
5397 #[cfg(windows)]
5398 #[test]
5399 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5400 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5401 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5402
5403 let content = wait_for_file(&exit_path);
5404
5405 assert_eq!(content.trim(), "0");
5406 }
5407
5408 #[cfg(windows)]
5409 #[test]
5410 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5411 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5412 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5413
5414 let content = wait_for_file(&exit_path);
5415
5416 assert_eq!(content.trim(), "42");
5417 }
5418
5419 #[cfg(windows)]
5420 #[test]
5421 fn windows_spawn_captures_stdout_to_disk() {
5422 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5423 let task = registry.task_for_session(&task_id, "session").unwrap();
5424 let stdout_path = task.paths.stdout.clone();
5425 let exit_path = task.paths.exit.clone();
5426
5427 let _ = wait_for_file(&exit_path);
5428 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5429
5430 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5431 }
5432
5433 #[cfg(windows)]
5434 #[test]
5435 fn windows_spawn_uses_pwsh_when_available() {
5436 let candidates = crate::windows_shell::shell_candidates_with(
5440 |binary| match binary {
5441 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5442 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5443 _ => None,
5444 },
5445 || None,
5446 );
5447 let shell = candidates.first().expect("at least one candidate").clone();
5448 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5449 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5450 }
5451
5452 #[cfg(windows)]
5459 #[test]
5460 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5461 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5462 let script =
5463 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5464
5465 assert!(
5469 script.contains("set CODE=%ERRORLEVEL%"),
5470 "wrapper must capture exit code into CODE: {script}"
5471 );
5472 assert!(
5473 script.contains("echo %CODE% >"),
5474 "wrapper must echo CODE to a temp marker file: {script}"
5475 );
5476 assert!(
5477 script.contains("move /Y"),
5478 "wrapper must use atomic move to write the marker: {script}"
5479 );
5480 assert!(
5483 script.contains("> nul"),
5484 "wrapper must redirect move output to nul: {script}"
5485 );
5486 assert!(
5488 script.contains("exit /B %CODE%"),
5489 "wrapper must propagate the captured exit code: {script}"
5490 );
5491 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5492 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5493 }
5494
5495 #[cfg(windows)]
5501 #[test]
5502 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5503 use crate::windows_shell::WindowsShell;
5504 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5505 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5506 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5507 assert_eq!(
5508 args_strs,
5509 vec!["/D", "/S", "/C", "echo wrapped"],
5510 "Cmd::bg_command must prepend /D /S /C"
5511 );
5512 }
5513
5514 #[cfg(windows)]
5518 #[test]
5519 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5520 use crate::windows_shell::WindowsShell;
5521 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5522 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5523 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5524 assert!(
5525 args_strs.contains(&"-Command"),
5526 "Pwsh::bg_command must use -Command: {args_strs:?}"
5527 );
5528 assert!(
5529 args_strs.contains(&"Get-Date"),
5530 "Pwsh::bg_command must include the user command body"
5531 );
5532 }
5533
5534 #[allow(dead_code)]
5565 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
5567}