1use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::{Read, Write};
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::compress::caps::DropClass;
16use crate::compress::CompressionResult;
17use crate::context::SharedProgressSender;
18use crate::harness::Harness;
19use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
20
21#[cfg(unix)]
22use std::os::unix::process::CommandExt;
23#[cfg(windows)]
24use std::os::windows::process::CommandExt;
25
26use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
27use super::output::{
28 cap_completion_output, cap_completion_output_with_marker, cap_final_output,
29 cap_final_output_with_marker, completion_preview_threshold, json_output_pointer, quote_path,
30 COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
31 FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
32 RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
33};
34use super::persistence::{
35 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
36 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
37 ExitMarker, PersistedTask, TaskPaths,
38};
39use super::process::is_process_alive;
40#[cfg(unix)]
41use super::process::terminate_pgid;
42#[cfg(windows)]
43use super::process::terminate_pid;
44use super::pty_process::spawn_pty_for_command;
45use super::pty_runtime::PtyRuntime;
46use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
47use super::{BgTaskInfo, BgTaskStatus};
48const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
56const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
57const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
58const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
59
60const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
61
62#[derive(Debug, Clone, Serialize)]
63pub struct BgCompletion {
64 pub task_id: String,
65 #[serde(skip_serializing)]
68 pub session_id: String,
69 pub status: BgTaskStatus,
70 pub exit_code: Option<i32>,
71 pub command: String,
72 #[serde(default, skip_serializing_if = "String::is_empty")]
78 pub output_preview: String,
79 #[serde(default, skip_serializing_if = "is_false")]
84 pub output_truncated: bool,
85 #[serde(default, skip_serializing_if = "Option::is_none")]
88 pub original_tokens: Option<u32>,
89 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub compressed_tokens: Option<u32>,
93 #[serde(default, skip_serializing_if = "is_false")]
95 pub tokens_skipped: bool,
96}
97
98fn is_false(v: &bool) -> bool {
99 !*v
100}
101
102#[derive(Debug, Clone, Serialize)]
103pub struct BgTaskSnapshot {
104 #[serde(flatten)]
105 pub info: BgTaskInfo,
106 pub exit_code: Option<i32>,
107 pub child_pid: Option<u32>,
108 pub workdir: String,
109 pub output_preview: String,
110 pub output_truncated: bool,
111 pub output_path: Option<String>,
112 pub stderr_path: Option<String>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_rows: Option<u16>,
115 #[serde(skip_serializing_if = "Option::is_none")]
116 pub pty_cols: Option<u16>,
117}
118
119#[derive(Debug, Clone, Copy, PartialEq, Eq)]
120enum TerminalOutputKind {
121 Compressed,
122 Raw,
123 Structured,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127struct TerminalOutputCache {
128 output_preview: String,
129 output_truncated: bool,
130 kind: TerminalOutputKind,
131 output_path: Option<String>,
132 stderr_path: Option<String>,
133 recovery: Option<RecoveryContext>,
134}
135
136#[derive(Debug, Clone, PartialEq, Eq)]
137struct RecoveryContext {
138 dropped_by_class: BTreeMap<DropClass, usize>,
139 had_inner_drop: bool,
140 offset_hint_eligible: bool,
141 offset_start_line: Option<usize>,
142 byte_truncated: bool,
143 output_path: Option<String>,
144 stderr_path: Option<String>,
145 include_stderr_path: bool,
146}
147
148impl RecoveryContext {
149 fn has_visible_drop(&self) -> bool {
150 self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
151 }
152}
153
154#[derive(Clone)]
155pub struct BgTaskRegistry {
156 pub(crate) inner: Arc<RegistryInner>,
157}
158
159pub(crate) struct RegistryInner {
160 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
161 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
162 pub(crate) progress_sender: SharedProgressSender,
163 watchdog_started: AtomicBool,
164 pub(crate) shutdown: AtomicBool,
165 pub(crate) long_running_reminder_enabled: AtomicBool,
166 pub(crate) long_running_reminder_interval_ms: AtomicU64,
167 persisted_gc_started: AtomicBool,
168 #[cfg(test)]
169 persisted_gc_runs: AtomicU64,
170 pub(crate) compressor:
176 Mutex<Option<Box<dyn Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync>>>,
177 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
178 pub(crate) db_harness: RwLock<Option<String>>,
179 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
180 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
181 pub(crate) watch_registry: Mutex<WatchRegistry>,
182}
183
184pub(crate) struct BgTask {
185 pub(crate) task_id: String,
186 pub(crate) session_id: String,
187 pub(crate) paths: TaskPaths,
188 pub(crate) started: Instant,
189 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
190 pub(crate) terminal_at: Mutex<Option<Instant>>,
191 pub(crate) state: Mutex<BgTaskState>,
192}
193
194pub(crate) enum TaskRuntime {
195 Piped(Option<Child>),
196 Pty(Option<PtyRuntime>),
197}
198
199pub(crate) struct BgTaskState {
200 pub(crate) metadata: PersistedTask,
201 pub(crate) runtime: TaskRuntime,
202 pub(crate) detached: bool,
203 pub(crate) child_exit_observed: bool,
214 pub(crate) buffer: BgBuffer,
215 terminal_output_cache: Option<TerminalOutputCache>,
216 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
218}
219
220fn completion_matches_session(completion: &BgCompletion, session_id: Option<&str>) -> bool {
221 session_id
222 .map(|session_id| completion.session_id == session_id)
223 .unwrap_or(true)
224}
225
226impl BgTaskRegistry {
227 pub fn new(progress_sender: SharedProgressSender) -> Self {
228 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
229 Self {
230 inner: Arc::new(RegistryInner {
231 tasks: Mutex::new(HashMap::new()),
232 completions: Mutex::new(VecDeque::new()),
233 progress_sender,
234 watchdog_started: AtomicBool::new(false),
235 shutdown: AtomicBool::new(false),
236 long_running_reminder_enabled: AtomicBool::new(true),
237 long_running_reminder_interval_ms: AtomicU64::new(600_000),
238 persisted_gc_started: AtomicBool::new(false),
239 #[cfg(test)]
240 persisted_gc_runs: AtomicU64::new(0),
241 compressor: Mutex::new(None),
242 db_pool: RwLock::new(None),
243 db_harness: RwLock::new(None),
244 wake_tx,
245 wake_rx,
246 watch_registry: Mutex::new(WatchRegistry::default()),
247 }),
248 }
249 }
250
251 pub fn set_harness(&self, harness: Harness) {
252 if let Ok(mut slot) = self.inner.db_harness.write() {
253 *slot = Some(harness.as_str().to_string());
254 }
255 }
256
257 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
258 if let Ok(mut slot) = self.inner.db_pool.write() {
259 *slot = Some(conn);
260 }
261 }
262
263 pub fn clear_db_pool(&self) {
264 if let Ok(mut slot) = self.inner.db_pool.write() {
265 *slot = None;
266 }
267 }
268
269 pub fn set_compressor<F>(&self, compressor: F)
274 where
275 F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
276 {
277 self.set_compressor_with_exit_code(move |command, output, _exit_code| {
278 compressor(command, output)
279 });
280 }
281
282 pub fn set_compressor_with_exit_code<F>(&self, compressor: F)
283 where
284 F: Fn(&str, String, Option<i32>) -> CompressionResult + Send + Sync + 'static,
285 {
286 if let Ok(mut slot) = self.inner.compressor.lock() {
287 *slot = Some(Box::new(compressor));
288 }
289 }
290
291 pub(crate) fn compress_output(
294 &self,
295 command: &str,
296 output: String,
297 exit_code: Option<i32>,
298 ) -> CompressionResult {
299 let Ok(slot) = self.inner.compressor.lock() else {
300 return CompressionResult::new(output);
301 };
302 match slot.as_ref() {
303 Some(compressor) => compressor(command, output, exit_code),
304 None => CompressionResult::new(output),
305 }
306 }
307
308 fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
309 let (metadata, buffer) = {
310 let state = task.state.lock().ok()?;
311 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
312 return None;
313 }
314 if let Some(cache) = state.terminal_output_cache.clone() {
315 return Some(cache);
316 }
317 (state.metadata.clone(), state.buffer.clone())
318 };
319
320 let mut cap_buffer = buffer.clone();
321 cap_buffer.enforce_terminal_cap();
322 let cache = self.render_terminal_output(&metadata, &buffer);
323 let mut state = task.state.lock().ok()?;
324 if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
325 return None;
326 }
327 if let Some(existing) = state.terminal_output_cache.clone() {
328 return Some(existing);
329 }
330 state.terminal_output_cache = Some(cache.clone());
331 Some(cache)
332 }
333
334 fn render_terminal_output(
335 &self,
336 metadata: &PersistedTask,
337 buffer: &BgBuffer,
338 ) -> TerminalOutputCache {
339 if metadata.mode == BgMode::Pty {
340 return TerminalOutputCache {
341 output_preview: String::new(),
342 output_truncated: false,
343 kind: TerminalOutputKind::Raw,
344 output_path: buffer.output_path().map(|path| path.display().to_string()),
345 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
346 recovery: None,
347 };
348 }
349
350 if let Some(structured) = render_structured_output(&metadata.command, buffer) {
351 return structured;
352 }
353
354 if !metadata.compressed {
355 return render_raw_passthrough(buffer);
356 }
357
358 let raw = buffer.read_combined_head_tail(
359 COMPRESS_INPUT_CAP_BYTES,
360 COMPRESS_INPUT_HEAD_BYTES,
361 COMPRESS_INPUT_TAIL_BYTES,
362 );
363 let compressed = self.compress_output(&metadata.command, raw.text, metadata.exit_code);
364 render_compressed_with_recovery(buffer, compressed, raw.truncated)
365 }
366
367 fn snapshot_with_terminal_cache(
368 &self,
369 task: &Arc<BgTask>,
370 preview_bytes: usize,
371 ) -> BgTaskSnapshot {
372 let mut snapshot = task.snapshot(preview_bytes);
373 self.maybe_compress_snapshot(task, &mut snapshot);
374 snapshot
375 }
376
377 fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
378 let (metadata, buffer) = {
379 let state = task
380 .state
381 .lock()
382 .map_err(|_| "background task lock poisoned".to_string())?;
383 if !state.metadata.status.is_terminal() {
384 return Ok(());
385 }
386 (state.metadata.clone(), state.buffer.clone())
387 };
388
389 let mut cap_buffer = buffer.clone();
390 cap_buffer.enforce_terminal_cap();
391 let cache = self.ensure_terminal_output_cache(task);
392 self.enqueue_completion_from_parts(
393 &metadata,
394 Some(&buffer),
395 None,
396 emit_frame,
397 cache.as_ref(),
398 );
399 Ok(())
400 }
401
402 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
403 write_task(&paths.json, metadata)?;
404 self.dual_write_task(paths, metadata);
405 Ok(())
406 }
407
408 fn update_task_metadata<F>(
409 &self,
410 paths: &TaskPaths,
411 update: F,
412 ) -> std::io::Result<PersistedTask>
413 where
414 F: FnOnce(&mut PersistedTask),
415 {
416 let metadata = update_task(&paths.json, update)?;
417 self.dual_write_task(paths, &metadata);
418 Ok(metadata)
419 }
420
421 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
422 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
423 let Some(pool) = pool else {
424 return;
425 };
426 let harness = self
427 .inner
428 .db_harness
429 .read()
430 .ok()
431 .and_then(|slot| slot.clone());
432 let Some(harness) = harness else {
433 crate::slog_warn!(
434 "dual-write bash_task to DB skipped for {}: harness not configured",
435 metadata.task_id
436 );
437 return;
438 };
439 let row = match metadata.to_bash_task_row(&harness, paths) {
440 Ok(row) => row,
441 Err(error) => {
442 crate::slog_warn!(
443 "dual-write bash_task to DB failed for {}: {}",
444 metadata.task_id,
445 error
446 );
447 return;
448 }
449 };
450 let conn = match pool.lock() {
451 Ok(conn) => conn,
452 Err(_) => {
453 crate::slog_warn!(
454 "dual-write bash_task to DB failed for {}: db mutex poisoned",
455 metadata.task_id
456 );
457 return;
458 }
459 };
460 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
461 crate::slog_warn!(
462 "dual-write bash_task to DB failed for {}: {}",
463 metadata.task_id,
464 error
465 );
466 }
467 }
468
469 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
470 self.inner
471 .long_running_reminder_enabled
472 .store(enabled, Ordering::SeqCst);
473 self.inner
474 .long_running_reminder_interval_ms
475 .store(interval_ms, Ordering::SeqCst);
476 }
477
478 #[cfg(unix)]
479 #[allow(clippy::too_many_arguments)]
480 pub fn spawn(
481 &self,
482 command: &str,
483 session_id: String,
484 workdir: PathBuf,
485 env: HashMap<String, String>,
486 timeout: Option<Duration>,
487 storage_dir: PathBuf,
488 max_running: usize,
489 notify_on_completion: bool,
490 compressed: bool,
491 project_root: Option<PathBuf>,
492 ) -> Result<String, String> {
493 self.start_watchdog();
494
495 let running = self.running_count();
496 if running >= max_running {
497 return Err(format!(
498 "background bash task limit exceeded: {running} running (max {max_running})"
499 ));
500 }
501
502 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
503 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
504 let task_id = self.generate_unique_task_id()?;
505 let paths = task_paths(&storage_dir, &session_id, &task_id);
506 fs::create_dir_all(&paths.dir)
507 .map_err(|e| format!("failed to create background task dir: {e}"))?;
508
509 let mut metadata = PersistedTask::starting(
510 task_id.clone(),
511 session_id.clone(),
512 command.to_string(),
513 workdir.clone(),
514 project_root,
515 timeout_ms,
516 notify_on_completion,
517 compressed,
518 );
519 self.persist_task(&paths, &metadata)
520 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
521
522 create_capture_file(&paths.stdout)
526 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
527 create_capture_file(&paths.stderr)
528 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
529
530 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
531 Ok(child) => child,
532 Err(error) => {
533 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
534 let _ = delete_task_bundle(&paths);
535 return Err(error);
536 }
537 };
538
539 let child_pid = child.id();
540 metadata.mark_running(child_pid, child_pid as i32);
541 self.persist_task(&paths, &metadata)
542 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
543
544 let task = Arc::new(BgTask {
545 task_id: task_id.clone(),
546 session_id,
547 paths: paths.clone(),
548 started: Instant::now(),
549 last_reminder_at: Mutex::new(None),
550 terminal_at: Mutex::new(None),
551 state: Mutex::new(BgTaskState {
552 metadata,
553 runtime: TaskRuntime::Piped(Some(child)),
554 detached: false,
555 child_exit_observed: false,
556 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
557 terminal_output_cache: None,
558 pending_terminal_override: None,
559 }),
560 });
561
562 self.inner
563 .tasks
564 .lock()
565 .map_err(|_| "background task registry lock poisoned".to_string())?
566 .insert(task_id.clone(), task);
567
568 Ok(task_id)
569 }
570
571 #[allow(clippy::too_many_arguments)]
572 pub fn spawn_pty(
573 &self,
574 command: &str,
575 session_id: String,
576 workdir: PathBuf,
577 env: HashMap<String, String>,
578 timeout: Option<Duration>,
579 storage_dir: PathBuf,
580 max_running: usize,
581 notify_on_completion: bool,
582 compressed: bool,
583 project_root: Option<PathBuf>,
584 rows: u16,
585 cols: u16,
586 ) -> Result<String, String> {
587 self.start_watchdog();
588
589 let running = self.running_count();
590 if running >= max_running {
591 return Err(format!(
592 "background bash task limit exceeded: {running} running (max {max_running})"
593 ));
594 }
595
596 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
597 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
598 let task_id = self.generate_unique_task_id()?;
599 let paths = task_paths(&storage_dir, &session_id, &task_id);
600 fs::create_dir_all(&paths.dir)
601 .map_err(|e| format!("failed to create background task dir: {e}"))?;
602
603 let mut metadata = PersistedTask::starting(
604 task_id.clone(),
605 session_id.clone(),
606 command.to_string(),
607 workdir.clone(),
608 project_root,
609 timeout_ms,
610 notify_on_completion,
611 compressed,
612 );
613 metadata.mode = BgMode::Pty;
614 metadata.pty_rows = Some(rows);
615 metadata.pty_cols = Some(cols);
616 self.persist_task(&paths, &metadata)
617 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
618 create_capture_file(&paths.pty)
619 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
620
621 let runtime = match spawn_pty_for_command(
622 &task_id,
623 &session_id,
624 command,
625 &paths,
626 &workdir,
627 &env,
628 rows,
629 cols,
630 self.inner.wake_tx.clone(),
631 ) {
632 Ok(runtime) => runtime,
633 Err(error) => {
634 crate::slog_warn!(
635 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
636 );
637 let _ = delete_task_bundle(&paths);
638 return Err(error);
639 }
640 };
641
642 if let Some(child_pid) = runtime.child_pid {
643 metadata.mark_running(child_pid, child_pid as i32);
644 } else {
645 metadata.status = BgTaskStatus::Running;
646 metadata.pgid = None;
647 }
648 self.persist_task(&paths, &metadata)
649 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
650
651 let task = Arc::new(BgTask {
652 task_id: task_id.clone(),
653 session_id,
654 paths: paths.clone(),
655 started: Instant::now(),
656 last_reminder_at: Mutex::new(None),
657 terminal_at: Mutex::new(None),
658 state: Mutex::new(BgTaskState {
659 metadata,
660 runtime: TaskRuntime::Pty(Some(runtime)),
661 detached: false,
662 child_exit_observed: false,
663 buffer: BgBuffer::pty(paths.pty.clone()),
664 terminal_output_cache: None,
665 pending_terminal_override: None,
666 }),
667 });
668
669 self.inner
670 .tasks
671 .lock()
672 .map_err(|_| "background task registry lock poisoned".to_string())?
673 .insert(task_id.clone(), task);
674
675 Ok(task_id)
676 }
677
678 #[cfg(windows)]
679 #[allow(clippy::too_many_arguments)]
680 pub fn spawn(
681 &self,
682 command: &str,
683 session_id: String,
684 workdir: PathBuf,
685 env: HashMap<String, String>,
686 timeout: Option<Duration>,
687 storage_dir: PathBuf,
688 max_running: usize,
689 notify_on_completion: bool,
690 compressed: bool,
691 project_root: Option<PathBuf>,
692 ) -> Result<String, String> {
693 self.start_watchdog();
694
695 let running = self.running_count();
696 if running >= max_running {
697 return Err(format!(
698 "background bash task limit exceeded: {running} running (max {max_running})"
699 ));
700 }
701
702 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
703 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
704 let task_id = self.generate_unique_task_id()?;
705 let paths = task_paths(&storage_dir, &session_id, &task_id);
706 fs::create_dir_all(&paths.dir)
707 .map_err(|e| format!("failed to create background task dir: {e}"))?;
708
709 let mut metadata = PersistedTask::starting(
710 task_id.clone(),
711 session_id.clone(),
712 command.to_string(),
713 workdir.clone(),
714 project_root,
715 timeout_ms,
716 notify_on_completion,
717 compressed,
718 );
719 self.persist_task(&paths, &metadata)
720 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
721
722 create_capture_file(&paths.stdout)
728 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
729 create_capture_file(&paths.stderr)
730 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
731
732 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
733 Ok(child) => child,
734 Err(error) => {
735 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
736 let _ = delete_task_bundle(&paths);
737 return Err(error);
738 }
739 };
740
741 let child_pid = child.id();
742 metadata.status = BgTaskStatus::Running;
743 metadata.child_pid = Some(child_pid);
744 metadata.pgid = None;
745 self.persist_task(&paths, &metadata)
746 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
747
748 let task = Arc::new(BgTask {
749 task_id: task_id.clone(),
750 session_id,
751 paths: paths.clone(),
752 started: Instant::now(),
753 last_reminder_at: Mutex::new(None),
754 terminal_at: Mutex::new(None),
755 state: Mutex::new(BgTaskState {
756 metadata,
757 runtime: TaskRuntime::Piped(Some(child)),
758 detached: false,
759 child_exit_observed: false,
760 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
761 terminal_output_cache: None,
762 pending_terminal_override: None,
763 }),
764 });
765
766 self.inner
767 .tasks
768 .lock()
769 .map_err(|_| "background task registry lock poisoned".to_string())?
770 .insert(task_id.clone(), task);
771
772 Ok(task_id)
773 }
774
775 pub fn write_pty(
776 &self,
777 task_id: &str,
778 session_id: &str,
779 input: &[u8],
780 ) -> Result<usize, String> {
781 let task = self
782 .task_for_session(task_id, session_id)
783 .ok_or_else(|| "task_not_found".to_string())?;
784
785 let writer = {
786 let state = task
787 .state
788 .lock()
789 .map_err(|_| "background task lock poisoned".to_string())?;
790 if state.metadata.mode != BgMode::Pty {
791 return Err("task_not_pty".to_string());
792 }
793 if state.metadata.status.is_terminal() {
794 return Err("task_exited".to_string());
795 }
796 match &state.runtime {
797 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
798 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
799 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
800 }
801 };
802
803 let mut writer = writer
804 .lock()
805 .map_err(|_| "PTY writer lock poisoned".to_string())?;
806 writer
807 .write_all(input)
808 .map_err(|error| format!("failed to write to PTY: {error}"))?;
809 writer
810 .flush()
811 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
812 Ok(input.len())
813 }
814
815 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
816 self.replay_session_inner(storage_dir, session_id, None)
817 }
818
819 pub fn replay_session_for_project(
820 &self,
821 storage_dir: &Path,
822 session_id: &str,
823 project_root: &Path,
824 ) -> Result<(), String> {
825 self.replay_session_inner(storage_dir, session_id, Some(project_root))
826 }
827
828 fn replay_session_inner(
829 &self,
830 storage_dir: &Path,
831 session_id: &str,
832 project_root: Option<&Path>,
833 ) -> Result<(), String> {
834 self.start_watchdog();
835 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
836 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
837 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
838 }
839 }
840
841 let canonical_project = project_root.map(canonicalized_path);
842 let tasks = match self.replay_session_from_db(session_id) {
854 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
855 Some(Ok(_)) => {
856 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
857 if !disk_tasks.is_empty() {
858 crate::slog_info!(
859 "bash task replay: 0 in DB for session {}, {} from disk fallback",
860 session_id,
861 disk_tasks.len()
862 );
863 }
864 disk_tasks
865 }
866 Some(Err(error)) => {
867 crate::slog_warn!(
868 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
869 session_id,
870 error
871 );
872 self.replay_session_from_disk(storage_dir, session_id)?
873 }
874 None => {
875 self.replay_session_from_disk(storage_dir, session_id)?
877 }
878 };
879
880 for mut metadata in tasks {
881 if metadata.session_id != session_id {
882 continue;
883 }
884 if let Some(canonical_project) = canonical_project.as_deref() {
885 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
886 if metadata_project.as_deref() != Some(canonical_project) {
887 continue;
888 }
889 }
890
891 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
892 match metadata.status {
893 BgTaskStatus::Starting => {
894 let completion_was_delivered = metadata.completion_delivered;
895 metadata.mark_terminal(
896 BgTaskStatus::Failed,
897 None,
898 Some("spawn aborted".to_string()),
899 );
900 metadata.completion_delivered |= completion_was_delivered;
901 let _ = self.persist_task(&paths, &metadata);
902 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
903 self.insert_rehydrated_task(metadata, paths, true)?;
904 }
905 BgTaskStatus::Running | BgTaskStatus::Killing => {
906 if metadata.mode == BgMode::Pty {
907 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
908 let completion_was_delivered = metadata.completion_delivered;
909 metadata = terminal_metadata_from_marker(metadata, marker, None);
910 metadata.completion_delivered |= completion_was_delivered;
911 let _ = self.persist_task(&paths, &metadata);
912 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
913 self.insert_rehydrated_task(metadata, paths, true)?;
914 } else if metadata.status.is_terminal() {
915 self.insert_rehydrated_task(metadata, paths, true)?;
916 } else {
917 let completion_was_delivered = metadata.completion_delivered;
918 metadata.mark_terminal(
919 BgTaskStatus::Killed,
920 None,
921 Some("pty_lost_on_bridge_restart".to_string()),
922 );
923 metadata.completion_delivered |= completion_was_delivered;
924 let _ = self.persist_task(&paths, &metadata);
925 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
926 self.insert_rehydrated_task(metadata, paths, true)?;
927 }
928 } else if self.running_metadata_is_stale(&metadata) {
929 let completion_was_delivered = metadata.completion_delivered;
930 metadata.mark_terminal(
931 BgTaskStatus::Killed,
932 None,
933 Some("orphaned (>24h)".to_string()),
934 );
935 metadata.completion_delivered |= completion_was_delivered;
936 if !paths.exit.exists() {
937 let _ = write_kill_marker_if_absent(&paths.exit);
938 }
939 let _ = self.persist_task(&paths, &metadata);
940 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
941 self.insert_rehydrated_task(metadata, paths, true)?;
942 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
943 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
944 "recovered from inconsistent killing state on replay".to_string()
945 });
946 if reason.is_some() {
947 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
948 metadata.task_id);
949 }
950 let completion_was_delivered = metadata.completion_delivered;
951 metadata = terminal_metadata_from_marker(metadata, marker, reason);
952 metadata.completion_delivered |= completion_was_delivered;
953 let _ = self.persist_task(&paths, &metadata);
954 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
955 self.insert_rehydrated_task(metadata, paths, true)?;
956 } else if metadata.status == BgTaskStatus::Killing {
957 if !paths.exit.exists() {
958 let _ = write_kill_marker_if_absent(&paths.exit);
959 }
960 let completion_was_delivered = metadata.completion_delivered;
961 metadata.mark_terminal(
962 BgTaskStatus::Killed,
963 None,
964 Some("recovered from inconsistent killing state on replay".to_string()),
965 );
966 metadata.completion_delivered |= completion_was_delivered;
967 let _ = self.persist_task(&paths, &metadata);
968 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
969 self.insert_rehydrated_task(metadata, paths, true)?;
970 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
971 let completion_was_delivered = metadata.completion_delivered;
972 metadata.mark_terminal(
973 BgTaskStatus::Failed,
974 None,
975 Some("process exited without exit marker".to_string()),
976 );
977 metadata.completion_delivered |= completion_was_delivered;
978 let _ = self.persist_task(&paths, &metadata);
979 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
980 self.insert_rehydrated_task(metadata, paths, true)?;
981 } else {
982 self.insert_rehydrated_task(metadata, paths, true)?;
983 }
984 }
985 _ if metadata.status.is_terminal() => {
986 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
992 self.insert_rehydrated_task(metadata, paths, true)?;
993 }
994 _ => {}
995 }
996 }
997
998 Ok(())
999 }
1000
1001 fn replay_session_from_db(
1002 &self,
1003 session_id: &str,
1004 ) -> Option<Result<Vec<PersistedTask>, String>> {
1005 let pool = self
1006 .inner
1007 .db_pool
1008 .read()
1009 .ok()
1010 .and_then(|slot| slot.clone())?;
1011 let harness = self
1012 .inner
1013 .db_harness
1014 .read()
1015 .ok()
1016 .and_then(|slot| slot.clone())?;
1017 let conn = match pool.lock() {
1018 Ok(conn) => conn,
1019 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1020 };
1021 Some(
1022 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
1023 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
1024 .map_err(|error| error.to_string()),
1025 )
1026 }
1027
1028 fn replay_session_from_disk(
1029 &self,
1030 storage_dir: &Path,
1031 session_id: &str,
1032 ) -> Result<Vec<PersistedTask>, String> {
1033 let dir = session_tasks_dir(storage_dir, session_id);
1034 if !dir.exists() {
1035 return Ok(Vec::new());
1036 }
1037
1038 let entries = fs::read_dir(&dir)
1039 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
1040 let mut tasks = Vec::new();
1041 for entry in entries.flatten() {
1042 let path = entry.path();
1043 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
1044 continue;
1045 }
1046 match read_task(&path) {
1047 Ok(metadata) => tasks.push(metadata),
1048 Err(error) => {
1049 crate::slog_warn!(
1050 "quarantining invalid background task metadata {} during replay: {error}",
1051 path.display()
1052 );
1053 if let Err(quarantine_error) =
1054 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1055 {
1056 crate::slog_warn!(
1057 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1058 path.display()
1059 );
1060 }
1061 }
1062 }
1063 }
1064 Ok(tasks)
1065 }
1066
1067 pub fn register_watch(
1068 &self,
1069 task_id: String,
1070 pattern: WatchPattern,
1071 once: bool,
1072 ) -> Result<String, &'static str> {
1073 let task = self.task(&task_id).ok_or("task_not_found")?;
1074 let (mode, terminal_at_registration, stdout, stderr, pty) = task
1075 .state
1076 .lock()
1077 .map(|state| {
1078 (
1079 state.metadata.mode.clone(),
1080 state.metadata.status.is_terminal(),
1081 task.paths.stdout.clone(),
1082 task.paths.stderr.clone(),
1083 task.paths.pty.clone(),
1084 )
1085 })
1086 .map_err(|_| "background_task_lock_poisoned")?;
1087
1088 let mut terminal_matches = Vec::new();
1089 let scanned_terminal = terminal_at_registration;
1090 let watch_id = {
1091 let mut registry = self
1092 .inner
1093 .watch_registry
1094 .lock()
1095 .map_err(|_| "watch_registry_poisoned")?;
1096 let watch_id = registry.register(task_id.clone(), pattern, once)?;
1097 match &mode {
1098 BgMode::Pipes => {
1099 let stdout_key = format!("{task_id}:stdout");
1100 let stderr_key = format!("{task_id}:stderr");
1101 if terminal_at_registration {
1102 registry.set_file_cursor(&stdout_key, 0);
1103 registry.set_file_cursor(&stderr_key, 0);
1104 terminal_matches.extend(registry.scan_file_new_bytes(
1105 &stdout_key,
1106 &task_id,
1107 &stdout,
1108 ));
1109 terminal_matches.extend(registry.scan_file_new_bytes(
1110 &stderr_key,
1111 &task_id,
1112 &stderr,
1113 ));
1114 } else {
1115 registry.prime_file_cursor(&stdout_key, &stdout);
1116 registry.prime_file_cursor(&stderr_key, &stderr);
1117 }
1118 }
1119 BgMode::Pty => {
1120 let pty_key = format!("{task_id}:pty");
1121 if terminal_at_registration {
1122 registry.set_file_cursor(&pty_key, 0);
1123 terminal_matches
1124 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
1125 } else {
1126 registry.prime_file_cursor(&pty_key, &pty);
1127 }
1128 }
1129 }
1130 watch_id
1131 };
1132
1133 if task.is_terminal() {
1134 if !scanned_terminal {
1135 terminal_matches = {
1136 let mut registry = self
1137 .inner
1138 .watch_registry
1139 .lock()
1140 .map_err(|_| "watch_registry_poisoned")?;
1141 match &mode {
1142 BgMode::Pipes => {
1143 let stdout_key = format!("{task_id}:stdout");
1144 let stderr_key = format!("{task_id}:stderr");
1145 registry.set_file_cursor(&stdout_key, 0);
1146 registry.set_file_cursor(&stderr_key, 0);
1147 let mut matches =
1148 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
1149 matches.extend(registry.scan_file_new_bytes(
1150 &stderr_key,
1151 &task_id,
1152 &stderr,
1153 ));
1154 matches
1155 }
1156 BgMode::Pty => {
1157 let pty_key = format!("{task_id}:pty");
1158 registry.set_file_cursor(&pty_key, 0);
1159 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1160 }
1161 }
1162 };
1163 }
1164
1165 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1166 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1167 if watch_matched {
1168 let _ = task.set_completion_delivered(true, self);
1169 self.clear_task_watch_state(&task_id);
1170 }
1171 return Ok(watch_id);
1172 }
1173
1174 let completion = self
1175 .remove_pending_completion(&task_id)
1176 .or_else(|| self.completion_snapshot_for_task(&task));
1177 if terminal_matches.is_empty() {
1178 if let Some(completion) = completion.as_ref() {
1179 self.emit_bash_watch_exit(completion);
1180 }
1181 } else {
1182 for pattern_match in terminal_matches {
1183 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1184 }
1185 }
1186 let _ = task.set_completion_delivered(true, self);
1187 self.clear_task_watch_state(&task_id);
1188 }
1189
1190 Ok(watch_id)
1191 }
1192
1193 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1194 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1195 registry.unregister(task_id, watch_id);
1196 }
1197 }
1198
1199 pub fn active_watch_count(&self, task_id: &str) -> usize {
1200 self.inner
1201 .watch_registry
1202 .lock()
1203 .map(|registry| registry.active_count(task_id))
1204 .unwrap_or(0)
1205 }
1206
1207 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1208 self.inner
1209 .watch_registry
1210 .lock()
1211 .map(|registry| {
1212 (
1213 registry.has_controlled_task(task_id),
1214 registry.has_matched_task(task_id),
1215 )
1216 })
1217 .unwrap_or((false, false))
1218 }
1219
1220 fn task_has_watch_control(&self, task_id: &str) -> bool {
1221 self.inner
1222 .watch_registry
1223 .lock()
1224 .map(|registry| registry.has_controlled_task(task_id))
1225 .unwrap_or(false)
1226 }
1227
1228 fn clear_task_watch_state(&self, task_id: &str) {
1229 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1230 registry.clear_task(task_id);
1231 }
1232 }
1233
1234 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1235 let (mode, stdout, stderr, pty) = match task.state.lock() {
1236 Ok(state) => (
1237 state.metadata.mode.clone(),
1238 task.paths.stdout.clone(),
1239 task.paths.stderr.clone(),
1240 task.paths.pty.clone(),
1241 ),
1242 Err(_) => return,
1243 };
1244 let mut matches = Vec::new();
1245 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1246 match mode {
1247 BgMode::Pipes => {
1248 let stdout_key = format!("{}:stdout", task.task_id);
1249 let stderr_key = format!("{}:stderr", task.task_id);
1250 matches.extend(registry.scan_file_new_bytes(
1251 &stdout_key,
1252 &task.task_id,
1253 &stdout,
1254 ));
1255 matches.extend(registry.scan_file_new_bytes(
1256 &stderr_key,
1257 &task.task_id,
1258 &stderr,
1259 ));
1260 }
1261 BgMode::Pty => {
1262 let pty_key = format!("{}:pty", task.task_id);
1263 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1264 }
1265 }
1266 }
1267 for pattern_match in matches {
1268 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1269 }
1270 }
1271
1272 pub fn status(
1273 &self,
1274 task_id: &str,
1275 session_id: &str,
1276 project_root: Option<&Path>,
1277 storage_dir: Option<&Path>,
1278 preview_bytes: usize,
1279 ) -> Option<BgTaskSnapshot> {
1280 let mut task = self.task_for_session(task_id, session_id);
1281 if task.is_none() {
1282 if let Some(storage_dir) = storage_dir {
1283 let _ = self.replay_session(storage_dir, session_id);
1284 task = self.task_for_session(task_id, session_id);
1285 }
1286 }
1287 let Some(task) = task else {
1288 return self.status_relaxed(
1289 task_id,
1290 session_id,
1291 project_root?,
1292 storage_dir?,
1293 preview_bytes,
1294 );
1295 };
1296 let _ = self.poll_task(&task);
1297 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1298 }
1299
1300 fn status_relaxed_task(
1301 &self,
1302 task_id: &str,
1303 project_root: &Path,
1304 storage_dir: &Path,
1305 ) -> Option<Arc<BgTask>> {
1306 let canonical_project = canonicalized_path(project_root);
1307 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1308 Some(Ok(Some(metadata))) => {
1309 if let Some(task) = self.task(task_id) {
1310 let matches_project = task
1311 .state
1312 .lock()
1313 .map(|state| {
1314 state
1315 .metadata
1316 .project_root
1317 .as_deref()
1318 .map(canonicalized_path)
1319 .as_deref()
1320 == Some(canonical_project.as_path())
1321 })
1322 .unwrap_or(false);
1323 return matches_project.then_some(task);
1324 }
1325 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1326 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1327 return None;
1328 }
1329 return self.task(task_id);
1330 }
1331 Some(Ok(None)) => {
1332 crate::slog_info!(
1333 "bash task relaxed DB miss for {}; falling back to disk",
1334 task_id
1335 );
1336 }
1337 Some(Err(error)) => {
1338 crate::slog_warn!(
1339 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1340 task_id,
1341 error
1342 );
1343 }
1344 None => {
1345 crate::slog_info!(
1346 "bash task relaxed DB unavailable for {}; falling back to disk",
1347 task_id
1348 );
1349 }
1350 }
1351 let root = storage_dir.join("bash-tasks");
1352 let entries = fs::read_dir(&root).ok()?;
1353 for entry in entries.flatten() {
1354 let dir = entry.path();
1355 if !dir.is_dir() {
1356 continue;
1357 }
1358 let path = dir.join(format!("{task_id}.json"));
1359 if !path.exists() {
1360 continue;
1361 }
1362 let metadata = match read_task(&path) {
1363 Ok(metadata) => metadata,
1364 Err(error) => {
1365 crate::slog_warn!(
1366 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1367 path.display()
1368 );
1369 if let Err(quarantine_error) =
1370 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1371 {
1372 crate::slog_warn!(
1373 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1374 path.display()
1375 );
1376 }
1377 continue;
1378 }
1379 };
1380 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1381 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1382 continue;
1383 }
1384 if let Some(task) = self.task(task_id) {
1385 let matches_project = task
1386 .state
1387 .lock()
1388 .map(|state| {
1389 state
1390 .metadata
1391 .project_root
1392 .as_deref()
1393 .map(canonicalized_path)
1394 .as_deref()
1395 == Some(canonical_project.as_path())
1396 })
1397 .unwrap_or(false);
1398 return matches_project.then_some(task);
1399 }
1400 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1401 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1402 return None;
1403 }
1404 return self.task(task_id);
1405 }
1406 None
1407 }
1408
1409 fn lookup_relaxed_task_from_db(
1410 &self,
1411 task_id: &str,
1412 project_root: &Path,
1413 ) -> Option<Result<Option<PersistedTask>, String>> {
1414 let pool = self
1415 .inner
1416 .db_pool
1417 .read()
1418 .ok()
1419 .and_then(|slot| slot.clone())?;
1420 let harness = self
1421 .inner
1422 .db_harness
1423 .read()
1424 .ok()
1425 .and_then(|slot| slot.clone())?;
1426 let conn = match pool.lock() {
1427 Ok(conn) => conn,
1428 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1429 };
1430 let project_key = crate::search_index::project_cache_key(project_root);
1431 Some(
1432 crate::db::bash_tasks::find_bash_task_for_project(
1433 &conn,
1434 &harness,
1435 &project_key,
1436 task_id,
1437 )
1438 .map(|row| row.map(PersistedTask::from))
1439 .map_err(|error| error.to_string()),
1440 )
1441 }
1442
1443 pub(super) fn status_relaxed(
1444 &self,
1445 task_id: &str,
1446 _session_id: &str,
1447 project_root: &Path,
1448 storage_dir: &Path,
1449 preview_bytes: usize,
1450 ) -> Option<BgTaskSnapshot> {
1451 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1452 let _ = self.poll_task(&task);
1453 Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
1454 }
1455
1456 pub fn kill_relaxed(
1457 &self,
1458 task_id: &str,
1459 project_root: &Path,
1460 storage_dir: &Path,
1461 ) -> Result<BgTaskSnapshot, String> {
1462 let task = self
1463 .status_relaxed_task(task_id, project_root, storage_dir)
1464 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1465 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1466 }
1467
1468 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1469 #[cfg(test)]
1470 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1471
1472 let mut deleted = 0usize;
1473
1474 let root = storage_dir.join("bash-tasks");
1475 if root.exists() {
1476 let session_dirs = fs::read_dir(&root).map_err(|e| {
1477 format!(
1478 "failed to read background task root {}: {e}",
1479 root.display()
1480 )
1481 })?;
1482 for session_entry in session_dirs.flatten() {
1483 let session_dir = session_entry.path();
1484 if !session_dir.is_dir() {
1485 continue;
1486 }
1487 let task_entries = match fs::read_dir(&session_dir) {
1488 Ok(entries) => entries,
1489 Err(error) => {
1490 crate::slog_warn!(
1491 "failed to read background task session dir {}: {error}",
1492 session_dir.display()
1493 );
1494 continue;
1495 }
1496 };
1497 for task_entry in task_entries.flatten() {
1498 let json_path = task_entry.path();
1499 if json_path
1500 .extension()
1501 .and_then(|extension| extension.to_str())
1502 != Some("json")
1503 {
1504 continue;
1505 }
1506 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1507 continue;
1508 }
1509 let metadata = match read_task(&json_path) {
1510 Ok(metadata) => metadata,
1511 Err(error) => {
1512 crate::slog_warn!(
1513 "quarantining corrupt background task metadata {}: {error}",
1514 json_path.display()
1515 );
1516 quarantine_task_json(
1517 storage_dir,
1518 &session_dir,
1519 &json_path,
1520 QuarantineKind::Corrupt,
1521 )?;
1522 continue;
1523 }
1524 };
1525 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1526 continue;
1527 }
1528 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1529 match delete_task_bundle(&paths) {
1530 Ok(()) => {
1531 deleted += 1;
1532 log::debug!(
1533 "deleted persisted background task bundle {}",
1534 metadata.task_id
1535 );
1536 }
1537 Err(error) => {
1538 crate::slog_warn!(
1539 "failed to delete background task bundle {}: {error}",
1540 metadata.task_id
1541 );
1542 continue;
1543 }
1544 }
1545 }
1546 }
1547 }
1548 gc_quarantine(storage_dir);
1549 Ok(deleted)
1550 }
1551
1552 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1553 let tasks = self
1554 .inner
1555 .tasks
1556 .lock()
1557 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1558 .unwrap_or_default();
1559 tasks
1560 .into_iter()
1561 .map(|task| {
1562 let _ = self.poll_task(&task);
1563 self.snapshot_with_terminal_cache(&task, preview_bytes)
1564 })
1565 .collect()
1566 }
1567
1568 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1574 if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
1575 return;
1576 }
1577 if let Some(cache) = self.ensure_terminal_output_cache(task) {
1578 snapshot.output_preview = cache.output_preview;
1579 snapshot.output_truncated = cache.output_truncated;
1580 }
1581 }
1582
1583 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1584 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1585 }
1586
1587 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1588 let task = self
1589 .task_for_session(task_id, session_id)
1590 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1591 let terminal_after_promote = {
1592 let mut state = task
1593 .state
1594 .lock()
1595 .map_err(|_| "background task lock poisoned".to_string())?;
1596 let updated = self
1597 .update_task_metadata(&task.paths, |metadata| {
1598 metadata.notify_on_completion = true;
1599 metadata.completion_delivered = false;
1600 })
1601 .map_err(|e| format!("failed to promote background task: {e}"))?;
1602 state.metadata = updated;
1603 state.metadata.status.is_terminal()
1604 };
1605 if terminal_after_promote {
1606 self.post_terminal_transition(&task, true)?;
1607 }
1608 Ok(true)
1609 }
1610
1611 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1612 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1613 .map(|_| ())
1614 }
1615
1616 pub fn cleanup_finished(&self, older_than: Duration) {
1617 let cutoff = Instant::now().checked_sub(older_than);
1618 let removable_paths: Vec<(String, TaskPaths)> =
1619 if let Ok(mut tasks) = self.inner.tasks.lock() {
1620 let removable = tasks
1621 .iter()
1622 .filter_map(|(task_id, task)| {
1623 let delivered_terminal = task
1624 .state
1625 .lock()
1626 .map(|state| {
1627 state.metadata.status.is_terminal()
1628 && state.metadata.completion_delivered
1629 })
1630 .unwrap_or(false);
1631 if !delivered_terminal {
1632 return None;
1633 }
1634
1635 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1636 let expired = match (terminal_at, cutoff) {
1637 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1638 (Some(_), None) => true,
1639 (None, _) => false,
1640 };
1641 expired.then(|| task_id.clone())
1642 })
1643 .collect::<Vec<_>>();
1644
1645 removable
1646 .into_iter()
1647 .filter_map(|task_id| {
1648 tasks
1649 .remove(&task_id)
1650 .map(|task| (task_id, task.paths.clone()))
1651 })
1652 .collect()
1653 } else {
1654 Vec::new()
1655 };
1656
1657 for (task_id, paths) in removable_paths {
1658 match delete_task_bundle(&paths) {
1659 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1660 Err(error) => crate::slog_warn!(
1661 "failed to delete persisted background task bundle {task_id}: {error}"
1662 ),
1663 }
1664 }
1665 }
1666
1667 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1668 self.drain_completions_for_session(None)
1669 }
1670
1671 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1672 let completions = match self.inner.completions.lock() {
1673 Ok(completions) => completions,
1674 Err(_) => return Vec::new(),
1675 };
1676
1677 completions
1678 .iter()
1679 .filter(|completion| completion_matches_session(completion, session_id))
1680 .cloned()
1681 .collect()
1682 }
1683
1684 pub fn has_completions_for_session(&self, session_id: Option<&str>) -> bool {
1685 match self.inner.completions.lock() {
1686 Ok(completions) => completions
1687 .iter()
1688 .any(|completion| completion_matches_session(completion, session_id)),
1689 Err(_) => true,
1693 }
1694 }
1695
1696 pub fn ack_completions_for_session(
1697 &self,
1698 session_id: Option<&str>,
1699 task_ids: &[String],
1700 ) -> Vec<String> {
1701 if task_ids.is_empty() {
1702 return Vec::new();
1703 }
1704 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1705 let mut completion_sessions = HashMap::new();
1706 if let Ok(mut completions) = self.inner.completions.lock() {
1707 completions.retain(|completion| {
1708 let session_matches = session_id
1709 .map(|session_id| completion.session_id == session_id)
1710 .unwrap_or(true);
1711 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1712 completion_sessions
1713 .insert(completion.task_id.clone(), completion.session_id.clone());
1714 false
1715 } else {
1716 true
1717 }
1718 });
1719 }
1720
1721 let mut delivered = Vec::new();
1722 for task_id in task_ids {
1723 let task = if let Some(session_id) = session_id {
1724 self.task_for_session(task_id, session_id)
1725 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1726 self.task_for_session(task_id, completion_session_id)
1727 } else {
1728 self.task(task_id)
1729 };
1730 if let Some(task) = task {
1731 if task.set_completion_delivered(true, self).is_ok() {
1732 delivered.push(task_id.clone());
1733 }
1734 }
1735 }
1736
1737 delivered
1738 }
1739
1740 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1741 self.inner
1742 .completions
1743 .lock()
1744 .map(|completions| {
1745 completions
1746 .iter()
1747 .filter(|completion| completion.session_id == session_id)
1748 .cloned()
1749 .collect()
1750 })
1751 .unwrap_or_default()
1752 }
1753
1754 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1755 let mut completions = self.inner.completions.lock().ok()?;
1756 let idx = completions
1757 .iter()
1758 .position(|completion| completion.task_id == task_id)?;
1759 completions.remove(idx)
1760 }
1761
1762 fn completion_snapshot_for_task(&self, task: &Arc<BgTask>) -> Option<BgCompletion> {
1763 let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
1764 if !snapshot.info.status.is_terminal() {
1765 return None;
1766 }
1767 let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
1768 (String::new(), false)
1769 } else {
1770 self.ensure_terminal_output_cache(task)
1771 .map(|cache| completion_preview_for_cache(&cache, snapshot.exit_code))
1772 .unwrap_or_else(|| (String::new(), false))
1773 };
1774 Some(BgCompletion {
1775 task_id: snapshot.info.task_id,
1776 session_id: task.session_id.clone(),
1777 status: snapshot.info.status,
1778 exit_code: snapshot.exit_code,
1779 command: snapshot.info.command,
1780 output_preview,
1781 output_truncated,
1782 original_tokens: None,
1783 compressed_tokens: None,
1784 tokens_skipped: false,
1785 })
1786 }
1787
1788 pub fn detach(&self) {
1789 self.inner.shutdown.store(true, Ordering::SeqCst);
1790 if let Ok(mut tasks) = self.inner.tasks.lock() {
1791 for task in tasks.values() {
1792 if let Ok(mut state) = task.state.lock() {
1793 match &mut state.runtime {
1794 TaskRuntime::Piped(child) => *child = None,
1795 TaskRuntime::Pty(runtime) => *runtime = None,
1796 }
1797 state.detached = true;
1798 }
1799 }
1800 tasks.clear();
1801 }
1802 }
1803
1804 pub fn shutdown(&self) {
1805 let tasks = self
1806 .inner
1807 .tasks
1808 .lock()
1809 .map(|tasks| {
1810 tasks
1811 .values()
1812 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1813 .collect::<Vec<_>>()
1814 })
1815 .unwrap_or_default();
1816 for (task_id, session_id) in tasks {
1817 let _ = self.kill(&task_id, &session_id);
1818 }
1819 }
1820
1821 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1822 if let Ok(state) = task.state.lock() {
1823 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1824 if !pty.exit_observed.load(Ordering::SeqCst) {
1832 return Ok(());
1833 }
1834 }
1835 }
1836 let marker = match read_exit_marker(&task.paths.exit) {
1837 Ok(Some(marker)) => marker,
1838 Ok(None) => return Ok(()),
1839 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1840 };
1841 self.finalize_from_marker(task, marker, None)
1842 }
1843
1844 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1845 let mut needs_completion = false;
1846 {
1847 let Ok(mut state) = task.state.lock() else {
1848 return;
1849 };
1850 match &mut state.runtime {
1851 TaskRuntime::Piped(child_slot) => {
1852 if let Some(child) = child_slot.as_mut() {
1853 if matches!(child.try_wait(), Ok(Some(_))) {
1854 *child_slot = None;
1855 state.detached = true;
1856 state.child_exit_observed = true;
1857 }
1858 } else if state.detached {
1859 let child_known_dead = state.child_exit_observed
1860 || state
1861 .metadata
1862 .child_pid
1863 .is_some_and(|pid| !is_process_alive(pid));
1864 if child_known_dead {
1865 needs_completion =
1866 self.fail_without_exit_marker_if_needed(task, &mut state);
1867 }
1868 }
1869 }
1870 TaskRuntime::Pty(Some(pty)) => {
1871 if pty.exit_observed.load(Ordering::SeqCst) {
1872 drop(state);
1873 let _ = self.poll_task(task);
1874 return;
1875 }
1876 }
1877 TaskRuntime::Pty(None) => {}
1878 }
1879 }
1880 if needs_completion {
1881 let _ = self.post_terminal_transition(task, true);
1882 }
1883 }
1884
1885 fn fail_without_exit_marker_if_needed(
1886 &self,
1887 task: &Arc<BgTask>,
1888 state: &mut BgTaskState,
1889 ) -> bool {
1890 if state.metadata.status.is_terminal() {
1891 return false;
1892 }
1893 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1894 return false;
1895 }
1896 let watch_controlled = self.task_has_watch_control(&task.task_id);
1897 let updated = self.update_task_metadata(&task.paths, |metadata| {
1898 metadata.mark_terminal(
1899 BgTaskStatus::Failed,
1900 None,
1901 Some("process exited without exit marker".to_string()),
1902 );
1903 if watch_controlled {
1904 metadata.completion_delivered = true;
1905 }
1906 });
1907 if let Ok(metadata) = updated {
1908 state.pending_terminal_override = None;
1909 state.metadata = metadata;
1910 task.mark_terminal_now();
1911 return true;
1912 }
1913 false
1914 }
1915
1916 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1917 self.inner
1918 .tasks
1919 .lock()
1920 .map(|tasks| {
1921 tasks
1922 .values()
1923 .filter(|task| task.is_running())
1924 .cloned()
1925 .collect()
1926 })
1927 .unwrap_or_default()
1928 }
1929
1930 fn insert_rehydrated_task(
1931 &self,
1932 metadata: PersistedTask,
1933 paths: TaskPaths,
1934 detached: bool,
1935 ) -> Result<(), String> {
1936 let task_id = metadata.task_id.clone();
1937 let session_id = metadata.session_id.clone();
1938 let started = started_instant_from_unix_millis(metadata.started_at);
1939 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1940 let mode = metadata.mode.clone();
1941 let task = Arc::new(BgTask {
1942 task_id: task_id.clone(),
1943 session_id,
1944 paths: paths.clone(),
1945 started,
1946 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1947 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1948 state: Mutex::new(BgTaskState {
1949 metadata,
1950 runtime: if mode == BgMode::Pty {
1951 TaskRuntime::Pty(None)
1952 } else {
1953 TaskRuntime::Piped(None)
1954 },
1955 detached,
1956 child_exit_observed: false,
1963 buffer: if mode == BgMode::Pty {
1964 BgBuffer::pty(paths.pty.clone())
1965 } else {
1966 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1967 },
1968 terminal_output_cache: None,
1969 pending_terminal_override: None,
1970 }),
1971 });
1972 self.inner
1973 .tasks
1974 .lock()
1975 .map_err(|_| "background task registry lock poisoned".to_string())?
1976 .insert(task_id, task);
1977 Ok(())
1978 }
1979
1980 fn kill_with_status(
1981 &self,
1982 task_id: &str,
1983 session_id: &str,
1984 terminal_status: BgTaskStatus,
1985 ) -> Result<BgTaskSnapshot, String> {
1986 let task = self
1987 .task_for_session(task_id, session_id)
1988 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1989 let mut terminalized = false;
1990
1991 {
1992 let mut state = task
1993 .state
1994 .lock()
1995 .map_err(|_| "background task lock poisoned".to_string())?;
1996 if state.metadata.status.is_terminal() {
1997 state.pending_terminal_override = None;
1998 } else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1999 state.metadata =
2000 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
2001 if self.task_has_watch_control(&task.task_id) {
2002 state.metadata.completion_delivered = true;
2003 }
2004 state.pending_terminal_override = None;
2005 task.mark_terminal_now();
2006 match &mut state.runtime {
2007 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2014 TaskRuntime::Pty(runtime) => *runtime = None,
2015 }
2016 state.detached = true;
2017 self.persist_task(&task.paths, &state.metadata)
2018 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2019 terminalized = true;
2020 } else {
2021 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
2022 if !was_already_killing {
2023 state.metadata.status = BgTaskStatus::Killing;
2024 self.persist_task(&task.paths, &state.metadata)
2025 .map_err(|e| format!("failed to persist killing state: {e}"))?;
2026 }
2027
2028 #[cfg(unix)]
2029 let pgid = state.metadata.pgid;
2030 #[cfg(windows)]
2031 let child_pid = state.metadata.child_pid;
2032 if !was_already_killing
2033 && state.metadata.mode == BgMode::Pty
2034 && terminal_status == BgTaskStatus::TimedOut
2035 {
2036 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
2037 }
2038
2039 #[cfg(windows)]
2040 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
2041
2042 match &mut state.runtime {
2043 TaskRuntime::Piped(child_slot) => {
2044 #[cfg(unix)]
2045 if let Some(pgid) = pgid {
2046 terminate_pgid(pgid, child_slot.as_mut());
2047 }
2048 #[cfg(windows)]
2049 if let Some(child) = child_slot.as_mut() {
2050 super::process::terminate_process(child);
2051 } else if let Some(pid) = child_pid {
2052 terminate_pid(pid);
2053 }
2054 if let Some(child) = child_slot.as_mut() {
2055 let _ = child.wait();
2056 }
2057 *child_slot = None;
2058 state.detached = true;
2059
2060 if !task.paths.exit.exists() {
2061 write_kill_marker_if_absent(&task.paths.exit)
2062 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2063 }
2064
2065 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
2066 Some(124)
2067 } else {
2068 None
2069 };
2070 state
2071 .metadata
2072 .mark_terminal(terminal_status, exit_code, None);
2073 if self.task_has_watch_control(&task.task_id) {
2074 state.metadata.completion_delivered = true;
2075 }
2076 state.pending_terminal_override = None;
2077 task.mark_terminal_now();
2078 self.persist_task(&task.paths, &state.metadata)
2079 .map_err(|e| format!("failed to persist killed state: {e}"))?;
2080 terminalized = true;
2081 }
2082 TaskRuntime::Pty(Some(pty)) => {
2083 pty.was_killed.store(true, Ordering::SeqCst);
2084 if let Err(error) = pty.killer.kill() {
2085 crate::slog_warn!(
2086 "[pty-kill] {task_id} ChildKiller::kill failed: {error}"
2087 );
2088 }
2089 if let Some(pid) = pty.child_pid {
2090 #[cfg(unix)]
2091 terminate_pgid(pid as i32, None);
2092 #[cfg(windows)]
2093 terminate_pid(pid);
2094 }
2095 drop(pty.master.take());
2096
2097 #[cfg(windows)]
2098 {
2099 let default_status = if terminal_status == BgTaskStatus::TimedOut {
2100 BgTaskStatus::TimedOut
2101 } else {
2102 BgTaskStatus::Killed
2103 };
2104 pty_forced_terminal_status = Some(
2105 state
2106 .pending_terminal_override
2107 .take()
2108 .unwrap_or(default_status),
2109 );
2110 }
2111 }
2112 TaskRuntime::Pty(None) => {}
2113 }
2114
2115 #[cfg(windows)]
2116 if let Some(target_status) = pty_forced_terminal_status {
2117 if !task.paths.exit.exists() {
2118 write_kill_marker_if_absent(&task.paths.exit)
2119 .map_err(|e| format!("failed to write kill marker: {e}"))?;
2120 }
2121
2122 let exit_code = if target_status == BgTaskStatus::TimedOut {
2123 Some(124)
2124 } else {
2125 None
2126 };
2127 state.metadata.mark_terminal(target_status, exit_code, None);
2128 if self.task_has_watch_control(&task.task_id) {
2129 state.metadata.completion_delivered = true;
2130 }
2131 state.pending_terminal_override = None;
2132 task.mark_terminal_now();
2133 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
2134 *runtime = None;
2135 }
2136 state.detached = true;
2137 self.persist_task(&task.paths, &state.metadata)
2138 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
2139 terminalized = true;
2140 }
2141 }
2142 }
2143
2144 if terminalized {
2145 self.post_terminal_transition(&task, true)?;
2146 }
2147 Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
2148 }
2149
2150 fn finalize_from_marker(
2151 &self,
2152 task: &Arc<BgTask>,
2153 marker: ExitMarker,
2154 reason: Option<String>,
2155 ) -> Result<(), String> {
2156 let watch_controlled = self.task_has_watch_control(&task.task_id);
2157 let mut pty_reader_done = None;
2158 {
2159 let mut state = task
2160 .state
2161 .lock()
2162 .map_err(|_| "background task lock poisoned".to_string())?;
2163 if state.metadata.status.is_terminal() {
2164 state.pending_terminal_override = None;
2165 return Ok(());
2166 }
2167
2168 let pending_override = state.pending_terminal_override.take();
2169 let is_pty = state.metadata.mode == BgMode::Pty;
2170 let updated = self
2171 .update_task_metadata(&task.paths, |metadata| {
2172 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2173 let mut metadata = metadata.clone();
2174 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2175 let exit_code = if target_status == BgTaskStatus::TimedOut {
2176 Some(124)
2177 } else {
2178 None
2179 };
2180 metadata.mark_terminal(target_status, exit_code, reason);
2181 metadata
2182 } else {
2183 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2184 };
2185 if watch_controlled {
2186 new_metadata.completion_delivered = true;
2187 }
2188 *metadata = new_metadata;
2189 })
2190 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2191 state.metadata = updated;
2192 task.mark_terminal_now();
2193 match &mut state.runtime {
2194 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2199 TaskRuntime::Pty(runtime) => {
2200 pty_reader_done = runtime
2201 .as_ref()
2202 .map(|runtime| Arc::clone(&runtime.reader_done));
2203 *runtime = None;
2204 }
2205 }
2206 state.detached = true;
2207 }
2208
2209 if let Some(reader_done) = pty_reader_done {
2210 let deadline = Instant::now() + Duration::from_millis(200);
2211 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2212 std::thread::sleep(Duration::from_millis(10));
2213 }
2214 }
2215
2216 self.scan_task_watch_output(task);
2219
2220 self.post_terminal_transition(task, true)
2221 }
2222
2223 fn enqueue_completion_if_needed(
2224 &self,
2225 metadata: &PersistedTask,
2226 paths: Option<&TaskPaths>,
2227 emit_frame: bool,
2228 ) {
2229 if metadata.status.is_terminal() && !metadata.completion_delivered {
2230 let cache =
2231 paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
2232 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
2233 }
2234 }
2235
2236 fn render_terminal_output_from_paths(
2237 &self,
2238 metadata: &PersistedTask,
2239 paths: &TaskPaths,
2240 ) -> Option<TerminalOutputCache> {
2241 if metadata.mode == BgMode::Pty {
2242 return None;
2243 }
2244 let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
2245 Some(self.render_terminal_output(metadata, &buffer))
2246 }
2247
2248 fn enqueue_completion_from_parts(
2249 &self,
2250 metadata: &PersistedTask,
2251 buffer: Option<&BgBuffer>,
2252 paths: Option<&TaskPaths>,
2253 emit_frame: bool,
2254 terminal_render: Option<&TerminalOutputCache>,
2255 ) {
2256 if !metadata.status.is_terminal() {
2267 return;
2268 }
2269
2270 let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
2271 paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
2272 } else {
2273 None
2274 };
2275 let render_buffer = buffer.or(owned_buffer.as_ref());
2276 let owned_render = if terminal_render.is_none() {
2277 render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
2278 } else {
2279 None
2280 };
2281 let render = terminal_render.or(owned_render.as_ref());
2282
2283 let (output_preview, output_truncated) = render
2287 .map(|cache| completion_preview_for_cache(cache, metadata.exit_code))
2288 .unwrap_or_else(|| (String::new(), false));
2289
2290 let token_counts = self.completion_token_counts(
2291 metadata,
2292 buffer,
2293 paths,
2294 render.map(|render| render.output_preview.as_str()),
2295 );
2296 let completion = BgCompletion {
2297 task_id: metadata.task_id.clone(),
2298 session_id: metadata.session_id.clone(),
2299 status: metadata.status.clone(),
2300 exit_code: metadata.exit_code,
2301 command: metadata.command.clone(),
2302 output_preview,
2303 output_truncated,
2304 original_tokens: token_counts.original_tokens,
2305 compressed_tokens: token_counts.compressed_tokens,
2306 tokens_skipped: token_counts.tokens_skipped,
2307 };
2308
2309 self.record_compression_event_if_applicable(metadata, &token_counts);
2320
2321 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2322 if watch_controlled {
2323 if emit_frame && !watch_matched {
2324 self.emit_bash_watch_exit(&completion);
2325 }
2326 self.clear_task_watch_state(&metadata.task_id);
2327 return;
2328 }
2329
2330 if metadata.completion_delivered {
2340 return;
2341 }
2342
2343 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2346 if completions
2347 .iter()
2348 .any(|existing| existing.task_id == metadata.task_id)
2349 {
2350 false
2351 } else {
2352 completions.push_back(completion.clone());
2353 true
2354 }
2355 } else {
2356 false
2357 };
2358
2359 if pushed && emit_frame {
2360 self.emit_bash_completed(completion);
2361 }
2362 }
2363
2364 fn record_compression_event_if_applicable(
2365 &self,
2366 metadata: &PersistedTask,
2367 token_counts: &CompletionTokenCounts,
2368 ) {
2369 if metadata.mode == BgMode::Pty {
2370 return;
2371 }
2372
2373 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2374 token_counts.original_tokens,
2375 token_counts.compressed_tokens,
2376 token_counts.original_bytes,
2377 token_counts.compressed_bytes,
2378 ) {
2379 (
2380 Some(original_tokens),
2381 Some(compressed_tokens),
2382 Some(original_bytes),
2383 Some(compressed_bytes),
2384 ) => (
2385 original_tokens,
2386 compressed_tokens,
2387 original_bytes,
2388 compressed_bytes,
2389 ),
2390 _ => {
2391 crate::slog_warn!(
2392 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2393 metadata.task_id
2394 );
2395 return;
2396 }
2397 };
2398
2399 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2400 let Some(pool) = pool else {
2401 crate::slog_warn!(
2402 "compression event skipped for {}: db_pool not initialized — was configure run?",
2403 metadata.task_id
2404 );
2405 return;
2406 };
2407 let harness = self
2408 .inner
2409 .db_harness
2410 .read()
2411 .ok()
2412 .and_then(|slot| slot.clone());
2413 let Some(harness) = harness else {
2414 crate::slog_warn!(
2415 "compression event insert skipped for {}: harness not configured",
2416 metadata.task_id
2417 );
2418 return;
2419 };
2420
2421 let project_root = metadata
2422 .project_root
2423 .as_deref()
2424 .unwrap_or(&metadata.workdir);
2425 let project_key = crate::search_index::project_cache_key(project_root);
2426 let row = crate::db::compression_events::CompressionEventRow {
2427 harness: &harness,
2428 session_id: Some(&metadata.session_id),
2429 project_key: &project_key,
2430 tool: "bash",
2431 task_id: Some(&metadata.task_id),
2432 command: Some(&metadata.command),
2433 compressor: if metadata.compressed {
2434 "registry"
2435 } else {
2436 "none"
2437 },
2438 original_bytes,
2439 compressed_bytes,
2440 original_tokens,
2441 compressed_tokens,
2442 created_at: unix_millis() as i64,
2443 };
2444
2445 let conn = match pool.lock() {
2446 Ok(conn) => conn,
2447 Err(_) => {
2448 crate::slog_warn!(
2449 "compression event insert failed for {}: db mutex poisoned",
2450 metadata.task_id
2451 );
2452 return;
2453 }
2454 };
2455 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2456 Ok(_) => {
2457 crate::slog_debug!(
2461 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2462 metadata.task_id,
2463 project_key,
2464 metadata.session_id,
2465 original_tokens,
2466 compressed_tokens
2467 );
2468 }
2469 Err(error) => {
2470 crate::slog_warn!(
2471 "compression event insert failed for {}: {}",
2472 metadata.task_id,
2473 error
2474 );
2475 }
2476 }
2477 }
2478
2479 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2480 let Ok(progress_sender) = self
2481 .inner
2482 .progress_sender
2483 .lock()
2484 .map(|sender| sender.clone())
2485 else {
2486 return;
2487 };
2488 if let Some(sender) = progress_sender.as_ref() {
2489 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2490 pattern_match.task_id,
2491 session_id.to_string(),
2492 pattern_match.watch_id,
2493 pattern_match.match_text,
2494 pattern_match.match_offset,
2495 pattern_match.context,
2496 pattern_match.once,
2497 )));
2498 }
2499 }
2500
2501 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2502 let Ok(progress_sender) = self
2503 .inner
2504 .progress_sender
2505 .lock()
2506 .map(|sender| sender.clone())
2507 else {
2508 return;
2509 };
2510 let Some(sender) = progress_sender.as_ref() else {
2511 return;
2512 };
2513 let status = completion_status_text(&completion.status, completion.exit_code);
2514 let preview = completion.output_preview.trim_end();
2515 let context = if preview.is_empty() {
2516 format!("task {} exited ({status})", completion.task_id)
2517 } else {
2518 format!(
2519 "task {} exited ({status})
2520{preview}",
2521 completion.task_id
2522 )
2523 };
2524 sender(PushFrame::BashPatternMatch(
2525 BashPatternMatchFrame::task_exit(
2526 completion.task_id.clone(),
2527 completion.session_id.clone(),
2528 format!("exited ({status})"),
2529 context,
2530 ),
2531 ));
2532 }
2533
2534 fn emit_bash_completed(&self, completion: BgCompletion) {
2535 let Ok(progress_sender) = self
2536 .inner
2537 .progress_sender
2538 .lock()
2539 .map(|sender| sender.clone())
2540 else {
2541 return;
2542 };
2543 let Some(sender) = progress_sender.as_ref() else {
2544 return;
2545 };
2546 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2554 completion.task_id,
2555 completion.session_id,
2556 completion.status,
2557 completion.exit_code,
2558 completion.command,
2559 completion.output_preview,
2560 completion.output_truncated,
2561 completion.original_tokens,
2562 completion.compressed_tokens,
2563 completion.tokens_skipped,
2564 )));
2565 }
2566
2567 fn completion_token_counts(
2568 &self,
2569 metadata: &PersistedTask,
2570 buffer: Option<&BgBuffer>,
2571 paths: Option<&TaskPaths>,
2572 rendered_output: Option<&str>,
2573 ) -> CompletionTokenCounts {
2574 if metadata.mode == BgMode::Pty {
2575 return CompletionTokenCounts::skipped();
2576 }
2577
2578 let raw = match buffer {
2579 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2580 None => paths
2581 .map(|paths| {
2582 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2583 })
2584 .unwrap_or(TokenCountInput::Skipped),
2585 };
2586
2587 let TokenCountInput::Text(raw_output) = raw else {
2588 return CompletionTokenCounts::skipped();
2589 };
2590
2591 let original_tokens = token_count_u32(&raw_output);
2592 let original_bytes = raw_output.len() as i64;
2593 let compressed_output = rendered_output.unwrap_or(&raw_output);
2594 let compressed_tokens = token_count_u32(compressed_output);
2595 let compressed_bytes = compressed_output.len() as i64;
2596 CompletionTokenCounts {
2597 original_tokens: Some(original_tokens),
2598 compressed_tokens: Some(compressed_tokens),
2599 original_bytes: Some(original_bytes),
2600 compressed_bytes: Some(compressed_bytes),
2601 tokens_skipped: false,
2602 }
2603 }
2604
2605 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2606 if !self
2607 .inner
2608 .long_running_reminder_enabled
2609 .load(Ordering::SeqCst)
2610 {
2611 return;
2612 }
2613 let interval_ms = self
2614 .inner
2615 .long_running_reminder_interval_ms
2616 .load(Ordering::SeqCst);
2617 if interval_ms == 0 {
2618 return;
2619 }
2620 let interval = Duration::from_millis(interval_ms);
2621 let now = Instant::now();
2622 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2623 return;
2624 };
2625 let since = last_reminder_at.unwrap_or(task.started);
2626 if now.duration_since(since) < interval {
2627 return;
2628 }
2629 let command = task
2630 .state
2631 .lock()
2632 .map(|state| state.metadata.command.clone())
2633 .unwrap_or_default();
2634 *last_reminder_at = Some(now);
2635 self.emit_bash_long_running(BashLongRunningFrame::new(
2636 task.task_id.clone(),
2637 task.session_id.clone(),
2638 command,
2639 task.started.elapsed().as_millis() as u64,
2640 ));
2641 }
2642
2643 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2644 let Ok(progress_sender) = self
2645 .inner
2646 .progress_sender
2647 .lock()
2648 .map(|sender| sender.clone())
2649 else {
2650 return;
2651 };
2652 if let Some(sender) = progress_sender.as_ref() {
2653 sender(PushFrame::BashLongRunning(frame));
2654 }
2655 }
2656
2657 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2658 self.inner
2659 .tasks
2660 .lock()
2661 .ok()
2662 .and_then(|tasks| tasks.get(task_id).cloned())
2663 }
2664
2665 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2666 self.task(task_id)
2667 .filter(|task| task.session_id == session_id)
2668 }
2669
2670 fn running_count(&self) -> usize {
2671 self.inner
2672 .tasks
2673 .lock()
2674 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2675 .unwrap_or(0)
2676 }
2677
2678 fn start_watchdog(&self) {
2679 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2680 super::watchdog::start(self.clone());
2681 }
2682 }
2683
2684 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2685 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2686 }
2687
2688 #[cfg(test)]
2689 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2690 self.task_for_session(task_id, session_id)
2691 .map(|task| task.paths.json.clone())
2692 }
2693
2694 #[cfg(test)]
2695 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2696 self.task_for_session(task_id, session_id)
2697 .map(|task| task.paths.exit.clone())
2698 }
2699
2700 fn generate_unique_task_id(&self) -> Result<String, String> {
2702 for _ in 0..32 {
2703 let candidate = random_slug();
2704 let tasks = self
2705 .inner
2706 .tasks
2707 .lock()
2708 .map_err(|_| "background task registry lock poisoned".to_string())?;
2709 if tasks.contains_key(&candidate) {
2710 continue;
2711 }
2712 let completions = self
2713 .inner
2714 .completions
2715 .lock()
2716 .map_err(|_| "background completions lock poisoned".to_string())?;
2717 if completions
2718 .iter()
2719 .any(|completion| completion.task_id == candidate)
2720 {
2721 continue;
2722 }
2723 return Ok(candidate);
2724 }
2725 Err("failed to allocate unique background task id after 32 attempts".to_string())
2726 }
2727}
2728
2729fn render_compressed_with_recovery(
2730 buffer: &BgBuffer,
2731 mut compressed: CompressionResult,
2732 input_truncated: bool,
2733) -> TerminalOutputCache {
2734 let had_trailing_newline = compressed.text.ends_with('\n');
2742 let mut text = strip_plain_truncation_marker_lines(&compressed.text)
2743 .trim_end()
2744 .to_string();
2745 if had_trailing_newline && !text.is_empty() {
2746 text.push('\n');
2747 }
2748 compressed.text = text;
2749
2750 let output_path = buffer.output_path().map(|path| path.display().to_string());
2751 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
2752 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
2753 let mut recovery = RecoveryContext {
2754 dropped_by_class: compressed.dropped_by_class,
2755 had_inner_drop: compressed.had_inner_drop,
2756 offset_hint_eligible: compressed.offset_hint_eligible,
2757 offset_start_line: compressed.offset_start_line,
2758 byte_truncated: input_truncated,
2759 output_path: output_path.clone(),
2760 stderr_path: stderr_path.clone(),
2761 include_stderr_path,
2762 };
2763
2764 let (output_preview, output_truncated) =
2765 render_body_with_recovery_marker(&compressed.text, &mut recovery);
2766 TerminalOutputCache {
2767 output_preview,
2768 output_truncated,
2769 kind: TerminalOutputKind::Compressed,
2770 output_path,
2771 stderr_path,
2772 recovery: Some(recovery),
2773 }
2774}
2775
2776fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
2777 render_body_with_recovery_marker_at_cap(
2778 body,
2779 recovery,
2780 FINAL_OUTPUT_CAP_BYTES,
2781 cap_final_output,
2782 cap_final_output_with_marker,
2783 )
2784}
2785
2786fn render_raw_body_with_recovery_marker(
2787 body: &str,
2788 recovery: &mut RecoveryContext,
2789) -> (String, bool) {
2790 render_body_with_recovery_marker_at_cap(
2791 body,
2792 recovery,
2793 RAW_PASSTHROUGH_CAP_BYTES,
2794 |input| {
2795 super::output::cap_head_tail(
2796 input,
2797 RAW_PASSTHROUGH_CAP_BYTES,
2798 RAW_PASSTHROUGH_HEAD_BYTES,
2799 RAW_PASSTHROUGH_TAIL_BYTES,
2800 )
2801 },
2802 |input, marker| {
2803 super::output::cap_head_tail_with_marker(
2804 input,
2805 RAW_PASSTHROUGH_CAP_BYTES,
2806 RAW_PASSTHROUGH_HEAD_BYTES,
2807 RAW_PASSTHROUGH_TAIL_BYTES,
2808 marker,
2809 )
2810 },
2811 )
2812}
2813
2814fn render_body_with_recovery_marker_at_cap<F, G>(
2815 body: &str,
2816 recovery: &mut RecoveryContext,
2817 cap_bytes: usize,
2818 cap_plain: F,
2819 cap_with_marker: G,
2820) -> (String, bool)
2821where
2822 F: Fn(&str) -> super::output::CappedText,
2823 G: Fn(&str, &str) -> super::output::CappedText,
2824{
2825 let needs_marker = recovery.has_visible_drop();
2826 if body.len() > cap_bytes {
2827 recovery.byte_truncated = true;
2828 if let Some(marker) = recovery_marker(recovery) {
2829 let capped = cap_with_marker(body, &marker);
2830 return (capped.text, true);
2831 }
2832 let capped = cap_plain(body);
2833 return (capped.text, capped.truncated || needs_marker);
2834 }
2835
2836 if !needs_marker {
2837 return (body.to_string(), false);
2838 }
2839
2840 let Some(marker) = recovery_marker(recovery) else {
2841 return (body.to_string(), true);
2842 };
2843 let with_marker = append_recovery_marker(body, &marker);
2844 if with_marker.len() <= cap_bytes {
2845 return (with_marker, true);
2846 }
2847
2848 recovery.byte_truncated = true;
2849 let marker = recovery_marker(recovery).unwrap_or(marker);
2850 let capped = cap_with_marker(body, &marker);
2851 (capped.text, true)
2852}
2853
2854fn append_recovery_marker(body: &str, marker: &str) -> String {
2855 if body.is_empty() {
2856 return marker.to_string();
2857 }
2858 let mut output = body.trim_end().to_string();
2859 output.push('\n');
2860 output.push_str(marker);
2861 output
2862}
2863
2864fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
2865 let mut parts = Vec::new();
2866 for (class, count) in &recovery.dropped_by_class {
2867 let label = if *count == 1 {
2868 class.singular()
2869 } else {
2870 class.plural()
2871 };
2872 parts.push(format!("+{count} more {label}"));
2873 }
2874 if recovery.byte_truncated {
2875 parts.push("truncated output".to_string());
2876 } else if recovery.had_inner_drop && parts.is_empty() {
2877 parts.push("omitted output".to_string());
2878 }
2879
2880 if parts.is_empty() {
2881 return None;
2882 }
2883
2884 let hint = recovery_hint(recovery);
2885 Some(format!("[{}; {hint}]", parts.join(", ")))
2886}
2887
2888fn recovery_hint(recovery: &RecoveryContext) -> String {
2889 if recovery.offset_hint_eligible
2893 && !recovery.byte_truncated
2894 && recovery.dropped_by_class.is_empty()
2895 && !recovery.include_stderr_path
2896 {
2897 if let (Some(path), Some(line)) =
2898 (recovery.output_path.as_deref(), recovery.offset_start_line)
2899 {
2900 return format!("see remaining: tail -n +{line} {}", quote_path(path));
2901 }
2902 }
2903
2904 let mut paths = Vec::new();
2905 if let Some(path) = recovery.output_path.as_deref() {
2906 paths.push(path);
2907 }
2908 if recovery.include_stderr_path {
2909 if let Some(path) = recovery.stderr_path.as_deref() {
2910 if !paths.contains(&path) {
2911 paths.push(path);
2912 }
2913 }
2914 }
2915
2916 if paths.is_empty() {
2917 return "full output unavailable".to_string();
2918 }
2919
2920 let reads = paths
2921 .into_iter()
2922 .map(|path| format!("read {}", quote_path(path)))
2923 .collect::<Vec<_>>()
2924 .join(" and ");
2925 format!("full output: {reads}")
2926}
2927
2928fn strip_plain_truncation_marker_lines(input: &str) -> String {
2929 input
2930 .lines()
2931 .filter(|line| !is_plain_truncation_marker(line.trim()))
2932 .collect::<Vec<_>>()
2933 .join("\n")
2934}
2935
2936fn strip_recovery_marker_lines(input: &str) -> String {
2937 input
2938 .lines()
2939 .filter(|line| !is_recovery_marker(line.trim()))
2940 .collect::<Vec<_>>()
2941 .join("\n")
2942}
2943
2944fn is_plain_truncation_marker(line: &str) -> bool {
2945 let Some(rest) = line.strip_prefix("...<truncated ") else {
2946 return false;
2947 };
2948 let Some(bytes) = rest.strip_suffix(" bytes>...") else {
2949 return false;
2950 };
2951 !bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
2952}
2953
2954fn is_recovery_marker(line: &str) -> bool {
2955 line.starts_with('[')
2956 && line.ends_with(']')
2957 && (line.contains("full output: read ")
2958 || line.contains("see remaining: tail -n +")
2959 || line.contains("full output unavailable"))
2960}
2961
2962fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
2963 if !is_gh_structured_command(command) {
2964 return None;
2965 }
2966
2967 let output_path = buffer
2968 .output_path()
2969 .map(|path| path.display().to_string())?;
2970 let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
2971 if stdout_bytes == 0 {
2972 return None;
2973 }
2974
2975 if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
2976 if !stream_starts_like_json(buffer, StreamKind::Stdout) {
2977 return None;
2978 }
2979 return Some(TerminalOutputCache {
2980 output_preview: json_output_pointer(stdout_bytes, &output_path),
2981 output_truncated: true,
2982 kind: TerminalOutputKind::Structured,
2983 output_path: Some(output_path),
2984 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
2985 recovery: None,
2986 });
2987 }
2988
2989 let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
2990 if stdout.truncated || !is_structured_body(&stdout.text) {
2991 return None;
2992 }
2993
2994 Some(TerminalOutputCache {
2995 output_preview: stdout.text,
2996 output_truncated: false,
2997 kind: TerminalOutputKind::Structured,
2998 output_path: Some(output_path),
2999 stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
3000 recovery: None,
3001 })
3002}
3003
3004fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
3005 let raw = buffer.read_combined_head_tail(
3006 RAW_PASSTHROUGH_CAP_BYTES,
3007 RAW_PASSTHROUGH_HEAD_BYTES,
3008 RAW_PASSTHROUGH_TAIL_BYTES,
3009 );
3010 let output_path = buffer.output_path().map(|path| path.display().to_string());
3011 let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
3012 if !raw.truncated {
3013 return TerminalOutputCache {
3014 output_preview: raw.text,
3015 output_truncated: false,
3016 kind: TerminalOutputKind::Raw,
3017 output_path,
3018 stderr_path,
3019 recovery: None,
3020 };
3021 }
3022
3023 let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
3024 let mut recovery = RecoveryContext {
3025 dropped_by_class: BTreeMap::new(),
3026 had_inner_drop: false,
3027 offset_hint_eligible: false,
3028 offset_start_line: None,
3029 byte_truncated: true,
3030 output_path: output_path.clone(),
3031 stderr_path: stderr_path.clone(),
3032 include_stderr_path,
3033 };
3034 let (output_preview, output_truncated) =
3035 render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
3036 TerminalOutputCache {
3037 output_preview,
3038 output_truncated,
3039 kind: TerminalOutputKind::Raw,
3040 output_path,
3041 stderr_path,
3042 recovery: Some(recovery),
3043 }
3044}
3045
3046fn completion_preview_for_cache(
3047 cache: &TerminalOutputCache,
3048 exit_code: Option<i32>,
3049) -> (String, bool) {
3050 let exit_ok = exit_code == Some(0);
3053 let threshold = completion_preview_threshold(exit_ok);
3054 if cache.kind == TerminalOutputKind::Structured && cache.output_preview.len() > threshold {
3055 if let Some(path) = cache.output_path.as_deref() {
3056 return (
3057 json_output_pointer(cache.output_preview.len() as u64, path),
3058 true,
3059 );
3060 }
3061 return (cache.output_preview.clone(), cache.output_truncated);
3062 }
3063
3064 if let Some(recovery) = cache.recovery.as_ref() {
3065 if cache.output_preview.len() <= threshold {
3066 return (cache.output_preview.clone(), cache.output_truncated);
3067 }
3068 let body = strip_recovery_marker_lines(&cache.output_preview);
3069 let mut completion_recovery = recovery.clone();
3070 completion_recovery.byte_truncated = true;
3071 if let Some(marker) = recovery_marker(&completion_recovery) {
3072 let capped = cap_completion_output_with_marker(&body, &marker, exit_ok);
3073 return (capped.text, true);
3074 }
3075 }
3076
3077 let capped = cap_completion_output(&cache.output_preview, exit_ok);
3078 (capped.text, cache.output_truncated || capped.truncated)
3079}
3080
3081fn is_gh_structured_command(command: &str) -> bool {
3082 let normalized = crate::compress::normalize_command_for_dispatch(command)
3083 .unwrap_or_else(|| command.trim_start().to_string());
3084 let tokens = shell_words_for_flags(&normalized);
3085 let Some(head) = tokens.first() else {
3086 return false;
3087 };
3088 let head_name = Path::new(head)
3089 .file_name()
3090 .and_then(|name| name.to_str())
3091 .unwrap_or(head);
3092 if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
3093 return false;
3094 }
3095 tokens.iter().any(|token| {
3096 matches!(token.as_str(), "--json" | "--jq" | "--template")
3097 || token.starts_with("--json=")
3098 || token.starts_with("--jq=")
3099 || token.starts_with("--template=")
3100 })
3101}
3102
3103fn shell_words_for_flags(command: &str) -> Vec<String> {
3104 let mut words = Vec::new();
3105 let mut current = String::new();
3106 let mut in_single = false;
3107 let mut in_double = false;
3108 let mut escaped = false;
3109
3110 for ch in command.chars() {
3111 if escaped {
3112 current.push(ch);
3113 escaped = false;
3114 continue;
3115 }
3116 if ch == '\\' && !in_single {
3117 escaped = true;
3118 continue;
3119 }
3120 if ch == '\'' && !in_double {
3121 in_single = !in_single;
3122 continue;
3123 }
3124 if ch == '"' && !in_single {
3125 in_double = !in_double;
3126 continue;
3127 }
3128 if ch.is_whitespace() && !in_single && !in_double {
3129 if !current.is_empty() {
3130 words.push(std::mem::take(&mut current));
3131 }
3132 continue;
3133 }
3134 if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
3135 if !current.is_empty() {
3136 words.push(std::mem::take(&mut current));
3137 }
3138 continue;
3139 }
3140 current.push(ch);
3141 }
3142 if !current.is_empty() {
3143 words.push(current);
3144 }
3145 words
3146}
3147
3148fn is_structured_body(body: &str) -> bool {
3149 let trimmed = body.trim();
3150 if trimmed.is_empty() {
3151 return false;
3152 }
3153 if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
3154 return true;
3155 }
3156
3157 let mut saw_line = false;
3158 for line in trimmed
3159 .lines()
3160 .map(str::trim)
3161 .filter(|line| !line.is_empty())
3162 {
3163 saw_line = true;
3164 if serde_json::from_str::<serde_json::Value>(line).is_err() {
3165 return false;
3166 }
3167 }
3168 saw_line
3169}
3170
3171fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
3172 let path = match (buffer, stream) {
3173 (BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
3174 (BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
3175 (BgBuffer::Pty { combined_path }, _) => Some(combined_path),
3176 };
3177 let Some(path) = path else {
3178 return false;
3179 };
3180 let Ok(file) = std::fs::File::open(path) else {
3181 return false;
3182 };
3183 let mut limited = file.take(512);
3184 let mut bytes = Vec::new();
3185 if limited.read_to_end(&mut bytes).is_err() {
3186 return false;
3187 }
3188 String::from_utf8_lossy(&bytes)
3189 .chars()
3190 .find(|ch| !ch.is_whitespace())
3191 .is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
3192}
3193
3194struct CompletionTokenCounts {
3195 original_tokens: Option<u32>,
3196 compressed_tokens: Option<u32>,
3197 original_bytes: Option<i64>,
3198 compressed_bytes: Option<i64>,
3199 tokens_skipped: bool,
3200}
3201
3202impl CompletionTokenCounts {
3203 fn skipped() -> Self {
3204 Self {
3205 original_tokens: None,
3206 compressed_tokens: None,
3207 original_bytes: None,
3208 compressed_bytes: None,
3209 tokens_skipped: true,
3210 }
3211 }
3212}
3213
3214fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
3215 match status {
3216 BgTaskStatus::TimedOut => "timed out".to_string(),
3217 BgTaskStatus::Killed => "killed".to_string(),
3218 _ => exit_code
3219 .map(|code| format!("exit {code}"))
3220 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
3221 }
3222}
3223
3224fn token_count_u32(text: &str) -> u32 {
3225 aft_tokenizer::count_tokens(text)
3226 .try_into()
3227 .unwrap_or(u32::MAX)
3228}
3229
3230impl Default for BgTaskRegistry {
3231 fn default() -> Self {
3232 Self::new(Arc::new(Mutex::new(None)))
3233 }
3234}
3235
3236fn modified_within(path: &Path, grace: Duration) -> bool {
3237 fs::metadata(path)
3238 .and_then(|metadata| metadata.modified())
3239 .ok()
3240 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
3241 .map(|age| age < grace)
3242 .unwrap_or(false)
3243}
3244
3245fn canonicalized_path(path: &Path) -> PathBuf {
3246 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
3247}
3248
3249fn started_instant_from_unix_millis(started_at: u64) -> Instant {
3250 let now_ms = SystemTime::now()
3251 .duration_since(UNIX_EPOCH)
3252 .ok()
3253 .map(|duration| duration.as_millis() as u64)
3254 .unwrap_or(started_at);
3255 let elapsed_ms = now_ms.saturating_sub(started_at);
3256 Instant::now()
3257 .checked_sub(Duration::from_millis(elapsed_ms))
3258 .unwrap_or_else(Instant::now)
3259}
3260
3261fn gc_quarantine(storage_dir: &Path) {
3262 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
3263 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
3264 return;
3265 };
3266 for session_entry in session_dirs.flatten() {
3267 let session_quarantine_dir = session_entry.path();
3268 if !session_quarantine_dir.is_dir() {
3269 continue;
3270 }
3271 let entries = match fs::read_dir(&session_quarantine_dir) {
3272 Ok(entries) => entries,
3273 Err(error) => {
3274 crate::slog_warn!(
3275 "failed to read background task quarantine dir {}: {error}",
3276 session_quarantine_dir.display()
3277 );
3278 continue;
3279 }
3280 };
3281 for entry in entries.flatten() {
3282 let path = entry.path();
3283 if modified_within(&path, QUARANTINE_GC_GRACE) {
3284 continue;
3285 }
3286 let result = if path.is_dir() {
3287 fs::remove_dir_all(&path)
3288 } else {
3289 fs::remove_file(&path)
3290 };
3291 match result {
3292 Ok(()) => log::debug!(
3293 "deleted old background task quarantine entry {}",
3294 path.display()
3295 ),
3296 Err(error) => crate::slog_warn!(
3297 "failed to delete old background task quarantine entry {}: {error}",
3298 path.display()
3299 ),
3300 }
3301 }
3302 let _ = fs::remove_dir(&session_quarantine_dir);
3303 }
3304 let _ = fs::remove_dir(&quarantine_root);
3305}
3306
3307enum QuarantineKind {
3308 Corrupt,
3309 Invalid,
3310}
3311
3312fn quarantine_task_json(
3313 storage_dir: &Path,
3314 session_dir: &Path,
3315 json_path: &Path,
3316 kind: QuarantineKind,
3317) -> Result<(), String> {
3318 let session_hash = session_dir
3319 .file_name()
3320 .and_then(|name| name.to_str())
3321 .ok_or_else(|| {
3322 format!(
3323 "invalid background task session dir: {}",
3324 session_dir.display()
3325 )
3326 })?;
3327 let task_name = json_path
3328 .file_name()
3329 .and_then(|name| name.to_str())
3330 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
3331 let unix_ts = SystemTime::now()
3332 .duration_since(UNIX_EPOCH)
3333 .map(|duration| duration.as_secs())
3334 .unwrap_or(0);
3335 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
3336 fs::create_dir_all(&quarantine_dir).map_err(|e| {
3337 format!(
3338 "failed to create background task quarantine dir {}: {e}",
3339 quarantine_dir.display()
3340 )
3341 })?;
3342 let target_name = quarantine_name(task_name, unix_ts, &kind);
3343 let target = quarantine_dir.join(target_name);
3344 fs::rename(json_path, &target).map_err(|e| {
3345 format!(
3346 "failed to quarantine background task metadata {} to {}: {e}",
3347 json_path.display(),
3348 target.display()
3349 )
3350 })?;
3351
3352 for sibling in task_sibling_paths(json_path) {
3353 if !sibling.exists() {
3354 continue;
3355 }
3356 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
3357 crate::slog_warn!(
3358 "skipping background task sibling with invalid name during quarantine: {}",
3359 sibling.display()
3360 );
3361 continue;
3362 };
3363 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
3364 if let Err(error) = fs::rename(&sibling, &sibling_target) {
3365 crate::slog_warn!(
3366 "failed to quarantine background task sibling {} to {}: {error}",
3367 sibling.display(),
3368 sibling_target.display()
3369 );
3370 }
3371 }
3372
3373 let _ = fs::remove_dir(session_dir);
3374 Ok(())
3375}
3376
3377fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
3378 match kind {
3379 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
3380 QuarantineKind::Invalid => {
3381 let path = Path::new(file_name);
3382 let stem = path.file_stem().and_then(|stem| stem.to_str());
3383 let extension = path.extension().and_then(|extension| extension.to_str());
3384 match (stem, extension) {
3385 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
3386 _ => format!("{file_name}.invalid.{unix_ts}"),
3387 }
3388 }
3389 }
3390}
3391
3392fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
3393 let Some(parent) = json_path.parent() else {
3394 return Vec::new();
3395 };
3396 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
3397 return Vec::new();
3398 };
3399 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
3400 .into_iter()
3401 .map(|extension| parent.join(format!("{stem}.{extension}")))
3402 .collect()
3403}
3404
3405fn read_for_token_count_from_disk(
3406 metadata: &PersistedTask,
3407 paths: &TaskPaths,
3408 max_bytes_per_stream: usize,
3409) -> TokenCountInput {
3410 if metadata.mode == BgMode::Pty {
3411 return TokenCountInput::Skipped;
3412 }
3413 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
3420 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
3421 match (stdout, stderr) {
3422 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3423 String::from_utf8_lossy(&stdout).as_ref(),
3424 String::from_utf8_lossy(&stderr).as_ref(),
3425 )),
3426 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
3427 String::from_utf8_lossy(&stdout).as_ref(),
3428 "",
3429 )),
3430 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
3431 "",
3432 String::from_utf8_lossy(&stderr).as_ref(),
3433 )),
3434 (Err(_), Err(_)) => TokenCountInput::Skipped,
3435 }
3436}
3437
3438fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
3443 use std::io::{Read, Seek, SeekFrom};
3444 let mut file = std::fs::File::open(path)?;
3445 let len = file.metadata()?.len();
3446 let read_len = len.min(max_bytes as u64);
3447 if read_len > 0 && len > max_bytes as u64 {
3448 file.seek(SeekFrom::End(-(read_len as i64)))?;
3449 }
3450 let mut bytes = Vec::with_capacity(read_len as usize);
3451 file.read_to_end(&mut bytes)?;
3452 Ok(bytes)
3453}
3454
3455impl BgTask {
3456 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
3457 let state = self
3458 .state
3459 .lock()
3460 .unwrap_or_else(|poison| poison.into_inner());
3461 self.snapshot_locked(&state, preview_bytes)
3462 }
3463
3464 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
3465 let metadata = &state.metadata;
3466 let duration_ms = metadata.duration_ms.or_else(|| {
3467 metadata
3468 .status
3469 .is_terminal()
3470 .then(|| self.started.elapsed().as_millis() as u64)
3471 });
3472 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
3473 (String::new(), false)
3474 } else if metadata.status.is_terminal() {
3475 state
3476 .terminal_output_cache
3477 .as_ref()
3478 .map(|cache| (cache.output_preview.clone(), cache.output_truncated))
3479 .unwrap_or_else(|| (String::new(), false))
3480 } else {
3481 state.buffer.read_tail(preview_bytes)
3482 };
3483 BgTaskSnapshot {
3484 info: BgTaskInfo {
3485 task_id: self.task_id.clone(),
3486 status: metadata.status.clone(),
3487 command: metadata.command.clone(),
3488 mode: metadata.mode.clone(),
3489 started_at: metadata.started_at,
3490 duration_ms,
3491 },
3492 exit_code: metadata.exit_code,
3493 child_pid: metadata.child_pid,
3494 workdir: metadata.workdir.display().to_string(),
3495 output_preview,
3496 output_truncated,
3497 output_path: state
3498 .buffer
3499 .output_path()
3500 .map(|path| path.display().to_string()),
3501 stderr_path: state
3502 .buffer
3503 .stderr_path()
3504 .map(|path| path.display().to_string()),
3505 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
3506 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
3507 }
3508 }
3509
3510 pub(crate) fn is_running(&self) -> bool {
3511 self.state
3512 .lock()
3513 .map(|state| {
3514 state.metadata.status == BgTaskStatus::Running
3515 || (state.metadata.mode == BgMode::Pty
3516 && state.metadata.status == BgTaskStatus::Killing)
3517 })
3518 .unwrap_or(false)
3519 }
3520
3521 fn is_terminal(&self) -> bool {
3522 self.state
3523 .lock()
3524 .map(|state| state.metadata.status.is_terminal())
3525 .unwrap_or(false)
3526 }
3527
3528 fn mark_terminal_now(&self) {
3529 if let Ok(mut terminal_at) = self.terminal_at.lock() {
3530 if terminal_at.is_none() {
3531 *terminal_at = Some(Instant::now());
3532 }
3533 }
3534 }
3535
3536 fn set_completion_delivered(
3537 &self,
3538 delivered: bool,
3539 registry: &BgTaskRegistry,
3540 ) -> Result<(), String> {
3541 let mut state = self
3542 .state
3543 .lock()
3544 .map_err(|_| "background task lock poisoned".to_string())?;
3545 let updated = registry
3546 .update_task_metadata(&self.paths, |metadata| {
3547 metadata.completion_delivered = delivered;
3548 })
3549 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
3550 state.metadata = updated;
3551 Ok(())
3552 }
3553}
3554
3555#[cfg(unix)]
3576fn reap_piped_child(child_slot: &mut Option<Child>) {
3577 if let Some(mut child) = child_slot.take() {
3578 if matches!(child.try_wait(), Ok(None)) {
3579 let _ = child.wait();
3580 }
3581 }
3582}
3583
3584#[cfg(windows)]
3589fn reap_piped_child(child_slot: &mut Option<Child>) {
3590 *child_slot = None;
3591}
3592
3593fn terminal_metadata_from_marker(
3594 mut metadata: PersistedTask,
3595 marker: ExitMarker,
3596 reason: Option<String>,
3597) -> PersistedTask {
3598 match marker {
3599 ExitMarker::Code(code) => {
3600 let status = if code == 0 {
3601 BgTaskStatus::Completed
3602 } else {
3603 BgTaskStatus::Failed
3604 };
3605 metadata.mark_terminal(status, Some(code), reason);
3606 }
3607 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3608 }
3609 metadata
3610}
3611
3612#[cfg(unix)]
3613fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
3614 let shell = resolve_posix_shell();
3615 let mut cmd = Command::new(&shell);
3616 cmd.arg("-c")
3617 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
3618 .arg(&shell)
3619 .arg(command)
3620 .arg(exit_path);
3621 unsafe {
3622 cmd.pre_exec(|| {
3623 if libc::setsid() == -1 {
3624 return Err(std::io::Error::last_os_error());
3625 }
3626 Ok(())
3627 });
3628 }
3629 cmd
3630}
3631
3632#[cfg(unix)]
3633fn resolve_posix_shell() -> PathBuf {
3634 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3635 POSIX_SHELL
3636 .get_or_init(|| {
3637 std::env::var_os("BASH")
3638 .filter(|value| !value.is_empty())
3639 .map(PathBuf::from)
3640 .filter(|path| path.exists())
3641 .or_else(|| which::which("bash").ok())
3642 .or_else(|| which::which("zsh").ok())
3643 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3644 })
3645 .clone()
3646}
3647
3648#[cfg(windows)]
3649fn detached_shell_command_for(
3650 shell: crate::windows_shell::WindowsShell,
3651 command: &str,
3652 exit_path: &Path,
3653 paths: &TaskPaths,
3654 creation_flags: u32,
3655) -> Result<Command, String> {
3656 use crate::windows_shell::WindowsShell;
3657 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3670 let wrapper_ext = match shell {
3671 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3672 WindowsShell::Cmd => "bat",
3673 WindowsShell::Posix(_) => "sh",
3677 };
3678 let wrapper_path = paths.dir.join(format!(
3679 "{}.{}",
3680 paths
3681 .json
3682 .file_stem()
3683 .and_then(|s| s.to_str())
3684 .unwrap_or("wrapper"),
3685 wrapper_ext
3686 ));
3687 fs::write(&wrapper_path, wrapper_body)
3688 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3689
3690 let mut cmd = Command::new(shell.binary().as_ref());
3691 match shell {
3692 WindowsShell::Pwsh | WindowsShell::Powershell => {
3693 cmd.args([
3696 "-NoLogo",
3697 "-NoProfile",
3698 "-NonInteractive",
3699 "-ExecutionPolicy",
3700 "Bypass",
3701 "-File",
3702 ]);
3703 cmd.arg(&wrapper_path);
3704 }
3705 WindowsShell::Cmd => {
3706 cmd.args(["/D", "/C"]);
3713 cmd.arg(&wrapper_path);
3714 }
3715 WindowsShell::Posix(_) => {
3716 cmd.arg(&wrapper_path);
3721 }
3722 }
3723
3724 cmd.creation_flags(creation_flags);
3728 Ok(cmd)
3729}
3730
3731fn spawn_detached_child(
3747 command: &str,
3748 paths: &TaskPaths,
3749 workdir: &Path,
3750 env: &HashMap<String, String>,
3751) -> Result<std::process::Child, String> {
3752 #[cfg(not(windows))]
3753 {
3754 let stdout = create_capture_file(&paths.stdout)
3755 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3756 let stderr = create_capture_file(&paths.stderr)
3757 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3758 detached_shell_command(command, &paths.exit)
3759 .current_dir(workdir)
3760 .envs(env)
3761 .stdin(Stdio::null())
3762 .stdout(Stdio::from(stdout))
3763 .stderr(Stdio::from(stderr))
3764 .spawn()
3765 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3766 }
3767 #[cfg(windows)]
3768 {
3769 use crate::windows_shell::shell_candidates;
3770 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3781 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3802 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3803 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3804 let with_breakaway =
3805 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3806 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3807 let mut last_error: Option<String> = None;
3808 for (idx, shell) in candidates.iter().enumerate() {
3809 for &flags in &[with_breakaway, without_breakaway] {
3813 let stdout = create_capture_file(&paths.stdout)
3815 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3816 let stderr = create_capture_file(&paths.stderr)
3817 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3818 let mut cmd =
3819 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3820 cmd.current_dir(workdir)
3821 .envs(env)
3822 .stdin(Stdio::null())
3823 .stdout(Stdio::from(stdout))
3824 .stderr(Stdio::from(stderr));
3825 match cmd.spawn() {
3826 Ok(child) => {
3827 if idx > 0 {
3828 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3829 the cached PATH probe disagreed with runtime spawn — likely PATH \
3830 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3831 shell.binary(),
3832 idx);
3833 }
3834 if flags == without_breakaway {
3835 crate::slog_warn!(
3836 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3837 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3838 Spawned without breakaway; the bg task will be torn down if the \
3839 AFT process group is killed."
3840 );
3841 }
3842 return Ok(child);
3843 }
3844 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3845 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3846 shell.binary());
3847 last_error = Some(format!("{}: {e}", shell.binary()));
3848 break;
3851 }
3852 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3853 crate::slog_warn!(
3855 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3856 Access Denied — retrying {} without breakaway",
3857 shell.binary()
3858 );
3859 last_error = Some(format!("{}: {e}", shell.binary()));
3860 continue;
3861 }
3862 Err(e) => {
3863 return Err(format!(
3864 "failed to spawn background bash command via {}: {e}",
3865 shell.binary()
3866 ));
3867 }
3868 }
3869 }
3870 }
3871 Err(format!(
3872 "failed to spawn background bash command: no Windows shell could be spawned. \
3873 Last error: {}. PATH-probed candidates: {:?}",
3874 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3875 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3876 ))
3877 }
3878}
3879
3880fn random_slug() -> String {
3881 let mut bytes = [0u8; 4];
3882 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3884 let t = SystemTime::now()
3886 .duration_since(UNIX_EPOCH)
3887 .map(|d| d.subsec_nanos())
3888 .unwrap_or(0);
3889 let p = std::process::id();
3890 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3891 });
3892 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3894 format!("bash-{hex}")
3895}
3896
3897#[cfg(test)]
3898mod tests {
3899 use std::collections::HashMap;
3900 use std::fs;
3901 use std::sync::atomic::{AtomicBool, AtomicUsize};
3902 use std::sync::{Arc, Mutex};
3903 use std::time::Duration;
3904 #[cfg(windows)]
3905 use std::time::Instant;
3906
3907 use super::*;
3908
3909 #[cfg(unix)]
3910 const QUICK_SUCCESS_COMMAND: &str = "true";
3911 #[cfg(windows)]
3912 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3913
3914 #[cfg(unix)]
3915 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3916 #[cfg(windows)]
3917 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3918
3919 fn insert_terminal_piped_task(
3920 registry: &BgTaskRegistry,
3921 dir: &tempfile::TempDir,
3922 command: &str,
3923 stdout: &str,
3924 stderr: &str,
3925 compressed: bool,
3926 ) -> (String, Arc<BgTask>) {
3927 let task_id = format!("bash-test-{}", random_slug());
3928 let paths = task_paths(dir.path(), "session", &task_id);
3929 fs::create_dir_all(&paths.dir).unwrap();
3930 fs::write(&paths.stdout, stdout).unwrap();
3931 fs::write(&paths.stderr, stderr).unwrap();
3932 let mut metadata = PersistedTask::starting(
3933 task_id.clone(),
3934 "session".to_string(),
3935 command.to_string(),
3936 dir.path().to_path_buf(),
3937 Some(dir.path().to_path_buf()),
3938 Some(30_000),
3939 true,
3940 compressed,
3941 );
3942 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3943 write_task(&paths.json, &metadata).unwrap();
3944 registry
3945 .insert_rehydrated_task(metadata, paths, true)
3946 .expect("insert terminal task");
3947 let task = registry.task_for_session(&task_id, "session").unwrap();
3948 (task_id, task)
3949 }
3950
3951 fn insert_terminal_pty_task(
3952 registry: &BgTaskRegistry,
3953 dir: &tempfile::TempDir,
3954 pty_output: &str,
3955 ) -> (String, Arc<BgTask>) {
3956 let task_id = format!("bash-test-{}", random_slug());
3957 let paths = task_paths(dir.path(), "session", &task_id);
3958 fs::create_dir_all(&paths.dir).unwrap();
3959 fs::write(&paths.pty, pty_output).unwrap();
3960 let mut metadata = PersistedTask::starting(
3961 task_id.clone(),
3962 "session".to_string(),
3963 "python".to_string(),
3964 dir.path().to_path_buf(),
3965 Some(dir.path().to_path_buf()),
3966 Some(30_000),
3967 true,
3968 true,
3969 );
3970 metadata.mode = BgMode::Pty;
3971 metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
3972 write_task(&paths.json, &metadata).unwrap();
3973 registry
3974 .insert_rehydrated_task(metadata, paths, true)
3975 .expect("insert terminal pty task");
3976 let task = registry.task_for_session(&task_id, "session").unwrap();
3977 (task_id, task)
3978 }
3979
3980 #[test]
3981 fn recognizes_all_recovery_marker_forms() {
3982 assert!(is_recovery_marker(
3983 "[truncated output; full output: read \"/tmp/out\"]"
3984 ));
3985 assert!(is_recovery_marker(
3986 "[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
3987 ));
3988 assert!(is_recovery_marker(
3989 "[truncated output; full output unavailable]"
3990 ));
3991 }
3992
3993 #[test]
3994 fn terminal_status_polls_use_cached_render_once_and_off_lock() {
3995 let registry = BgTaskRegistry::default();
3996 let dir = tempfile::tempdir().unwrap();
3997 let (_task_id, task) = insert_terminal_piped_task(
3998 ®istry,
3999 &dir,
4000 "custom-tool --verbose",
4001 &"stdout line\n".repeat(200_000),
4002 "",
4003 true,
4004 );
4005 let calls = Arc::new(AtomicUsize::new(0));
4006 let saw_unlocked_state = Arc::new(AtomicBool::new(false));
4007 let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
4008 let calls_for_closure = Arc::clone(&calls);
4009 let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
4010 let task_for_closure = Arc::clone(&task_holder);
4011 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4012 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4013 if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
4014 if task.state.try_lock().is_ok() {
4015 unlocked_for_closure.store(true, Ordering::SeqCst);
4016 }
4017 }
4018 CompressionResult::new(format!("compressed {} bytes", output.len()))
4019 });
4020
4021 let first = registry
4022 .status(
4023 &task.task_id,
4024 "session",
4025 None,
4026 Some(dir.path()),
4027 RUNNING_OUTPUT_PREVIEW_BYTES,
4028 )
4029 .unwrap();
4030 let second = registry
4031 .status(
4032 &task.task_id,
4033 "session",
4034 None,
4035 Some(dir.path()),
4036 RUNNING_OUTPUT_PREVIEW_BYTES,
4037 )
4038 .unwrap();
4039 let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
4040
4041 assert_eq!(
4042 calls.load(Ordering::SeqCst),
4043 1,
4044 "terminal render must be cached"
4045 );
4046 assert!(
4047 saw_unlocked_state.load(Ordering::SeqCst),
4048 "compressor must run after releasing the task state lock"
4049 );
4050 assert!(first.output_preview.starts_with("compressed "));
4051 assert_eq!(second.output_preview, first.output_preview);
4052 assert_eq!(listed[0].output_preview, first.output_preview);
4053 }
4054
4055 #[test]
4056 fn completion_preview_success_keeps_tail_only() {
4057 let registry = BgTaskRegistry::default();
4062 let dir = tempfile::tempdir().unwrap();
4063 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4064 let (_task_id, task) =
4065 insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
4066
4067 registry.post_terminal_transition(&task, true).unwrap();
4068 let completions = registry.drain_completions_for_session(Some("session"));
4069 assert_eq!(completions.len(), 1);
4070 let preview = &completions[0].output_preview;
4071 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4072 assert!(!preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4073 assert!(completions[0].output_truncated);
4074 }
4075
4076 #[test]
4077 fn completion_preview_failure_keeps_head_and_tail() {
4078 let registry = BgTaskRegistry::default();
4081 let dir = tempfile::tempdir().unwrap();
4082 let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
4083 let task_id = format!("bash-test-{}", random_slug());
4084 let paths = task_paths(dir.path(), "session", &task_id);
4085 fs::create_dir_all(&paths.dir).unwrap();
4086 fs::write(&paths.stdout, &output).unwrap();
4087 fs::write(&paths.stderr, "").unwrap();
4088 let mut metadata = PersistedTask::starting(
4089 task_id.clone(),
4090 "session".to_string(),
4091 "cat big.log".to_string(),
4092 dir.path().to_path_buf(),
4093 Some(dir.path().to_path_buf()),
4094 Some(30_000),
4095 true,
4096 false,
4097 );
4098 metadata.mark_terminal(BgTaskStatus::Failed, Some(1), None);
4099 write_task(&paths.json, &metadata).unwrap();
4100 registry
4101 .insert_rehydrated_task(metadata, paths, true)
4102 .expect("insert terminal task");
4103 let task = registry.task_for_session(&task_id, "session").unwrap();
4104
4105 registry.post_terminal_transition(&task, true).unwrap();
4106 let completions = registry.drain_completions_for_session(Some("session"));
4107 assert_eq!(completions.len(), 1);
4108 let preview = &completions[0].output_preview;
4109 assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
4110 assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
4111 }
4112
4113 #[test]
4114 fn has_completions_for_session_matches_pending_delivery() {
4115 let registry = BgTaskRegistry::default();
4116 assert!(!registry.has_completions_for_session(Some("session")));
4117 assert!(!registry.has_completions_for_session(None));
4118
4119 let dir = tempfile::tempdir().unwrap();
4120 let (_task_id, task) =
4121 insert_terminal_piped_task(®istry, &dir, QUICK_SUCCESS_COMMAND, "done\n", "", false);
4122 registry.post_terminal_transition(&task, true).unwrap();
4123
4124 assert!(registry.has_completions_for_session(Some("session")));
4125 assert!(registry.has_completions_for_session(None));
4126 assert!(!registry.has_completions_for_session(Some("other-session")));
4127
4128 let completions = registry.drain_completions_for_session(Some("session"));
4129 assert_eq!(completions.len(), 1);
4130 assert_eq!(completions[0].task_id, task.task_id);
4131 }
4132
4133 #[test]
4134 fn structured_gh_json_survives_intact_and_ignores_stderr() {
4135 let registry = BgTaskRegistry::default();
4136 let dir = tempfile::tempdir().unwrap();
4137 let calls = Arc::new(AtomicUsize::new(0));
4138 let calls_for_closure = Arc::clone(&calls);
4139 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4140 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4141 CompressionResult::new(output)
4142 });
4143 let (task_id, _task) = insert_terminal_piped_task(
4144 ®istry,
4145 &dir,
4146 "gh pr view 123 --json body",
4147 "{\"body\":\"hello\"}",
4148 "warning: stderr must not join json",
4149 true,
4150 );
4151
4152 let snapshot = registry
4153 .status(
4154 &task_id,
4155 "session",
4156 None,
4157 Some(dir.path()),
4158 RUNNING_OUTPUT_PREVIEW_BYTES,
4159 )
4160 .unwrap();
4161
4162 assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
4163 assert!(!snapshot.output_preview.contains("warning"));
4164 assert!(!snapshot.output_truncated);
4165 assert_eq!(
4166 calls.load(Ordering::SeqCst),
4167 0,
4168 "structured JSON bypasses compression"
4169 );
4170 }
4171
4172 #[test]
4173 fn registry_emits_single_recovery_marker_for_class_drops() {
4174 let registry = BgTaskRegistry::default();
4175 let dir = tempfile::tempdir().unwrap();
4176 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4177 let mut dropped = BTreeMap::new();
4178 dropped.insert(DropClass::Error, 18);
4179 dropped.insert(DropClass::Warning, 6);
4180 CompressionResult::with_class_drops("kept diagnostic", dropped)
4181 });
4182 let (task_id, task) =
4183 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4184
4185 let snapshot = registry
4186 .status(
4187 &task_id,
4188 "session",
4189 None,
4190 Some(dir.path()),
4191 RUNNING_OUTPUT_PREVIEW_BYTES,
4192 )
4193 .unwrap();
4194
4195 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4196 assert!(snapshot.output_preview.contains("+18 more errors"));
4197 assert!(snapshot.output_preview.contains("+6 more warnings"));
4198 assert!(snapshot
4199 .output_preview
4200 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4201 assert!(!snapshot.output_preview.contains("tail -n +"));
4202 assert!(snapshot.output_truncated);
4203 }
4204
4205 #[test]
4206 fn registry_marker_reports_semantic_and_byte_drops_once() {
4207 let registry = BgTaskRegistry::default();
4208 let dir = tempfile::tempdir().unwrap();
4209 registry.set_compressor_with_exit_code(move |_command, _output, _exit_code| {
4210 let mut dropped = BTreeMap::new();
4211 dropped.insert(DropClass::Error, 1);
4212 CompressionResult::with_class_drops(
4213 format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
4214 dropped,
4215 )
4216 });
4217 let (task_id, _task) =
4218 insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
4219
4220 let snapshot = registry
4221 .status(
4222 &task_id,
4223 "session",
4224 None,
4225 Some(dir.path()),
4226 RUNNING_OUTPUT_PREVIEW_BYTES,
4227 )
4228 .unwrap();
4229
4230 assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
4231 assert!(snapshot.output_preview.contains("+1 more error"));
4232 assert!(snapshot.output_preview.contains("truncated output"));
4233 assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
4234 assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
4235 assert!(!snapshot.output_preview.contains("...<truncated"));
4236 assert!(snapshot.output_truncated);
4237 }
4238
4239 #[test]
4240 fn cargo_stderr_class_drops_name_both_capture_paths() {
4241 let registry = BgTaskRegistry::default();
4242 let dir = tempfile::tempdir().unwrap();
4243 let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
4244 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4245 crate::compress::compress_with_registry_exit_code(
4246 command,
4247 &output,
4248 exit_code,
4249 &filter_registry,
4250 )
4251 });
4252 let stderr = (0..22)
4253 .map(|index| {
4254 format!(
4255 "error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
4256 index + 1,
4257 index + 1
4258 )
4259 })
4260 .collect::<Vec<_>>()
4261 .join("\n");
4262 let (task_id, task) = insert_terminal_piped_task(
4263 ®istry,
4264 &dir,
4265 "cargo check",
4266 "Finished dev [unoptimized] target(s) in 0.01s\n",
4267 &stderr,
4268 true,
4269 );
4270
4271 let snapshot = registry
4272 .status(
4273 &task_id,
4274 "session",
4275 None,
4276 Some(dir.path()),
4277 RUNNING_OUTPUT_PREVIEW_BYTES,
4278 )
4279 .unwrap();
4280
4281 assert!(snapshot.output_preview.contains("+2 more errors"));
4282 assert!(snapshot
4283 .output_preview
4284 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4285 assert!(snapshot
4286 .output_preview
4287 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4288 assert!(!snapshot.output_preview.contains("tail -n +"));
4289 }
4290
4291 #[test]
4292 fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
4293 let registry = BgTaskRegistry::default();
4294 let dir = tempfile::tempdir().unwrap();
4295 let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
4296 let (task_id, task) = insert_terminal_piped_task(
4297 ®istry,
4298 &dir,
4299 "cd /repo && gh pr view 123 --json body",
4300 &body,
4301 "",
4302 true,
4303 );
4304
4305 let snapshot = registry
4306 .status(
4307 &task_id,
4308 "session",
4309 None,
4310 Some(dir.path()),
4311 RUNNING_OUTPUT_PREVIEW_BYTES,
4312 )
4313 .unwrap();
4314
4315 assert!(snapshot.output_preview.starts_with("[JSON output "));
4316 assert!(snapshot
4317 .output_preview
4318 .contains(&task.paths.stdout.display().to_string()));
4319 assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
4320 assert!(snapshot.output_truncated);
4321 }
4322
4323 #[test]
4324 fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
4325 let registry = BgTaskRegistry::default();
4326 let dir = tempfile::tempdir().unwrap();
4327 let filter_registry = crate::compress::toml_filter::build_registry(
4328 crate::compress::builtin_filters::ALL,
4329 None,
4330 None,
4331 );
4332 registry.set_compressor_with_exit_code(move |command, output, exit_code| {
4333 crate::compress::compress_with_registry_exit_code(
4334 command,
4335 &output,
4336 exit_code,
4337 &filter_registry,
4338 )
4339 });
4340 let stdout = format!(
4341 "make[1]: Entering directory `/tmp`\n{}",
4342 (0..100)
4343 .map(|index| format!("compile line {index}"))
4344 .collect::<Vec<_>>()
4345 .join("\n")
4346 );
4347 let (task_id, task) =
4348 insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
4349
4350 let snapshot = registry
4351 .status(
4352 &task_id,
4353 "session",
4354 None,
4355 Some(dir.path()),
4356 RUNNING_OUTPUT_PREVIEW_BYTES,
4357 )
4358 .unwrap();
4359
4360 assert!(snapshot.output_preview.contains("compile line 99"));
4361 assert!(snapshot.output_preview.contains(&format!(
4362 "full output: read \"{}\"",
4363 task.paths.stdout.display()
4364 )));
4365 assert!(!snapshot
4366 .output_preview
4367 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4368 assert!(!snapshot.output_preview.contains("tail -n +"));
4369 }
4370
4371 #[test]
4372 fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
4373 let registry = BgTaskRegistry::default();
4374 let dir = tempfile::tempdir().unwrap();
4375 let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
4376 let (task_id, task) =
4377 insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
4378
4379 let snapshot = registry
4380 .status(
4381 &task_id,
4382 "session",
4383 None,
4384 Some(dir.path()),
4385 RUNNING_OUTPUT_PREVIEW_BYTES,
4386 )
4387 .unwrap();
4388
4389 assert!(snapshot.output_preview.contains("RAW-HEAD"));
4390 assert!(snapshot.output_preview.contains("RAW-TAIL"));
4391 assert!(snapshot.output_preview.contains("truncated output"));
4392 assert!(snapshot
4393 .output_preview
4394 .contains(&format!("read \"{}\"", task.paths.stdout.display())));
4395 assert!(snapshot
4396 .output_preview
4397 .contains(&format!("read \"{}\"", task.paths.stderr.display())));
4398 assert!(!snapshot.output_preview.contains("tail -n +"));
4399 assert!(snapshot.output_preview.len() > 16 * 1024);
4400 assert!(snapshot.output_truncated);
4401 }
4402
4403 #[test]
4404 fn pty_terminal_snapshot_bypasses_line_compression() {
4405 let registry = BgTaskRegistry::default();
4406 let dir = tempfile::tempdir().unwrap();
4407 let calls = Arc::new(AtomicUsize::new(0));
4408 let calls_for_closure = Arc::clone(&calls);
4409 registry.set_compressor_with_exit_code(move |_command, output, _exit_code| {
4410 calls_for_closure.fetch_add(1, Ordering::SeqCst);
4411 CompressionResult::new(output)
4412 });
4413 let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
4414
4415 let snapshot = registry
4416 .status(
4417 &task_id,
4418 "session",
4419 None,
4420 Some(dir.path()),
4421 RUNNING_OUTPUT_PREVIEW_BYTES,
4422 )
4423 .unwrap();
4424
4425 assert_eq!(snapshot.info.mode, BgMode::Pty);
4426 assert_eq!(snapshot.output_preview, "");
4427 assert_eq!(calls.load(Ordering::SeqCst), 0);
4428 }
4429
4430 #[test]
4431 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
4432 let registry = BgTaskRegistry::default();
4433 let dir = tempfile::tempdir().unwrap();
4434 let task_id = registry
4435 .spawn_pty(
4436 QUICK_SUCCESS_COMMAND,
4437 "session".to_string(),
4438 dir.path().to_path_buf(),
4439 HashMap::new(),
4440 Some(Duration::from_secs(30)),
4441 dir.path().to_path_buf(),
4442 10,
4443 true,
4444 false,
4445 Some(dir.path().to_path_buf()),
4446 50,
4447 120,
4448 )
4449 .unwrap();
4450
4451 let paths = task_paths(dir.path(), "session", &task_id);
4452 let metadata = read_task(&paths.json).unwrap();
4453 assert_eq!(
4454 metadata.schema_version,
4455 crate::bash_background::persistence::SCHEMA_VERSION
4456 );
4457 assert_eq!(metadata.mode, BgMode::Pty);
4458 assert_eq!(metadata.pty_rows, Some(50));
4459 assert_eq!(metadata.pty_cols, Some(120));
4460
4461 let snapshot = registry
4462 .status(&task_id, "session", None, Some(dir.path()), 1024)
4463 .unwrap();
4464 assert_eq!(snapshot.pty_rows, Some(50));
4465 assert_eq!(snapshot.pty_cols, Some(120));
4466 }
4467
4468 fn spawn_dead_child() -> std::process::Child {
4473 #[cfg(unix)]
4474 let mut cmd = std::process::Command::new("true");
4475 #[cfg(windows)]
4476 let mut cmd = {
4477 let mut c = std::process::Command::new("cmd");
4478 c.args(["/c", "exit", "0"]);
4479 c
4480 };
4481 cmd.stdin(std::process::Stdio::null());
4482 cmd.stdout(std::process::Stdio::null());
4483 cmd.stderr(std::process::Stdio::null());
4484 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
4485 let started = Instant::now();
4494 loop {
4495 match child.try_wait() {
4496 Ok(Some(_)) => break,
4497 Ok(None) => {
4498 if started.elapsed() > Duration::from_secs(5) {
4499 panic!("dead-child stand-in did not exit within 5s");
4500 }
4501 std::thread::sleep(Duration::from_millis(10));
4502 }
4503 Err(error) => panic!("dead-child try_wait failed: {error}"),
4504 }
4505 }
4506 child
4507 }
4508
4509 #[test]
4510 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
4511 let registry = BgTaskRegistry::default();
4512 let dir = tempfile::tempdir().unwrap();
4513 let task_id = registry
4514 .spawn(
4515 LONG_RUNNING_COMMAND,
4516 "session".to_string(),
4517 dir.path().to_path_buf(),
4518 HashMap::new(),
4519 Some(Duration::from_secs(30)),
4520 dir.path().to_path_buf(),
4521 10,
4522 true,
4523 false,
4524 Some(dir.path().to_path_buf()),
4525 )
4526 .unwrap();
4527 registry
4528 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4529 .unwrap();
4530 assert_eq!(
4531 registry
4532 .drain_completions_for_session(Some("session"))
4533 .len(),
4534 1
4535 );
4536
4537 registry.inner.completions.lock().unwrap().clear();
4540
4541 assert_eq!(
4542 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4543 vec![task_id.clone()]
4544 );
4545 assert!(registry
4546 .drain_completions_for_session(Some("session"))
4547 .is_empty());
4548
4549 let paths = task_paths(dir.path(), "session", &task_id);
4550 let metadata = read_task(&paths.json).unwrap();
4551 assert!(metadata.completion_delivered);
4552
4553 let replayed = BgTaskRegistry::default();
4554 replayed
4555 .replay_session_inner(dir.path(), "session", None)
4556 .unwrap();
4557 assert!(replayed
4558 .drain_completions_for_session(Some("session"))
4559 .is_empty());
4560 }
4561
4562 #[test]
4563 fn register_watch_rejects_unknown_task() {
4564 let registry = BgTaskRegistry::default();
4565
4566 let result = registry.register_watch(
4567 "missing-task".to_string(),
4568 WatchPattern::Substring("READY".into()),
4569 true,
4570 );
4571
4572 assert_eq!(result, Err("task_not_found"));
4573 }
4574
4575 #[test]
4576 fn register_watch_on_terminal_task_scans_existing_output() {
4577 let frames = Arc::new(Mutex::new(Vec::new()));
4578 let captured = Arc::clone(&frames);
4579 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
4580 captured.lock().unwrap().push(frame);
4581 })
4582 as Box<dyn Fn(PushFrame) + Send + Sync>);
4583 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
4584 let dir = tempfile::tempdir().unwrap();
4585 let task_id = registry
4586 .spawn(
4587 LONG_RUNNING_COMMAND,
4588 "session".to_string(),
4589 dir.path().to_path_buf(),
4590 HashMap::new(),
4591 Some(Duration::from_secs(30)),
4592 dir.path().to_path_buf(),
4593 10,
4594 true,
4595 false,
4596 Some(dir.path().to_path_buf()),
4597 )
4598 .unwrap();
4599 registry
4600 .inner
4601 .shutdown
4602 .store(true, std::sync::atomic::Ordering::SeqCst);
4603 let task = registry.task_for_session(&task_id, "session").unwrap();
4604 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
4605 registry
4606 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4607 .unwrap();
4608 frames.lock().unwrap().clear();
4609 registry.inner.completions.lock().unwrap().clear();
4610
4611 registry
4612 .register_watch(
4613 task_id.clone(),
4614 WatchPattern::Substring("READY".into()),
4615 true,
4616 )
4617 .unwrap();
4618
4619 let frames = frames.lock().unwrap();
4620 let frame = frames
4621 .iter()
4622 .find_map(|frame| match frame {
4623 PushFrame::BashPatternMatch(frame) => Some(frame),
4624 _ => None,
4625 })
4626 .expect("terminal watch registration should emit pattern frame");
4627 assert_eq!(frame.reason, "pattern_match");
4628 assert_eq!(frame.task_id, task_id);
4629 assert_eq!(frame.session_id, "session");
4630 assert_eq!(frame.match_text, "READY");
4631 assert_eq!(frame.match_offset, 0);
4632 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
4633 let metadata = read_task(&task.paths.json).unwrap();
4634 assert!(metadata.completion_delivered);
4635 }
4636
4637 #[test]
4638 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
4639 let registry = BgTaskRegistry::default();
4640 let dir = tempfile::tempdir().unwrap();
4641 let task_id = registry
4642 .spawn(
4643 QUICK_SUCCESS_COMMAND,
4644 "session".to_string(),
4645 dir.path().to_path_buf(),
4646 HashMap::new(),
4647 Some(Duration::from_secs(30)),
4648 dir.path().to_path_buf(),
4649 10,
4650 true,
4651 false,
4652 Some(dir.path().to_path_buf()),
4653 )
4654 .unwrap();
4655 registry
4656 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4657 .unwrap();
4658 let completions = registry.drain_completions_for_session(Some("session"));
4659 assert_eq!(completions.len(), 1);
4660 assert_eq!(
4661 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
4662 vec![task_id.clone()]
4663 );
4664
4665 registry.cleanup_finished(Duration::ZERO);
4666
4667 assert!(registry.inner.tasks.lock().unwrap().is_empty());
4668 }
4669
4670 #[test]
4671 fn cleanup_finished_retains_undelivered_terminals() {
4672 let registry = BgTaskRegistry::default();
4673 let dir = tempfile::tempdir().unwrap();
4674 let task_id = registry
4675 .spawn(
4676 QUICK_SUCCESS_COMMAND,
4677 "session".to_string(),
4678 dir.path().to_path_buf(),
4679 HashMap::new(),
4680 Some(Duration::from_secs(30)),
4681 dir.path().to_path_buf(),
4682 10,
4683 true,
4684 false,
4685 Some(dir.path().to_path_buf()),
4686 )
4687 .unwrap();
4688 registry
4689 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4690 .unwrap();
4691
4692 registry.cleanup_finished(Duration::ZERO);
4693
4694 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4695 }
4696
4697 #[test]
4705 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
4706 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4707 let dir = tempfile::tempdir().unwrap();
4708 let task_id = registry
4709 .spawn(
4710 QUICK_SUCCESS_COMMAND,
4711 "session".to_string(),
4712 dir.path().to_path_buf(),
4713 HashMap::new(),
4714 Some(Duration::from_secs(30)),
4715 dir.path().to_path_buf(),
4716 10,
4717 true,
4718 false,
4719 Some(dir.path().to_path_buf()),
4720 )
4721 .unwrap();
4722
4723 let task = registry.task_for_session(&task_id, "session").unwrap();
4724
4725 let started = Instant::now();
4730 loop {
4731 let exited = {
4732 let mut state = task.state.lock().unwrap();
4733 match &mut state.runtime {
4734 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4735 _ => true,
4736 }
4737 };
4738 if exited {
4739 break;
4740 }
4741 assert!(
4742 started.elapsed() < Duration::from_secs(5),
4743 "child should exit quickly"
4744 );
4745 std::thread::sleep(Duration::from_millis(20));
4746 }
4747
4748 registry
4756 .inner
4757 .shutdown
4758 .store(true, std::sync::atomic::Ordering::SeqCst);
4759 std::thread::sleep(Duration::from_millis(550));
4763
4764 let _ = std::fs::remove_file(&task.paths.exit);
4767
4768 {
4783 let mut state = task.state.lock().unwrap();
4784 state.metadata.status = BgTaskStatus::Running;
4785 state.metadata.status_reason = None;
4786 state.metadata.exit_code = None;
4787 state.metadata.finished_at = None;
4788 state.metadata.duration_ms = None;
4789 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4792 .expect("persist reset Running metadata for reap_child test");
4793 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4797 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4798 }
4799 }
4800 *task.terminal_at.lock().unwrap() = None;
4803
4804 assert!(
4807 task.is_running(),
4808 "precondition: metadata.status == Running"
4809 );
4810 assert!(
4811 !task.paths.exit.exists(),
4812 "precondition: exit marker absent"
4813 );
4814
4815 registry.reap_child(&task);
4820
4821 {
4822 let state = task.state.lock().unwrap();
4823 assert_eq!(
4824 state.metadata.status,
4825 BgTaskStatus::Running,
4826 "first reap must leave status Running while waiting one pass for marker"
4827 );
4828 assert_eq!(
4829 state.metadata.status_reason, None,
4830 "first reap must not record a failure reason"
4831 );
4832 assert!(
4833 matches!(state.runtime, TaskRuntime::Piped(None)),
4834 "child handle must be released after first reap"
4835 );
4836 assert!(
4837 state.detached,
4838 "task must be marked detached after first reap"
4839 );
4840 }
4841
4842 registry.reap_child(&task);
4846
4847 let state = task.state.lock().unwrap();
4848 assert!(
4849 state.metadata.status.is_terminal(),
4850 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
4851 state.metadata.status
4852 );
4853 assert_eq!(
4854 state.metadata.status,
4855 BgTaskStatus::Failed,
4856 "must specifically be Failed (not Killed): status={:?}",
4857 state.metadata.status
4858 );
4859 assert_eq!(
4860 state.metadata.status_reason.as_deref(),
4861 Some("process exited without exit marker"),
4862 "reason must match replay path's wording: {:?}",
4863 state.metadata.status_reason
4864 );
4865 assert!(
4866 matches!(state.runtime, TaskRuntime::Piped(None)),
4867 "child handle must stay released after second reap"
4868 );
4869 assert!(
4870 state.detached,
4871 "task must remain detached after second reap"
4872 );
4873 }
4874
4875 #[test]
4880 fn reap_child_preserves_running_when_exit_marker_exists() {
4881 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4882 let dir = tempfile::tempdir().unwrap();
4883 let task_id = registry
4884 .spawn(
4885 QUICK_SUCCESS_COMMAND,
4886 "session".to_string(),
4887 dir.path().to_path_buf(),
4888 HashMap::new(),
4889 Some(Duration::from_secs(30)),
4890 dir.path().to_path_buf(),
4891 10,
4892 true,
4893 false,
4894 Some(dir.path().to_path_buf()),
4895 )
4896 .unwrap();
4897
4898 let task = registry.task_for_session(&task_id, "session").unwrap();
4899
4900 let started = Instant::now();
4903 loop {
4904 let exited = {
4905 let mut state = task.state.lock().unwrap();
4906 match &mut state.runtime {
4907 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
4908 _ => true,
4909 }
4910 };
4911 if exited && task.paths.exit.exists() {
4912 break;
4913 }
4914 assert!(
4915 started.elapsed() < Duration::from_secs(5),
4916 "child should exit and write marker quickly"
4917 );
4918 std::thread::sleep(Duration::from_millis(20));
4919 }
4920
4921 registry
4927 .inner
4928 .shutdown
4929 .store(true, std::sync::atomic::Ordering::SeqCst);
4930 std::thread::sleep(Duration::from_millis(550));
4931
4932 {
4938 let mut state = task.state.lock().unwrap();
4939 state.metadata.status = BgTaskStatus::Running;
4940 state.metadata.status_reason = None;
4941 if matches!(state.runtime, TaskRuntime::Piped(None)) {
4942 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
4943 }
4944 }
4945 *task.terminal_at.lock().unwrap() = None;
4946 if !task.paths.exit.exists() {
4949 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
4950 }
4951
4952 registry.reap_child(&task);
4956
4957 let state = task.state.lock().unwrap();
4958 assert!(
4959 matches!(state.runtime, TaskRuntime::Piped(None)),
4960 "child handle still released even when marker exists"
4961 );
4962 assert!(
4963 state.detached,
4964 "task still marked detached even when marker exists"
4965 );
4966 assert_eq!(
4971 state.metadata.status,
4972 BgTaskStatus::Running,
4973 "reap_child must defer to poll_task when marker exists"
4974 );
4975 }
4976
4977 #[cfg(unix)]
4981 fn pid_stat(pid: u32) -> Option<String> {
4982 let output = std::process::Command::new("ps")
4983 .args(["-o", "stat=", "-p", &pid.to_string()])
4984 .output()
4985 .ok()?;
4986 if !output.status.success() {
4987 return None;
4988 }
4989 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
4990 if stat.is_empty() {
4991 None
4992 } else {
4993 Some(stat)
4994 }
4995 }
4996
4997 #[cfg(unix)]
4999 fn is_zombie(pid: u32) -> bool {
5000 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
5001 }
5002
5003 #[cfg(unix)]
5009 fn spawn_unreaped_zombie() -> std::process::Child {
5010 let child = std::process::Command::new("true")
5011 .stdin(std::process::Stdio::null())
5012 .stdout(std::process::Stdio::null())
5013 .stderr(std::process::Stdio::null())
5014 .spawn()
5015 .expect("spawn zombie stand-in");
5016 let pid = child.id();
5017 let started = Instant::now();
5018 while !is_zombie(pid) {
5019 assert!(
5020 started.elapsed() < Duration::from_secs(5),
5021 "stand-in child should become a zombie within 5s"
5022 );
5023 std::thread::sleep(Duration::from_millis(10));
5024 }
5025 child
5027 }
5028
5029 #[cfg(unix)]
5039 #[test]
5040 fn finalize_from_marker_reaps_child_no_zombie() {
5041 use std::sync::atomic::Ordering;
5042
5043 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5044 let dir = tempfile::tempdir().unwrap();
5045 let task_id = registry
5046 .spawn(
5047 QUICK_SUCCESS_COMMAND,
5048 "session".to_string(),
5049 dir.path().to_path_buf(),
5050 HashMap::new(),
5051 Some(Duration::from_secs(30)),
5052 dir.path().to_path_buf(),
5053 10,
5054 true,
5055 false,
5056 Some(dir.path().to_path_buf()),
5057 )
5058 .unwrap();
5059
5060 registry.inner.shutdown.store(true, Ordering::SeqCst);
5064 std::thread::sleep(Duration::from_millis(550));
5065
5066 let task = registry.task_for_session(&task_id, "session").unwrap();
5067
5068 let started = Instant::now();
5072 while !task.paths.exit.exists() {
5073 assert!(
5074 started.elapsed() < Duration::from_secs(5),
5075 "exit marker should land quickly for `true`"
5076 );
5077 std::thread::sleep(Duration::from_millis(20));
5078 }
5079
5080 let zombie_pid;
5086 {
5087 let mut state = task.state.lock().unwrap();
5088 state.metadata.status = BgTaskStatus::Running;
5089 state.metadata.status_reason = None;
5090 state.metadata.exit_code = None;
5091 state.metadata.finished_at = None;
5092 state.metadata.duration_ms = None;
5093 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5094 .expect("persist reset Running metadata");
5095 let zombie = spawn_unreaped_zombie();
5096 zombie_pid = zombie.id();
5097 state.runtime = TaskRuntime::Piped(Some(zombie));
5098 }
5099 *task.terminal_at.lock().unwrap() = None;
5100
5101 assert!(
5103 is_zombie(zombie_pid),
5104 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
5105 );
5106
5107 registry.poll_task(&task).unwrap();
5110
5111 {
5112 let state = task.state.lock().unwrap();
5113 assert!(
5114 matches!(state.runtime, TaskRuntime::Piped(None)),
5115 "child handle must be released after marker finalize"
5116 );
5117 assert!(
5118 state.metadata.status.is_terminal(),
5119 "task must be terminal after marker finalize: {:?}",
5120 state.metadata.status
5121 );
5122 }
5123
5124 assert!(
5127 !is_zombie(zombie_pid),
5128 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5129 after the exit-marker terminal transition"
5130 );
5131 }
5132
5133 #[cfg(unix)]
5137 #[test]
5138 fn kill_with_existing_marker_reaps_child_no_zombie() {
5139 use std::sync::atomic::Ordering;
5140
5141 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5142 let dir = tempfile::tempdir().unwrap();
5143 let task_id = registry
5144 .spawn(
5145 QUICK_SUCCESS_COMMAND,
5146 "session".to_string(),
5147 dir.path().to_path_buf(),
5148 HashMap::new(),
5149 Some(Duration::from_secs(30)),
5150 dir.path().to_path_buf(),
5151 10,
5152 true,
5153 false,
5154 Some(dir.path().to_path_buf()),
5155 )
5156 .unwrap();
5157
5158 registry.inner.shutdown.store(true, Ordering::SeqCst);
5159 std::thread::sleep(Duration::from_millis(550));
5160
5161 let task = registry.task_for_session(&task_id, "session").unwrap();
5162
5163 let started = Instant::now();
5164 while !task.paths.exit.exists() {
5165 assert!(
5166 started.elapsed() < Duration::from_secs(5),
5167 "exit marker should land quickly for `true`"
5168 );
5169 std::thread::sleep(Duration::from_millis(20));
5170 }
5171
5172 let zombie_pid;
5173 {
5174 let mut state = task.state.lock().unwrap();
5175 state.metadata.status = BgTaskStatus::Running;
5176 state.metadata.status_reason = None;
5177 state.metadata.exit_code = None;
5178 state.metadata.finished_at = None;
5179 state.metadata.duration_ms = None;
5180 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
5181 .expect("persist reset Running metadata");
5182 let zombie = spawn_unreaped_zombie();
5183 zombie_pid = zombie.id();
5184 state.runtime = TaskRuntime::Piped(Some(zombie));
5185 }
5186 *task.terminal_at.lock().unwrap() = None;
5187
5188 assert!(
5189 is_zombie(zombie_pid),
5190 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
5191 );
5192
5193 registry
5195 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
5196 .expect("kill should succeed");
5197
5198 {
5199 let state = task.state.lock().unwrap();
5200 assert!(
5201 matches!(state.runtime, TaskRuntime::Piped(None)),
5202 "child handle must be released after marker-aware kill"
5203 );
5204 assert!(state.metadata.status.is_terminal());
5205 }
5206
5207 assert!(
5208 !is_zombie(zombie_pid),
5209 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
5210 after a marker-aware kill"
5211 );
5212 }
5213
5214 #[test]
5215 fn cleanup_finished_keeps_running_tasks() {
5216 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5217 let dir = tempfile::tempdir().unwrap();
5218 let task_id = registry
5219 .spawn(
5220 LONG_RUNNING_COMMAND,
5221 "session".to_string(),
5222 dir.path().to_path_buf(),
5223 HashMap::new(),
5224 Some(Duration::from_secs(30)),
5225 dir.path().to_path_buf(),
5226 10,
5227 true,
5228 false,
5229 Some(dir.path().to_path_buf()),
5230 )
5231 .unwrap();
5232
5233 registry.cleanup_finished(Duration::ZERO);
5234
5235 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
5236 let _ = registry.kill(&task_id, "session");
5237 }
5238
5239 #[cfg(windows)]
5240 fn wait_for_file(path: &Path) -> String {
5241 let started = Instant::now();
5242 loop {
5243 if path.exists() {
5244 return fs::read_to_string(path).expect("read file");
5245 }
5246 assert!(
5247 started.elapsed() < Duration::from_secs(30),
5248 "timed out waiting for {}",
5249 path.display()
5250 );
5251 std::thread::sleep(Duration::from_millis(100));
5252 }
5253 }
5254
5255 #[cfg(windows)]
5256 fn spawn_windows_registry_command(
5257 command: &str,
5258 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
5259 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
5260 let dir = tempfile::tempdir().unwrap();
5261 let task_id = registry
5262 .spawn(
5263 command,
5264 "session".to_string(),
5265 dir.path().to_path_buf(),
5266 HashMap::new(),
5267 Some(Duration::from_secs(30)),
5268 dir.path().to_path_buf(),
5269 10,
5270 false,
5271 false,
5272 Some(dir.path().to_path_buf()),
5273 )
5274 .unwrap();
5275 (registry, dir, task_id)
5276 }
5277
5278 #[cfg(windows)]
5279 #[test]
5280 fn windows_spawn_writes_exit_marker_for_zero_exit() {
5281 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
5282 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5283
5284 let content = wait_for_file(&exit_path);
5285
5286 assert_eq!(content.trim(), "0");
5287 }
5288
5289 #[cfg(windows)]
5290 #[test]
5291 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
5292 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
5293 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
5294
5295 let content = wait_for_file(&exit_path);
5296
5297 assert_eq!(content.trim(), "42");
5298 }
5299
5300 #[cfg(windows)]
5301 #[test]
5302 fn windows_spawn_captures_stdout_to_disk() {
5303 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
5304 let task = registry.task_for_session(&task_id, "session").unwrap();
5305 let stdout_path = task.paths.stdout.clone();
5306 let exit_path = task.paths.exit.clone();
5307
5308 let _ = wait_for_file(&exit_path);
5309 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
5310
5311 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
5312 }
5313
5314 #[cfg(windows)]
5315 #[test]
5316 fn windows_spawn_uses_pwsh_when_available() {
5317 let candidates = crate::windows_shell::shell_candidates_with(
5321 |binary| match binary {
5322 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
5323 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
5324 _ => None,
5325 },
5326 || None,
5327 );
5328 let shell = candidates.first().expect("at least one candidate").clone();
5329 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
5330 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
5331 }
5332
5333 #[cfg(windows)]
5340 #[test]
5341 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
5342 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
5343 let script =
5344 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
5345
5346 assert!(
5350 script.contains("set CODE=%ERRORLEVEL%"),
5351 "wrapper must capture exit code into CODE: {script}"
5352 );
5353 assert!(
5354 script.contains("echo %CODE% >"),
5355 "wrapper must echo CODE to a temp marker file: {script}"
5356 );
5357 assert!(
5358 script.contains("move /Y"),
5359 "wrapper must use atomic move to write the marker: {script}"
5360 );
5361 assert!(
5364 script.contains("> nul"),
5365 "wrapper must redirect move output to nul: {script}"
5366 );
5367 assert!(
5369 script.contains("exit /B %CODE%"),
5370 "wrapper must propagate the captured exit code: {script}"
5371 );
5372 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
5373 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
5374 }
5375
5376 #[cfg(windows)]
5382 #[test]
5383 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
5384 use crate::windows_shell::WindowsShell;
5385 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
5386 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5387 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5388 assert_eq!(
5389 args_strs,
5390 vec!["/D", "/S", "/C", "echo wrapped"],
5391 "Cmd::bg_command must prepend /D /S /C"
5392 );
5393 }
5394
5395 #[cfg(windows)]
5399 #[test]
5400 fn windows_shell_pwsh_bg_command_uses_standard_args() {
5401 use crate::windows_shell::WindowsShell;
5402 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
5403 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
5404 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
5405 assert!(
5406 args_strs.contains(&"-Command"),
5407 "Pwsh::bg_command must use -Command: {args_strs:?}"
5408 );
5409 assert!(
5410 args_strs.contains(&"Get-Date"),
5411 "Pwsh::bg_command must include the user command body"
5412 );
5413 }
5414
5415 #[allow(dead_code)]
5446 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
5448}