1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28 ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62 pub task_id: String,
63 #[serde(skip_serializing)]
66 pub session_id: String,
67 pub status: BgTaskStatus,
68 pub exit_code: Option<i32>,
69 pub command: String,
70 #[serde(default, skip_serializing_if = "String::is_empty")]
76 pub output_preview: String,
77 #[serde(default, skip_serializing_if = "is_false")]
82 pub output_truncated: bool,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub original_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub compressed_tokens: Option<u32>,
91 #[serde(default, skip_serializing_if = "is_false")]
93 pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97 !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102 #[serde(flatten)]
103 pub info: BgTaskInfo,
104 pub exit_code: Option<i32>,
105 pub child_pid: Option<u32>,
106 pub workdir: String,
107 pub output_preview: String,
108 pub output_truncated: bool,
109 pub output_path: Option<String>,
110 pub stderr_path: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_rows: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119 pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125 pub(crate) progress_sender: SharedProgressSender,
126 watchdog_started: AtomicBool,
127 pub(crate) shutdown: AtomicBool,
128 pub(crate) long_running_reminder_enabled: AtomicBool,
129 pub(crate) long_running_reminder_interval_ms: AtomicU64,
130 persisted_gc_started: AtomicBool,
131 #[cfg(test)]
132 persisted_gc_runs: AtomicU64,
133 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140 pub(crate) db_harness: RwLock<Option<String>>,
141 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143 pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147 pub(crate) task_id: String,
148 pub(crate) session_id: String,
149 pub(crate) paths: TaskPaths,
150 pub(crate) started: Instant,
151 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152 pub(crate) terminal_at: Mutex<Option<Instant>>,
153 pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157 Piped(Option<Child>),
158 Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162 pub(crate) metadata: PersistedTask,
163 pub(crate) runtime: TaskRuntime,
164 pub(crate) detached: bool,
165 pub(crate) child_exit_observed: bool,
176 pub(crate) buffer: BgBuffer,
177 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182 pub fn new(progress_sender: SharedProgressSender) -> Self {
183 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184 Self {
185 inner: Arc::new(RegistryInner {
186 tasks: Mutex::new(HashMap::new()),
187 completions: Mutex::new(VecDeque::new()),
188 progress_sender,
189 watchdog_started: AtomicBool::new(false),
190 shutdown: AtomicBool::new(false),
191 long_running_reminder_enabled: AtomicBool::new(true),
192 long_running_reminder_interval_ms: AtomicU64::new(600_000),
193 persisted_gc_started: AtomicBool::new(false),
194 #[cfg(test)]
195 persisted_gc_runs: AtomicU64::new(0),
196 compressor: Mutex::new(None),
197 db_pool: RwLock::new(None),
198 db_harness: RwLock::new(None),
199 wake_tx,
200 wake_rx,
201 watch_registry: Mutex::new(WatchRegistry::default()),
202 }),
203 }
204 }
205
206 pub fn set_harness(&self, harness: Harness) {
207 if let Ok(mut slot) = self.inner.db_harness.write() {
208 *slot = Some(harness.as_str().to_string());
209 }
210 }
211
212 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213 if let Ok(mut slot) = self.inner.db_pool.write() {
214 *slot = Some(conn);
215 }
216 }
217
218 pub fn clear_db_pool(&self) {
219 if let Ok(mut slot) = self.inner.db_pool.write() {
220 *slot = None;
221 }
222 }
223
224 pub fn set_compressor<F>(&self, compressor: F)
229 where
230 F: Fn(&str, String) -> String + Send + Sync + 'static,
231 {
232 if let Ok(mut slot) = self.inner.compressor.lock() {
233 *slot = Some(Box::new(compressor));
234 }
235 }
236
237 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240 let Ok(slot) = self.inner.compressor.lock() else {
241 return output;
242 };
243 match slot.as_ref() {
244 Some(compressor) => compressor(command, output),
245 None => output,
246 }
247 }
248
249 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250 write_task(&paths.json, metadata)?;
251 self.dual_write_task(paths, metadata);
252 Ok(())
253 }
254
255 fn update_task_metadata<F>(
256 &self,
257 paths: &TaskPaths,
258 update: F,
259 ) -> std::io::Result<PersistedTask>
260 where
261 F: FnOnce(&mut PersistedTask),
262 {
263 let metadata = update_task(&paths.json, update)?;
264 self.dual_write_task(paths, &metadata);
265 Ok(metadata)
266 }
267
268 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270 let Some(pool) = pool else {
271 return;
272 };
273 let harness = self
274 .inner
275 .db_harness
276 .read()
277 .ok()
278 .and_then(|slot| slot.clone());
279 let Some(harness) = harness else {
280 crate::slog_warn!(
281 "dual-write bash_task to DB skipped for {}: harness not configured",
282 metadata.task_id
283 );
284 return;
285 };
286 let row = match metadata.to_bash_task_row(&harness, paths) {
287 Ok(row) => row,
288 Err(error) => {
289 crate::slog_warn!(
290 "dual-write bash_task to DB failed for {}: {}",
291 metadata.task_id,
292 error
293 );
294 return;
295 }
296 };
297 let conn = match pool.lock() {
298 Ok(conn) => conn,
299 Err(_) => {
300 crate::slog_warn!(
301 "dual-write bash_task to DB failed for {}: db mutex poisoned",
302 metadata.task_id
303 );
304 return;
305 }
306 };
307 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308 crate::slog_warn!(
309 "dual-write bash_task to DB failed for {}: {}",
310 metadata.task_id,
311 error
312 );
313 }
314 }
315
316 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317 self.inner
318 .long_running_reminder_enabled
319 .store(enabled, Ordering::SeqCst);
320 self.inner
321 .long_running_reminder_interval_ms
322 .store(interval_ms, Ordering::SeqCst);
323 }
324
325 #[cfg(unix)]
326 #[allow(clippy::too_many_arguments)]
327 pub fn spawn(
328 &self,
329 command: &str,
330 session_id: String,
331 workdir: PathBuf,
332 env: HashMap<String, String>,
333 timeout: Option<Duration>,
334 storage_dir: PathBuf,
335 max_running: usize,
336 notify_on_completion: bool,
337 compressed: bool,
338 project_root: Option<PathBuf>,
339 ) -> Result<String, String> {
340 self.start_watchdog();
341
342 let running = self.running_count();
343 if running >= max_running {
344 return Err(format!(
345 "background bash task limit exceeded: {running} running (max {max_running})"
346 ));
347 }
348
349 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351 let task_id = self.generate_unique_task_id()?;
352 let paths = task_paths(&storage_dir, &session_id, &task_id);
353 fs::create_dir_all(&paths.dir)
354 .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356 let mut metadata = PersistedTask::starting(
357 task_id.clone(),
358 session_id.clone(),
359 command.to_string(),
360 workdir.clone(),
361 project_root,
362 timeout_ms,
363 notify_on_completion,
364 compressed,
365 );
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369 create_capture_file(&paths.stdout)
373 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374 create_capture_file(&paths.stderr)
375 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378 Ok(child) => child,
379 Err(error) => {
380 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381 let _ = delete_task_bundle(&paths);
382 return Err(error);
383 }
384 };
385
386 let child_pid = child.id();
387 metadata.mark_running(child_pid, child_pid as i32);
388 self.persist_task(&paths, &metadata)
389 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391 let task = Arc::new(BgTask {
392 task_id: task_id.clone(),
393 session_id,
394 paths: paths.clone(),
395 started: Instant::now(),
396 last_reminder_at: Mutex::new(None),
397 terminal_at: Mutex::new(None),
398 state: Mutex::new(BgTaskState {
399 metadata,
400 runtime: TaskRuntime::Piped(Some(child)),
401 detached: false,
402 child_exit_observed: false,
403 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404 pending_terminal_override: None,
405 }),
406 });
407
408 self.inner
409 .tasks
410 .lock()
411 .map_err(|_| "background task registry lock poisoned".to_string())?
412 .insert(task_id.clone(), task);
413
414 Ok(task_id)
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 pub fn spawn_pty(
419 &self,
420 command: &str,
421 session_id: String,
422 workdir: PathBuf,
423 env: HashMap<String, String>,
424 timeout: Option<Duration>,
425 storage_dir: PathBuf,
426 max_running: usize,
427 notify_on_completion: bool,
428 compressed: bool,
429 project_root: Option<PathBuf>,
430 rows: u16,
431 cols: u16,
432 ) -> Result<String, String> {
433 self.start_watchdog();
434
435 let running = self.running_count();
436 if running >= max_running {
437 return Err(format!(
438 "background bash task limit exceeded: {running} running (max {max_running})"
439 ));
440 }
441
442 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444 let task_id = self.generate_unique_task_id()?;
445 let paths = task_paths(&storage_dir, &session_id, &task_id);
446 fs::create_dir_all(&paths.dir)
447 .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449 let mut metadata = PersistedTask::starting(
450 task_id.clone(),
451 session_id.clone(),
452 command.to_string(),
453 workdir.clone(),
454 project_root,
455 timeout_ms,
456 notify_on_completion,
457 compressed,
458 );
459 metadata.mode = BgMode::Pty;
460 metadata.pty_rows = Some(rows);
461 metadata.pty_cols = Some(cols);
462 self.persist_task(&paths, &metadata)
463 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464 create_capture_file(&paths.pty)
465 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467 let runtime = match spawn_pty_for_command(
468 &task_id,
469 &session_id,
470 command,
471 &paths,
472 &workdir,
473 &env,
474 rows,
475 cols,
476 self.inner.wake_tx.clone(),
477 ) {
478 Ok(runtime) => runtime,
479 Err(error) => {
480 crate::slog_warn!(
481 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482 );
483 let _ = delete_task_bundle(&paths);
484 return Err(error);
485 }
486 };
487
488 if let Some(child_pid) = runtime.child_pid {
489 metadata.mark_running(child_pid, child_pid as i32);
490 } else {
491 metadata.status = BgTaskStatus::Running;
492 metadata.pgid = None;
493 }
494 self.persist_task(&paths, &metadata)
495 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497 let task = Arc::new(BgTask {
498 task_id: task_id.clone(),
499 session_id,
500 paths: paths.clone(),
501 started: Instant::now(),
502 last_reminder_at: Mutex::new(None),
503 terminal_at: Mutex::new(None),
504 state: Mutex::new(BgTaskState {
505 metadata,
506 runtime: TaskRuntime::Pty(Some(runtime)),
507 detached: false,
508 child_exit_observed: false,
509 buffer: BgBuffer::pty(paths.pty.clone()),
510 pending_terminal_override: None,
511 }),
512 });
513
514 self.inner
515 .tasks
516 .lock()
517 .map_err(|_| "background task registry lock poisoned".to_string())?
518 .insert(task_id.clone(), task);
519
520 Ok(task_id)
521 }
522
523 #[cfg(windows)]
524 #[allow(clippy::too_many_arguments)]
525 pub fn spawn(
526 &self,
527 command: &str,
528 session_id: String,
529 workdir: PathBuf,
530 env: HashMap<String, String>,
531 timeout: Option<Duration>,
532 storage_dir: PathBuf,
533 max_running: usize,
534 notify_on_completion: bool,
535 compressed: bool,
536 project_root: Option<PathBuf>,
537 ) -> Result<String, String> {
538 self.start_watchdog();
539
540 let running = self.running_count();
541 if running >= max_running {
542 return Err(format!(
543 "background bash task limit exceeded: {running} running (max {max_running})"
544 ));
545 }
546
547 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549 let task_id = self.generate_unique_task_id()?;
550 let paths = task_paths(&storage_dir, &session_id, &task_id);
551 fs::create_dir_all(&paths.dir)
552 .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554 let mut metadata = PersistedTask::starting(
555 task_id.clone(),
556 session_id.clone(),
557 command.to_string(),
558 workdir.clone(),
559 project_root,
560 timeout_ms,
561 notify_on_completion,
562 compressed,
563 );
564 self.persist_task(&paths, &metadata)
565 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567 create_capture_file(&paths.stdout)
573 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574 create_capture_file(&paths.stderr)
575 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578 Ok(child) => child,
579 Err(error) => {
580 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581 let _ = delete_task_bundle(&paths);
582 return Err(error);
583 }
584 };
585
586 let child_pid = child.id();
587 metadata.status = BgTaskStatus::Running;
588 metadata.child_pid = Some(child_pid);
589 metadata.pgid = None;
590 self.persist_task(&paths, &metadata)
591 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593 let task = Arc::new(BgTask {
594 task_id: task_id.clone(),
595 session_id,
596 paths: paths.clone(),
597 started: Instant::now(),
598 last_reminder_at: Mutex::new(None),
599 terminal_at: Mutex::new(None),
600 state: Mutex::new(BgTaskState {
601 metadata,
602 runtime: TaskRuntime::Piped(Some(child)),
603 detached: false,
604 child_exit_observed: false,
605 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606 pending_terminal_override: None,
607 }),
608 });
609
610 self.inner
611 .tasks
612 .lock()
613 .map_err(|_| "background task registry lock poisoned".to_string())?
614 .insert(task_id.clone(), task);
615
616 Ok(task_id)
617 }
618
619 pub fn write_pty(
620 &self,
621 task_id: &str,
622 session_id: &str,
623 input: &[u8],
624 ) -> Result<usize, String> {
625 let task = self
626 .task_for_session(task_id, session_id)
627 .ok_or_else(|| "task_not_found".to_string())?;
628
629 let writer = {
630 let state = task
631 .state
632 .lock()
633 .map_err(|_| "background task lock poisoned".to_string())?;
634 if state.metadata.mode != BgMode::Pty {
635 return Err("task_not_pty".to_string());
636 }
637 if state.metadata.status.is_terminal() {
638 return Err("task_exited".to_string());
639 }
640 match &state.runtime {
641 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644 }
645 };
646
647 let mut writer = writer
648 .lock()
649 .map_err(|_| "PTY writer lock poisoned".to_string())?;
650 writer
651 .write_all(input)
652 .map_err(|error| format!("failed to write to PTY: {error}"))?;
653 writer
654 .flush()
655 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656 Ok(input.len())
657 }
658
659 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660 self.replay_session_inner(storage_dir, session_id, None)
661 }
662
663 pub fn replay_session_for_project(
664 &self,
665 storage_dir: &Path,
666 session_id: &str,
667 project_root: &Path,
668 ) -> Result<(), String> {
669 self.replay_session_inner(storage_dir, session_id, Some(project_root))
670 }
671
672 fn replay_session_inner(
673 &self,
674 storage_dir: &Path,
675 session_id: &str,
676 project_root: Option<&Path>,
677 ) -> Result<(), String> {
678 self.start_watchdog();
679 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682 }
683 }
684
685 let canonical_project = project_root.map(canonicalized_path);
686 let tasks = match self.replay_session_from_db(session_id) {
698 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699 Some(Ok(_)) => {
700 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701 if !disk_tasks.is_empty() {
702 crate::slog_info!(
703 "bash task replay: 0 in DB for session {}, {} from disk fallback",
704 session_id,
705 disk_tasks.len()
706 );
707 }
708 disk_tasks
709 }
710 Some(Err(error)) => {
711 crate::slog_warn!(
712 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713 session_id,
714 error
715 );
716 self.replay_session_from_disk(storage_dir, session_id)?
717 }
718 None => {
719 self.replay_session_from_disk(storage_dir, session_id)?
721 }
722 };
723
724 for mut metadata in tasks {
725 if metadata.session_id != session_id {
726 continue;
727 }
728 if let Some(canonical_project) = canonical_project.as_deref() {
729 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730 if metadata_project.as_deref() != Some(canonical_project) {
731 continue;
732 }
733 }
734
735 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736 match metadata.status {
737 BgTaskStatus::Starting => {
738 let completion_was_delivered = metadata.completion_delivered;
739 metadata.mark_terminal(
740 BgTaskStatus::Failed,
741 None,
742 Some("spawn aborted".to_string()),
743 );
744 metadata.completion_delivered |= completion_was_delivered;
745 let _ = self.persist_task(&paths, &metadata);
746 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747 self.insert_rehydrated_task(metadata, paths, true)?;
748 }
749 BgTaskStatus::Running | BgTaskStatus::Killing => {
750 if metadata.mode == BgMode::Pty {
751 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752 let completion_was_delivered = metadata.completion_delivered;
753 metadata = terminal_metadata_from_marker(metadata, marker, None);
754 metadata.completion_delivered |= completion_was_delivered;
755 let _ = self.persist_task(&paths, &metadata);
756 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757 self.insert_rehydrated_task(metadata, paths, true)?;
758 } else if metadata.status.is_terminal() {
759 self.insert_rehydrated_task(metadata, paths, true)?;
760 } else {
761 let completion_was_delivered = metadata.completion_delivered;
762 metadata.mark_terminal(
763 BgTaskStatus::Killed,
764 None,
765 Some("pty_lost_on_bridge_restart".to_string()),
766 );
767 metadata.completion_delivered |= completion_was_delivered;
768 let _ = self.persist_task(&paths, &metadata);
769 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770 self.insert_rehydrated_task(metadata, paths, true)?;
771 }
772 } else if self.running_metadata_is_stale(&metadata) {
773 let completion_was_delivered = metadata.completion_delivered;
774 metadata.mark_terminal(
775 BgTaskStatus::Killed,
776 None,
777 Some("orphaned (>24h)".to_string()),
778 );
779 metadata.completion_delivered |= completion_was_delivered;
780 if !paths.exit.exists() {
781 let _ = write_kill_marker_if_absent(&paths.exit);
782 }
783 let _ = self.persist_task(&paths, &metadata);
784 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785 self.insert_rehydrated_task(metadata, paths, true)?;
786 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788 "recovered from inconsistent killing state on replay".to_string()
789 });
790 if reason.is_some() {
791 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792 metadata.task_id);
793 }
794 let completion_was_delivered = metadata.completion_delivered;
795 metadata = terminal_metadata_from_marker(metadata, marker, reason);
796 metadata.completion_delivered |= completion_was_delivered;
797 let _ = self.persist_task(&paths, &metadata);
798 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799 self.insert_rehydrated_task(metadata, paths, true)?;
800 } else if metadata.status == BgTaskStatus::Killing {
801 if !paths.exit.exists() {
802 let _ = write_kill_marker_if_absent(&paths.exit);
803 }
804 let completion_was_delivered = metadata.completion_delivered;
805 metadata.mark_terminal(
806 BgTaskStatus::Killed,
807 None,
808 Some("recovered from inconsistent killing state on replay".to_string()),
809 );
810 metadata.completion_delivered |= completion_was_delivered;
811 let _ = self.persist_task(&paths, &metadata);
812 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813 self.insert_rehydrated_task(metadata, paths, true)?;
814 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815 let completion_was_delivered = metadata.completion_delivered;
816 metadata.mark_terminal(
817 BgTaskStatus::Failed,
818 None,
819 Some("process exited without exit marker".to_string()),
820 );
821 metadata.completion_delivered |= completion_was_delivered;
822 let _ = self.persist_task(&paths, &metadata);
823 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824 self.insert_rehydrated_task(metadata, paths, true)?;
825 } else {
826 self.insert_rehydrated_task(metadata, paths, true)?;
827 }
828 }
829 _ if metadata.status.is_terminal() => {
830 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836 self.insert_rehydrated_task(metadata, paths, true)?;
837 }
838 _ => {}
839 }
840 }
841
842 Ok(())
843 }
844
845 fn replay_session_from_db(
846 &self,
847 session_id: &str,
848 ) -> Option<Result<Vec<PersistedTask>, String>> {
849 let pool = self
850 .inner
851 .db_pool
852 .read()
853 .ok()
854 .and_then(|slot| slot.clone())?;
855 let harness = self
856 .inner
857 .db_harness
858 .read()
859 .ok()
860 .and_then(|slot| slot.clone())?;
861 let conn = match pool.lock() {
862 Ok(conn) => conn,
863 Err(_) => return Some(Err("db mutex poisoned".to_string())),
864 };
865 Some(
866 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868 .map_err(|error| error.to_string()),
869 )
870 }
871
872 fn replay_session_from_disk(
873 &self,
874 storage_dir: &Path,
875 session_id: &str,
876 ) -> Result<Vec<PersistedTask>, String> {
877 let dir = session_tasks_dir(storage_dir, session_id);
878 if !dir.exists() {
879 return Ok(Vec::new());
880 }
881
882 let entries = fs::read_dir(&dir)
883 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884 let mut tasks = Vec::new();
885 for entry in entries.flatten() {
886 let path = entry.path();
887 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888 continue;
889 }
890 match read_task(&path) {
891 Ok(metadata) => tasks.push(metadata),
892 Err(error) => {
893 crate::slog_warn!(
894 "quarantining invalid background task metadata {} during replay: {error}",
895 path.display()
896 );
897 if let Err(quarantine_error) =
898 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899 {
900 crate::slog_warn!(
901 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902 path.display()
903 );
904 }
905 }
906 }
907 }
908 Ok(tasks)
909 }
910
911 pub fn register_watch(
912 &self,
913 task_id: String,
914 pattern: WatchPattern,
915 once: bool,
916 ) -> Result<String, &'static str> {
917 let task = self.task(&task_id).ok_or("task_not_found")?;
918 let (mode, terminal_at_registration, stdout, stderr, pty) = task
919 .state
920 .lock()
921 .map(|state| {
922 (
923 state.metadata.mode.clone(),
924 state.metadata.status.is_terminal(),
925 task.paths.stdout.clone(),
926 task.paths.stderr.clone(),
927 task.paths.pty.clone(),
928 )
929 })
930 .map_err(|_| "background_task_lock_poisoned")?;
931
932 let mut terminal_matches = Vec::new();
933 let scanned_terminal = terminal_at_registration;
934 let watch_id = {
935 let mut registry = self
936 .inner
937 .watch_registry
938 .lock()
939 .map_err(|_| "watch_registry_poisoned")?;
940 let watch_id = registry.register(task_id.clone(), pattern, once)?;
941 match &mode {
942 BgMode::Pipes => {
943 let stdout_key = format!("{task_id}:stdout");
944 let stderr_key = format!("{task_id}:stderr");
945 if terminal_at_registration {
946 registry.set_file_cursor(&stdout_key, 0);
947 registry.set_file_cursor(&stderr_key, 0);
948 terminal_matches.extend(registry.scan_file_new_bytes(
949 &stdout_key,
950 &task_id,
951 &stdout,
952 ));
953 terminal_matches.extend(registry.scan_file_new_bytes(
954 &stderr_key,
955 &task_id,
956 &stderr,
957 ));
958 } else {
959 registry.prime_file_cursor(&stdout_key, &stdout);
960 registry.prime_file_cursor(&stderr_key, &stderr);
961 }
962 }
963 BgMode::Pty => {
964 let pty_key = format!("{task_id}:pty");
965 if terminal_at_registration {
966 registry.set_file_cursor(&pty_key, 0);
967 terminal_matches
968 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
969 } else {
970 registry.prime_file_cursor(&pty_key, &pty);
971 }
972 }
973 }
974 watch_id
975 };
976
977 if task.is_terminal() {
978 if !scanned_terminal {
979 terminal_matches = {
980 let mut registry = self
981 .inner
982 .watch_registry
983 .lock()
984 .map_err(|_| "watch_registry_poisoned")?;
985 match &mode {
986 BgMode::Pipes => {
987 let stdout_key = format!("{task_id}:stdout");
988 let stderr_key = format!("{task_id}:stderr");
989 registry.set_file_cursor(&stdout_key, 0);
990 registry.set_file_cursor(&stderr_key, 0);
991 let mut matches =
992 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
993 matches.extend(registry.scan_file_new_bytes(
994 &stderr_key,
995 &task_id,
996 &stderr,
997 ));
998 matches
999 }
1000 BgMode::Pty => {
1001 let pty_key = format!("{task_id}:pty");
1002 registry.set_file_cursor(&pty_key, 0);
1003 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1004 }
1005 }
1006 };
1007 }
1008
1009 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1010 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1011 if watch_matched {
1012 let _ = task.set_completion_delivered(true, self);
1013 self.clear_task_watch_state(&task_id);
1014 }
1015 return Ok(watch_id);
1016 }
1017
1018 let completion = self
1019 .remove_pending_completion(&task_id)
1020 .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1021 if terminal_matches.is_empty() {
1022 if let Some(completion) = completion.as_ref() {
1023 self.emit_bash_watch_exit(completion);
1024 }
1025 } else {
1026 for pattern_match in terminal_matches {
1027 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1028 }
1029 }
1030 let _ = task.set_completion_delivered(true, self);
1031 self.clear_task_watch_state(&task_id);
1032 }
1033
1034 Ok(watch_id)
1035 }
1036
1037 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1038 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1039 registry.unregister(task_id, watch_id);
1040 }
1041 }
1042
1043 pub fn active_watch_count(&self, task_id: &str) -> usize {
1044 self.inner
1045 .watch_registry
1046 .lock()
1047 .map(|registry| registry.active_count(task_id))
1048 .unwrap_or(0)
1049 }
1050
1051 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1052 self.inner
1053 .watch_registry
1054 .lock()
1055 .map(|registry| {
1056 (
1057 registry.has_controlled_task(task_id),
1058 registry.has_matched_task(task_id),
1059 )
1060 })
1061 .unwrap_or((false, false))
1062 }
1063
1064 fn task_has_watch_control(&self, task_id: &str) -> bool {
1065 self.inner
1066 .watch_registry
1067 .lock()
1068 .map(|registry| registry.has_controlled_task(task_id))
1069 .unwrap_or(false)
1070 }
1071
1072 fn clear_task_watch_state(&self, task_id: &str) {
1073 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1074 registry.clear_task(task_id);
1075 }
1076 }
1077
1078 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1079 let (mode, stdout, stderr, pty) = match task.state.lock() {
1080 Ok(state) => (
1081 state.metadata.mode.clone(),
1082 task.paths.stdout.clone(),
1083 task.paths.stderr.clone(),
1084 task.paths.pty.clone(),
1085 ),
1086 Err(_) => return,
1087 };
1088 let mut matches = Vec::new();
1089 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1090 match mode {
1091 BgMode::Pipes => {
1092 let stdout_key = format!("{}:stdout", task.task_id);
1093 let stderr_key = format!("{}:stderr", task.task_id);
1094 matches.extend(registry.scan_file_new_bytes(
1095 &stdout_key,
1096 &task.task_id,
1097 &stdout,
1098 ));
1099 matches.extend(registry.scan_file_new_bytes(
1100 &stderr_key,
1101 &task.task_id,
1102 &stderr,
1103 ));
1104 }
1105 BgMode::Pty => {
1106 let pty_key = format!("{}:pty", task.task_id);
1107 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1108 }
1109 }
1110 }
1111 for pattern_match in matches {
1112 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1113 }
1114 }
1115
1116 pub fn status(
1117 &self,
1118 task_id: &str,
1119 session_id: &str,
1120 project_root: Option<&Path>,
1121 storage_dir: Option<&Path>,
1122 preview_bytes: usize,
1123 ) -> Option<BgTaskSnapshot> {
1124 let mut task = self.task_for_session(task_id, session_id);
1125 if task.is_none() {
1126 if let Some(storage_dir) = storage_dir {
1127 let _ = self.replay_session(storage_dir, session_id);
1128 task = self.task_for_session(task_id, session_id);
1129 }
1130 }
1131 let Some(task) = task else {
1132 return self.status_relaxed(
1133 task_id,
1134 session_id,
1135 project_root?,
1136 storage_dir?,
1137 preview_bytes,
1138 );
1139 };
1140 let _ = self.poll_task(&task);
1141 let mut snapshot = task.snapshot(preview_bytes);
1142 self.maybe_compress_snapshot(&task, &mut snapshot);
1143 Some(snapshot)
1144 }
1145
1146 fn status_relaxed_task(
1147 &self,
1148 task_id: &str,
1149 project_root: &Path,
1150 storage_dir: &Path,
1151 ) -> Option<Arc<BgTask>> {
1152 let canonical_project = canonicalized_path(project_root);
1153 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1154 Some(Ok(Some(metadata))) => {
1155 if let Some(task) = self.task(task_id) {
1156 let matches_project = task
1157 .state
1158 .lock()
1159 .map(|state| {
1160 state
1161 .metadata
1162 .project_root
1163 .as_deref()
1164 .map(canonicalized_path)
1165 .as_deref()
1166 == Some(canonical_project.as_path())
1167 })
1168 .unwrap_or(false);
1169 return matches_project.then_some(task);
1170 }
1171 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1172 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1173 return None;
1174 }
1175 return self.task(task_id);
1176 }
1177 Some(Ok(None)) => {
1178 crate::slog_info!(
1179 "bash task relaxed DB miss for {}; falling back to disk",
1180 task_id
1181 );
1182 }
1183 Some(Err(error)) => {
1184 crate::slog_warn!(
1185 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1186 task_id,
1187 error
1188 );
1189 }
1190 None => {
1191 crate::slog_info!(
1192 "bash task relaxed DB unavailable for {}; falling back to disk",
1193 task_id
1194 );
1195 }
1196 }
1197 let root = storage_dir.join("bash-tasks");
1198 let entries = fs::read_dir(&root).ok()?;
1199 for entry in entries.flatten() {
1200 let dir = entry.path();
1201 if !dir.is_dir() {
1202 continue;
1203 }
1204 let path = dir.join(format!("{task_id}.json"));
1205 if !path.exists() {
1206 continue;
1207 }
1208 let metadata = match read_task(&path) {
1209 Ok(metadata) => metadata,
1210 Err(error) => {
1211 crate::slog_warn!(
1212 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1213 path.display()
1214 );
1215 if let Err(quarantine_error) =
1216 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1217 {
1218 crate::slog_warn!(
1219 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1220 path.display()
1221 );
1222 }
1223 continue;
1224 }
1225 };
1226 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1227 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1228 continue;
1229 }
1230 if let Some(task) = self.task(task_id) {
1231 let matches_project = task
1232 .state
1233 .lock()
1234 .map(|state| {
1235 state
1236 .metadata
1237 .project_root
1238 .as_deref()
1239 .map(canonicalized_path)
1240 .as_deref()
1241 == Some(canonical_project.as_path())
1242 })
1243 .unwrap_or(false);
1244 return matches_project.then_some(task);
1245 }
1246 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1247 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1248 return None;
1249 }
1250 return self.task(task_id);
1251 }
1252 None
1253 }
1254
1255 fn lookup_relaxed_task_from_db(
1256 &self,
1257 task_id: &str,
1258 project_root: &Path,
1259 ) -> Option<Result<Option<PersistedTask>, String>> {
1260 let pool = self
1261 .inner
1262 .db_pool
1263 .read()
1264 .ok()
1265 .and_then(|slot| slot.clone())?;
1266 let harness = self
1267 .inner
1268 .db_harness
1269 .read()
1270 .ok()
1271 .and_then(|slot| slot.clone())?;
1272 let conn = match pool.lock() {
1273 Ok(conn) => conn,
1274 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1275 };
1276 let project_key = crate::search_index::project_cache_key(project_root);
1277 Some(
1278 crate::db::bash_tasks::find_bash_task_for_project(
1279 &conn,
1280 &harness,
1281 &project_key,
1282 task_id,
1283 )
1284 .map(|row| row.map(PersistedTask::from))
1285 .map_err(|error| error.to_string()),
1286 )
1287 }
1288
1289 pub(super) fn status_relaxed(
1290 &self,
1291 task_id: &str,
1292 _session_id: &str,
1293 project_root: &Path,
1294 storage_dir: &Path,
1295 preview_bytes: usize,
1296 ) -> Option<BgTaskSnapshot> {
1297 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1298 let _ = self.poll_task(&task);
1299 let mut snapshot = task.snapshot(preview_bytes);
1300 self.maybe_compress_snapshot(&task, &mut snapshot);
1301 Some(snapshot)
1302 }
1303
1304 pub fn kill_relaxed(
1305 &self,
1306 task_id: &str,
1307 project_root: &Path,
1308 storage_dir: &Path,
1309 ) -> Result<BgTaskSnapshot, String> {
1310 let task = self
1311 .status_relaxed_task(task_id, project_root, storage_dir)
1312 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1313 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1314 }
1315
1316 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1317 #[cfg(test)]
1318 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1319
1320 let mut deleted = 0usize;
1321
1322 let root = storage_dir.join("bash-tasks");
1323 if root.exists() {
1324 let session_dirs = fs::read_dir(&root).map_err(|e| {
1325 format!(
1326 "failed to read background task root {}: {e}",
1327 root.display()
1328 )
1329 })?;
1330 for session_entry in session_dirs.flatten() {
1331 let session_dir = session_entry.path();
1332 if !session_dir.is_dir() {
1333 continue;
1334 }
1335 let task_entries = match fs::read_dir(&session_dir) {
1336 Ok(entries) => entries,
1337 Err(error) => {
1338 crate::slog_warn!(
1339 "failed to read background task session dir {}: {error}",
1340 session_dir.display()
1341 );
1342 continue;
1343 }
1344 };
1345 for task_entry in task_entries.flatten() {
1346 let json_path = task_entry.path();
1347 if json_path
1348 .extension()
1349 .and_then(|extension| extension.to_str())
1350 != Some("json")
1351 {
1352 continue;
1353 }
1354 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1355 continue;
1356 }
1357 let metadata = match read_task(&json_path) {
1358 Ok(metadata) => metadata,
1359 Err(error) => {
1360 crate::slog_warn!(
1361 "quarantining corrupt background task metadata {}: {error}",
1362 json_path.display()
1363 );
1364 quarantine_task_json(
1365 storage_dir,
1366 &session_dir,
1367 &json_path,
1368 QuarantineKind::Corrupt,
1369 )?;
1370 continue;
1371 }
1372 };
1373 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1374 continue;
1375 }
1376 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1377 match delete_task_bundle(&paths) {
1378 Ok(()) => {
1379 deleted += 1;
1380 log::debug!(
1381 "deleted persisted background task bundle {}",
1382 metadata.task_id
1383 );
1384 }
1385 Err(error) => {
1386 crate::slog_warn!(
1387 "failed to delete background task bundle {}: {error}",
1388 metadata.task_id
1389 );
1390 continue;
1391 }
1392 }
1393 }
1394 }
1395 }
1396 gc_quarantine(storage_dir);
1397 Ok(deleted)
1398 }
1399
1400 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1401 let tasks = self
1402 .inner
1403 .tasks
1404 .lock()
1405 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1406 .unwrap_or_default();
1407 tasks
1408 .into_iter()
1409 .map(|task| {
1410 let _ = self.poll_task(&task);
1411 let mut snapshot = task.snapshot(preview_bytes);
1412 self.maybe_compress_snapshot(&task, &mut snapshot);
1413 snapshot
1414 })
1415 .collect()
1416 }
1417
1418 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1424 if !snapshot.info.status.is_terminal() {
1425 return;
1426 }
1427 let (compressed_flag, mode) = task
1428 .state
1429 .lock()
1430 .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1431 .unwrap_or((true, BgMode::Pipes));
1432 if mode == BgMode::Pty {
1433 return;
1434 }
1435 if !compressed_flag {
1436 return;
1437 }
1438 let raw = std::mem::take(&mut snapshot.output_preview);
1439 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1440 }
1441
1442 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1443 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1444 }
1445
1446 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1447 let task = self
1448 .task_for_session(task_id, session_id)
1449 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1450 let mut state = task
1451 .state
1452 .lock()
1453 .map_err(|_| "background task lock poisoned".to_string())?;
1454 let updated = self
1455 .update_task_metadata(&task.paths, |metadata| {
1456 metadata.notify_on_completion = true;
1457 metadata.completion_delivered = false;
1458 })
1459 .map_err(|e| format!("failed to promote background task: {e}"))?;
1460 state.metadata = updated;
1461 if state.metadata.status.is_terminal() {
1462 state.buffer.enforce_terminal_cap();
1463 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1464 }
1465 Ok(true)
1466 }
1467
1468 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1469 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1470 .map(|_| ())
1471 }
1472
1473 pub fn cleanup_finished(&self, older_than: Duration) {
1474 let cutoff = Instant::now().checked_sub(older_than);
1475 let removable_paths: Vec<(String, TaskPaths)> =
1476 if let Ok(mut tasks) = self.inner.tasks.lock() {
1477 let removable = tasks
1478 .iter()
1479 .filter_map(|(task_id, task)| {
1480 let delivered_terminal = task
1481 .state
1482 .lock()
1483 .map(|state| {
1484 state.metadata.status.is_terminal()
1485 && state.metadata.completion_delivered
1486 })
1487 .unwrap_or(false);
1488 if !delivered_terminal {
1489 return None;
1490 }
1491
1492 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1493 let expired = match (terminal_at, cutoff) {
1494 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1495 (Some(_), None) => true,
1496 (None, _) => false,
1497 };
1498 expired.then(|| task_id.clone())
1499 })
1500 .collect::<Vec<_>>();
1501
1502 removable
1503 .into_iter()
1504 .filter_map(|task_id| {
1505 tasks
1506 .remove(&task_id)
1507 .map(|task| (task_id, task.paths.clone()))
1508 })
1509 .collect()
1510 } else {
1511 Vec::new()
1512 };
1513
1514 for (task_id, paths) in removable_paths {
1515 match delete_task_bundle(&paths) {
1516 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1517 Err(error) => crate::slog_warn!(
1518 "failed to delete persisted background task bundle {task_id}: {error}"
1519 ),
1520 }
1521 }
1522 }
1523
1524 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1525 self.drain_completions_for_session(None)
1526 }
1527
1528 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1529 let completions = match self.inner.completions.lock() {
1530 Ok(completions) => completions,
1531 Err(_) => return Vec::new(),
1532 };
1533
1534 completions
1535 .iter()
1536 .filter(|completion| {
1537 session_id
1538 .map(|session_id| completion.session_id == session_id)
1539 .unwrap_or(true)
1540 })
1541 .cloned()
1542 .collect()
1543 }
1544
1545 pub fn ack_completions_for_session(
1546 &self,
1547 session_id: Option<&str>,
1548 task_ids: &[String],
1549 ) -> Vec<String> {
1550 if task_ids.is_empty() {
1551 return Vec::new();
1552 }
1553 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1554 let mut completion_sessions = HashMap::new();
1555 if let Ok(mut completions) = self.inner.completions.lock() {
1556 completions.retain(|completion| {
1557 let session_matches = session_id
1558 .map(|session_id| completion.session_id == session_id)
1559 .unwrap_or(true);
1560 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1561 completion_sessions
1562 .insert(completion.task_id.clone(), completion.session_id.clone());
1563 false
1564 } else {
1565 true
1566 }
1567 });
1568 }
1569
1570 let mut delivered = Vec::new();
1571 for task_id in task_ids {
1572 let task = if let Some(session_id) = session_id {
1573 self.task_for_session(task_id, session_id)
1574 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1575 self.task_for_session(task_id, completion_session_id)
1576 } else {
1577 self.task(task_id)
1578 };
1579 if let Some(task) = task {
1580 if task.set_completion_delivered(true, self).is_ok() {
1581 delivered.push(task_id.clone());
1582 }
1583 }
1584 }
1585
1586 delivered
1587 }
1588
1589 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1590 self.inner
1591 .completions
1592 .lock()
1593 .map(|completions| {
1594 completions
1595 .iter()
1596 .filter(|completion| completion.session_id == session_id)
1597 .cloned()
1598 .collect()
1599 })
1600 .unwrap_or_default()
1601 }
1602
1603 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1604 let mut completions = self.inner.completions.lock().ok()?;
1605 let idx = completions
1606 .iter()
1607 .position(|completion| completion.task_id == task_id)?;
1608 completions.remove(idx)
1609 }
1610
1611 fn completion_snapshot_for_task(
1612 &self,
1613 task: &Arc<BgTask>,
1614 preview_bytes: usize,
1615 ) -> Option<BgCompletion> {
1616 let snapshot = task.snapshot(preview_bytes);
1617 if !snapshot.info.status.is_terminal() {
1618 return None;
1619 }
1620 let output_preview = if snapshot.info.mode == BgMode::Pty {
1621 String::new()
1622 } else {
1623 let compressed = task
1624 .state
1625 .lock()
1626 .map(|state| state.metadata.compressed)
1627 .unwrap_or(true);
1628 if compressed {
1629 self.compress_output(&snapshot.info.command, snapshot.output_preview)
1630 } else {
1631 snapshot.output_preview
1632 }
1633 };
1634 Some(BgCompletion {
1635 task_id: snapshot.info.task_id,
1636 session_id: task.session_id.clone(),
1637 status: snapshot.info.status,
1638 exit_code: snapshot.exit_code,
1639 command: snapshot.info.command,
1640 output_preview,
1641 output_truncated: snapshot.output_truncated,
1642 original_tokens: None,
1643 compressed_tokens: None,
1644 tokens_skipped: false,
1645 })
1646 }
1647
1648 pub fn detach(&self) {
1649 self.inner.shutdown.store(true, Ordering::SeqCst);
1650 if let Ok(mut tasks) = self.inner.tasks.lock() {
1651 for task in tasks.values() {
1652 if let Ok(mut state) = task.state.lock() {
1653 match &mut state.runtime {
1654 TaskRuntime::Piped(child) => *child = None,
1655 TaskRuntime::Pty(runtime) => *runtime = None,
1656 }
1657 state.detached = true;
1658 }
1659 }
1660 tasks.clear();
1661 }
1662 }
1663
1664 pub fn shutdown(&self) {
1665 let tasks = self
1666 .inner
1667 .tasks
1668 .lock()
1669 .map(|tasks| {
1670 tasks
1671 .values()
1672 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1673 .collect::<Vec<_>>()
1674 })
1675 .unwrap_or_default();
1676 for (task_id, session_id) in tasks {
1677 let _ = self.kill(&task_id, &session_id);
1678 }
1679 }
1680
1681 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1682 if let Ok(state) = task.state.lock() {
1683 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1684 let complete = pty.reader_done.load(Ordering::SeqCst)
1685 && pty.exit_observed.load(Ordering::SeqCst);
1686 if !complete {
1687 return Ok(());
1688 }
1689 }
1690 }
1691 let marker = match read_exit_marker(&task.paths.exit) {
1692 Ok(Some(marker)) => marker,
1693 Ok(None) => return Ok(()),
1694 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1695 };
1696 self.finalize_from_marker(task, marker, None)
1697 }
1698
1699 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1700 let Ok(mut state) = task.state.lock() else {
1701 return;
1702 };
1703 match &mut state.runtime {
1704 TaskRuntime::Piped(child_slot) => {
1705 if let Some(child) = child_slot.as_mut() {
1706 if matches!(child.try_wait(), Ok(Some(_))) {
1707 *child_slot = None;
1708 state.detached = true;
1709 state.child_exit_observed = true;
1710 }
1711 } else if state.detached {
1712 let child_known_dead = state.child_exit_observed
1713 || state
1714 .metadata
1715 .child_pid
1716 .is_some_and(|pid| !is_process_alive(pid));
1717 if child_known_dead {
1718 self.fail_without_exit_marker_if_needed(task, &mut state);
1719 }
1720 }
1721 }
1722 TaskRuntime::Pty(Some(pty)) => {
1723 let complete = pty.reader_done.load(Ordering::SeqCst)
1724 && pty.exit_observed.load(Ordering::SeqCst);
1725 if complete {
1726 drop(state);
1727 let _ = self.poll_task(task);
1728 }
1729 }
1730 TaskRuntime::Pty(None) => {}
1731 }
1732 }
1733
1734 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1735 if state.metadata.status.is_terminal() {
1736 return;
1737 }
1738 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1739 return;
1740 }
1741 let watch_controlled = self.task_has_watch_control(&task.task_id);
1742 let updated = self.update_task_metadata(&task.paths, |metadata| {
1743 metadata.mark_terminal(
1744 BgTaskStatus::Failed,
1745 None,
1746 Some("process exited without exit marker".to_string()),
1747 );
1748 if watch_controlled {
1749 metadata.completion_delivered = true;
1750 }
1751 });
1752 if let Ok(metadata) = updated {
1753 state.pending_terminal_override = None;
1754 state.metadata = metadata;
1755 task.mark_terminal_now();
1756 state.buffer.enforce_terminal_cap();
1757 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1758 }
1759 }
1760
1761 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1762 self.inner
1763 .tasks
1764 .lock()
1765 .map(|tasks| {
1766 tasks
1767 .values()
1768 .filter(|task| task.is_running())
1769 .cloned()
1770 .collect()
1771 })
1772 .unwrap_or_default()
1773 }
1774
1775 fn insert_rehydrated_task(
1776 &self,
1777 metadata: PersistedTask,
1778 paths: TaskPaths,
1779 detached: bool,
1780 ) -> Result<(), String> {
1781 let task_id = metadata.task_id.clone();
1782 let session_id = metadata.session_id.clone();
1783 let started = started_instant_from_unix_millis(metadata.started_at);
1784 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1785 let mode = metadata.mode.clone();
1786 let task = Arc::new(BgTask {
1787 task_id: task_id.clone(),
1788 session_id,
1789 paths: paths.clone(),
1790 started,
1791 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1792 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1793 state: Mutex::new(BgTaskState {
1794 metadata,
1795 runtime: if mode == BgMode::Pty {
1796 TaskRuntime::Pty(None)
1797 } else {
1798 TaskRuntime::Piped(None)
1799 },
1800 detached,
1801 child_exit_observed: false,
1808 buffer: if mode == BgMode::Pty {
1809 BgBuffer::pty(paths.pty.clone())
1810 } else {
1811 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1812 },
1813 pending_terminal_override: None,
1814 }),
1815 });
1816 self.inner
1817 .tasks
1818 .lock()
1819 .map_err(|_| "background task registry lock poisoned".to_string())?
1820 .insert(task_id, task);
1821 Ok(())
1822 }
1823
1824 fn kill_with_status(
1825 &self,
1826 task_id: &str,
1827 session_id: &str,
1828 terminal_status: BgTaskStatus,
1829 ) -> Result<BgTaskSnapshot, String> {
1830 let task = self
1831 .task_for_session(task_id, session_id)
1832 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1833
1834 {
1835 let mut state = task
1836 .state
1837 .lock()
1838 .map_err(|_| "background task lock poisoned".to_string())?;
1839 if state.metadata.status.is_terminal() {
1840 state.pending_terminal_override = None;
1841 return Ok(task.snapshot_locked(&state, 5 * 1024));
1842 }
1843
1844 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1845 state.metadata =
1846 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1847 if self.task_has_watch_control(&task.task_id) {
1848 state.metadata.completion_delivered = true;
1849 }
1850 state.pending_terminal_override = None;
1851 task.mark_terminal_now();
1852 match &mut state.runtime {
1853 TaskRuntime::Piped(child) => *child = None,
1854 TaskRuntime::Pty(runtime) => *runtime = None,
1855 }
1856 state.detached = true;
1857 state.buffer.enforce_terminal_cap();
1858 self.persist_task(&task.paths, &state.metadata)
1859 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1860 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1861 return Ok(task.snapshot_locked(&state, 5 * 1024));
1862 }
1863
1864 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1865 if !was_already_killing {
1866 state.metadata.status = BgTaskStatus::Killing;
1867 self.persist_task(&task.paths, &state.metadata)
1868 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1869 }
1870
1871 let pgid = state.metadata.pgid;
1872 #[cfg(windows)]
1873 let child_pid = state.metadata.child_pid;
1874 if !was_already_killing
1875 && state.metadata.mode == BgMode::Pty
1876 && terminal_status == BgTaskStatus::TimedOut
1877 {
1878 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1879 }
1880
1881 #[cfg(windows)]
1882 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
1883
1884 match &mut state.runtime {
1885 TaskRuntime::Piped(child_slot) => {
1886 #[cfg(unix)]
1887 if let Some(pgid) = pgid {
1888 terminate_pgid(pgid, child_slot.as_mut());
1889 }
1890 #[cfg(windows)]
1891 if let Some(child) = child_slot.as_mut() {
1892 super::process::terminate_process(child);
1893 } else if let Some(pid) = child_pid {
1894 terminate_pid(pid);
1895 }
1896 if let Some(child) = child_slot.as_mut() {
1897 let _ = child.wait();
1898 }
1899 *child_slot = None;
1900 state.detached = true;
1901
1902 if !task.paths.exit.exists() {
1903 write_kill_marker_if_absent(&task.paths.exit)
1904 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1905 }
1906
1907 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1908 Some(124)
1909 } else {
1910 None
1911 };
1912 state
1913 .metadata
1914 .mark_terminal(terminal_status, exit_code, None);
1915 if self.task_has_watch_control(&task.task_id) {
1916 state.metadata.completion_delivered = true;
1917 }
1918 state.pending_terminal_override = None;
1919 task.mark_terminal_now();
1920 self.persist_task(&task.paths, &state.metadata)
1921 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1922 state.buffer.enforce_terminal_cap();
1923 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1924 }
1925 TaskRuntime::Pty(Some(pty)) => {
1926 pty.was_killed.store(true, Ordering::SeqCst);
1927 if let Err(error) = pty.killer.kill() {
1928 crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1929 }
1930 if let Some(pid) = pty.child_pid {
1931 #[cfg(unix)]
1932 terminate_pgid(pid as i32, None);
1933 #[cfg(windows)]
1934 terminate_pid(pid);
1935 }
1936 drop(pty.master.take());
1937
1938 #[cfg(windows)]
1939 {
1940 let default_status = if terminal_status == BgTaskStatus::TimedOut {
1941 BgTaskStatus::TimedOut
1942 } else {
1943 BgTaskStatus::Killed
1944 };
1945 pty_forced_terminal_status = Some(
1946 state
1947 .pending_terminal_override
1948 .take()
1949 .unwrap_or(default_status),
1950 );
1951 }
1952 }
1953 TaskRuntime::Pty(None) => {}
1954 }
1955
1956 #[cfg(windows)]
1957 if let Some(target_status) = pty_forced_terminal_status {
1958 if !task.paths.exit.exists() {
1959 write_kill_marker_if_absent(&task.paths.exit)
1960 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1961 }
1962
1963 let exit_code = if target_status == BgTaskStatus::TimedOut {
1964 Some(124)
1965 } else {
1966 None
1967 };
1968 state.metadata.mark_terminal(target_status, exit_code, None);
1969 if self.task_has_watch_control(&task.task_id) {
1970 state.metadata.completion_delivered = true;
1971 }
1972 state.pending_terminal_override = None;
1973 task.mark_terminal_now();
1974 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
1975 *runtime = None;
1976 }
1977 state.detached = true;
1978 self.persist_task(&task.paths, &state.metadata)
1979 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
1980 state.buffer.enforce_terminal_cap();
1981 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1982 }
1983 }
1984
1985 Ok(task.snapshot(5 * 1024))
1986 }
1987
1988 fn finalize_from_marker(
1989 &self,
1990 task: &Arc<BgTask>,
1991 marker: ExitMarker,
1992 reason: Option<String>,
1993 ) -> Result<(), String> {
1994 let watch_controlled = self.task_has_watch_control(&task.task_id);
1995 {
1996 let mut state = task
1997 .state
1998 .lock()
1999 .map_err(|_| "background task lock poisoned".to_string())?;
2000 if state.metadata.status.is_terminal() {
2001 state.pending_terminal_override = None;
2002 return Ok(());
2003 }
2004
2005 let pending_override = state.pending_terminal_override.take();
2006 let is_pty = state.metadata.mode == BgMode::Pty;
2007 let updated = self
2008 .update_task_metadata(&task.paths, |metadata| {
2009 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2010 let mut metadata = metadata.clone();
2011 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2012 let exit_code = if target_status == BgTaskStatus::TimedOut {
2013 Some(124)
2014 } else {
2015 None
2016 };
2017 metadata.mark_terminal(target_status, exit_code, reason);
2018 metadata
2019 } else {
2020 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2021 };
2022 if watch_controlled {
2023 new_metadata.completion_delivered = true;
2024 }
2025 *metadata = new_metadata;
2026 })
2027 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2028 state.metadata = updated;
2029 task.mark_terminal_now();
2030 match &mut state.runtime {
2031 TaskRuntime::Piped(child) => *child = None,
2032 TaskRuntime::Pty(runtime) => *runtime = None,
2033 }
2034 state.detached = true;
2035 }
2036
2037 self.scan_task_watch_output(task);
2040
2041 let mut state = task
2042 .state
2043 .lock()
2044 .map_err(|_| "background task lock poisoned".to_string())?;
2045 state.buffer.enforce_terminal_cap();
2046 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
2047 Ok(())
2048 }
2049
2050 fn enqueue_completion_if_needed(
2051 &self,
2052 metadata: &PersistedTask,
2053 paths: Option<&TaskPaths>,
2054 emit_frame: bool,
2055 ) {
2056 if metadata.status.is_terminal() && !metadata.completion_delivered {
2057 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
2058 }
2059 }
2060
2061 fn enqueue_completion_locked(
2062 &self,
2063 metadata: &PersistedTask,
2064 buffer: Option<&BgBuffer>,
2065 emit_frame: bool,
2066 ) {
2067 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
2068 }
2069
2070 fn enqueue_completion_from_parts(
2071 &self,
2072 metadata: &PersistedTask,
2073 buffer: Option<&BgBuffer>,
2074 paths: Option<&TaskPaths>,
2075 emit_frame: bool,
2076 ) {
2077 if !metadata.status.is_terminal() {
2088 return;
2089 }
2090 let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2095 (String::new(), false)
2096 } else {
2097 match buffer {
2098 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2099 None => paths
2100 .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2101 .unwrap_or_else(|| (String::new(), false)),
2102 }
2103 };
2104 let output_preview = if metadata.compressed {
2109 self.compress_output(&metadata.command, raw_preview)
2110 } else {
2111 raw_preview
2112 };
2113 let token_counts = self.completion_token_counts(metadata, buffer, paths);
2114 let completion = BgCompletion {
2115 task_id: metadata.task_id.clone(),
2116 session_id: metadata.session_id.clone(),
2117 status: metadata.status.clone(),
2118 exit_code: metadata.exit_code,
2119 command: metadata.command.clone(),
2120 output_preview,
2121 output_truncated,
2122 original_tokens: token_counts.original_tokens,
2123 compressed_tokens: token_counts.compressed_tokens,
2124 tokens_skipped: token_counts.tokens_skipped,
2125 };
2126
2127 self.record_compression_event_if_applicable(metadata, &token_counts);
2138
2139 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2140 if watch_controlled {
2141 if emit_frame && !watch_matched {
2142 self.emit_bash_watch_exit(&completion);
2143 }
2144 self.clear_task_watch_state(&metadata.task_id);
2145 return;
2146 }
2147
2148 if metadata.completion_delivered {
2158 return;
2159 }
2160
2161 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2164 if completions
2165 .iter()
2166 .any(|existing| existing.task_id == metadata.task_id)
2167 {
2168 false
2169 } else {
2170 completions.push_back(completion.clone());
2171 true
2172 }
2173 } else {
2174 false
2175 };
2176
2177 if pushed && emit_frame {
2178 self.emit_bash_completed(completion);
2179 }
2180 }
2181
2182 fn record_compression_event_if_applicable(
2183 &self,
2184 metadata: &PersistedTask,
2185 token_counts: &CompletionTokenCounts,
2186 ) {
2187 if metadata.mode == BgMode::Pty {
2188 return;
2189 }
2190
2191 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2192 token_counts.original_tokens,
2193 token_counts.compressed_tokens,
2194 token_counts.original_bytes,
2195 token_counts.compressed_bytes,
2196 ) {
2197 (
2198 Some(original_tokens),
2199 Some(compressed_tokens),
2200 Some(original_bytes),
2201 Some(compressed_bytes),
2202 ) => (
2203 original_tokens,
2204 compressed_tokens,
2205 original_bytes,
2206 compressed_bytes,
2207 ),
2208 _ => {
2209 crate::slog_warn!(
2210 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2211 metadata.task_id
2212 );
2213 return;
2214 }
2215 };
2216
2217 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2218 let Some(pool) = pool else {
2219 crate::slog_warn!(
2220 "compression event skipped for {}: db_pool not initialized — was configure run?",
2221 metadata.task_id
2222 );
2223 return;
2224 };
2225 let harness = self
2226 .inner
2227 .db_harness
2228 .read()
2229 .ok()
2230 .and_then(|slot| slot.clone());
2231 let Some(harness) = harness else {
2232 crate::slog_warn!(
2233 "compression event insert skipped for {}: harness not configured",
2234 metadata.task_id
2235 );
2236 return;
2237 };
2238
2239 let project_root = metadata
2240 .project_root
2241 .as_deref()
2242 .unwrap_or(&metadata.workdir);
2243 let project_key = crate::search_index::project_cache_key(project_root);
2244 let row = crate::db::compression_events::CompressionEventRow {
2245 harness: &harness,
2246 session_id: Some(&metadata.session_id),
2247 project_key: &project_key,
2248 tool: "bash",
2249 task_id: Some(&metadata.task_id),
2250 command: Some(&metadata.command),
2251 compressor: if metadata.compressed {
2252 "registry"
2253 } else {
2254 "none"
2255 },
2256 original_bytes,
2257 compressed_bytes,
2258 original_tokens,
2259 compressed_tokens,
2260 created_at: unix_millis() as i64,
2261 };
2262
2263 let conn = match pool.lock() {
2264 Ok(conn) => conn,
2265 Err(_) => {
2266 crate::slog_warn!(
2267 "compression event insert failed for {}: db mutex poisoned",
2268 metadata.task_id
2269 );
2270 return;
2271 }
2272 };
2273 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2274 Ok(_) => {
2275 crate::slog_debug!(
2279 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2280 metadata.task_id,
2281 project_key,
2282 metadata.session_id,
2283 original_tokens,
2284 compressed_tokens
2285 );
2286 }
2287 Err(error) => {
2288 crate::slog_warn!(
2289 "compression event insert failed for {}: {}",
2290 metadata.task_id,
2291 error
2292 );
2293 }
2294 }
2295 }
2296
2297 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2298 let Ok(progress_sender) = self
2299 .inner
2300 .progress_sender
2301 .lock()
2302 .map(|sender| sender.clone())
2303 else {
2304 return;
2305 };
2306 if let Some(sender) = progress_sender.as_ref() {
2307 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2308 pattern_match.task_id,
2309 session_id.to_string(),
2310 pattern_match.watch_id,
2311 pattern_match.match_text,
2312 pattern_match.match_offset,
2313 pattern_match.context,
2314 pattern_match.once,
2315 )));
2316 }
2317 }
2318
2319 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2320 let Ok(progress_sender) = self
2321 .inner
2322 .progress_sender
2323 .lock()
2324 .map(|sender| sender.clone())
2325 else {
2326 return;
2327 };
2328 let Some(sender) = progress_sender.as_ref() else {
2329 return;
2330 };
2331 let status = completion_status_text(&completion.status, completion.exit_code);
2332 let preview = completion.output_preview.trim_end();
2333 let context = if preview.is_empty() {
2334 format!("task {} exited ({status})", completion.task_id)
2335 } else {
2336 format!(
2337 "task {} exited ({status})
2338{preview}",
2339 completion.task_id
2340 )
2341 };
2342 sender(PushFrame::BashPatternMatch(
2343 BashPatternMatchFrame::task_exit(
2344 completion.task_id.clone(),
2345 completion.session_id.clone(),
2346 format!("exited ({status})"),
2347 context,
2348 ),
2349 ));
2350 }
2351
2352 fn emit_bash_completed(&self, completion: BgCompletion) {
2353 let Ok(progress_sender) = self
2354 .inner
2355 .progress_sender
2356 .lock()
2357 .map(|sender| sender.clone())
2358 else {
2359 return;
2360 };
2361 let Some(sender) = progress_sender.as_ref() else {
2362 return;
2363 };
2364 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2372 completion.task_id,
2373 completion.session_id,
2374 completion.status,
2375 completion.exit_code,
2376 completion.command,
2377 completion.output_preview,
2378 completion.output_truncated,
2379 completion.original_tokens,
2380 completion.compressed_tokens,
2381 completion.tokens_skipped,
2382 )));
2383 }
2384
2385 fn completion_token_counts(
2386 &self,
2387 metadata: &PersistedTask,
2388 buffer: Option<&BgBuffer>,
2389 paths: Option<&TaskPaths>,
2390 ) -> CompletionTokenCounts {
2391 if metadata.mode == BgMode::Pty {
2392 return CompletionTokenCounts::skipped();
2393 }
2394
2395 let raw = match buffer {
2396 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2397 None => paths
2398 .map(|paths| {
2399 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2400 })
2401 .unwrap_or(TokenCountInput::Skipped),
2402 };
2403
2404 let TokenCountInput::Text(raw_output) = raw else {
2405 return CompletionTokenCounts::skipped();
2406 };
2407
2408 let original_tokens = token_count_u32(&raw_output);
2409 let original_bytes = raw_output.len() as i64;
2410 let compressed_output = if metadata.compressed {
2411 self.compress_output(&metadata.command, raw_output)
2412 } else {
2413 raw_output
2414 };
2415 let compressed_tokens = token_count_u32(&compressed_output);
2416 let compressed_bytes = compressed_output.len() as i64;
2417 CompletionTokenCounts {
2418 original_tokens: Some(original_tokens),
2419 compressed_tokens: Some(compressed_tokens),
2420 original_bytes: Some(original_bytes),
2421 compressed_bytes: Some(compressed_bytes),
2422 tokens_skipped: false,
2423 }
2424 }
2425
2426 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2427 if !self
2428 .inner
2429 .long_running_reminder_enabled
2430 .load(Ordering::SeqCst)
2431 {
2432 return;
2433 }
2434 let interval_ms = self
2435 .inner
2436 .long_running_reminder_interval_ms
2437 .load(Ordering::SeqCst);
2438 if interval_ms == 0 {
2439 return;
2440 }
2441 let interval = Duration::from_millis(interval_ms);
2442 let now = Instant::now();
2443 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2444 return;
2445 };
2446 let since = last_reminder_at.unwrap_or(task.started);
2447 if now.duration_since(since) < interval {
2448 return;
2449 }
2450 let command = task
2451 .state
2452 .lock()
2453 .map(|state| state.metadata.command.clone())
2454 .unwrap_or_default();
2455 *last_reminder_at = Some(now);
2456 self.emit_bash_long_running(BashLongRunningFrame::new(
2457 task.task_id.clone(),
2458 task.session_id.clone(),
2459 command,
2460 task.started.elapsed().as_millis() as u64,
2461 ));
2462 }
2463
2464 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2465 let Ok(progress_sender) = self
2466 .inner
2467 .progress_sender
2468 .lock()
2469 .map(|sender| sender.clone())
2470 else {
2471 return;
2472 };
2473 if let Some(sender) = progress_sender.as_ref() {
2474 sender(PushFrame::BashLongRunning(frame));
2475 }
2476 }
2477
2478 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2479 self.inner
2480 .tasks
2481 .lock()
2482 .ok()
2483 .and_then(|tasks| tasks.get(task_id).cloned())
2484 }
2485
2486 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2487 self.task(task_id)
2488 .filter(|task| task.session_id == session_id)
2489 }
2490
2491 fn running_count(&self) -> usize {
2492 self.inner
2493 .tasks
2494 .lock()
2495 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2496 .unwrap_or(0)
2497 }
2498
2499 fn start_watchdog(&self) {
2500 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2501 super::watchdog::start(self.clone());
2502 }
2503 }
2504
2505 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2506 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2507 }
2508
2509 #[cfg(test)]
2510 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2511 self.task_for_session(task_id, session_id)
2512 .map(|task| task.paths.json.clone())
2513 }
2514
2515 #[cfg(test)]
2516 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2517 self.task_for_session(task_id, session_id)
2518 .map(|task| task.paths.exit.clone())
2519 }
2520
2521 fn generate_unique_task_id(&self) -> Result<String, String> {
2523 for _ in 0..32 {
2524 let candidate = random_slug();
2525 let tasks = self
2526 .inner
2527 .tasks
2528 .lock()
2529 .map_err(|_| "background task registry lock poisoned".to_string())?;
2530 if tasks.contains_key(&candidate) {
2531 continue;
2532 }
2533 let completions = self
2534 .inner
2535 .completions
2536 .lock()
2537 .map_err(|_| "background completions lock poisoned".to_string())?;
2538 if completions
2539 .iter()
2540 .any(|completion| completion.task_id == candidate)
2541 {
2542 continue;
2543 }
2544 return Ok(candidate);
2545 }
2546 Err("failed to allocate unique background task id after 32 attempts".to_string())
2547 }
2548}
2549
2550struct CompletionTokenCounts {
2551 original_tokens: Option<u32>,
2552 compressed_tokens: Option<u32>,
2553 original_bytes: Option<i64>,
2554 compressed_bytes: Option<i64>,
2555 tokens_skipped: bool,
2556}
2557
2558impl CompletionTokenCounts {
2559 fn skipped() -> Self {
2560 Self {
2561 original_tokens: None,
2562 compressed_tokens: None,
2563 original_bytes: None,
2564 compressed_bytes: None,
2565 tokens_skipped: true,
2566 }
2567 }
2568}
2569
2570fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2571 match status {
2572 BgTaskStatus::TimedOut => "timed out".to_string(),
2573 BgTaskStatus::Killed => "killed".to_string(),
2574 _ => exit_code
2575 .map(|code| format!("exit {code}"))
2576 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2577 }
2578}
2579
2580fn token_count_u32(text: &str) -> u32 {
2581 aft_tokenizer::count_tokens(text)
2582 .try_into()
2583 .unwrap_or(u32::MAX)
2584}
2585
2586impl Default for BgTaskRegistry {
2587 fn default() -> Self {
2588 Self::new(Arc::new(Mutex::new(None)))
2589 }
2590}
2591
2592fn modified_within(path: &Path, grace: Duration) -> bool {
2593 fs::metadata(path)
2594 .and_then(|metadata| metadata.modified())
2595 .ok()
2596 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2597 .map(|age| age < grace)
2598 .unwrap_or(false)
2599}
2600
2601fn canonicalized_path(path: &Path) -> PathBuf {
2602 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2603}
2604
2605fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2606 let now_ms = SystemTime::now()
2607 .duration_since(UNIX_EPOCH)
2608 .ok()
2609 .map(|duration| duration.as_millis() as u64)
2610 .unwrap_or(started_at);
2611 let elapsed_ms = now_ms.saturating_sub(started_at);
2612 Instant::now()
2613 .checked_sub(Duration::from_millis(elapsed_ms))
2614 .unwrap_or_else(Instant::now)
2615}
2616
2617fn gc_quarantine(storage_dir: &Path) {
2618 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2619 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2620 return;
2621 };
2622 for session_entry in session_dirs.flatten() {
2623 let session_quarantine_dir = session_entry.path();
2624 if !session_quarantine_dir.is_dir() {
2625 continue;
2626 }
2627 let entries = match fs::read_dir(&session_quarantine_dir) {
2628 Ok(entries) => entries,
2629 Err(error) => {
2630 crate::slog_warn!(
2631 "failed to read background task quarantine dir {}: {error}",
2632 session_quarantine_dir.display()
2633 );
2634 continue;
2635 }
2636 };
2637 for entry in entries.flatten() {
2638 let path = entry.path();
2639 if modified_within(&path, QUARANTINE_GC_GRACE) {
2640 continue;
2641 }
2642 let result = if path.is_dir() {
2643 fs::remove_dir_all(&path)
2644 } else {
2645 fs::remove_file(&path)
2646 };
2647 match result {
2648 Ok(()) => log::debug!(
2649 "deleted old background task quarantine entry {}",
2650 path.display()
2651 ),
2652 Err(error) => crate::slog_warn!(
2653 "failed to delete old background task quarantine entry {}: {error}",
2654 path.display()
2655 ),
2656 }
2657 }
2658 let _ = fs::remove_dir(&session_quarantine_dir);
2659 }
2660 let _ = fs::remove_dir(&quarantine_root);
2661}
2662
2663enum QuarantineKind {
2664 Corrupt,
2665 Invalid,
2666}
2667
2668fn quarantine_task_json(
2669 storage_dir: &Path,
2670 session_dir: &Path,
2671 json_path: &Path,
2672 kind: QuarantineKind,
2673) -> Result<(), String> {
2674 let session_hash = session_dir
2675 .file_name()
2676 .and_then(|name| name.to_str())
2677 .ok_or_else(|| {
2678 format!(
2679 "invalid background task session dir: {}",
2680 session_dir.display()
2681 )
2682 })?;
2683 let task_name = json_path
2684 .file_name()
2685 .and_then(|name| name.to_str())
2686 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2687 let unix_ts = SystemTime::now()
2688 .duration_since(UNIX_EPOCH)
2689 .map(|duration| duration.as_secs())
2690 .unwrap_or(0);
2691 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2692 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2693 format!(
2694 "failed to create background task quarantine dir {}: {e}",
2695 quarantine_dir.display()
2696 )
2697 })?;
2698 let target_name = quarantine_name(task_name, unix_ts, &kind);
2699 let target = quarantine_dir.join(target_name);
2700 fs::rename(json_path, &target).map_err(|e| {
2701 format!(
2702 "failed to quarantine background task metadata {} to {}: {e}",
2703 json_path.display(),
2704 target.display()
2705 )
2706 })?;
2707
2708 for sibling in task_sibling_paths(json_path) {
2709 if !sibling.exists() {
2710 continue;
2711 }
2712 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2713 crate::slog_warn!(
2714 "skipping background task sibling with invalid name during quarantine: {}",
2715 sibling.display()
2716 );
2717 continue;
2718 };
2719 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2720 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2721 crate::slog_warn!(
2722 "failed to quarantine background task sibling {} to {}: {error}",
2723 sibling.display(),
2724 sibling_target.display()
2725 );
2726 }
2727 }
2728
2729 let _ = fs::remove_dir(session_dir);
2730 Ok(())
2731}
2732
2733fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2734 match kind {
2735 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2736 QuarantineKind::Invalid => {
2737 let path = Path::new(file_name);
2738 let stem = path.file_stem().and_then(|stem| stem.to_str());
2739 let extension = path.extension().and_then(|extension| extension.to_str());
2740 match (stem, extension) {
2741 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2742 _ => format!("{file_name}.invalid.{unix_ts}"),
2743 }
2744 }
2745 }
2746}
2747
2748fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2749 let Some(parent) = json_path.parent() else {
2750 return Vec::new();
2751 };
2752 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2753 return Vec::new();
2754 };
2755 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2756 .into_iter()
2757 .map(|extension| parent.join(format!("{stem}.{extension}")))
2758 .collect()
2759}
2760
2761fn read_tail_from_disk(
2762 metadata: &PersistedTask,
2763 paths: &TaskPaths,
2764 max_bytes: usize,
2765) -> (String, bool) {
2766 if metadata.mode == BgMode::Pty {
2767 return read_file_tail_capped(&paths.pty, max_bytes)
2768 .map(|bytes| {
2769 let truncated = fs::metadata(&paths.pty)
2770 .map(|metadata| metadata.len() > max_bytes as u64)
2771 .unwrap_or(false);
2772 (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2773 })
2774 .unwrap_or_else(|_| (String::new(), false));
2775 }
2776 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2777 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2778 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2779 bytes.extend_from_slice(&stdout);
2780 bytes.extend_from_slice(&stderr);
2781 if bytes.len() <= max_bytes {
2782 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2783 }
2784 let start = bytes.len().saturating_sub(max_bytes);
2785 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2786}
2787
2788fn read_for_token_count_from_disk(
2789 metadata: &PersistedTask,
2790 paths: &TaskPaths,
2791 max_bytes_per_stream: usize,
2792) -> TokenCountInput {
2793 if metadata.mode == BgMode::Pty {
2794 return TokenCountInput::Skipped;
2795 }
2796 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2803 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2804 match (stdout, stderr) {
2805 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2806 String::from_utf8_lossy(&stdout).as_ref(),
2807 String::from_utf8_lossy(&stderr).as_ref(),
2808 )),
2809 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2810 String::from_utf8_lossy(&stdout).as_ref(),
2811 "",
2812 )),
2813 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2814 "",
2815 String::from_utf8_lossy(&stderr).as_ref(),
2816 )),
2817 (Err(_), Err(_)) => TokenCountInput::Skipped,
2818 }
2819}
2820
2821fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2826 use std::io::{Read, Seek, SeekFrom};
2827 let mut file = std::fs::File::open(path)?;
2828 let len = file.metadata()?.len();
2829 let read_len = len.min(max_bytes as u64);
2830 if read_len > 0 && len > max_bytes as u64 {
2831 file.seek(SeekFrom::End(-(read_len as i64)))?;
2832 }
2833 let mut bytes = Vec::with_capacity(read_len as usize);
2834 file.read_to_end(&mut bytes)?;
2835 Ok(bytes)
2836}
2837
2838impl BgTask {
2839 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2840 let state = self
2841 .state
2842 .lock()
2843 .unwrap_or_else(|poison| poison.into_inner());
2844 self.snapshot_locked(&state, preview_bytes)
2845 }
2846
2847 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2848 let metadata = &state.metadata;
2849 let duration_ms = metadata.duration_ms.or_else(|| {
2850 metadata
2851 .status
2852 .is_terminal()
2853 .then(|| self.started.elapsed().as_millis() as u64)
2854 });
2855 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2856 (String::new(), false)
2857 } else {
2858 state.buffer.read_tail(preview_bytes)
2859 };
2860 BgTaskSnapshot {
2861 info: BgTaskInfo {
2862 task_id: self.task_id.clone(),
2863 status: metadata.status.clone(),
2864 command: metadata.command.clone(),
2865 mode: metadata.mode.clone(),
2866 started_at: metadata.started_at,
2867 duration_ms,
2868 },
2869 exit_code: metadata.exit_code,
2870 child_pid: metadata.child_pid,
2871 workdir: metadata.workdir.display().to_string(),
2872 output_preview,
2873 output_truncated,
2874 output_path: state
2875 .buffer
2876 .output_path()
2877 .map(|path| path.display().to_string()),
2878 stderr_path: state
2879 .buffer
2880 .stderr_path()
2881 .map(|path| path.display().to_string()),
2882 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2883 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2884 }
2885 }
2886
2887 pub(crate) fn is_running(&self) -> bool {
2888 self.state
2889 .lock()
2890 .map(|state| {
2891 state.metadata.status == BgTaskStatus::Running
2892 || (state.metadata.mode == BgMode::Pty
2893 && state.metadata.status == BgTaskStatus::Killing)
2894 })
2895 .unwrap_or(false)
2896 }
2897
2898 fn is_terminal(&self) -> bool {
2899 self.state
2900 .lock()
2901 .map(|state| state.metadata.status.is_terminal())
2902 .unwrap_or(false)
2903 }
2904
2905 fn mark_terminal_now(&self) {
2906 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2907 if terminal_at.is_none() {
2908 *terminal_at = Some(Instant::now());
2909 }
2910 }
2911 }
2912
2913 fn set_completion_delivered(
2914 &self,
2915 delivered: bool,
2916 registry: &BgTaskRegistry,
2917 ) -> Result<(), String> {
2918 let mut state = self
2919 .state
2920 .lock()
2921 .map_err(|_| "background task lock poisoned".to_string())?;
2922 let updated = registry
2923 .update_task_metadata(&self.paths, |metadata| {
2924 metadata.completion_delivered = delivered;
2925 })
2926 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2927 state.metadata = updated;
2928 Ok(())
2929 }
2930}
2931
2932fn terminal_metadata_from_marker(
2933 mut metadata: PersistedTask,
2934 marker: ExitMarker,
2935 reason: Option<String>,
2936) -> PersistedTask {
2937 match marker {
2938 ExitMarker::Code(code) => {
2939 let status = if code == 0 {
2940 BgTaskStatus::Completed
2941 } else {
2942 BgTaskStatus::Failed
2943 };
2944 metadata.mark_terminal(status, Some(code), reason);
2945 }
2946 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2947 }
2948 metadata
2949}
2950
2951#[cfg(unix)]
2952fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2953 let shell = resolve_posix_shell();
2954 let mut cmd = Command::new(&shell);
2955 cmd.arg("-c")
2956 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2957 .arg(&shell)
2958 .arg(command)
2959 .arg(exit_path);
2960 unsafe {
2961 cmd.pre_exec(|| {
2962 if libc::setsid() == -1 {
2963 return Err(std::io::Error::last_os_error());
2964 }
2965 Ok(())
2966 });
2967 }
2968 cmd
2969}
2970
2971#[cfg(unix)]
2972fn resolve_posix_shell() -> PathBuf {
2973 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2974 POSIX_SHELL
2975 .get_or_init(|| {
2976 std::env::var_os("BASH")
2977 .filter(|value| !value.is_empty())
2978 .map(PathBuf::from)
2979 .filter(|path| path.exists())
2980 .or_else(|| which::which("bash").ok())
2981 .or_else(|| which::which("zsh").ok())
2982 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2983 })
2984 .clone()
2985}
2986
2987#[cfg(windows)]
2988fn detached_shell_command_for(
2989 shell: crate::windows_shell::WindowsShell,
2990 command: &str,
2991 exit_path: &Path,
2992 paths: &TaskPaths,
2993 creation_flags: u32,
2994) -> Result<Command, String> {
2995 use crate::windows_shell::WindowsShell;
2996 let wrapper_body = shell.wrapper_script(command, exit_path);
3009 let wrapper_ext = match shell {
3010 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3011 WindowsShell::Cmd => "bat",
3012 WindowsShell::Posix(_) => "sh",
3016 };
3017 let wrapper_path = paths.dir.join(format!(
3018 "{}.{}",
3019 paths
3020 .json
3021 .file_stem()
3022 .and_then(|s| s.to_str())
3023 .unwrap_or("wrapper"),
3024 wrapper_ext
3025 ));
3026 fs::write(&wrapper_path, wrapper_body)
3027 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3028
3029 let mut cmd = Command::new(shell.binary().as_ref());
3030 match shell {
3031 WindowsShell::Pwsh | WindowsShell::Powershell => {
3032 cmd.args([
3035 "-NoLogo",
3036 "-NoProfile",
3037 "-NonInteractive",
3038 "-ExecutionPolicy",
3039 "Bypass",
3040 "-File",
3041 ]);
3042 cmd.arg(&wrapper_path);
3043 }
3044 WindowsShell::Cmd => {
3045 cmd.args(["/D", "/C"]);
3052 cmd.arg(&wrapper_path);
3053 }
3054 WindowsShell::Posix(_) => {
3055 cmd.arg(&wrapper_path);
3060 }
3061 }
3062
3063 cmd.creation_flags(creation_flags);
3067 Ok(cmd)
3068}
3069
3070fn spawn_detached_child(
3086 command: &str,
3087 paths: &TaskPaths,
3088 workdir: &Path,
3089 env: &HashMap<String, String>,
3090) -> Result<std::process::Child, String> {
3091 #[cfg(not(windows))]
3092 {
3093 let stdout = create_capture_file(&paths.stdout)
3094 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3095 let stderr = create_capture_file(&paths.stderr)
3096 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3097 detached_shell_command(command, &paths.exit)
3098 .current_dir(workdir)
3099 .envs(env)
3100 .stdin(Stdio::null())
3101 .stdout(Stdio::from(stdout))
3102 .stderr(Stdio::from(stderr))
3103 .spawn()
3104 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3105 }
3106 #[cfg(windows)]
3107 {
3108 use crate::windows_shell::shell_candidates;
3109 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3120 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3141 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3142 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3143 let with_breakaway =
3144 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3145 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3146 let mut last_error: Option<String> = None;
3147 for (idx, shell) in candidates.iter().enumerate() {
3148 for &flags in &[with_breakaway, without_breakaway] {
3152 let stdout = create_capture_file(&paths.stdout)
3154 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3155 let stderr = create_capture_file(&paths.stderr)
3156 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3157 let mut cmd =
3158 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3159 cmd.current_dir(workdir)
3160 .envs(env)
3161 .stdin(Stdio::null())
3162 .stdout(Stdio::from(stdout))
3163 .stderr(Stdio::from(stderr));
3164 match cmd.spawn() {
3165 Ok(child) => {
3166 if idx > 0 {
3167 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3168 the cached PATH probe disagreed with runtime spawn — likely PATH \
3169 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3170 shell.binary(),
3171 idx);
3172 }
3173 if flags == without_breakaway {
3174 crate::slog_warn!(
3175 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3176 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3177 Spawned without breakaway; the bg task will be torn down if the \
3178 AFT process group is killed."
3179 );
3180 }
3181 return Ok(child);
3182 }
3183 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3184 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3185 shell.binary());
3186 last_error = Some(format!("{}: {e}", shell.binary()));
3187 break;
3190 }
3191 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3192 crate::slog_warn!(
3194 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3195 Access Denied — retrying {} without breakaway",
3196 shell.binary()
3197 );
3198 last_error = Some(format!("{}: {e}", shell.binary()));
3199 continue;
3200 }
3201 Err(e) => {
3202 return Err(format!(
3203 "failed to spawn background bash command via {}: {e}",
3204 shell.binary()
3205 ));
3206 }
3207 }
3208 }
3209 }
3210 Err(format!(
3211 "failed to spawn background bash command: no Windows shell could be spawned. \
3212 Last error: {}. PATH-probed candidates: {:?}",
3213 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3214 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3215 ))
3216 }
3217}
3218
3219fn random_slug() -> String {
3220 let mut bytes = [0u8; 4];
3221 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3223 let t = SystemTime::now()
3225 .duration_since(UNIX_EPOCH)
3226 .map(|d| d.subsec_nanos())
3227 .unwrap_or(0);
3228 let p = std::process::id();
3229 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3230 });
3231 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3233 format!("bash-{hex}")
3234}
3235
3236#[cfg(test)]
3237mod tests {
3238 use std::collections::HashMap;
3239 #[cfg(windows)]
3240 use std::fs;
3241 use std::sync::{Arc, Mutex};
3242 use std::time::Duration;
3243 #[cfg(windows)]
3244 use std::time::Instant;
3245
3246 use super::*;
3247
3248 #[cfg(unix)]
3249 const QUICK_SUCCESS_COMMAND: &str = "true";
3250 #[cfg(windows)]
3251 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3252
3253 #[cfg(unix)]
3254 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3255 #[cfg(windows)]
3256 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3257
3258 #[test]
3259 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3260 let registry = BgTaskRegistry::default();
3261 let dir = tempfile::tempdir().unwrap();
3262 let task_id = registry
3263 .spawn_pty(
3264 QUICK_SUCCESS_COMMAND,
3265 "session".to_string(),
3266 dir.path().to_path_buf(),
3267 HashMap::new(),
3268 Some(Duration::from_secs(30)),
3269 dir.path().to_path_buf(),
3270 10,
3271 true,
3272 false,
3273 Some(dir.path().to_path_buf()),
3274 50,
3275 120,
3276 )
3277 .unwrap();
3278
3279 let paths = task_paths(dir.path(), "session", &task_id);
3280 let metadata = read_task(&paths.json).unwrap();
3281 assert_eq!(
3282 metadata.schema_version,
3283 crate::bash_background::persistence::SCHEMA_VERSION
3284 );
3285 assert_eq!(metadata.mode, BgMode::Pty);
3286 assert_eq!(metadata.pty_rows, Some(50));
3287 assert_eq!(metadata.pty_cols, Some(120));
3288
3289 let snapshot = registry
3290 .status(&task_id, "session", None, Some(dir.path()), 1024)
3291 .unwrap();
3292 assert_eq!(snapshot.pty_rows, Some(50));
3293 assert_eq!(snapshot.pty_cols, Some(120));
3294 }
3295
3296 fn spawn_dead_child() -> std::process::Child {
3301 #[cfg(unix)]
3302 let mut cmd = std::process::Command::new("true");
3303 #[cfg(windows)]
3304 let mut cmd = {
3305 let mut c = std::process::Command::new("cmd");
3306 c.args(["/c", "exit", "0"]);
3307 c
3308 };
3309 cmd.stdin(std::process::Stdio::null());
3310 cmd.stdout(std::process::Stdio::null());
3311 cmd.stderr(std::process::Stdio::null());
3312 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3313 let started = Instant::now();
3322 loop {
3323 match child.try_wait() {
3324 Ok(Some(_)) => break,
3325 Ok(None) => {
3326 if started.elapsed() > Duration::from_secs(5) {
3327 panic!("dead-child stand-in did not exit within 5s");
3328 }
3329 std::thread::sleep(Duration::from_millis(10));
3330 }
3331 Err(error) => panic!("dead-child try_wait failed: {error}"),
3332 }
3333 }
3334 child
3335 }
3336
3337 #[test]
3338 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3339 let registry = BgTaskRegistry::default();
3340 let dir = tempfile::tempdir().unwrap();
3341 let task_id = registry
3342 .spawn(
3343 LONG_RUNNING_COMMAND,
3344 "session".to_string(),
3345 dir.path().to_path_buf(),
3346 HashMap::new(),
3347 Some(Duration::from_secs(30)),
3348 dir.path().to_path_buf(),
3349 10,
3350 true,
3351 false,
3352 Some(dir.path().to_path_buf()),
3353 )
3354 .unwrap();
3355 registry
3356 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3357 .unwrap();
3358 assert_eq!(
3359 registry
3360 .drain_completions_for_session(Some("session"))
3361 .len(),
3362 1
3363 );
3364
3365 registry.inner.completions.lock().unwrap().clear();
3368
3369 assert_eq!(
3370 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3371 vec![task_id.clone()]
3372 );
3373 assert!(registry
3374 .drain_completions_for_session(Some("session"))
3375 .is_empty());
3376
3377 let paths = task_paths(dir.path(), "session", &task_id);
3378 let metadata = read_task(&paths.json).unwrap();
3379 assert!(metadata.completion_delivered);
3380
3381 let replayed = BgTaskRegistry::default();
3382 replayed
3383 .replay_session_inner(dir.path(), "session", None)
3384 .unwrap();
3385 assert!(replayed
3386 .drain_completions_for_session(Some("session"))
3387 .is_empty());
3388 }
3389
3390 #[test]
3391 fn register_watch_rejects_unknown_task() {
3392 let registry = BgTaskRegistry::default();
3393
3394 let result = registry.register_watch(
3395 "missing-task".to_string(),
3396 WatchPattern::Substring("READY".into()),
3397 true,
3398 );
3399
3400 assert_eq!(result, Err("task_not_found"));
3401 }
3402
3403 #[test]
3404 fn register_watch_on_terminal_task_scans_existing_output() {
3405 let frames = Arc::new(Mutex::new(Vec::new()));
3406 let captured = Arc::clone(&frames);
3407 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
3408 captured.lock().unwrap().push(frame);
3409 })
3410 as Box<dyn Fn(PushFrame) + Send + Sync>);
3411 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
3412 let dir = tempfile::tempdir().unwrap();
3413 let task_id = registry
3414 .spawn(
3415 LONG_RUNNING_COMMAND,
3416 "session".to_string(),
3417 dir.path().to_path_buf(),
3418 HashMap::new(),
3419 Some(Duration::from_secs(30)),
3420 dir.path().to_path_buf(),
3421 10,
3422 true,
3423 false,
3424 Some(dir.path().to_path_buf()),
3425 )
3426 .unwrap();
3427 registry
3428 .inner
3429 .shutdown
3430 .store(true, std::sync::atomic::Ordering::SeqCst);
3431 let task = registry.task_for_session(&task_id, "session").unwrap();
3432 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
3433 registry
3434 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3435 .unwrap();
3436 frames.lock().unwrap().clear();
3437 registry.inner.completions.lock().unwrap().clear();
3438
3439 registry
3440 .register_watch(
3441 task_id.clone(),
3442 WatchPattern::Substring("READY".into()),
3443 true,
3444 )
3445 .unwrap();
3446
3447 let frames = frames.lock().unwrap();
3448 let frame = frames
3449 .iter()
3450 .find_map(|frame| match frame {
3451 PushFrame::BashPatternMatch(frame) => Some(frame),
3452 _ => None,
3453 })
3454 .expect("terminal watch registration should emit pattern frame");
3455 assert_eq!(frame.reason, "pattern_match");
3456 assert_eq!(frame.task_id, task_id);
3457 assert_eq!(frame.session_id, "session");
3458 assert_eq!(frame.match_text, "READY");
3459 assert_eq!(frame.match_offset, 0);
3460 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
3461 let metadata = read_task(&task.paths.json).unwrap();
3462 assert!(metadata.completion_delivered);
3463 }
3464
3465 #[test]
3466 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3467 let registry = BgTaskRegistry::default();
3468 let dir = tempfile::tempdir().unwrap();
3469 let task_id = registry
3470 .spawn(
3471 QUICK_SUCCESS_COMMAND,
3472 "session".to_string(),
3473 dir.path().to_path_buf(),
3474 HashMap::new(),
3475 Some(Duration::from_secs(30)),
3476 dir.path().to_path_buf(),
3477 10,
3478 true,
3479 false,
3480 Some(dir.path().to_path_buf()),
3481 )
3482 .unwrap();
3483 registry
3484 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3485 .unwrap();
3486 let completions = registry.drain_completions_for_session(Some("session"));
3487 assert_eq!(completions.len(), 1);
3488 assert_eq!(
3489 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3490 vec![task_id.clone()]
3491 );
3492
3493 registry.cleanup_finished(Duration::ZERO);
3494
3495 assert!(registry.inner.tasks.lock().unwrap().is_empty());
3496 }
3497
3498 #[test]
3499 fn cleanup_finished_retains_undelivered_terminals() {
3500 let registry = BgTaskRegistry::default();
3501 let dir = tempfile::tempdir().unwrap();
3502 let task_id = registry
3503 .spawn(
3504 QUICK_SUCCESS_COMMAND,
3505 "session".to_string(),
3506 dir.path().to_path_buf(),
3507 HashMap::new(),
3508 Some(Duration::from_secs(30)),
3509 dir.path().to_path_buf(),
3510 10,
3511 true,
3512 false,
3513 Some(dir.path().to_path_buf()),
3514 )
3515 .unwrap();
3516 registry
3517 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3518 .unwrap();
3519
3520 registry.cleanup_finished(Duration::ZERO);
3521
3522 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3523 }
3524
3525 #[test]
3533 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3534 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3535 let dir = tempfile::tempdir().unwrap();
3536 let task_id = registry
3537 .spawn(
3538 QUICK_SUCCESS_COMMAND,
3539 "session".to_string(),
3540 dir.path().to_path_buf(),
3541 HashMap::new(),
3542 Some(Duration::from_secs(30)),
3543 dir.path().to_path_buf(),
3544 10,
3545 true,
3546 false,
3547 Some(dir.path().to_path_buf()),
3548 )
3549 .unwrap();
3550
3551 let task = registry.task_for_session(&task_id, "session").unwrap();
3552
3553 let started = Instant::now();
3558 loop {
3559 let exited = {
3560 let mut state = task.state.lock().unwrap();
3561 match &mut state.runtime {
3562 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3563 _ => true,
3564 }
3565 };
3566 if exited {
3567 break;
3568 }
3569 assert!(
3570 started.elapsed() < Duration::from_secs(5),
3571 "child should exit quickly"
3572 );
3573 std::thread::sleep(Duration::from_millis(20));
3574 }
3575
3576 registry
3584 .inner
3585 .shutdown
3586 .store(true, std::sync::atomic::Ordering::SeqCst);
3587 std::thread::sleep(Duration::from_millis(550));
3591
3592 let _ = std::fs::remove_file(&task.paths.exit);
3595
3596 {
3611 let mut state = task.state.lock().unwrap();
3612 state.metadata.status = BgTaskStatus::Running;
3613 state.metadata.status_reason = None;
3614 state.metadata.exit_code = None;
3615 state.metadata.finished_at = None;
3616 state.metadata.duration_ms = None;
3617 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3620 .expect("persist reset Running metadata for reap_child test");
3621 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3625 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3626 }
3627 }
3628 *task.terminal_at.lock().unwrap() = None;
3631
3632 assert!(
3635 task.is_running(),
3636 "precondition: metadata.status == Running"
3637 );
3638 assert!(
3639 !task.paths.exit.exists(),
3640 "precondition: exit marker absent"
3641 );
3642
3643 registry.reap_child(&task);
3648
3649 {
3650 let state = task.state.lock().unwrap();
3651 assert_eq!(
3652 state.metadata.status,
3653 BgTaskStatus::Running,
3654 "first reap must leave status Running while waiting one pass for marker"
3655 );
3656 assert_eq!(
3657 state.metadata.status_reason, None,
3658 "first reap must not record a failure reason"
3659 );
3660 assert!(
3661 matches!(state.runtime, TaskRuntime::Piped(None)),
3662 "child handle must be released after first reap"
3663 );
3664 assert!(
3665 state.detached,
3666 "task must be marked detached after first reap"
3667 );
3668 }
3669
3670 registry.reap_child(&task);
3674
3675 let state = task.state.lock().unwrap();
3676 assert!(
3677 state.metadata.status.is_terminal(),
3678 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3679 state.metadata.status
3680 );
3681 assert_eq!(
3682 state.metadata.status,
3683 BgTaskStatus::Failed,
3684 "must specifically be Failed (not Killed): status={:?}",
3685 state.metadata.status
3686 );
3687 assert_eq!(
3688 state.metadata.status_reason.as_deref(),
3689 Some("process exited without exit marker"),
3690 "reason must match replay path's wording: {:?}",
3691 state.metadata.status_reason
3692 );
3693 assert!(
3694 matches!(state.runtime, TaskRuntime::Piped(None)),
3695 "child handle must stay released after second reap"
3696 );
3697 assert!(
3698 state.detached,
3699 "task must remain detached after second reap"
3700 );
3701 }
3702
3703 #[test]
3708 fn reap_child_preserves_running_when_exit_marker_exists() {
3709 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3710 let dir = tempfile::tempdir().unwrap();
3711 let task_id = registry
3712 .spawn(
3713 QUICK_SUCCESS_COMMAND,
3714 "session".to_string(),
3715 dir.path().to_path_buf(),
3716 HashMap::new(),
3717 Some(Duration::from_secs(30)),
3718 dir.path().to_path_buf(),
3719 10,
3720 true,
3721 false,
3722 Some(dir.path().to_path_buf()),
3723 )
3724 .unwrap();
3725
3726 let task = registry.task_for_session(&task_id, "session").unwrap();
3727
3728 let started = Instant::now();
3731 loop {
3732 let exited = {
3733 let mut state = task.state.lock().unwrap();
3734 match &mut state.runtime {
3735 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3736 _ => true,
3737 }
3738 };
3739 if exited && task.paths.exit.exists() {
3740 break;
3741 }
3742 assert!(
3743 started.elapsed() < Duration::from_secs(5),
3744 "child should exit and write marker quickly"
3745 );
3746 std::thread::sleep(Duration::from_millis(20));
3747 }
3748
3749 registry
3755 .inner
3756 .shutdown
3757 .store(true, std::sync::atomic::Ordering::SeqCst);
3758 std::thread::sleep(Duration::from_millis(550));
3759
3760 {
3766 let mut state = task.state.lock().unwrap();
3767 state.metadata.status = BgTaskStatus::Running;
3768 state.metadata.status_reason = None;
3769 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3770 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3771 }
3772 }
3773 *task.terminal_at.lock().unwrap() = None;
3774 if !task.paths.exit.exists() {
3777 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3778 }
3779
3780 registry.reap_child(&task);
3784
3785 let state = task.state.lock().unwrap();
3786 assert!(
3787 matches!(state.runtime, TaskRuntime::Piped(None)),
3788 "child handle still released even when marker exists"
3789 );
3790 assert!(
3791 state.detached,
3792 "task still marked detached even when marker exists"
3793 );
3794 assert_eq!(
3799 state.metadata.status,
3800 BgTaskStatus::Running,
3801 "reap_child must defer to poll_task when marker exists"
3802 );
3803 }
3804
3805 #[test]
3806 fn cleanup_finished_keeps_running_tasks() {
3807 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3808 let dir = tempfile::tempdir().unwrap();
3809 let task_id = registry
3810 .spawn(
3811 LONG_RUNNING_COMMAND,
3812 "session".to_string(),
3813 dir.path().to_path_buf(),
3814 HashMap::new(),
3815 Some(Duration::from_secs(30)),
3816 dir.path().to_path_buf(),
3817 10,
3818 true,
3819 false,
3820 Some(dir.path().to_path_buf()),
3821 )
3822 .unwrap();
3823
3824 registry.cleanup_finished(Duration::ZERO);
3825
3826 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3827 let _ = registry.kill(&task_id, "session");
3828 }
3829
3830 #[cfg(windows)]
3831 fn wait_for_file(path: &Path) -> String {
3832 let started = Instant::now();
3833 loop {
3834 if path.exists() {
3835 return fs::read_to_string(path).expect("read file");
3836 }
3837 assert!(
3838 started.elapsed() < Duration::from_secs(30),
3839 "timed out waiting for {}",
3840 path.display()
3841 );
3842 std::thread::sleep(Duration::from_millis(100));
3843 }
3844 }
3845
3846 #[cfg(windows)]
3847 fn spawn_windows_registry_command(
3848 command: &str,
3849 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
3850 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3851 let dir = tempfile::tempdir().unwrap();
3852 let task_id = registry
3853 .spawn(
3854 command,
3855 "session".to_string(),
3856 dir.path().to_path_buf(),
3857 HashMap::new(),
3858 Some(Duration::from_secs(30)),
3859 dir.path().to_path_buf(),
3860 10,
3861 false,
3862 false,
3863 Some(dir.path().to_path_buf()),
3864 )
3865 .unwrap();
3866 (registry, dir, task_id)
3867 }
3868
3869 #[cfg(windows)]
3870 #[test]
3871 fn windows_spawn_writes_exit_marker_for_zero_exit() {
3872 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
3873 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3874
3875 let content = wait_for_file(&exit_path);
3876
3877 assert_eq!(content.trim(), "0");
3878 }
3879
3880 #[cfg(windows)]
3881 #[test]
3882 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
3883 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
3884 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3885
3886 let content = wait_for_file(&exit_path);
3887
3888 assert_eq!(content.trim(), "42");
3889 }
3890
3891 #[cfg(windows)]
3892 #[test]
3893 fn windows_spawn_captures_stdout_to_disk() {
3894 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3895 let task = registry.task_for_session(&task_id, "session").unwrap();
3896 let stdout_path = task.paths.stdout.clone();
3897 let exit_path = task.paths.exit.clone();
3898
3899 let _ = wait_for_file(&exit_path);
3900 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3901
3902 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3903 }
3904
3905 #[cfg(windows)]
3906 #[test]
3907 fn windows_spawn_uses_pwsh_when_available() {
3908 let candidates = crate::windows_shell::shell_candidates_with(
3912 |binary| match binary {
3913 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3914 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3915 _ => None,
3916 },
3917 || None,
3918 );
3919 let shell = candidates.first().expect("at least one candidate").clone();
3920 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3921 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3922 }
3923
3924 #[cfg(windows)]
3931 #[test]
3932 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3933 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3934 let script =
3935 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3936
3937 assert!(
3941 script.contains("set CODE=%ERRORLEVEL%"),
3942 "wrapper must capture exit code into CODE: {script}"
3943 );
3944 assert!(
3945 script.contains("echo %CODE% >"),
3946 "wrapper must echo CODE to a temp marker file: {script}"
3947 );
3948 assert!(
3949 script.contains("move /Y"),
3950 "wrapper must use atomic move to write the marker: {script}"
3951 );
3952 assert!(
3955 script.contains("> nul"),
3956 "wrapper must redirect move output to nul: {script}"
3957 );
3958 assert!(
3960 script.contains("exit /B %CODE%"),
3961 "wrapper must propagate the captured exit code: {script}"
3962 );
3963 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3964 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3965 }
3966
3967 #[cfg(windows)]
3973 #[test]
3974 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3975 use crate::windows_shell::WindowsShell;
3976 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3977 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3978 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3979 assert_eq!(
3980 args_strs,
3981 vec!["/D", "/S", "/C", "echo wrapped"],
3982 "Cmd::bg_command must prepend /D /S /C"
3983 );
3984 }
3985
3986 #[cfg(windows)]
3990 #[test]
3991 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3992 use crate::windows_shell::WindowsShell;
3993 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3994 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3995 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3996 assert!(
3997 args_strs.contains(&"-Command"),
3998 "Pwsh::bg_command must use -Command: {args_strs:?}"
3999 );
4000 assert!(
4001 args_strs.contains(&"Get-Date"),
4002 "Pwsh::bg_command must include the user command body"
4003 );
4004 }
4005
4006 #[allow(dead_code)]
4037 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
4039}