1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28 ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62 pub task_id: String,
63 #[serde(skip_serializing)]
66 pub session_id: String,
67 pub status: BgTaskStatus,
68 pub exit_code: Option<i32>,
69 pub command: String,
70 #[serde(default, skip_serializing_if = "String::is_empty")]
76 pub output_preview: String,
77 #[serde(default, skip_serializing_if = "is_false")]
82 pub output_truncated: bool,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub original_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub compressed_tokens: Option<u32>,
91 #[serde(default, skip_serializing_if = "is_false")]
93 pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97 !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102 #[serde(flatten)]
103 pub info: BgTaskInfo,
104 pub exit_code: Option<i32>,
105 pub child_pid: Option<u32>,
106 pub workdir: String,
107 pub output_preview: String,
108 pub output_truncated: bool,
109 pub output_path: Option<String>,
110 pub stderr_path: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_rows: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119 pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125 pub(crate) progress_sender: SharedProgressSender,
126 watchdog_started: AtomicBool,
127 pub(crate) shutdown: AtomicBool,
128 pub(crate) long_running_reminder_enabled: AtomicBool,
129 pub(crate) long_running_reminder_interval_ms: AtomicU64,
130 persisted_gc_started: AtomicBool,
131 #[cfg(test)]
132 persisted_gc_runs: AtomicU64,
133 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140 pub(crate) db_harness: RwLock<Option<String>>,
141 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143 pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147 pub(crate) task_id: String,
148 pub(crate) session_id: String,
149 pub(crate) paths: TaskPaths,
150 pub(crate) started: Instant,
151 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152 pub(crate) terminal_at: Mutex<Option<Instant>>,
153 pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157 Piped(Option<Child>),
158 Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162 pub(crate) metadata: PersistedTask,
163 pub(crate) runtime: TaskRuntime,
164 pub(crate) detached: bool,
165 pub(crate) child_exit_observed: bool,
176 pub(crate) buffer: BgBuffer,
177 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182 pub fn new(progress_sender: SharedProgressSender) -> Self {
183 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184 Self {
185 inner: Arc::new(RegistryInner {
186 tasks: Mutex::new(HashMap::new()),
187 completions: Mutex::new(VecDeque::new()),
188 progress_sender,
189 watchdog_started: AtomicBool::new(false),
190 shutdown: AtomicBool::new(false),
191 long_running_reminder_enabled: AtomicBool::new(true),
192 long_running_reminder_interval_ms: AtomicU64::new(600_000),
193 persisted_gc_started: AtomicBool::new(false),
194 #[cfg(test)]
195 persisted_gc_runs: AtomicU64::new(0),
196 compressor: Mutex::new(None),
197 db_pool: RwLock::new(None),
198 db_harness: RwLock::new(None),
199 wake_tx,
200 wake_rx,
201 watch_registry: Mutex::new(WatchRegistry::default()),
202 }),
203 }
204 }
205
206 pub fn set_harness(&self, harness: Harness) {
207 if let Ok(mut slot) = self.inner.db_harness.write() {
208 *slot = Some(harness.as_str().to_string());
209 }
210 }
211
212 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213 if let Ok(mut slot) = self.inner.db_pool.write() {
214 *slot = Some(conn);
215 }
216 }
217
218 pub fn clear_db_pool(&self) {
219 if let Ok(mut slot) = self.inner.db_pool.write() {
220 *slot = None;
221 }
222 }
223
224 pub fn set_compressor<F>(&self, compressor: F)
229 where
230 F: Fn(&str, String) -> String + Send + Sync + 'static,
231 {
232 if let Ok(mut slot) = self.inner.compressor.lock() {
233 *slot = Some(Box::new(compressor));
234 }
235 }
236
237 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240 let Ok(slot) = self.inner.compressor.lock() else {
241 return output;
242 };
243 match slot.as_ref() {
244 Some(compressor) => compressor(command, output),
245 None => output,
246 }
247 }
248
249 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250 write_task(&paths.json, metadata)?;
251 self.dual_write_task(paths, metadata);
252 Ok(())
253 }
254
255 fn update_task_metadata<F>(
256 &self,
257 paths: &TaskPaths,
258 update: F,
259 ) -> std::io::Result<PersistedTask>
260 where
261 F: FnOnce(&mut PersistedTask),
262 {
263 let metadata = update_task(&paths.json, update)?;
264 self.dual_write_task(paths, &metadata);
265 Ok(metadata)
266 }
267
268 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270 let Some(pool) = pool else {
271 return;
272 };
273 let harness = self
274 .inner
275 .db_harness
276 .read()
277 .ok()
278 .and_then(|slot| slot.clone());
279 let Some(harness) = harness else {
280 crate::slog_warn!(
281 "dual-write bash_task to DB skipped for {}: harness not configured",
282 metadata.task_id
283 );
284 return;
285 };
286 let row = match metadata.to_bash_task_row(&harness, paths) {
287 Ok(row) => row,
288 Err(error) => {
289 crate::slog_warn!(
290 "dual-write bash_task to DB failed for {}: {}",
291 metadata.task_id,
292 error
293 );
294 return;
295 }
296 };
297 let conn = match pool.lock() {
298 Ok(conn) => conn,
299 Err(_) => {
300 crate::slog_warn!(
301 "dual-write bash_task to DB failed for {}: db mutex poisoned",
302 metadata.task_id
303 );
304 return;
305 }
306 };
307 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308 crate::slog_warn!(
309 "dual-write bash_task to DB failed for {}: {}",
310 metadata.task_id,
311 error
312 );
313 }
314 }
315
316 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317 self.inner
318 .long_running_reminder_enabled
319 .store(enabled, Ordering::SeqCst);
320 self.inner
321 .long_running_reminder_interval_ms
322 .store(interval_ms, Ordering::SeqCst);
323 }
324
325 #[cfg(unix)]
326 #[allow(clippy::too_many_arguments)]
327 pub fn spawn(
328 &self,
329 command: &str,
330 session_id: String,
331 workdir: PathBuf,
332 env: HashMap<String, String>,
333 timeout: Option<Duration>,
334 storage_dir: PathBuf,
335 max_running: usize,
336 notify_on_completion: bool,
337 compressed: bool,
338 project_root: Option<PathBuf>,
339 ) -> Result<String, String> {
340 self.start_watchdog();
341
342 let running = self.running_count();
343 if running >= max_running {
344 return Err(format!(
345 "background bash task limit exceeded: {running} running (max {max_running})"
346 ));
347 }
348
349 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351 let task_id = self.generate_unique_task_id()?;
352 let paths = task_paths(&storage_dir, &session_id, &task_id);
353 fs::create_dir_all(&paths.dir)
354 .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356 let mut metadata = PersistedTask::starting(
357 task_id.clone(),
358 session_id.clone(),
359 command.to_string(),
360 workdir.clone(),
361 project_root,
362 timeout_ms,
363 notify_on_completion,
364 compressed,
365 );
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369 create_capture_file(&paths.stdout)
373 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374 create_capture_file(&paths.stderr)
375 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378 Ok(child) => child,
379 Err(error) => {
380 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381 let _ = delete_task_bundle(&paths);
382 return Err(error);
383 }
384 };
385
386 let child_pid = child.id();
387 metadata.mark_running(child_pid, child_pid as i32);
388 self.persist_task(&paths, &metadata)
389 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391 let task = Arc::new(BgTask {
392 task_id: task_id.clone(),
393 session_id,
394 paths: paths.clone(),
395 started: Instant::now(),
396 last_reminder_at: Mutex::new(None),
397 terminal_at: Mutex::new(None),
398 state: Mutex::new(BgTaskState {
399 metadata,
400 runtime: TaskRuntime::Piped(Some(child)),
401 detached: false,
402 child_exit_observed: false,
403 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404 pending_terminal_override: None,
405 }),
406 });
407
408 self.inner
409 .tasks
410 .lock()
411 .map_err(|_| "background task registry lock poisoned".to_string())?
412 .insert(task_id.clone(), task);
413
414 Ok(task_id)
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 pub fn spawn_pty(
419 &self,
420 command: &str,
421 session_id: String,
422 workdir: PathBuf,
423 env: HashMap<String, String>,
424 timeout: Option<Duration>,
425 storage_dir: PathBuf,
426 max_running: usize,
427 notify_on_completion: bool,
428 compressed: bool,
429 project_root: Option<PathBuf>,
430 rows: u16,
431 cols: u16,
432 ) -> Result<String, String> {
433 self.start_watchdog();
434
435 let running = self.running_count();
436 if running >= max_running {
437 return Err(format!(
438 "background bash task limit exceeded: {running} running (max {max_running})"
439 ));
440 }
441
442 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444 let task_id = self.generate_unique_task_id()?;
445 let paths = task_paths(&storage_dir, &session_id, &task_id);
446 fs::create_dir_all(&paths.dir)
447 .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449 let mut metadata = PersistedTask::starting(
450 task_id.clone(),
451 session_id.clone(),
452 command.to_string(),
453 workdir.clone(),
454 project_root,
455 timeout_ms,
456 notify_on_completion,
457 compressed,
458 );
459 metadata.mode = BgMode::Pty;
460 metadata.pty_rows = Some(rows);
461 metadata.pty_cols = Some(cols);
462 self.persist_task(&paths, &metadata)
463 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464 create_capture_file(&paths.pty)
465 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467 let runtime = match spawn_pty_for_command(
468 &task_id,
469 &session_id,
470 command,
471 &paths,
472 &workdir,
473 &env,
474 rows,
475 cols,
476 self.inner.wake_tx.clone(),
477 ) {
478 Ok(runtime) => runtime,
479 Err(error) => {
480 crate::slog_warn!(
481 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482 );
483 let _ = delete_task_bundle(&paths);
484 return Err(error);
485 }
486 };
487
488 if let Some(child_pid) = runtime.child_pid {
489 metadata.mark_running(child_pid, child_pid as i32);
490 } else {
491 metadata.status = BgTaskStatus::Running;
492 metadata.pgid = None;
493 }
494 self.persist_task(&paths, &metadata)
495 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497 let task = Arc::new(BgTask {
498 task_id: task_id.clone(),
499 session_id,
500 paths: paths.clone(),
501 started: Instant::now(),
502 last_reminder_at: Mutex::new(None),
503 terminal_at: Mutex::new(None),
504 state: Mutex::new(BgTaskState {
505 metadata,
506 runtime: TaskRuntime::Pty(Some(runtime)),
507 detached: false,
508 child_exit_observed: false,
509 buffer: BgBuffer::pty(paths.pty.clone()),
510 pending_terminal_override: None,
511 }),
512 });
513
514 self.inner
515 .tasks
516 .lock()
517 .map_err(|_| "background task registry lock poisoned".to_string())?
518 .insert(task_id.clone(), task);
519
520 Ok(task_id)
521 }
522
523 #[cfg(windows)]
524 #[allow(clippy::too_many_arguments)]
525 pub fn spawn(
526 &self,
527 command: &str,
528 session_id: String,
529 workdir: PathBuf,
530 env: HashMap<String, String>,
531 timeout: Option<Duration>,
532 storage_dir: PathBuf,
533 max_running: usize,
534 notify_on_completion: bool,
535 compressed: bool,
536 project_root: Option<PathBuf>,
537 ) -> Result<String, String> {
538 self.start_watchdog();
539
540 let running = self.running_count();
541 if running >= max_running {
542 return Err(format!(
543 "background bash task limit exceeded: {running} running (max {max_running})"
544 ));
545 }
546
547 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549 let task_id = self.generate_unique_task_id()?;
550 let paths = task_paths(&storage_dir, &session_id, &task_id);
551 fs::create_dir_all(&paths.dir)
552 .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554 let mut metadata = PersistedTask::starting(
555 task_id.clone(),
556 session_id.clone(),
557 command.to_string(),
558 workdir.clone(),
559 project_root,
560 timeout_ms,
561 notify_on_completion,
562 compressed,
563 );
564 self.persist_task(&paths, &metadata)
565 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567 create_capture_file(&paths.stdout)
573 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574 create_capture_file(&paths.stderr)
575 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578 Ok(child) => child,
579 Err(error) => {
580 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581 let _ = delete_task_bundle(&paths);
582 return Err(error);
583 }
584 };
585
586 let child_pid = child.id();
587 metadata.status = BgTaskStatus::Running;
588 metadata.child_pid = Some(child_pid);
589 metadata.pgid = None;
590 self.persist_task(&paths, &metadata)
591 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593 let task = Arc::new(BgTask {
594 task_id: task_id.clone(),
595 session_id,
596 paths: paths.clone(),
597 started: Instant::now(),
598 last_reminder_at: Mutex::new(None),
599 terminal_at: Mutex::new(None),
600 state: Mutex::new(BgTaskState {
601 metadata,
602 runtime: TaskRuntime::Piped(Some(child)),
603 detached: false,
604 child_exit_observed: false,
605 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606 pending_terminal_override: None,
607 }),
608 });
609
610 self.inner
611 .tasks
612 .lock()
613 .map_err(|_| "background task registry lock poisoned".to_string())?
614 .insert(task_id.clone(), task);
615
616 Ok(task_id)
617 }
618
619 pub fn write_pty(
620 &self,
621 task_id: &str,
622 session_id: &str,
623 input: &[u8],
624 ) -> Result<usize, String> {
625 let task = self
626 .task_for_session(task_id, session_id)
627 .ok_or_else(|| "task_not_found".to_string())?;
628
629 let writer = {
630 let state = task
631 .state
632 .lock()
633 .map_err(|_| "background task lock poisoned".to_string())?;
634 if state.metadata.mode != BgMode::Pty {
635 return Err("task_not_pty".to_string());
636 }
637 if state.metadata.status.is_terminal() {
638 return Err("task_exited".to_string());
639 }
640 match &state.runtime {
641 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644 }
645 };
646
647 let mut writer = writer
648 .lock()
649 .map_err(|_| "PTY writer lock poisoned".to_string())?;
650 writer
651 .write_all(input)
652 .map_err(|error| format!("failed to write to PTY: {error}"))?;
653 writer
654 .flush()
655 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656 Ok(input.len())
657 }
658
659 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660 self.replay_session_inner(storage_dir, session_id, None)
661 }
662
663 pub fn replay_session_for_project(
664 &self,
665 storage_dir: &Path,
666 session_id: &str,
667 project_root: &Path,
668 ) -> Result<(), String> {
669 self.replay_session_inner(storage_dir, session_id, Some(project_root))
670 }
671
672 fn replay_session_inner(
673 &self,
674 storage_dir: &Path,
675 session_id: &str,
676 project_root: Option<&Path>,
677 ) -> Result<(), String> {
678 self.start_watchdog();
679 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682 }
683 }
684
685 let canonical_project = project_root.map(canonicalized_path);
686 let tasks = match self.replay_session_from_db(session_id) {
698 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699 Some(Ok(_)) => {
700 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701 if !disk_tasks.is_empty() {
702 crate::slog_info!(
703 "bash task replay: 0 in DB for session {}, {} from disk fallback",
704 session_id,
705 disk_tasks.len()
706 );
707 }
708 disk_tasks
709 }
710 Some(Err(error)) => {
711 crate::slog_warn!(
712 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713 session_id,
714 error
715 );
716 self.replay_session_from_disk(storage_dir, session_id)?
717 }
718 None => {
719 self.replay_session_from_disk(storage_dir, session_id)?
721 }
722 };
723
724 for mut metadata in tasks {
725 if metadata.session_id != session_id {
726 continue;
727 }
728 if let Some(canonical_project) = canonical_project.as_deref() {
729 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730 if metadata_project.as_deref() != Some(canonical_project) {
731 continue;
732 }
733 }
734
735 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736 match metadata.status {
737 BgTaskStatus::Starting => {
738 let completion_was_delivered = metadata.completion_delivered;
739 metadata.mark_terminal(
740 BgTaskStatus::Failed,
741 None,
742 Some("spawn aborted".to_string()),
743 );
744 metadata.completion_delivered |= completion_was_delivered;
745 let _ = self.persist_task(&paths, &metadata);
746 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747 self.insert_rehydrated_task(metadata, paths, true)?;
748 }
749 BgTaskStatus::Running | BgTaskStatus::Killing => {
750 if metadata.mode == BgMode::Pty {
751 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752 let completion_was_delivered = metadata.completion_delivered;
753 metadata = terminal_metadata_from_marker(metadata, marker, None);
754 metadata.completion_delivered |= completion_was_delivered;
755 let _ = self.persist_task(&paths, &metadata);
756 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757 self.insert_rehydrated_task(metadata, paths, true)?;
758 } else if metadata.status.is_terminal() {
759 self.insert_rehydrated_task(metadata, paths, true)?;
760 } else {
761 let completion_was_delivered = metadata.completion_delivered;
762 metadata.mark_terminal(
763 BgTaskStatus::Killed,
764 None,
765 Some("pty_lost_on_bridge_restart".to_string()),
766 );
767 metadata.completion_delivered |= completion_was_delivered;
768 let _ = self.persist_task(&paths, &metadata);
769 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770 self.insert_rehydrated_task(metadata, paths, true)?;
771 }
772 } else if self.running_metadata_is_stale(&metadata) {
773 let completion_was_delivered = metadata.completion_delivered;
774 metadata.mark_terminal(
775 BgTaskStatus::Killed,
776 None,
777 Some("orphaned (>24h)".to_string()),
778 );
779 metadata.completion_delivered |= completion_was_delivered;
780 if !paths.exit.exists() {
781 let _ = write_kill_marker_if_absent(&paths.exit);
782 }
783 let _ = self.persist_task(&paths, &metadata);
784 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785 self.insert_rehydrated_task(metadata, paths, true)?;
786 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788 "recovered from inconsistent killing state on replay".to_string()
789 });
790 if reason.is_some() {
791 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792 metadata.task_id);
793 }
794 let completion_was_delivered = metadata.completion_delivered;
795 metadata = terminal_metadata_from_marker(metadata, marker, reason);
796 metadata.completion_delivered |= completion_was_delivered;
797 let _ = self.persist_task(&paths, &metadata);
798 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799 self.insert_rehydrated_task(metadata, paths, true)?;
800 } else if metadata.status == BgTaskStatus::Killing {
801 if !paths.exit.exists() {
802 let _ = write_kill_marker_if_absent(&paths.exit);
803 }
804 let completion_was_delivered = metadata.completion_delivered;
805 metadata.mark_terminal(
806 BgTaskStatus::Killed,
807 None,
808 Some("recovered from inconsistent killing state on replay".to_string()),
809 );
810 metadata.completion_delivered |= completion_was_delivered;
811 let _ = self.persist_task(&paths, &metadata);
812 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813 self.insert_rehydrated_task(metadata, paths, true)?;
814 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815 let completion_was_delivered = metadata.completion_delivered;
816 metadata.mark_terminal(
817 BgTaskStatus::Failed,
818 None,
819 Some("process exited without exit marker".to_string()),
820 );
821 metadata.completion_delivered |= completion_was_delivered;
822 let _ = self.persist_task(&paths, &metadata);
823 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824 self.insert_rehydrated_task(metadata, paths, true)?;
825 } else {
826 self.insert_rehydrated_task(metadata, paths, true)?;
827 }
828 }
829 _ if metadata.status.is_terminal() => {
830 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836 self.insert_rehydrated_task(metadata, paths, true)?;
837 }
838 _ => {}
839 }
840 }
841
842 Ok(())
843 }
844
845 fn replay_session_from_db(
846 &self,
847 session_id: &str,
848 ) -> Option<Result<Vec<PersistedTask>, String>> {
849 let pool = self
850 .inner
851 .db_pool
852 .read()
853 .ok()
854 .and_then(|slot| slot.clone())?;
855 let harness = self
856 .inner
857 .db_harness
858 .read()
859 .ok()
860 .and_then(|slot| slot.clone())?;
861 let conn = match pool.lock() {
862 Ok(conn) => conn,
863 Err(_) => return Some(Err("db mutex poisoned".to_string())),
864 };
865 Some(
866 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868 .map_err(|error| error.to_string()),
869 )
870 }
871
872 fn replay_session_from_disk(
873 &self,
874 storage_dir: &Path,
875 session_id: &str,
876 ) -> Result<Vec<PersistedTask>, String> {
877 let dir = session_tasks_dir(storage_dir, session_id);
878 if !dir.exists() {
879 return Ok(Vec::new());
880 }
881
882 let entries = fs::read_dir(&dir)
883 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884 let mut tasks = Vec::new();
885 for entry in entries.flatten() {
886 let path = entry.path();
887 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888 continue;
889 }
890 match read_task(&path) {
891 Ok(metadata) => tasks.push(metadata),
892 Err(error) => {
893 crate::slog_warn!(
894 "quarantining invalid background task metadata {} during replay: {error}",
895 path.display()
896 );
897 if let Err(quarantine_error) =
898 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899 {
900 crate::slog_warn!(
901 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902 path.display()
903 );
904 }
905 }
906 }
907 }
908 Ok(tasks)
909 }
910
911 pub fn register_watch(
912 &self,
913 task_id: String,
914 pattern: WatchPattern,
915 once: bool,
916 ) -> Result<String, &'static str> {
917 let task = self.task(&task_id).ok_or("task_not_found")?;
918 let (mode, terminal_at_registration, stdout, stderr, pty) = task
919 .state
920 .lock()
921 .map(|state| {
922 (
923 state.metadata.mode.clone(),
924 state.metadata.status.is_terminal(),
925 task.paths.stdout.clone(),
926 task.paths.stderr.clone(),
927 task.paths.pty.clone(),
928 )
929 })
930 .map_err(|_| "background_task_lock_poisoned")?;
931
932 let mut terminal_matches = Vec::new();
933 let scanned_terminal = terminal_at_registration;
934 let watch_id = {
935 let mut registry = self
936 .inner
937 .watch_registry
938 .lock()
939 .map_err(|_| "watch_registry_poisoned")?;
940 let watch_id = registry.register(task_id.clone(), pattern, once)?;
941 match &mode {
942 BgMode::Pipes => {
943 let stdout_key = format!("{task_id}:stdout");
944 let stderr_key = format!("{task_id}:stderr");
945 if terminal_at_registration {
946 registry.set_file_cursor(&stdout_key, 0);
947 registry.set_file_cursor(&stderr_key, 0);
948 terminal_matches.extend(registry.scan_file_new_bytes(
949 &stdout_key,
950 &task_id,
951 &stdout,
952 ));
953 terminal_matches.extend(registry.scan_file_new_bytes(
954 &stderr_key,
955 &task_id,
956 &stderr,
957 ));
958 } else {
959 registry.prime_file_cursor(&stdout_key, &stdout);
960 registry.prime_file_cursor(&stderr_key, &stderr);
961 }
962 }
963 BgMode::Pty => {
964 let pty_key = format!("{task_id}:pty");
965 if terminal_at_registration {
966 registry.set_file_cursor(&pty_key, 0);
967 terminal_matches
968 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
969 } else {
970 registry.prime_file_cursor(&pty_key, &pty);
971 }
972 }
973 }
974 watch_id
975 };
976
977 if task.is_terminal() {
978 if !scanned_terminal {
979 terminal_matches = {
980 let mut registry = self
981 .inner
982 .watch_registry
983 .lock()
984 .map_err(|_| "watch_registry_poisoned")?;
985 match &mode {
986 BgMode::Pipes => {
987 let stdout_key = format!("{task_id}:stdout");
988 let stderr_key = format!("{task_id}:stderr");
989 registry.set_file_cursor(&stdout_key, 0);
990 registry.set_file_cursor(&stderr_key, 0);
991 let mut matches =
992 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
993 matches.extend(registry.scan_file_new_bytes(
994 &stderr_key,
995 &task_id,
996 &stderr,
997 ));
998 matches
999 }
1000 BgMode::Pty => {
1001 let pty_key = format!("{task_id}:pty");
1002 registry.set_file_cursor(&pty_key, 0);
1003 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1004 }
1005 }
1006 };
1007 }
1008
1009 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1010 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1011 if watch_matched {
1012 let _ = task.set_completion_delivered(true, self);
1013 self.clear_task_watch_state(&task_id);
1014 }
1015 return Ok(watch_id);
1016 }
1017
1018 let completion = self
1019 .remove_pending_completion(&task_id)
1020 .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1021 if terminal_matches.is_empty() {
1022 if let Some(completion) = completion.as_ref() {
1023 self.emit_bash_watch_exit(completion);
1024 }
1025 } else {
1026 for pattern_match in terminal_matches {
1027 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1028 }
1029 }
1030 let _ = task.set_completion_delivered(true, self);
1031 self.clear_task_watch_state(&task_id);
1032 }
1033
1034 Ok(watch_id)
1035 }
1036
1037 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1038 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1039 registry.unregister(task_id, watch_id);
1040 }
1041 }
1042
1043 pub fn active_watch_count(&self, task_id: &str) -> usize {
1044 self.inner
1045 .watch_registry
1046 .lock()
1047 .map(|registry| registry.active_count(task_id))
1048 .unwrap_or(0)
1049 }
1050
1051 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1052 self.inner
1053 .watch_registry
1054 .lock()
1055 .map(|registry| {
1056 (
1057 registry.has_controlled_task(task_id),
1058 registry.has_matched_task(task_id),
1059 )
1060 })
1061 .unwrap_or((false, false))
1062 }
1063
1064 fn task_has_watch_control(&self, task_id: &str) -> bool {
1065 self.inner
1066 .watch_registry
1067 .lock()
1068 .map(|registry| registry.has_controlled_task(task_id))
1069 .unwrap_or(false)
1070 }
1071
1072 fn clear_task_watch_state(&self, task_id: &str) {
1073 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1074 registry.clear_task(task_id);
1075 }
1076 }
1077
1078 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1079 let (mode, stdout, stderr, pty) = match task.state.lock() {
1080 Ok(state) => (
1081 state.metadata.mode.clone(),
1082 task.paths.stdout.clone(),
1083 task.paths.stderr.clone(),
1084 task.paths.pty.clone(),
1085 ),
1086 Err(_) => return,
1087 };
1088 let mut matches = Vec::new();
1089 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1090 match mode {
1091 BgMode::Pipes => {
1092 let stdout_key = format!("{}:stdout", task.task_id);
1093 let stderr_key = format!("{}:stderr", task.task_id);
1094 matches.extend(registry.scan_file_new_bytes(
1095 &stdout_key,
1096 &task.task_id,
1097 &stdout,
1098 ));
1099 matches.extend(registry.scan_file_new_bytes(
1100 &stderr_key,
1101 &task.task_id,
1102 &stderr,
1103 ));
1104 }
1105 BgMode::Pty => {
1106 let pty_key = format!("{}:pty", task.task_id);
1107 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1108 }
1109 }
1110 }
1111 for pattern_match in matches {
1112 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1113 }
1114 }
1115
1116 pub fn status(
1117 &self,
1118 task_id: &str,
1119 session_id: &str,
1120 project_root: Option<&Path>,
1121 storage_dir: Option<&Path>,
1122 preview_bytes: usize,
1123 ) -> Option<BgTaskSnapshot> {
1124 let mut task = self.task_for_session(task_id, session_id);
1125 if task.is_none() {
1126 if let Some(storage_dir) = storage_dir {
1127 let _ = self.replay_session(storage_dir, session_id);
1128 task = self.task_for_session(task_id, session_id);
1129 }
1130 }
1131 let Some(task) = task else {
1132 return self.status_relaxed(
1133 task_id,
1134 session_id,
1135 project_root?,
1136 storage_dir?,
1137 preview_bytes,
1138 );
1139 };
1140 let _ = self.poll_task(&task);
1141 let mut snapshot = task.snapshot(preview_bytes);
1142 self.maybe_compress_snapshot(&task, &mut snapshot);
1143 Some(snapshot)
1144 }
1145
1146 fn status_relaxed_task(
1147 &self,
1148 task_id: &str,
1149 project_root: &Path,
1150 storage_dir: &Path,
1151 ) -> Option<Arc<BgTask>> {
1152 let canonical_project = canonicalized_path(project_root);
1153 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1154 Some(Ok(Some(metadata))) => {
1155 if let Some(task) = self.task(task_id) {
1156 let matches_project = task
1157 .state
1158 .lock()
1159 .map(|state| {
1160 state
1161 .metadata
1162 .project_root
1163 .as_deref()
1164 .map(canonicalized_path)
1165 .as_deref()
1166 == Some(canonical_project.as_path())
1167 })
1168 .unwrap_or(false);
1169 return matches_project.then_some(task);
1170 }
1171 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1172 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1173 return None;
1174 }
1175 return self.task(task_id);
1176 }
1177 Some(Ok(None)) => {
1178 crate::slog_info!(
1179 "bash task relaxed DB miss for {}; falling back to disk",
1180 task_id
1181 );
1182 }
1183 Some(Err(error)) => {
1184 crate::slog_warn!(
1185 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1186 task_id,
1187 error
1188 );
1189 }
1190 None => {
1191 crate::slog_info!(
1192 "bash task relaxed DB unavailable for {}; falling back to disk",
1193 task_id
1194 );
1195 }
1196 }
1197 let root = storage_dir.join("bash-tasks");
1198 let entries = fs::read_dir(&root).ok()?;
1199 for entry in entries.flatten() {
1200 let dir = entry.path();
1201 if !dir.is_dir() {
1202 continue;
1203 }
1204 let path = dir.join(format!("{task_id}.json"));
1205 if !path.exists() {
1206 continue;
1207 }
1208 let metadata = match read_task(&path) {
1209 Ok(metadata) => metadata,
1210 Err(error) => {
1211 crate::slog_warn!(
1212 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1213 path.display()
1214 );
1215 if let Err(quarantine_error) =
1216 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1217 {
1218 crate::slog_warn!(
1219 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1220 path.display()
1221 );
1222 }
1223 continue;
1224 }
1225 };
1226 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1227 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1228 continue;
1229 }
1230 if let Some(task) = self.task(task_id) {
1231 let matches_project = task
1232 .state
1233 .lock()
1234 .map(|state| {
1235 state
1236 .metadata
1237 .project_root
1238 .as_deref()
1239 .map(canonicalized_path)
1240 .as_deref()
1241 == Some(canonical_project.as_path())
1242 })
1243 .unwrap_or(false);
1244 return matches_project.then_some(task);
1245 }
1246 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1247 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1248 return None;
1249 }
1250 return self.task(task_id);
1251 }
1252 None
1253 }
1254
1255 fn lookup_relaxed_task_from_db(
1256 &self,
1257 task_id: &str,
1258 project_root: &Path,
1259 ) -> Option<Result<Option<PersistedTask>, String>> {
1260 let pool = self
1261 .inner
1262 .db_pool
1263 .read()
1264 .ok()
1265 .and_then(|slot| slot.clone())?;
1266 let harness = self
1267 .inner
1268 .db_harness
1269 .read()
1270 .ok()
1271 .and_then(|slot| slot.clone())?;
1272 let conn = match pool.lock() {
1273 Ok(conn) => conn,
1274 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1275 };
1276 let project_key = crate::search_index::project_cache_key(project_root);
1277 Some(
1278 crate::db::bash_tasks::find_bash_task_for_project(
1279 &conn,
1280 &harness,
1281 &project_key,
1282 task_id,
1283 )
1284 .map(|row| row.map(PersistedTask::from))
1285 .map_err(|error| error.to_string()),
1286 )
1287 }
1288
1289 pub(super) fn status_relaxed(
1290 &self,
1291 task_id: &str,
1292 _session_id: &str,
1293 project_root: &Path,
1294 storage_dir: &Path,
1295 preview_bytes: usize,
1296 ) -> Option<BgTaskSnapshot> {
1297 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1298 let _ = self.poll_task(&task);
1299 let mut snapshot = task.snapshot(preview_bytes);
1300 self.maybe_compress_snapshot(&task, &mut snapshot);
1301 Some(snapshot)
1302 }
1303
1304 pub fn kill_relaxed(
1305 &self,
1306 task_id: &str,
1307 project_root: &Path,
1308 storage_dir: &Path,
1309 ) -> Result<BgTaskSnapshot, String> {
1310 let task = self
1311 .status_relaxed_task(task_id, project_root, storage_dir)
1312 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1313 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1314 }
1315
1316 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1317 #[cfg(test)]
1318 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1319
1320 let mut deleted = 0usize;
1321
1322 let root = storage_dir.join("bash-tasks");
1323 if root.exists() {
1324 let session_dirs = fs::read_dir(&root).map_err(|e| {
1325 format!(
1326 "failed to read background task root {}: {e}",
1327 root.display()
1328 )
1329 })?;
1330 for session_entry in session_dirs.flatten() {
1331 let session_dir = session_entry.path();
1332 if !session_dir.is_dir() {
1333 continue;
1334 }
1335 let task_entries = match fs::read_dir(&session_dir) {
1336 Ok(entries) => entries,
1337 Err(error) => {
1338 crate::slog_warn!(
1339 "failed to read background task session dir {}: {error}",
1340 session_dir.display()
1341 );
1342 continue;
1343 }
1344 };
1345 for task_entry in task_entries.flatten() {
1346 let json_path = task_entry.path();
1347 if json_path
1348 .extension()
1349 .and_then(|extension| extension.to_str())
1350 != Some("json")
1351 {
1352 continue;
1353 }
1354 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1355 continue;
1356 }
1357 let metadata = match read_task(&json_path) {
1358 Ok(metadata) => metadata,
1359 Err(error) => {
1360 crate::slog_warn!(
1361 "quarantining corrupt background task metadata {}: {error}",
1362 json_path.display()
1363 );
1364 quarantine_task_json(
1365 storage_dir,
1366 &session_dir,
1367 &json_path,
1368 QuarantineKind::Corrupt,
1369 )?;
1370 continue;
1371 }
1372 };
1373 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1374 continue;
1375 }
1376 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1377 match delete_task_bundle(&paths) {
1378 Ok(()) => {
1379 deleted += 1;
1380 log::debug!(
1381 "deleted persisted background task bundle {}",
1382 metadata.task_id
1383 );
1384 }
1385 Err(error) => {
1386 crate::slog_warn!(
1387 "failed to delete background task bundle {}: {error}",
1388 metadata.task_id
1389 );
1390 continue;
1391 }
1392 }
1393 }
1394 }
1395 }
1396 gc_quarantine(storage_dir);
1397 Ok(deleted)
1398 }
1399
1400 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1401 let tasks = self
1402 .inner
1403 .tasks
1404 .lock()
1405 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1406 .unwrap_or_default();
1407 tasks
1408 .into_iter()
1409 .map(|task| {
1410 let _ = self.poll_task(&task);
1411 let mut snapshot = task.snapshot(preview_bytes);
1412 self.maybe_compress_snapshot(&task, &mut snapshot);
1413 snapshot
1414 })
1415 .collect()
1416 }
1417
1418 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1424 if !snapshot.info.status.is_terminal() {
1425 return;
1426 }
1427 let (compressed_flag, mode) = task
1428 .state
1429 .lock()
1430 .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1431 .unwrap_or((true, BgMode::Pipes));
1432 if mode == BgMode::Pty {
1433 return;
1434 }
1435 if !compressed_flag {
1436 return;
1437 }
1438 let raw = std::mem::take(&mut snapshot.output_preview);
1439 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1440 }
1441
1442 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1443 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1444 }
1445
1446 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1447 let task = self
1448 .task_for_session(task_id, session_id)
1449 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1450 let mut state = task
1451 .state
1452 .lock()
1453 .map_err(|_| "background task lock poisoned".to_string())?;
1454 let updated = self
1455 .update_task_metadata(&task.paths, |metadata| {
1456 metadata.notify_on_completion = true;
1457 metadata.completion_delivered = false;
1458 })
1459 .map_err(|e| format!("failed to promote background task: {e}"))?;
1460 state.metadata = updated;
1461 if state.metadata.status.is_terminal() {
1462 state.buffer.enforce_terminal_cap();
1463 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1464 }
1465 Ok(true)
1466 }
1467
1468 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1469 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1470 .map(|_| ())
1471 }
1472
1473 pub fn cleanup_finished(&self, older_than: Duration) {
1474 let cutoff = Instant::now().checked_sub(older_than);
1475 let removable_paths: Vec<(String, TaskPaths)> =
1476 if let Ok(mut tasks) = self.inner.tasks.lock() {
1477 let removable = tasks
1478 .iter()
1479 .filter_map(|(task_id, task)| {
1480 let delivered_terminal = task
1481 .state
1482 .lock()
1483 .map(|state| {
1484 state.metadata.status.is_terminal()
1485 && state.metadata.completion_delivered
1486 })
1487 .unwrap_or(false);
1488 if !delivered_terminal {
1489 return None;
1490 }
1491
1492 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1493 let expired = match (terminal_at, cutoff) {
1494 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1495 (Some(_), None) => true,
1496 (None, _) => false,
1497 };
1498 expired.then(|| task_id.clone())
1499 })
1500 .collect::<Vec<_>>();
1501
1502 removable
1503 .into_iter()
1504 .filter_map(|task_id| {
1505 tasks
1506 .remove(&task_id)
1507 .map(|task| (task_id, task.paths.clone()))
1508 })
1509 .collect()
1510 } else {
1511 Vec::new()
1512 };
1513
1514 for (task_id, paths) in removable_paths {
1515 match delete_task_bundle(&paths) {
1516 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1517 Err(error) => crate::slog_warn!(
1518 "failed to delete persisted background task bundle {task_id}: {error}"
1519 ),
1520 }
1521 }
1522 }
1523
1524 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1525 self.drain_completions_for_session(None)
1526 }
1527
1528 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1529 let completions = match self.inner.completions.lock() {
1530 Ok(completions) => completions,
1531 Err(_) => return Vec::new(),
1532 };
1533
1534 completions
1535 .iter()
1536 .filter(|completion| {
1537 session_id
1538 .map(|session_id| completion.session_id == session_id)
1539 .unwrap_or(true)
1540 })
1541 .cloned()
1542 .collect()
1543 }
1544
1545 pub fn ack_completions_for_session(
1546 &self,
1547 session_id: Option<&str>,
1548 task_ids: &[String],
1549 ) -> Vec<String> {
1550 if task_ids.is_empty() {
1551 return Vec::new();
1552 }
1553 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1554 let mut completion_sessions = HashMap::new();
1555 if let Ok(mut completions) = self.inner.completions.lock() {
1556 completions.retain(|completion| {
1557 let session_matches = session_id
1558 .map(|session_id| completion.session_id == session_id)
1559 .unwrap_or(true);
1560 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1561 completion_sessions
1562 .insert(completion.task_id.clone(), completion.session_id.clone());
1563 false
1564 } else {
1565 true
1566 }
1567 });
1568 }
1569
1570 let mut delivered = Vec::new();
1571 for task_id in task_ids {
1572 let task = if let Some(session_id) = session_id {
1573 self.task_for_session(task_id, session_id)
1574 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1575 self.task_for_session(task_id, completion_session_id)
1576 } else {
1577 self.task(task_id)
1578 };
1579 if let Some(task) = task {
1580 if task.set_completion_delivered(true, self).is_ok() {
1581 delivered.push(task_id.clone());
1582 }
1583 }
1584 }
1585
1586 delivered
1587 }
1588
1589 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1590 self.inner
1591 .completions
1592 .lock()
1593 .map(|completions| {
1594 completions
1595 .iter()
1596 .filter(|completion| completion.session_id == session_id)
1597 .cloned()
1598 .collect()
1599 })
1600 .unwrap_or_default()
1601 }
1602
1603 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1604 let mut completions = self.inner.completions.lock().ok()?;
1605 let idx = completions
1606 .iter()
1607 .position(|completion| completion.task_id == task_id)?;
1608 completions.remove(idx)
1609 }
1610
1611 fn completion_snapshot_for_task(
1612 &self,
1613 task: &Arc<BgTask>,
1614 preview_bytes: usize,
1615 ) -> Option<BgCompletion> {
1616 let snapshot = task.snapshot(preview_bytes);
1617 if !snapshot.info.status.is_terminal() {
1618 return None;
1619 }
1620 let output_preview = if snapshot.info.mode == BgMode::Pty {
1621 String::new()
1622 } else {
1623 let compressed = task
1624 .state
1625 .lock()
1626 .map(|state| state.metadata.compressed)
1627 .unwrap_or(true);
1628 if compressed {
1629 self.compress_output(&snapshot.info.command, snapshot.output_preview)
1630 } else {
1631 snapshot.output_preview
1632 }
1633 };
1634 Some(BgCompletion {
1635 task_id: snapshot.info.task_id,
1636 session_id: task.session_id.clone(),
1637 status: snapshot.info.status,
1638 exit_code: snapshot.exit_code,
1639 command: snapshot.info.command,
1640 output_preview,
1641 output_truncated: snapshot.output_truncated,
1642 original_tokens: None,
1643 compressed_tokens: None,
1644 tokens_skipped: false,
1645 })
1646 }
1647
1648 pub fn detach(&self) {
1649 self.inner.shutdown.store(true, Ordering::SeqCst);
1650 if let Ok(mut tasks) = self.inner.tasks.lock() {
1651 for task in tasks.values() {
1652 if let Ok(mut state) = task.state.lock() {
1653 match &mut state.runtime {
1654 TaskRuntime::Piped(child) => *child = None,
1655 TaskRuntime::Pty(runtime) => *runtime = None,
1656 }
1657 state.detached = true;
1658 }
1659 }
1660 tasks.clear();
1661 }
1662 }
1663
1664 pub fn shutdown(&self) {
1665 let tasks = self
1666 .inner
1667 .tasks
1668 .lock()
1669 .map(|tasks| {
1670 tasks
1671 .values()
1672 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1673 .collect::<Vec<_>>()
1674 })
1675 .unwrap_or_default();
1676 for (task_id, session_id) in tasks {
1677 let _ = self.kill(&task_id, &session_id);
1678 }
1679 }
1680
1681 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1682 if let Ok(state) = task.state.lock() {
1683 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1684 if !pty.exit_observed.load(Ordering::SeqCst) {
1692 return Ok(());
1693 }
1694 }
1695 }
1696 let marker = match read_exit_marker(&task.paths.exit) {
1697 Ok(Some(marker)) => marker,
1698 Ok(None) => return Ok(()),
1699 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1700 };
1701 self.finalize_from_marker(task, marker, None)
1702 }
1703
1704 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1705 let Ok(mut state) = task.state.lock() else {
1706 return;
1707 };
1708 match &mut state.runtime {
1709 TaskRuntime::Piped(child_slot) => {
1710 if let Some(child) = child_slot.as_mut() {
1711 if matches!(child.try_wait(), Ok(Some(_))) {
1712 *child_slot = None;
1713 state.detached = true;
1714 state.child_exit_observed = true;
1715 }
1716 } else if state.detached {
1717 let child_known_dead = state.child_exit_observed
1718 || state
1719 .metadata
1720 .child_pid
1721 .is_some_and(|pid| !is_process_alive(pid));
1722 if child_known_dead {
1723 self.fail_without_exit_marker_if_needed(task, &mut state);
1724 }
1725 }
1726 }
1727 TaskRuntime::Pty(Some(pty)) => {
1728 if pty.exit_observed.load(Ordering::SeqCst) {
1729 drop(state);
1730 let _ = self.poll_task(task);
1731 }
1732 }
1733 TaskRuntime::Pty(None) => {}
1734 }
1735 }
1736
1737 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1738 if state.metadata.status.is_terminal() {
1739 return;
1740 }
1741 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1742 return;
1743 }
1744 let watch_controlled = self.task_has_watch_control(&task.task_id);
1745 let updated = self.update_task_metadata(&task.paths, |metadata| {
1746 metadata.mark_terminal(
1747 BgTaskStatus::Failed,
1748 None,
1749 Some("process exited without exit marker".to_string()),
1750 );
1751 if watch_controlled {
1752 metadata.completion_delivered = true;
1753 }
1754 });
1755 if let Ok(metadata) = updated {
1756 state.pending_terminal_override = None;
1757 state.metadata = metadata;
1758 task.mark_terminal_now();
1759 state.buffer.enforce_terminal_cap();
1760 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1761 }
1762 }
1763
1764 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1765 self.inner
1766 .tasks
1767 .lock()
1768 .map(|tasks| {
1769 tasks
1770 .values()
1771 .filter(|task| task.is_running())
1772 .cloned()
1773 .collect()
1774 })
1775 .unwrap_or_default()
1776 }
1777
1778 fn insert_rehydrated_task(
1779 &self,
1780 metadata: PersistedTask,
1781 paths: TaskPaths,
1782 detached: bool,
1783 ) -> Result<(), String> {
1784 let task_id = metadata.task_id.clone();
1785 let session_id = metadata.session_id.clone();
1786 let started = started_instant_from_unix_millis(metadata.started_at);
1787 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1788 let mode = metadata.mode.clone();
1789 let task = Arc::new(BgTask {
1790 task_id: task_id.clone(),
1791 session_id,
1792 paths: paths.clone(),
1793 started,
1794 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1795 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1796 state: Mutex::new(BgTaskState {
1797 metadata,
1798 runtime: if mode == BgMode::Pty {
1799 TaskRuntime::Pty(None)
1800 } else {
1801 TaskRuntime::Piped(None)
1802 },
1803 detached,
1804 child_exit_observed: false,
1811 buffer: if mode == BgMode::Pty {
1812 BgBuffer::pty(paths.pty.clone())
1813 } else {
1814 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1815 },
1816 pending_terminal_override: None,
1817 }),
1818 });
1819 self.inner
1820 .tasks
1821 .lock()
1822 .map_err(|_| "background task registry lock poisoned".to_string())?
1823 .insert(task_id, task);
1824 Ok(())
1825 }
1826
1827 fn kill_with_status(
1828 &self,
1829 task_id: &str,
1830 session_id: &str,
1831 terminal_status: BgTaskStatus,
1832 ) -> Result<BgTaskSnapshot, String> {
1833 let task = self
1834 .task_for_session(task_id, session_id)
1835 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1836
1837 {
1838 let mut state = task
1839 .state
1840 .lock()
1841 .map_err(|_| "background task lock poisoned".to_string())?;
1842 if state.metadata.status.is_terminal() {
1843 state.pending_terminal_override = None;
1844 return Ok(task.snapshot_locked(&state, 5 * 1024));
1845 }
1846
1847 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1848 state.metadata =
1849 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1850 if self.task_has_watch_control(&task.task_id) {
1851 state.metadata.completion_delivered = true;
1852 }
1853 state.pending_terminal_override = None;
1854 task.mark_terminal_now();
1855 match &mut state.runtime {
1856 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
1863 TaskRuntime::Pty(runtime) => *runtime = None,
1864 }
1865 state.detached = true;
1866 state.buffer.enforce_terminal_cap();
1867 self.persist_task(&task.paths, &state.metadata)
1868 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1869 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1870 return Ok(task.snapshot_locked(&state, 5 * 1024));
1871 }
1872
1873 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1874 if !was_already_killing {
1875 state.metadata.status = BgTaskStatus::Killing;
1876 self.persist_task(&task.paths, &state.metadata)
1877 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1878 }
1879
1880 #[cfg(unix)]
1881 let pgid = state.metadata.pgid;
1882 #[cfg(windows)]
1883 let child_pid = state.metadata.child_pid;
1884 if !was_already_killing
1885 && state.metadata.mode == BgMode::Pty
1886 && terminal_status == BgTaskStatus::TimedOut
1887 {
1888 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1889 }
1890
1891 #[cfg(windows)]
1892 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
1893
1894 match &mut state.runtime {
1895 TaskRuntime::Piped(child_slot) => {
1896 #[cfg(unix)]
1897 if let Some(pgid) = pgid {
1898 terminate_pgid(pgid, child_slot.as_mut());
1899 }
1900 #[cfg(windows)]
1901 if let Some(child) = child_slot.as_mut() {
1902 super::process::terminate_process(child);
1903 } else if let Some(pid) = child_pid {
1904 terminate_pid(pid);
1905 }
1906 if let Some(child) = child_slot.as_mut() {
1907 let _ = child.wait();
1908 }
1909 *child_slot = None;
1910 state.detached = true;
1911
1912 if !task.paths.exit.exists() {
1913 write_kill_marker_if_absent(&task.paths.exit)
1914 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1915 }
1916
1917 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1918 Some(124)
1919 } else {
1920 None
1921 };
1922 state
1923 .metadata
1924 .mark_terminal(terminal_status, exit_code, None);
1925 if self.task_has_watch_control(&task.task_id) {
1926 state.metadata.completion_delivered = true;
1927 }
1928 state.pending_terminal_override = None;
1929 task.mark_terminal_now();
1930 self.persist_task(&task.paths, &state.metadata)
1931 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1932 state.buffer.enforce_terminal_cap();
1933 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1934 }
1935 TaskRuntime::Pty(Some(pty)) => {
1936 pty.was_killed.store(true, Ordering::SeqCst);
1937 if let Err(error) = pty.killer.kill() {
1938 crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1939 }
1940 if let Some(pid) = pty.child_pid {
1941 #[cfg(unix)]
1942 terminate_pgid(pid as i32, None);
1943 #[cfg(windows)]
1944 terminate_pid(pid);
1945 }
1946 drop(pty.master.take());
1947
1948 #[cfg(windows)]
1949 {
1950 let default_status = if terminal_status == BgTaskStatus::TimedOut {
1951 BgTaskStatus::TimedOut
1952 } else {
1953 BgTaskStatus::Killed
1954 };
1955 pty_forced_terminal_status = Some(
1956 state
1957 .pending_terminal_override
1958 .take()
1959 .unwrap_or(default_status),
1960 );
1961 }
1962 }
1963 TaskRuntime::Pty(None) => {}
1964 }
1965
1966 #[cfg(windows)]
1967 if let Some(target_status) = pty_forced_terminal_status {
1968 if !task.paths.exit.exists() {
1969 write_kill_marker_if_absent(&task.paths.exit)
1970 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1971 }
1972
1973 let exit_code = if target_status == BgTaskStatus::TimedOut {
1974 Some(124)
1975 } else {
1976 None
1977 };
1978 state.metadata.mark_terminal(target_status, exit_code, None);
1979 if self.task_has_watch_control(&task.task_id) {
1980 state.metadata.completion_delivered = true;
1981 }
1982 state.pending_terminal_override = None;
1983 task.mark_terminal_now();
1984 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
1985 *runtime = None;
1986 }
1987 state.detached = true;
1988 self.persist_task(&task.paths, &state.metadata)
1989 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
1990 state.buffer.enforce_terminal_cap();
1991 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1992 }
1993 }
1994
1995 Ok(task.snapshot(5 * 1024))
1996 }
1997
1998 fn finalize_from_marker(
1999 &self,
2000 task: &Arc<BgTask>,
2001 marker: ExitMarker,
2002 reason: Option<String>,
2003 ) -> Result<(), String> {
2004 let watch_controlled = self.task_has_watch_control(&task.task_id);
2005 let mut pty_reader_done = None;
2006 {
2007 let mut state = task
2008 .state
2009 .lock()
2010 .map_err(|_| "background task lock poisoned".to_string())?;
2011 if state.metadata.status.is_terminal() {
2012 state.pending_terminal_override = None;
2013 return Ok(());
2014 }
2015
2016 let pending_override = state.pending_terminal_override.take();
2017 let is_pty = state.metadata.mode == BgMode::Pty;
2018 let updated = self
2019 .update_task_metadata(&task.paths, |metadata| {
2020 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2021 let mut metadata = metadata.clone();
2022 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2023 let exit_code = if target_status == BgTaskStatus::TimedOut {
2024 Some(124)
2025 } else {
2026 None
2027 };
2028 metadata.mark_terminal(target_status, exit_code, reason);
2029 metadata
2030 } else {
2031 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2032 };
2033 if watch_controlled {
2034 new_metadata.completion_delivered = true;
2035 }
2036 *metadata = new_metadata;
2037 })
2038 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2039 state.metadata = updated;
2040 task.mark_terminal_now();
2041 match &mut state.runtime {
2042 TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
2047 TaskRuntime::Pty(runtime) => {
2048 pty_reader_done = runtime
2049 .as_ref()
2050 .map(|runtime| Arc::clone(&runtime.reader_done));
2051 *runtime = None;
2052 }
2053 }
2054 state.detached = true;
2055 }
2056
2057 if let Some(reader_done) = pty_reader_done {
2058 let deadline = Instant::now() + Duration::from_millis(200);
2059 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2060 std::thread::sleep(Duration::from_millis(10));
2061 }
2062 }
2063
2064 self.scan_task_watch_output(task);
2067
2068 let mut state = task
2069 .state
2070 .lock()
2071 .map_err(|_| "background task lock poisoned".to_string())?;
2072 state.buffer.enforce_terminal_cap();
2073 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
2074 Ok(())
2075 }
2076
2077 fn enqueue_completion_if_needed(
2078 &self,
2079 metadata: &PersistedTask,
2080 paths: Option<&TaskPaths>,
2081 emit_frame: bool,
2082 ) {
2083 if metadata.status.is_terminal() && !metadata.completion_delivered {
2084 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
2085 }
2086 }
2087
2088 fn enqueue_completion_locked(
2089 &self,
2090 metadata: &PersistedTask,
2091 buffer: Option<&BgBuffer>,
2092 emit_frame: bool,
2093 ) {
2094 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
2095 }
2096
2097 fn enqueue_completion_from_parts(
2098 &self,
2099 metadata: &PersistedTask,
2100 buffer: Option<&BgBuffer>,
2101 paths: Option<&TaskPaths>,
2102 emit_frame: bool,
2103 ) {
2104 if !metadata.status.is_terminal() {
2115 return;
2116 }
2117 let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2122 (String::new(), false)
2123 } else {
2124 match buffer {
2125 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2126 None => paths
2127 .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2128 .unwrap_or_else(|| (String::new(), false)),
2129 }
2130 };
2131 let output_preview = if metadata.compressed {
2136 self.compress_output(&metadata.command, raw_preview)
2137 } else {
2138 raw_preview
2139 };
2140 let token_counts = self.completion_token_counts(metadata, buffer, paths);
2141 let completion = BgCompletion {
2142 task_id: metadata.task_id.clone(),
2143 session_id: metadata.session_id.clone(),
2144 status: metadata.status.clone(),
2145 exit_code: metadata.exit_code,
2146 command: metadata.command.clone(),
2147 output_preview,
2148 output_truncated,
2149 original_tokens: token_counts.original_tokens,
2150 compressed_tokens: token_counts.compressed_tokens,
2151 tokens_skipped: token_counts.tokens_skipped,
2152 };
2153
2154 self.record_compression_event_if_applicable(metadata, &token_counts);
2165
2166 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2167 if watch_controlled {
2168 if emit_frame && !watch_matched {
2169 self.emit_bash_watch_exit(&completion);
2170 }
2171 self.clear_task_watch_state(&metadata.task_id);
2172 return;
2173 }
2174
2175 if metadata.completion_delivered {
2185 return;
2186 }
2187
2188 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2191 if completions
2192 .iter()
2193 .any(|existing| existing.task_id == metadata.task_id)
2194 {
2195 false
2196 } else {
2197 completions.push_back(completion.clone());
2198 true
2199 }
2200 } else {
2201 false
2202 };
2203
2204 if pushed && emit_frame {
2205 self.emit_bash_completed(completion);
2206 }
2207 }
2208
2209 fn record_compression_event_if_applicable(
2210 &self,
2211 metadata: &PersistedTask,
2212 token_counts: &CompletionTokenCounts,
2213 ) {
2214 if metadata.mode == BgMode::Pty {
2215 return;
2216 }
2217
2218 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2219 token_counts.original_tokens,
2220 token_counts.compressed_tokens,
2221 token_counts.original_bytes,
2222 token_counts.compressed_bytes,
2223 ) {
2224 (
2225 Some(original_tokens),
2226 Some(compressed_tokens),
2227 Some(original_bytes),
2228 Some(compressed_bytes),
2229 ) => (
2230 original_tokens,
2231 compressed_tokens,
2232 original_bytes,
2233 compressed_bytes,
2234 ),
2235 _ => {
2236 crate::slog_warn!(
2237 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2238 metadata.task_id
2239 );
2240 return;
2241 }
2242 };
2243
2244 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2245 let Some(pool) = pool else {
2246 crate::slog_warn!(
2247 "compression event skipped for {}: db_pool not initialized — was configure run?",
2248 metadata.task_id
2249 );
2250 return;
2251 };
2252 let harness = self
2253 .inner
2254 .db_harness
2255 .read()
2256 .ok()
2257 .and_then(|slot| slot.clone());
2258 let Some(harness) = harness else {
2259 crate::slog_warn!(
2260 "compression event insert skipped for {}: harness not configured",
2261 metadata.task_id
2262 );
2263 return;
2264 };
2265
2266 let project_root = metadata
2267 .project_root
2268 .as_deref()
2269 .unwrap_or(&metadata.workdir);
2270 let project_key = crate::search_index::project_cache_key(project_root);
2271 let row = crate::db::compression_events::CompressionEventRow {
2272 harness: &harness,
2273 session_id: Some(&metadata.session_id),
2274 project_key: &project_key,
2275 tool: "bash",
2276 task_id: Some(&metadata.task_id),
2277 command: Some(&metadata.command),
2278 compressor: if metadata.compressed {
2279 "registry"
2280 } else {
2281 "none"
2282 },
2283 original_bytes,
2284 compressed_bytes,
2285 original_tokens,
2286 compressed_tokens,
2287 created_at: unix_millis() as i64,
2288 };
2289
2290 let conn = match pool.lock() {
2291 Ok(conn) => conn,
2292 Err(_) => {
2293 crate::slog_warn!(
2294 "compression event insert failed for {}: db mutex poisoned",
2295 metadata.task_id
2296 );
2297 return;
2298 }
2299 };
2300 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2301 Ok(_) => {
2302 crate::slog_debug!(
2306 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2307 metadata.task_id,
2308 project_key,
2309 metadata.session_id,
2310 original_tokens,
2311 compressed_tokens
2312 );
2313 }
2314 Err(error) => {
2315 crate::slog_warn!(
2316 "compression event insert failed for {}: {}",
2317 metadata.task_id,
2318 error
2319 );
2320 }
2321 }
2322 }
2323
2324 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2325 let Ok(progress_sender) = self
2326 .inner
2327 .progress_sender
2328 .lock()
2329 .map(|sender| sender.clone())
2330 else {
2331 return;
2332 };
2333 if let Some(sender) = progress_sender.as_ref() {
2334 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2335 pattern_match.task_id,
2336 session_id.to_string(),
2337 pattern_match.watch_id,
2338 pattern_match.match_text,
2339 pattern_match.match_offset,
2340 pattern_match.context,
2341 pattern_match.once,
2342 )));
2343 }
2344 }
2345
2346 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2347 let Ok(progress_sender) = self
2348 .inner
2349 .progress_sender
2350 .lock()
2351 .map(|sender| sender.clone())
2352 else {
2353 return;
2354 };
2355 let Some(sender) = progress_sender.as_ref() else {
2356 return;
2357 };
2358 let status = completion_status_text(&completion.status, completion.exit_code);
2359 let preview = completion.output_preview.trim_end();
2360 let context = if preview.is_empty() {
2361 format!("task {} exited ({status})", completion.task_id)
2362 } else {
2363 format!(
2364 "task {} exited ({status})
2365{preview}",
2366 completion.task_id
2367 )
2368 };
2369 sender(PushFrame::BashPatternMatch(
2370 BashPatternMatchFrame::task_exit(
2371 completion.task_id.clone(),
2372 completion.session_id.clone(),
2373 format!("exited ({status})"),
2374 context,
2375 ),
2376 ));
2377 }
2378
2379 fn emit_bash_completed(&self, completion: BgCompletion) {
2380 let Ok(progress_sender) = self
2381 .inner
2382 .progress_sender
2383 .lock()
2384 .map(|sender| sender.clone())
2385 else {
2386 return;
2387 };
2388 let Some(sender) = progress_sender.as_ref() else {
2389 return;
2390 };
2391 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2399 completion.task_id,
2400 completion.session_id,
2401 completion.status,
2402 completion.exit_code,
2403 completion.command,
2404 completion.output_preview,
2405 completion.output_truncated,
2406 completion.original_tokens,
2407 completion.compressed_tokens,
2408 completion.tokens_skipped,
2409 )));
2410 }
2411
2412 fn completion_token_counts(
2413 &self,
2414 metadata: &PersistedTask,
2415 buffer: Option<&BgBuffer>,
2416 paths: Option<&TaskPaths>,
2417 ) -> CompletionTokenCounts {
2418 if metadata.mode == BgMode::Pty {
2419 return CompletionTokenCounts::skipped();
2420 }
2421
2422 let raw = match buffer {
2423 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2424 None => paths
2425 .map(|paths| {
2426 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2427 })
2428 .unwrap_or(TokenCountInput::Skipped),
2429 };
2430
2431 let TokenCountInput::Text(raw_output) = raw else {
2432 return CompletionTokenCounts::skipped();
2433 };
2434
2435 let original_tokens = token_count_u32(&raw_output);
2436 let original_bytes = raw_output.len() as i64;
2437 let compressed_output = if metadata.compressed {
2438 self.compress_output(&metadata.command, raw_output)
2439 } else {
2440 raw_output
2441 };
2442 let compressed_tokens = token_count_u32(&compressed_output);
2443 let compressed_bytes = compressed_output.len() as i64;
2444 CompletionTokenCounts {
2445 original_tokens: Some(original_tokens),
2446 compressed_tokens: Some(compressed_tokens),
2447 original_bytes: Some(original_bytes),
2448 compressed_bytes: Some(compressed_bytes),
2449 tokens_skipped: false,
2450 }
2451 }
2452
2453 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2454 if !self
2455 .inner
2456 .long_running_reminder_enabled
2457 .load(Ordering::SeqCst)
2458 {
2459 return;
2460 }
2461 let interval_ms = self
2462 .inner
2463 .long_running_reminder_interval_ms
2464 .load(Ordering::SeqCst);
2465 if interval_ms == 0 {
2466 return;
2467 }
2468 let interval = Duration::from_millis(interval_ms);
2469 let now = Instant::now();
2470 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2471 return;
2472 };
2473 let since = last_reminder_at.unwrap_or(task.started);
2474 if now.duration_since(since) < interval {
2475 return;
2476 }
2477 let command = task
2478 .state
2479 .lock()
2480 .map(|state| state.metadata.command.clone())
2481 .unwrap_or_default();
2482 *last_reminder_at = Some(now);
2483 self.emit_bash_long_running(BashLongRunningFrame::new(
2484 task.task_id.clone(),
2485 task.session_id.clone(),
2486 command,
2487 task.started.elapsed().as_millis() as u64,
2488 ));
2489 }
2490
2491 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2492 let Ok(progress_sender) = self
2493 .inner
2494 .progress_sender
2495 .lock()
2496 .map(|sender| sender.clone())
2497 else {
2498 return;
2499 };
2500 if let Some(sender) = progress_sender.as_ref() {
2501 sender(PushFrame::BashLongRunning(frame));
2502 }
2503 }
2504
2505 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2506 self.inner
2507 .tasks
2508 .lock()
2509 .ok()
2510 .and_then(|tasks| tasks.get(task_id).cloned())
2511 }
2512
2513 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2514 self.task(task_id)
2515 .filter(|task| task.session_id == session_id)
2516 }
2517
2518 fn running_count(&self) -> usize {
2519 self.inner
2520 .tasks
2521 .lock()
2522 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2523 .unwrap_or(0)
2524 }
2525
2526 fn start_watchdog(&self) {
2527 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2528 super::watchdog::start(self.clone());
2529 }
2530 }
2531
2532 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2533 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2534 }
2535
2536 #[cfg(test)]
2537 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2538 self.task_for_session(task_id, session_id)
2539 .map(|task| task.paths.json.clone())
2540 }
2541
2542 #[cfg(test)]
2543 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2544 self.task_for_session(task_id, session_id)
2545 .map(|task| task.paths.exit.clone())
2546 }
2547
2548 fn generate_unique_task_id(&self) -> Result<String, String> {
2550 for _ in 0..32 {
2551 let candidate = random_slug();
2552 let tasks = self
2553 .inner
2554 .tasks
2555 .lock()
2556 .map_err(|_| "background task registry lock poisoned".to_string())?;
2557 if tasks.contains_key(&candidate) {
2558 continue;
2559 }
2560 let completions = self
2561 .inner
2562 .completions
2563 .lock()
2564 .map_err(|_| "background completions lock poisoned".to_string())?;
2565 if completions
2566 .iter()
2567 .any(|completion| completion.task_id == candidate)
2568 {
2569 continue;
2570 }
2571 return Ok(candidate);
2572 }
2573 Err("failed to allocate unique background task id after 32 attempts".to_string())
2574 }
2575}
2576
2577struct CompletionTokenCounts {
2578 original_tokens: Option<u32>,
2579 compressed_tokens: Option<u32>,
2580 original_bytes: Option<i64>,
2581 compressed_bytes: Option<i64>,
2582 tokens_skipped: bool,
2583}
2584
2585impl CompletionTokenCounts {
2586 fn skipped() -> Self {
2587 Self {
2588 original_tokens: None,
2589 compressed_tokens: None,
2590 original_bytes: None,
2591 compressed_bytes: None,
2592 tokens_skipped: true,
2593 }
2594 }
2595}
2596
2597fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2598 match status {
2599 BgTaskStatus::TimedOut => "timed out".to_string(),
2600 BgTaskStatus::Killed => "killed".to_string(),
2601 _ => exit_code
2602 .map(|code| format!("exit {code}"))
2603 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2604 }
2605}
2606
2607fn token_count_u32(text: &str) -> u32 {
2608 aft_tokenizer::count_tokens(text)
2609 .try_into()
2610 .unwrap_or(u32::MAX)
2611}
2612
2613impl Default for BgTaskRegistry {
2614 fn default() -> Self {
2615 Self::new(Arc::new(Mutex::new(None)))
2616 }
2617}
2618
2619fn modified_within(path: &Path, grace: Duration) -> bool {
2620 fs::metadata(path)
2621 .and_then(|metadata| metadata.modified())
2622 .ok()
2623 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2624 .map(|age| age < grace)
2625 .unwrap_or(false)
2626}
2627
2628fn canonicalized_path(path: &Path) -> PathBuf {
2629 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2630}
2631
2632fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2633 let now_ms = SystemTime::now()
2634 .duration_since(UNIX_EPOCH)
2635 .ok()
2636 .map(|duration| duration.as_millis() as u64)
2637 .unwrap_or(started_at);
2638 let elapsed_ms = now_ms.saturating_sub(started_at);
2639 Instant::now()
2640 .checked_sub(Duration::from_millis(elapsed_ms))
2641 .unwrap_or_else(Instant::now)
2642}
2643
2644fn gc_quarantine(storage_dir: &Path) {
2645 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2646 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2647 return;
2648 };
2649 for session_entry in session_dirs.flatten() {
2650 let session_quarantine_dir = session_entry.path();
2651 if !session_quarantine_dir.is_dir() {
2652 continue;
2653 }
2654 let entries = match fs::read_dir(&session_quarantine_dir) {
2655 Ok(entries) => entries,
2656 Err(error) => {
2657 crate::slog_warn!(
2658 "failed to read background task quarantine dir {}: {error}",
2659 session_quarantine_dir.display()
2660 );
2661 continue;
2662 }
2663 };
2664 for entry in entries.flatten() {
2665 let path = entry.path();
2666 if modified_within(&path, QUARANTINE_GC_GRACE) {
2667 continue;
2668 }
2669 let result = if path.is_dir() {
2670 fs::remove_dir_all(&path)
2671 } else {
2672 fs::remove_file(&path)
2673 };
2674 match result {
2675 Ok(()) => log::debug!(
2676 "deleted old background task quarantine entry {}",
2677 path.display()
2678 ),
2679 Err(error) => crate::slog_warn!(
2680 "failed to delete old background task quarantine entry {}: {error}",
2681 path.display()
2682 ),
2683 }
2684 }
2685 let _ = fs::remove_dir(&session_quarantine_dir);
2686 }
2687 let _ = fs::remove_dir(&quarantine_root);
2688}
2689
2690enum QuarantineKind {
2691 Corrupt,
2692 Invalid,
2693}
2694
2695fn quarantine_task_json(
2696 storage_dir: &Path,
2697 session_dir: &Path,
2698 json_path: &Path,
2699 kind: QuarantineKind,
2700) -> Result<(), String> {
2701 let session_hash = session_dir
2702 .file_name()
2703 .and_then(|name| name.to_str())
2704 .ok_or_else(|| {
2705 format!(
2706 "invalid background task session dir: {}",
2707 session_dir.display()
2708 )
2709 })?;
2710 let task_name = json_path
2711 .file_name()
2712 .and_then(|name| name.to_str())
2713 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2714 let unix_ts = SystemTime::now()
2715 .duration_since(UNIX_EPOCH)
2716 .map(|duration| duration.as_secs())
2717 .unwrap_or(0);
2718 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2719 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2720 format!(
2721 "failed to create background task quarantine dir {}: {e}",
2722 quarantine_dir.display()
2723 )
2724 })?;
2725 let target_name = quarantine_name(task_name, unix_ts, &kind);
2726 let target = quarantine_dir.join(target_name);
2727 fs::rename(json_path, &target).map_err(|e| {
2728 format!(
2729 "failed to quarantine background task metadata {} to {}: {e}",
2730 json_path.display(),
2731 target.display()
2732 )
2733 })?;
2734
2735 for sibling in task_sibling_paths(json_path) {
2736 if !sibling.exists() {
2737 continue;
2738 }
2739 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2740 crate::slog_warn!(
2741 "skipping background task sibling with invalid name during quarantine: {}",
2742 sibling.display()
2743 );
2744 continue;
2745 };
2746 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2747 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2748 crate::slog_warn!(
2749 "failed to quarantine background task sibling {} to {}: {error}",
2750 sibling.display(),
2751 sibling_target.display()
2752 );
2753 }
2754 }
2755
2756 let _ = fs::remove_dir(session_dir);
2757 Ok(())
2758}
2759
2760fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2761 match kind {
2762 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2763 QuarantineKind::Invalid => {
2764 let path = Path::new(file_name);
2765 let stem = path.file_stem().and_then(|stem| stem.to_str());
2766 let extension = path.extension().and_then(|extension| extension.to_str());
2767 match (stem, extension) {
2768 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2769 _ => format!("{file_name}.invalid.{unix_ts}"),
2770 }
2771 }
2772 }
2773}
2774
2775fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2776 let Some(parent) = json_path.parent() else {
2777 return Vec::new();
2778 };
2779 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2780 return Vec::new();
2781 };
2782 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2783 .into_iter()
2784 .map(|extension| parent.join(format!("{stem}.{extension}")))
2785 .collect()
2786}
2787
2788fn read_tail_from_disk(
2789 metadata: &PersistedTask,
2790 paths: &TaskPaths,
2791 max_bytes: usize,
2792) -> (String, bool) {
2793 if metadata.mode == BgMode::Pty {
2794 return read_file_tail_capped(&paths.pty, max_bytes)
2795 .map(|bytes| {
2796 let truncated = fs::metadata(&paths.pty)
2797 .map(|metadata| metadata.len() > max_bytes as u64)
2798 .unwrap_or(false);
2799 (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2800 })
2801 .unwrap_or_else(|_| (String::new(), false));
2802 }
2803 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2804 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2805 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2806 bytes.extend_from_slice(&stdout);
2807 bytes.extend_from_slice(&stderr);
2808 if bytes.len() <= max_bytes {
2809 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2810 }
2811 let start = bytes.len().saturating_sub(max_bytes);
2812 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2813}
2814
2815fn read_for_token_count_from_disk(
2816 metadata: &PersistedTask,
2817 paths: &TaskPaths,
2818 max_bytes_per_stream: usize,
2819) -> TokenCountInput {
2820 if metadata.mode == BgMode::Pty {
2821 return TokenCountInput::Skipped;
2822 }
2823 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2830 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2831 match (stdout, stderr) {
2832 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2833 String::from_utf8_lossy(&stdout).as_ref(),
2834 String::from_utf8_lossy(&stderr).as_ref(),
2835 )),
2836 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2837 String::from_utf8_lossy(&stdout).as_ref(),
2838 "",
2839 )),
2840 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2841 "",
2842 String::from_utf8_lossy(&stderr).as_ref(),
2843 )),
2844 (Err(_), Err(_)) => TokenCountInput::Skipped,
2845 }
2846}
2847
2848fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2853 use std::io::{Read, Seek, SeekFrom};
2854 let mut file = std::fs::File::open(path)?;
2855 let len = file.metadata()?.len();
2856 let read_len = len.min(max_bytes as u64);
2857 if read_len > 0 && len > max_bytes as u64 {
2858 file.seek(SeekFrom::End(-(read_len as i64)))?;
2859 }
2860 let mut bytes = Vec::with_capacity(read_len as usize);
2861 file.read_to_end(&mut bytes)?;
2862 Ok(bytes)
2863}
2864
2865impl BgTask {
2866 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2867 let state = self
2868 .state
2869 .lock()
2870 .unwrap_or_else(|poison| poison.into_inner());
2871 self.snapshot_locked(&state, preview_bytes)
2872 }
2873
2874 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2875 let metadata = &state.metadata;
2876 let duration_ms = metadata.duration_ms.or_else(|| {
2877 metadata
2878 .status
2879 .is_terminal()
2880 .then(|| self.started.elapsed().as_millis() as u64)
2881 });
2882 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2883 (String::new(), false)
2884 } else {
2885 state.buffer.read_tail(preview_bytes)
2886 };
2887 BgTaskSnapshot {
2888 info: BgTaskInfo {
2889 task_id: self.task_id.clone(),
2890 status: metadata.status.clone(),
2891 command: metadata.command.clone(),
2892 mode: metadata.mode.clone(),
2893 started_at: metadata.started_at,
2894 duration_ms,
2895 },
2896 exit_code: metadata.exit_code,
2897 child_pid: metadata.child_pid,
2898 workdir: metadata.workdir.display().to_string(),
2899 output_preview,
2900 output_truncated,
2901 output_path: state
2902 .buffer
2903 .output_path()
2904 .map(|path| path.display().to_string()),
2905 stderr_path: state
2906 .buffer
2907 .stderr_path()
2908 .map(|path| path.display().to_string()),
2909 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2910 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2911 }
2912 }
2913
2914 pub(crate) fn is_running(&self) -> bool {
2915 self.state
2916 .lock()
2917 .map(|state| {
2918 state.metadata.status == BgTaskStatus::Running
2919 || (state.metadata.mode == BgMode::Pty
2920 && state.metadata.status == BgTaskStatus::Killing)
2921 })
2922 .unwrap_or(false)
2923 }
2924
2925 fn is_terminal(&self) -> bool {
2926 self.state
2927 .lock()
2928 .map(|state| state.metadata.status.is_terminal())
2929 .unwrap_or(false)
2930 }
2931
2932 fn mark_terminal_now(&self) {
2933 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2934 if terminal_at.is_none() {
2935 *terminal_at = Some(Instant::now());
2936 }
2937 }
2938 }
2939
2940 fn set_completion_delivered(
2941 &self,
2942 delivered: bool,
2943 registry: &BgTaskRegistry,
2944 ) -> Result<(), String> {
2945 let mut state = self
2946 .state
2947 .lock()
2948 .map_err(|_| "background task lock poisoned".to_string())?;
2949 let updated = registry
2950 .update_task_metadata(&self.paths, |metadata| {
2951 metadata.completion_delivered = delivered;
2952 })
2953 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2954 state.metadata = updated;
2955 Ok(())
2956 }
2957}
2958
2959#[cfg(unix)]
2980fn reap_piped_child(child_slot: &mut Option<Child>) {
2981 if let Some(mut child) = child_slot.take() {
2982 if matches!(child.try_wait(), Ok(None)) {
2983 let _ = child.wait();
2984 }
2985 }
2986}
2987
2988#[cfg(windows)]
2993fn reap_piped_child(child_slot: &mut Option<Child>) {
2994 *child_slot = None;
2995}
2996
2997fn terminal_metadata_from_marker(
2998 mut metadata: PersistedTask,
2999 marker: ExitMarker,
3000 reason: Option<String>,
3001) -> PersistedTask {
3002 match marker {
3003 ExitMarker::Code(code) => {
3004 let status = if code == 0 {
3005 BgTaskStatus::Completed
3006 } else {
3007 BgTaskStatus::Failed
3008 };
3009 metadata.mark_terminal(status, Some(code), reason);
3010 }
3011 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
3012 }
3013 metadata
3014}
3015
3016#[cfg(unix)]
3017fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
3018 let shell = resolve_posix_shell();
3019 let mut cmd = Command::new(&shell);
3020 cmd.arg("-c")
3021 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
3022 .arg(&shell)
3023 .arg(command)
3024 .arg(exit_path);
3025 unsafe {
3026 cmd.pre_exec(|| {
3027 if libc::setsid() == -1 {
3028 return Err(std::io::Error::last_os_error());
3029 }
3030 Ok(())
3031 });
3032 }
3033 cmd
3034}
3035
3036#[cfg(unix)]
3037fn resolve_posix_shell() -> PathBuf {
3038 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
3039 POSIX_SHELL
3040 .get_or_init(|| {
3041 std::env::var_os("BASH")
3042 .filter(|value| !value.is_empty())
3043 .map(PathBuf::from)
3044 .filter(|path| path.exists())
3045 .or_else(|| which::which("bash").ok())
3046 .or_else(|| which::which("zsh").ok())
3047 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3048 })
3049 .clone()
3050}
3051
3052#[cfg(windows)]
3053fn detached_shell_command_for(
3054 shell: crate::windows_shell::WindowsShell,
3055 command: &str,
3056 exit_path: &Path,
3057 paths: &TaskPaths,
3058 creation_flags: u32,
3059) -> Result<Command, String> {
3060 use crate::windows_shell::WindowsShell;
3061 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3074 let wrapper_ext = match shell {
3075 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3076 WindowsShell::Cmd => "bat",
3077 WindowsShell::Posix(_) => "sh",
3081 };
3082 let wrapper_path = paths.dir.join(format!(
3083 "{}.{}",
3084 paths
3085 .json
3086 .file_stem()
3087 .and_then(|s| s.to_str())
3088 .unwrap_or("wrapper"),
3089 wrapper_ext
3090 ));
3091 fs::write(&wrapper_path, wrapper_body)
3092 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3093
3094 let mut cmd = Command::new(shell.binary().as_ref());
3095 match shell {
3096 WindowsShell::Pwsh | WindowsShell::Powershell => {
3097 cmd.args([
3100 "-NoLogo",
3101 "-NoProfile",
3102 "-NonInteractive",
3103 "-ExecutionPolicy",
3104 "Bypass",
3105 "-File",
3106 ]);
3107 cmd.arg(&wrapper_path);
3108 }
3109 WindowsShell::Cmd => {
3110 cmd.args(["/D", "/C"]);
3117 cmd.arg(&wrapper_path);
3118 }
3119 WindowsShell::Posix(_) => {
3120 cmd.arg(&wrapper_path);
3125 }
3126 }
3127
3128 cmd.creation_flags(creation_flags);
3132 Ok(cmd)
3133}
3134
3135fn spawn_detached_child(
3151 command: &str,
3152 paths: &TaskPaths,
3153 workdir: &Path,
3154 env: &HashMap<String, String>,
3155) -> Result<std::process::Child, String> {
3156 #[cfg(not(windows))]
3157 {
3158 let stdout = create_capture_file(&paths.stdout)
3159 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3160 let stderr = create_capture_file(&paths.stderr)
3161 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3162 detached_shell_command(command, &paths.exit)
3163 .current_dir(workdir)
3164 .envs(env)
3165 .stdin(Stdio::null())
3166 .stdout(Stdio::from(stdout))
3167 .stderr(Stdio::from(stderr))
3168 .spawn()
3169 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3170 }
3171 #[cfg(windows)]
3172 {
3173 use crate::windows_shell::shell_candidates;
3174 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3185 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3206 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3207 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3208 let with_breakaway =
3209 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3210 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3211 let mut last_error: Option<String> = None;
3212 for (idx, shell) in candidates.iter().enumerate() {
3213 for &flags in &[with_breakaway, without_breakaway] {
3217 let stdout = create_capture_file(&paths.stdout)
3219 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3220 let stderr = create_capture_file(&paths.stderr)
3221 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3222 let mut cmd =
3223 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3224 cmd.current_dir(workdir)
3225 .envs(env)
3226 .stdin(Stdio::null())
3227 .stdout(Stdio::from(stdout))
3228 .stderr(Stdio::from(stderr));
3229 match cmd.spawn() {
3230 Ok(child) => {
3231 if idx > 0 {
3232 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3233 the cached PATH probe disagreed with runtime spawn — likely PATH \
3234 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3235 shell.binary(),
3236 idx);
3237 }
3238 if flags == without_breakaway {
3239 crate::slog_warn!(
3240 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3241 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3242 Spawned without breakaway; the bg task will be torn down if the \
3243 AFT process group is killed."
3244 );
3245 }
3246 return Ok(child);
3247 }
3248 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3249 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3250 shell.binary());
3251 last_error = Some(format!("{}: {e}", shell.binary()));
3252 break;
3255 }
3256 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3257 crate::slog_warn!(
3259 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3260 Access Denied — retrying {} without breakaway",
3261 shell.binary()
3262 );
3263 last_error = Some(format!("{}: {e}", shell.binary()));
3264 continue;
3265 }
3266 Err(e) => {
3267 return Err(format!(
3268 "failed to spawn background bash command via {}: {e}",
3269 shell.binary()
3270 ));
3271 }
3272 }
3273 }
3274 }
3275 Err(format!(
3276 "failed to spawn background bash command: no Windows shell could be spawned. \
3277 Last error: {}. PATH-probed candidates: {:?}",
3278 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3279 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3280 ))
3281 }
3282}
3283
3284fn random_slug() -> String {
3285 let mut bytes = [0u8; 4];
3286 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3288 let t = SystemTime::now()
3290 .duration_since(UNIX_EPOCH)
3291 .map(|d| d.subsec_nanos())
3292 .unwrap_or(0);
3293 let p = std::process::id();
3294 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3295 });
3296 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3298 format!("bash-{hex}")
3299}
3300
3301#[cfg(test)]
3302mod tests {
3303 use std::collections::HashMap;
3304 #[cfg(windows)]
3305 use std::fs;
3306 use std::sync::{Arc, Mutex};
3307 use std::time::Duration;
3308 #[cfg(windows)]
3309 use std::time::Instant;
3310
3311 use super::*;
3312
3313 #[cfg(unix)]
3314 const QUICK_SUCCESS_COMMAND: &str = "true";
3315 #[cfg(windows)]
3316 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3317
3318 #[cfg(unix)]
3319 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3320 #[cfg(windows)]
3321 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3322
3323 #[test]
3324 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3325 let registry = BgTaskRegistry::default();
3326 let dir = tempfile::tempdir().unwrap();
3327 let task_id = registry
3328 .spawn_pty(
3329 QUICK_SUCCESS_COMMAND,
3330 "session".to_string(),
3331 dir.path().to_path_buf(),
3332 HashMap::new(),
3333 Some(Duration::from_secs(30)),
3334 dir.path().to_path_buf(),
3335 10,
3336 true,
3337 false,
3338 Some(dir.path().to_path_buf()),
3339 50,
3340 120,
3341 )
3342 .unwrap();
3343
3344 let paths = task_paths(dir.path(), "session", &task_id);
3345 let metadata = read_task(&paths.json).unwrap();
3346 assert_eq!(
3347 metadata.schema_version,
3348 crate::bash_background::persistence::SCHEMA_VERSION
3349 );
3350 assert_eq!(metadata.mode, BgMode::Pty);
3351 assert_eq!(metadata.pty_rows, Some(50));
3352 assert_eq!(metadata.pty_cols, Some(120));
3353
3354 let snapshot = registry
3355 .status(&task_id, "session", None, Some(dir.path()), 1024)
3356 .unwrap();
3357 assert_eq!(snapshot.pty_rows, Some(50));
3358 assert_eq!(snapshot.pty_cols, Some(120));
3359 }
3360
3361 fn spawn_dead_child() -> std::process::Child {
3366 #[cfg(unix)]
3367 let mut cmd = std::process::Command::new("true");
3368 #[cfg(windows)]
3369 let mut cmd = {
3370 let mut c = std::process::Command::new("cmd");
3371 c.args(["/c", "exit", "0"]);
3372 c
3373 };
3374 cmd.stdin(std::process::Stdio::null());
3375 cmd.stdout(std::process::Stdio::null());
3376 cmd.stderr(std::process::Stdio::null());
3377 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3378 let started = Instant::now();
3387 loop {
3388 match child.try_wait() {
3389 Ok(Some(_)) => break,
3390 Ok(None) => {
3391 if started.elapsed() > Duration::from_secs(5) {
3392 panic!("dead-child stand-in did not exit within 5s");
3393 }
3394 std::thread::sleep(Duration::from_millis(10));
3395 }
3396 Err(error) => panic!("dead-child try_wait failed: {error}"),
3397 }
3398 }
3399 child
3400 }
3401
3402 #[test]
3403 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3404 let registry = BgTaskRegistry::default();
3405 let dir = tempfile::tempdir().unwrap();
3406 let task_id = registry
3407 .spawn(
3408 LONG_RUNNING_COMMAND,
3409 "session".to_string(),
3410 dir.path().to_path_buf(),
3411 HashMap::new(),
3412 Some(Duration::from_secs(30)),
3413 dir.path().to_path_buf(),
3414 10,
3415 true,
3416 false,
3417 Some(dir.path().to_path_buf()),
3418 )
3419 .unwrap();
3420 registry
3421 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3422 .unwrap();
3423 assert_eq!(
3424 registry
3425 .drain_completions_for_session(Some("session"))
3426 .len(),
3427 1
3428 );
3429
3430 registry.inner.completions.lock().unwrap().clear();
3433
3434 assert_eq!(
3435 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3436 vec![task_id.clone()]
3437 );
3438 assert!(registry
3439 .drain_completions_for_session(Some("session"))
3440 .is_empty());
3441
3442 let paths = task_paths(dir.path(), "session", &task_id);
3443 let metadata = read_task(&paths.json).unwrap();
3444 assert!(metadata.completion_delivered);
3445
3446 let replayed = BgTaskRegistry::default();
3447 replayed
3448 .replay_session_inner(dir.path(), "session", None)
3449 .unwrap();
3450 assert!(replayed
3451 .drain_completions_for_session(Some("session"))
3452 .is_empty());
3453 }
3454
3455 #[test]
3456 fn register_watch_rejects_unknown_task() {
3457 let registry = BgTaskRegistry::default();
3458
3459 let result = registry.register_watch(
3460 "missing-task".to_string(),
3461 WatchPattern::Substring("READY".into()),
3462 true,
3463 );
3464
3465 assert_eq!(result, Err("task_not_found"));
3466 }
3467
3468 #[test]
3469 fn register_watch_on_terminal_task_scans_existing_output() {
3470 let frames = Arc::new(Mutex::new(Vec::new()));
3471 let captured = Arc::clone(&frames);
3472 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
3473 captured.lock().unwrap().push(frame);
3474 })
3475 as Box<dyn Fn(PushFrame) + Send + Sync>);
3476 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
3477 let dir = tempfile::tempdir().unwrap();
3478 let task_id = registry
3479 .spawn(
3480 LONG_RUNNING_COMMAND,
3481 "session".to_string(),
3482 dir.path().to_path_buf(),
3483 HashMap::new(),
3484 Some(Duration::from_secs(30)),
3485 dir.path().to_path_buf(),
3486 10,
3487 true,
3488 false,
3489 Some(dir.path().to_path_buf()),
3490 )
3491 .unwrap();
3492 registry
3493 .inner
3494 .shutdown
3495 .store(true, std::sync::atomic::Ordering::SeqCst);
3496 let task = registry.task_for_session(&task_id, "session").unwrap();
3497 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
3498 registry
3499 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3500 .unwrap();
3501 frames.lock().unwrap().clear();
3502 registry.inner.completions.lock().unwrap().clear();
3503
3504 registry
3505 .register_watch(
3506 task_id.clone(),
3507 WatchPattern::Substring("READY".into()),
3508 true,
3509 )
3510 .unwrap();
3511
3512 let frames = frames.lock().unwrap();
3513 let frame = frames
3514 .iter()
3515 .find_map(|frame| match frame {
3516 PushFrame::BashPatternMatch(frame) => Some(frame),
3517 _ => None,
3518 })
3519 .expect("terminal watch registration should emit pattern frame");
3520 assert_eq!(frame.reason, "pattern_match");
3521 assert_eq!(frame.task_id, task_id);
3522 assert_eq!(frame.session_id, "session");
3523 assert_eq!(frame.match_text, "READY");
3524 assert_eq!(frame.match_offset, 0);
3525 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
3526 let metadata = read_task(&task.paths.json).unwrap();
3527 assert!(metadata.completion_delivered);
3528 }
3529
3530 #[test]
3531 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3532 let registry = BgTaskRegistry::default();
3533 let dir = tempfile::tempdir().unwrap();
3534 let task_id = registry
3535 .spawn(
3536 QUICK_SUCCESS_COMMAND,
3537 "session".to_string(),
3538 dir.path().to_path_buf(),
3539 HashMap::new(),
3540 Some(Duration::from_secs(30)),
3541 dir.path().to_path_buf(),
3542 10,
3543 true,
3544 false,
3545 Some(dir.path().to_path_buf()),
3546 )
3547 .unwrap();
3548 registry
3549 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3550 .unwrap();
3551 let completions = registry.drain_completions_for_session(Some("session"));
3552 assert_eq!(completions.len(), 1);
3553 assert_eq!(
3554 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3555 vec![task_id.clone()]
3556 );
3557
3558 registry.cleanup_finished(Duration::ZERO);
3559
3560 assert!(registry.inner.tasks.lock().unwrap().is_empty());
3561 }
3562
3563 #[test]
3564 fn cleanup_finished_retains_undelivered_terminals() {
3565 let registry = BgTaskRegistry::default();
3566 let dir = tempfile::tempdir().unwrap();
3567 let task_id = registry
3568 .spawn(
3569 QUICK_SUCCESS_COMMAND,
3570 "session".to_string(),
3571 dir.path().to_path_buf(),
3572 HashMap::new(),
3573 Some(Duration::from_secs(30)),
3574 dir.path().to_path_buf(),
3575 10,
3576 true,
3577 false,
3578 Some(dir.path().to_path_buf()),
3579 )
3580 .unwrap();
3581 registry
3582 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3583 .unwrap();
3584
3585 registry.cleanup_finished(Duration::ZERO);
3586
3587 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3588 }
3589
3590 #[test]
3598 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3599 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3600 let dir = tempfile::tempdir().unwrap();
3601 let task_id = registry
3602 .spawn(
3603 QUICK_SUCCESS_COMMAND,
3604 "session".to_string(),
3605 dir.path().to_path_buf(),
3606 HashMap::new(),
3607 Some(Duration::from_secs(30)),
3608 dir.path().to_path_buf(),
3609 10,
3610 true,
3611 false,
3612 Some(dir.path().to_path_buf()),
3613 )
3614 .unwrap();
3615
3616 let task = registry.task_for_session(&task_id, "session").unwrap();
3617
3618 let started = Instant::now();
3623 loop {
3624 let exited = {
3625 let mut state = task.state.lock().unwrap();
3626 match &mut state.runtime {
3627 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3628 _ => true,
3629 }
3630 };
3631 if exited {
3632 break;
3633 }
3634 assert!(
3635 started.elapsed() < Duration::from_secs(5),
3636 "child should exit quickly"
3637 );
3638 std::thread::sleep(Duration::from_millis(20));
3639 }
3640
3641 registry
3649 .inner
3650 .shutdown
3651 .store(true, std::sync::atomic::Ordering::SeqCst);
3652 std::thread::sleep(Duration::from_millis(550));
3656
3657 let _ = std::fs::remove_file(&task.paths.exit);
3660
3661 {
3676 let mut state = task.state.lock().unwrap();
3677 state.metadata.status = BgTaskStatus::Running;
3678 state.metadata.status_reason = None;
3679 state.metadata.exit_code = None;
3680 state.metadata.finished_at = None;
3681 state.metadata.duration_ms = None;
3682 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3685 .expect("persist reset Running metadata for reap_child test");
3686 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3690 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3691 }
3692 }
3693 *task.terminal_at.lock().unwrap() = None;
3696
3697 assert!(
3700 task.is_running(),
3701 "precondition: metadata.status == Running"
3702 );
3703 assert!(
3704 !task.paths.exit.exists(),
3705 "precondition: exit marker absent"
3706 );
3707
3708 registry.reap_child(&task);
3713
3714 {
3715 let state = task.state.lock().unwrap();
3716 assert_eq!(
3717 state.metadata.status,
3718 BgTaskStatus::Running,
3719 "first reap must leave status Running while waiting one pass for marker"
3720 );
3721 assert_eq!(
3722 state.metadata.status_reason, None,
3723 "first reap must not record a failure reason"
3724 );
3725 assert!(
3726 matches!(state.runtime, TaskRuntime::Piped(None)),
3727 "child handle must be released after first reap"
3728 );
3729 assert!(
3730 state.detached,
3731 "task must be marked detached after first reap"
3732 );
3733 }
3734
3735 registry.reap_child(&task);
3739
3740 let state = task.state.lock().unwrap();
3741 assert!(
3742 state.metadata.status.is_terminal(),
3743 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3744 state.metadata.status
3745 );
3746 assert_eq!(
3747 state.metadata.status,
3748 BgTaskStatus::Failed,
3749 "must specifically be Failed (not Killed): status={:?}",
3750 state.metadata.status
3751 );
3752 assert_eq!(
3753 state.metadata.status_reason.as_deref(),
3754 Some("process exited without exit marker"),
3755 "reason must match replay path's wording: {:?}",
3756 state.metadata.status_reason
3757 );
3758 assert!(
3759 matches!(state.runtime, TaskRuntime::Piped(None)),
3760 "child handle must stay released after second reap"
3761 );
3762 assert!(
3763 state.detached,
3764 "task must remain detached after second reap"
3765 );
3766 }
3767
3768 #[test]
3773 fn reap_child_preserves_running_when_exit_marker_exists() {
3774 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3775 let dir = tempfile::tempdir().unwrap();
3776 let task_id = registry
3777 .spawn(
3778 QUICK_SUCCESS_COMMAND,
3779 "session".to_string(),
3780 dir.path().to_path_buf(),
3781 HashMap::new(),
3782 Some(Duration::from_secs(30)),
3783 dir.path().to_path_buf(),
3784 10,
3785 true,
3786 false,
3787 Some(dir.path().to_path_buf()),
3788 )
3789 .unwrap();
3790
3791 let task = registry.task_for_session(&task_id, "session").unwrap();
3792
3793 let started = Instant::now();
3796 loop {
3797 let exited = {
3798 let mut state = task.state.lock().unwrap();
3799 match &mut state.runtime {
3800 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3801 _ => true,
3802 }
3803 };
3804 if exited && task.paths.exit.exists() {
3805 break;
3806 }
3807 assert!(
3808 started.elapsed() < Duration::from_secs(5),
3809 "child should exit and write marker quickly"
3810 );
3811 std::thread::sleep(Duration::from_millis(20));
3812 }
3813
3814 registry
3820 .inner
3821 .shutdown
3822 .store(true, std::sync::atomic::Ordering::SeqCst);
3823 std::thread::sleep(Duration::from_millis(550));
3824
3825 {
3831 let mut state = task.state.lock().unwrap();
3832 state.metadata.status = BgTaskStatus::Running;
3833 state.metadata.status_reason = None;
3834 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3835 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3836 }
3837 }
3838 *task.terminal_at.lock().unwrap() = None;
3839 if !task.paths.exit.exists() {
3842 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3843 }
3844
3845 registry.reap_child(&task);
3849
3850 let state = task.state.lock().unwrap();
3851 assert!(
3852 matches!(state.runtime, TaskRuntime::Piped(None)),
3853 "child handle still released even when marker exists"
3854 );
3855 assert!(
3856 state.detached,
3857 "task still marked detached even when marker exists"
3858 );
3859 assert_eq!(
3864 state.metadata.status,
3865 BgTaskStatus::Running,
3866 "reap_child must defer to poll_task when marker exists"
3867 );
3868 }
3869
3870 #[cfg(unix)]
3874 fn pid_stat(pid: u32) -> Option<String> {
3875 let output = std::process::Command::new("ps")
3876 .args(["-o", "stat=", "-p", &pid.to_string()])
3877 .output()
3878 .ok()?;
3879 if !output.status.success() {
3880 return None;
3881 }
3882 let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
3883 if stat.is_empty() {
3884 None
3885 } else {
3886 Some(stat)
3887 }
3888 }
3889
3890 #[cfg(unix)]
3892 fn is_zombie(pid: u32) -> bool {
3893 pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
3894 }
3895
3896 #[cfg(unix)]
3902 fn spawn_unreaped_zombie() -> std::process::Child {
3903 let child = std::process::Command::new("true")
3904 .stdin(std::process::Stdio::null())
3905 .stdout(std::process::Stdio::null())
3906 .stderr(std::process::Stdio::null())
3907 .spawn()
3908 .expect("spawn zombie stand-in");
3909 let pid = child.id();
3910 let started = Instant::now();
3911 while !is_zombie(pid) {
3912 assert!(
3913 started.elapsed() < Duration::from_secs(5),
3914 "stand-in child should become a zombie within 5s"
3915 );
3916 std::thread::sleep(Duration::from_millis(10));
3917 }
3918 child
3920 }
3921
3922 #[cfg(unix)]
3932 #[test]
3933 fn finalize_from_marker_reaps_child_no_zombie() {
3934 use std::sync::atomic::Ordering;
3935
3936 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3937 let dir = tempfile::tempdir().unwrap();
3938 let task_id = registry
3939 .spawn(
3940 QUICK_SUCCESS_COMMAND,
3941 "session".to_string(),
3942 dir.path().to_path_buf(),
3943 HashMap::new(),
3944 Some(Duration::from_secs(30)),
3945 dir.path().to_path_buf(),
3946 10,
3947 true,
3948 false,
3949 Some(dir.path().to_path_buf()),
3950 )
3951 .unwrap();
3952
3953 registry.inner.shutdown.store(true, Ordering::SeqCst);
3957 std::thread::sleep(Duration::from_millis(550));
3958
3959 let task = registry.task_for_session(&task_id, "session").unwrap();
3960
3961 let started = Instant::now();
3965 while !task.paths.exit.exists() {
3966 assert!(
3967 started.elapsed() < Duration::from_secs(5),
3968 "exit marker should land quickly for `true`"
3969 );
3970 std::thread::sleep(Duration::from_millis(20));
3971 }
3972
3973 let zombie_pid;
3979 {
3980 let mut state = task.state.lock().unwrap();
3981 state.metadata.status = BgTaskStatus::Running;
3982 state.metadata.status_reason = None;
3983 state.metadata.exit_code = None;
3984 state.metadata.finished_at = None;
3985 state.metadata.duration_ms = None;
3986 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3987 .expect("persist reset Running metadata");
3988 let zombie = spawn_unreaped_zombie();
3989 zombie_pid = zombie.id();
3990 state.runtime = TaskRuntime::Piped(Some(zombie));
3991 }
3992 *task.terminal_at.lock().unwrap() = None;
3993
3994 assert!(
3996 is_zombie(zombie_pid),
3997 "precondition: stand-in child {zombie_pid} must be a zombie before finalize"
3998 );
3999
4000 registry.poll_task(&task).unwrap();
4003
4004 {
4005 let state = task.state.lock().unwrap();
4006 assert!(
4007 matches!(state.runtime, TaskRuntime::Piped(None)),
4008 "child handle must be released after marker finalize"
4009 );
4010 assert!(
4011 state.metadata.status.is_terminal(),
4012 "task must be terminal after marker finalize: {:?}",
4013 state.metadata.status
4014 );
4015 }
4016
4017 assert!(
4020 !is_zombie(zombie_pid),
4021 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
4022 after the exit-marker terminal transition"
4023 );
4024 }
4025
4026 #[cfg(unix)]
4030 #[test]
4031 fn kill_with_existing_marker_reaps_child_no_zombie() {
4032 use std::sync::atomic::Ordering;
4033
4034 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4035 let dir = tempfile::tempdir().unwrap();
4036 let task_id = registry
4037 .spawn(
4038 QUICK_SUCCESS_COMMAND,
4039 "session".to_string(),
4040 dir.path().to_path_buf(),
4041 HashMap::new(),
4042 Some(Duration::from_secs(30)),
4043 dir.path().to_path_buf(),
4044 10,
4045 true,
4046 false,
4047 Some(dir.path().to_path_buf()),
4048 )
4049 .unwrap();
4050
4051 registry.inner.shutdown.store(true, Ordering::SeqCst);
4052 std::thread::sleep(Duration::from_millis(550));
4053
4054 let task = registry.task_for_session(&task_id, "session").unwrap();
4055
4056 let started = Instant::now();
4057 while !task.paths.exit.exists() {
4058 assert!(
4059 started.elapsed() < Duration::from_secs(5),
4060 "exit marker should land quickly for `true`"
4061 );
4062 std::thread::sleep(Duration::from_millis(20));
4063 }
4064
4065 let zombie_pid;
4066 {
4067 let mut state = task.state.lock().unwrap();
4068 state.metadata.status = BgTaskStatus::Running;
4069 state.metadata.status_reason = None;
4070 state.metadata.exit_code = None;
4071 state.metadata.finished_at = None;
4072 state.metadata.duration_ms = None;
4073 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
4074 .expect("persist reset Running metadata");
4075 let zombie = spawn_unreaped_zombie();
4076 zombie_pid = zombie.id();
4077 state.runtime = TaskRuntime::Piped(Some(zombie));
4078 }
4079 *task.terminal_at.lock().unwrap() = None;
4080
4081 assert!(
4082 is_zombie(zombie_pid),
4083 "precondition: stand-in child {zombie_pid} must be a zombie before kill"
4084 );
4085
4086 registry
4088 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
4089 .expect("kill should succeed");
4090
4091 {
4092 let state = task.state.lock().unwrap();
4093 assert!(
4094 matches!(state.runtime, TaskRuntime::Piped(None)),
4095 "child handle must be released after marker-aware kill"
4096 );
4097 assert!(state.metadata.status.is_terminal());
4098 }
4099
4100 assert!(
4101 !is_zombie(zombie_pid),
4102 "issue #91 regression: child {zombie_pid} left as <defunct> zombie \
4103 after a marker-aware kill"
4104 );
4105 }
4106
4107 #[test]
4108 fn cleanup_finished_keeps_running_tasks() {
4109 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4110 let dir = tempfile::tempdir().unwrap();
4111 let task_id = registry
4112 .spawn(
4113 LONG_RUNNING_COMMAND,
4114 "session".to_string(),
4115 dir.path().to_path_buf(),
4116 HashMap::new(),
4117 Some(Duration::from_secs(30)),
4118 dir.path().to_path_buf(),
4119 10,
4120 true,
4121 false,
4122 Some(dir.path().to_path_buf()),
4123 )
4124 .unwrap();
4125
4126 registry.cleanup_finished(Duration::ZERO);
4127
4128 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
4129 let _ = registry.kill(&task_id, "session");
4130 }
4131
4132 #[cfg(windows)]
4133 fn wait_for_file(path: &Path) -> String {
4134 let started = Instant::now();
4135 loop {
4136 if path.exists() {
4137 return fs::read_to_string(path).expect("read file");
4138 }
4139 assert!(
4140 started.elapsed() < Duration::from_secs(30),
4141 "timed out waiting for {}",
4142 path.display()
4143 );
4144 std::thread::sleep(Duration::from_millis(100));
4145 }
4146 }
4147
4148 #[cfg(windows)]
4149 fn spawn_windows_registry_command(
4150 command: &str,
4151 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
4152 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
4153 let dir = tempfile::tempdir().unwrap();
4154 let task_id = registry
4155 .spawn(
4156 command,
4157 "session".to_string(),
4158 dir.path().to_path_buf(),
4159 HashMap::new(),
4160 Some(Duration::from_secs(30)),
4161 dir.path().to_path_buf(),
4162 10,
4163 false,
4164 false,
4165 Some(dir.path().to_path_buf()),
4166 )
4167 .unwrap();
4168 (registry, dir, task_id)
4169 }
4170
4171 #[cfg(windows)]
4172 #[test]
4173 fn windows_spawn_writes_exit_marker_for_zero_exit() {
4174 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
4175 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
4176
4177 let content = wait_for_file(&exit_path);
4178
4179 assert_eq!(content.trim(), "0");
4180 }
4181
4182 #[cfg(windows)]
4183 #[test]
4184 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
4185 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
4186 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
4187
4188 let content = wait_for_file(&exit_path);
4189
4190 assert_eq!(content.trim(), "42");
4191 }
4192
4193 #[cfg(windows)]
4194 #[test]
4195 fn windows_spawn_captures_stdout_to_disk() {
4196 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
4197 let task = registry.task_for_session(&task_id, "session").unwrap();
4198 let stdout_path = task.paths.stdout.clone();
4199 let exit_path = task.paths.exit.clone();
4200
4201 let _ = wait_for_file(&exit_path);
4202 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
4203
4204 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
4205 }
4206
4207 #[cfg(windows)]
4208 #[test]
4209 fn windows_spawn_uses_pwsh_when_available() {
4210 let candidates = crate::windows_shell::shell_candidates_with(
4214 |binary| match binary {
4215 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
4216 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
4217 _ => None,
4218 },
4219 || None,
4220 );
4221 let shell = candidates.first().expect("at least one candidate").clone();
4222 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
4223 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
4224 }
4225
4226 #[cfg(windows)]
4233 #[test]
4234 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
4235 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
4236 let script =
4237 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
4238
4239 assert!(
4243 script.contains("set CODE=%ERRORLEVEL%"),
4244 "wrapper must capture exit code into CODE: {script}"
4245 );
4246 assert!(
4247 script.contains("echo %CODE% >"),
4248 "wrapper must echo CODE to a temp marker file: {script}"
4249 );
4250 assert!(
4251 script.contains("move /Y"),
4252 "wrapper must use atomic move to write the marker: {script}"
4253 );
4254 assert!(
4257 script.contains("> nul"),
4258 "wrapper must redirect move output to nul: {script}"
4259 );
4260 assert!(
4262 script.contains("exit /B %CODE%"),
4263 "wrapper must propagate the captured exit code: {script}"
4264 );
4265 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
4266 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
4267 }
4268
4269 #[cfg(windows)]
4275 #[test]
4276 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
4277 use crate::windows_shell::WindowsShell;
4278 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
4279 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
4280 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
4281 assert_eq!(
4282 args_strs,
4283 vec!["/D", "/S", "/C", "echo wrapped"],
4284 "Cmd::bg_command must prepend /D /S /C"
4285 );
4286 }
4287
4288 #[cfg(windows)]
4292 #[test]
4293 fn windows_shell_pwsh_bg_command_uses_standard_args() {
4294 use crate::windows_shell::WindowsShell;
4295 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
4296 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
4297 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
4298 assert!(
4299 args_strs.contains(&"-Command"),
4300 "Pwsh::bg_command must use -Command: {args_strs:?}"
4301 );
4302 assert!(
4303 args_strs.contains(&"Get-Date"),
4304 "Pwsh::bg_command must include the user command body"
4305 );
4306 }
4307
4308 #[allow(dead_code)]
4339 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
4341}