1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28 ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62 pub task_id: String,
63 #[serde(skip_serializing)]
66 pub session_id: String,
67 pub status: BgTaskStatus,
68 pub exit_code: Option<i32>,
69 pub command: String,
70 #[serde(default, skip_serializing_if = "String::is_empty")]
76 pub output_preview: String,
77 #[serde(default, skip_serializing_if = "is_false")]
82 pub output_truncated: bool,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub original_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub compressed_tokens: Option<u32>,
91 #[serde(default, skip_serializing_if = "is_false")]
93 pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97 !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102 #[serde(flatten)]
103 pub info: BgTaskInfo,
104 pub exit_code: Option<i32>,
105 pub child_pid: Option<u32>,
106 pub workdir: String,
107 pub output_preview: String,
108 pub output_truncated: bool,
109 pub output_path: Option<String>,
110 pub stderr_path: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_rows: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119 pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125 pub(crate) progress_sender: SharedProgressSender,
126 watchdog_started: AtomicBool,
127 pub(crate) shutdown: AtomicBool,
128 pub(crate) long_running_reminder_enabled: AtomicBool,
129 pub(crate) long_running_reminder_interval_ms: AtomicU64,
130 persisted_gc_started: AtomicBool,
131 #[cfg(test)]
132 persisted_gc_runs: AtomicU64,
133 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140 pub(crate) db_harness: RwLock<Option<String>>,
141 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143 pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147 pub(crate) task_id: String,
148 pub(crate) session_id: String,
149 pub(crate) paths: TaskPaths,
150 pub(crate) started: Instant,
151 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152 pub(crate) terminal_at: Mutex<Option<Instant>>,
153 pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157 Piped(Option<Child>),
158 Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162 pub(crate) metadata: PersistedTask,
163 pub(crate) runtime: TaskRuntime,
164 pub(crate) detached: bool,
165 pub(crate) child_exit_observed: bool,
176 pub(crate) buffer: BgBuffer,
177 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182 pub fn new(progress_sender: SharedProgressSender) -> Self {
183 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184 Self {
185 inner: Arc::new(RegistryInner {
186 tasks: Mutex::new(HashMap::new()),
187 completions: Mutex::new(VecDeque::new()),
188 progress_sender,
189 watchdog_started: AtomicBool::new(false),
190 shutdown: AtomicBool::new(false),
191 long_running_reminder_enabled: AtomicBool::new(true),
192 long_running_reminder_interval_ms: AtomicU64::new(600_000),
193 persisted_gc_started: AtomicBool::new(false),
194 #[cfg(test)]
195 persisted_gc_runs: AtomicU64::new(0),
196 compressor: Mutex::new(None),
197 db_pool: RwLock::new(None),
198 db_harness: RwLock::new(None),
199 wake_tx,
200 wake_rx,
201 watch_registry: Mutex::new(WatchRegistry::default()),
202 }),
203 }
204 }
205
206 pub fn set_harness(&self, harness: Harness) {
207 if let Ok(mut slot) = self.inner.db_harness.write() {
208 *slot = Some(harness.as_str().to_string());
209 }
210 }
211
212 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213 if let Ok(mut slot) = self.inner.db_pool.write() {
214 *slot = Some(conn);
215 }
216 }
217
218 pub fn clear_db_pool(&self) {
219 if let Ok(mut slot) = self.inner.db_pool.write() {
220 *slot = None;
221 }
222 }
223
224 pub fn set_compressor<F>(&self, compressor: F)
229 where
230 F: Fn(&str, String) -> String + Send + Sync + 'static,
231 {
232 if let Ok(mut slot) = self.inner.compressor.lock() {
233 *slot = Some(Box::new(compressor));
234 }
235 }
236
237 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240 let Ok(slot) = self.inner.compressor.lock() else {
241 return output;
242 };
243 match slot.as_ref() {
244 Some(compressor) => compressor(command, output),
245 None => output,
246 }
247 }
248
249 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250 write_task(&paths.json, metadata)?;
251 self.dual_write_task(paths, metadata);
252 Ok(())
253 }
254
255 fn update_task_metadata<F>(
256 &self,
257 paths: &TaskPaths,
258 update: F,
259 ) -> std::io::Result<PersistedTask>
260 where
261 F: FnOnce(&mut PersistedTask),
262 {
263 let metadata = update_task(&paths.json, update)?;
264 self.dual_write_task(paths, &metadata);
265 Ok(metadata)
266 }
267
268 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270 let Some(pool) = pool else {
271 return;
272 };
273 let harness = self
274 .inner
275 .db_harness
276 .read()
277 .ok()
278 .and_then(|slot| slot.clone());
279 let Some(harness) = harness else {
280 crate::slog_warn!(
281 "dual-write bash_task to DB skipped for {}: harness not configured",
282 metadata.task_id
283 );
284 return;
285 };
286 let row = match metadata.to_bash_task_row(&harness, paths) {
287 Ok(row) => row,
288 Err(error) => {
289 crate::slog_warn!(
290 "dual-write bash_task to DB failed for {}: {}",
291 metadata.task_id,
292 error
293 );
294 return;
295 }
296 };
297 let conn = match pool.lock() {
298 Ok(conn) => conn,
299 Err(_) => {
300 crate::slog_warn!(
301 "dual-write bash_task to DB failed for {}: db mutex poisoned",
302 metadata.task_id
303 );
304 return;
305 }
306 };
307 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308 crate::slog_warn!(
309 "dual-write bash_task to DB failed for {}: {}",
310 metadata.task_id,
311 error
312 );
313 }
314 }
315
316 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317 self.inner
318 .long_running_reminder_enabled
319 .store(enabled, Ordering::SeqCst);
320 self.inner
321 .long_running_reminder_interval_ms
322 .store(interval_ms, Ordering::SeqCst);
323 }
324
325 #[cfg(unix)]
326 #[allow(clippy::too_many_arguments)]
327 pub fn spawn(
328 &self,
329 command: &str,
330 session_id: String,
331 workdir: PathBuf,
332 env: HashMap<String, String>,
333 timeout: Option<Duration>,
334 storage_dir: PathBuf,
335 max_running: usize,
336 notify_on_completion: bool,
337 compressed: bool,
338 project_root: Option<PathBuf>,
339 ) -> Result<String, String> {
340 self.start_watchdog();
341
342 let running = self.running_count();
343 if running >= max_running {
344 return Err(format!(
345 "background bash task limit exceeded: {running} running (max {max_running})"
346 ));
347 }
348
349 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351 let task_id = self.generate_unique_task_id()?;
352 let paths = task_paths(&storage_dir, &session_id, &task_id);
353 fs::create_dir_all(&paths.dir)
354 .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356 let mut metadata = PersistedTask::starting(
357 task_id.clone(),
358 session_id.clone(),
359 command.to_string(),
360 workdir.clone(),
361 project_root,
362 timeout_ms,
363 notify_on_completion,
364 compressed,
365 );
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369 create_capture_file(&paths.stdout)
373 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374 create_capture_file(&paths.stderr)
375 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378 Ok(child) => child,
379 Err(error) => {
380 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381 let _ = delete_task_bundle(&paths);
382 return Err(error);
383 }
384 };
385
386 let child_pid = child.id();
387 metadata.mark_running(child_pid, child_pid as i32);
388 self.persist_task(&paths, &metadata)
389 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391 let task = Arc::new(BgTask {
392 task_id: task_id.clone(),
393 session_id,
394 paths: paths.clone(),
395 started: Instant::now(),
396 last_reminder_at: Mutex::new(None),
397 terminal_at: Mutex::new(None),
398 state: Mutex::new(BgTaskState {
399 metadata,
400 runtime: TaskRuntime::Piped(Some(child)),
401 detached: false,
402 child_exit_observed: false,
403 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404 pending_terminal_override: None,
405 }),
406 });
407
408 self.inner
409 .tasks
410 .lock()
411 .map_err(|_| "background task registry lock poisoned".to_string())?
412 .insert(task_id.clone(), task);
413
414 Ok(task_id)
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 pub fn spawn_pty(
419 &self,
420 command: &str,
421 session_id: String,
422 workdir: PathBuf,
423 env: HashMap<String, String>,
424 timeout: Option<Duration>,
425 storage_dir: PathBuf,
426 max_running: usize,
427 notify_on_completion: bool,
428 compressed: bool,
429 project_root: Option<PathBuf>,
430 rows: u16,
431 cols: u16,
432 ) -> Result<String, String> {
433 self.start_watchdog();
434
435 let running = self.running_count();
436 if running >= max_running {
437 return Err(format!(
438 "background bash task limit exceeded: {running} running (max {max_running})"
439 ));
440 }
441
442 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444 let task_id = self.generate_unique_task_id()?;
445 let paths = task_paths(&storage_dir, &session_id, &task_id);
446 fs::create_dir_all(&paths.dir)
447 .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449 let mut metadata = PersistedTask::starting(
450 task_id.clone(),
451 session_id.clone(),
452 command.to_string(),
453 workdir.clone(),
454 project_root,
455 timeout_ms,
456 notify_on_completion,
457 compressed,
458 );
459 metadata.mode = BgMode::Pty;
460 metadata.pty_rows = Some(rows);
461 metadata.pty_cols = Some(cols);
462 self.persist_task(&paths, &metadata)
463 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464 create_capture_file(&paths.pty)
465 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467 let runtime = match spawn_pty_for_command(
468 &task_id,
469 &session_id,
470 command,
471 &paths,
472 &workdir,
473 &env,
474 rows,
475 cols,
476 self.inner.wake_tx.clone(),
477 ) {
478 Ok(runtime) => runtime,
479 Err(error) => {
480 crate::slog_warn!(
481 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482 );
483 let _ = delete_task_bundle(&paths);
484 return Err(error);
485 }
486 };
487
488 if let Some(child_pid) = runtime.child_pid {
489 metadata.mark_running(child_pid, child_pid as i32);
490 } else {
491 metadata.status = BgTaskStatus::Running;
492 metadata.pgid = None;
493 }
494 self.persist_task(&paths, &metadata)
495 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497 let task = Arc::new(BgTask {
498 task_id: task_id.clone(),
499 session_id,
500 paths: paths.clone(),
501 started: Instant::now(),
502 last_reminder_at: Mutex::new(None),
503 terminal_at: Mutex::new(None),
504 state: Mutex::new(BgTaskState {
505 metadata,
506 runtime: TaskRuntime::Pty(Some(runtime)),
507 detached: false,
508 child_exit_observed: false,
509 buffer: BgBuffer::pty(paths.pty.clone()),
510 pending_terminal_override: None,
511 }),
512 });
513
514 self.inner
515 .tasks
516 .lock()
517 .map_err(|_| "background task registry lock poisoned".to_string())?
518 .insert(task_id.clone(), task);
519
520 Ok(task_id)
521 }
522
523 #[cfg(windows)]
524 #[allow(clippy::too_many_arguments)]
525 pub fn spawn(
526 &self,
527 command: &str,
528 session_id: String,
529 workdir: PathBuf,
530 env: HashMap<String, String>,
531 timeout: Option<Duration>,
532 storage_dir: PathBuf,
533 max_running: usize,
534 notify_on_completion: bool,
535 compressed: bool,
536 project_root: Option<PathBuf>,
537 ) -> Result<String, String> {
538 self.start_watchdog();
539
540 let running = self.running_count();
541 if running >= max_running {
542 return Err(format!(
543 "background bash task limit exceeded: {running} running (max {max_running})"
544 ));
545 }
546
547 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549 let task_id = self.generate_unique_task_id()?;
550 let paths = task_paths(&storage_dir, &session_id, &task_id);
551 fs::create_dir_all(&paths.dir)
552 .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554 let mut metadata = PersistedTask::starting(
555 task_id.clone(),
556 session_id.clone(),
557 command.to_string(),
558 workdir.clone(),
559 project_root,
560 timeout_ms,
561 notify_on_completion,
562 compressed,
563 );
564 self.persist_task(&paths, &metadata)
565 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567 create_capture_file(&paths.stdout)
573 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574 create_capture_file(&paths.stderr)
575 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578 Ok(child) => child,
579 Err(error) => {
580 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581 let _ = delete_task_bundle(&paths);
582 return Err(error);
583 }
584 };
585
586 let child_pid = child.id();
587 metadata.status = BgTaskStatus::Running;
588 metadata.child_pid = Some(child_pid);
589 metadata.pgid = None;
590 self.persist_task(&paths, &metadata)
591 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593 let task = Arc::new(BgTask {
594 task_id: task_id.clone(),
595 session_id,
596 paths: paths.clone(),
597 started: Instant::now(),
598 last_reminder_at: Mutex::new(None),
599 terminal_at: Mutex::new(None),
600 state: Mutex::new(BgTaskState {
601 metadata,
602 runtime: TaskRuntime::Piped(Some(child)),
603 detached: false,
604 child_exit_observed: false,
605 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606 pending_terminal_override: None,
607 }),
608 });
609
610 self.inner
611 .tasks
612 .lock()
613 .map_err(|_| "background task registry lock poisoned".to_string())?
614 .insert(task_id.clone(), task);
615
616 Ok(task_id)
617 }
618
619 pub fn write_pty(
620 &self,
621 task_id: &str,
622 session_id: &str,
623 input: &[u8],
624 ) -> Result<usize, String> {
625 let task = self
626 .task_for_session(task_id, session_id)
627 .ok_or_else(|| "task_not_found".to_string())?;
628
629 let writer = {
630 let state = task
631 .state
632 .lock()
633 .map_err(|_| "background task lock poisoned".to_string())?;
634 if state.metadata.mode != BgMode::Pty {
635 return Err("task_not_pty".to_string());
636 }
637 if state.metadata.status.is_terminal() {
638 return Err("task_exited".to_string());
639 }
640 match &state.runtime {
641 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644 }
645 };
646
647 let mut writer = writer
648 .lock()
649 .map_err(|_| "PTY writer lock poisoned".to_string())?;
650 writer
651 .write_all(input)
652 .map_err(|error| format!("failed to write to PTY: {error}"))?;
653 writer
654 .flush()
655 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656 Ok(input.len())
657 }
658
659 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660 self.replay_session_inner(storage_dir, session_id, None)
661 }
662
663 pub fn replay_session_for_project(
664 &self,
665 storage_dir: &Path,
666 session_id: &str,
667 project_root: &Path,
668 ) -> Result<(), String> {
669 self.replay_session_inner(storage_dir, session_id, Some(project_root))
670 }
671
672 fn replay_session_inner(
673 &self,
674 storage_dir: &Path,
675 session_id: &str,
676 project_root: Option<&Path>,
677 ) -> Result<(), String> {
678 self.start_watchdog();
679 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682 }
683 }
684
685 let canonical_project = project_root.map(canonicalized_path);
686 let tasks = match self.replay_session_from_db(session_id) {
698 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699 Some(Ok(_)) => {
700 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701 if !disk_tasks.is_empty() {
702 crate::slog_info!(
703 "bash task replay: 0 in DB for session {}, {} from disk fallback",
704 session_id,
705 disk_tasks.len()
706 );
707 }
708 disk_tasks
709 }
710 Some(Err(error)) => {
711 crate::slog_warn!(
712 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713 session_id,
714 error
715 );
716 self.replay_session_from_disk(storage_dir, session_id)?
717 }
718 None => {
719 self.replay_session_from_disk(storage_dir, session_id)?
721 }
722 };
723
724 for mut metadata in tasks {
725 if metadata.session_id != session_id {
726 continue;
727 }
728 if let Some(canonical_project) = canonical_project.as_deref() {
729 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730 if metadata_project.as_deref() != Some(canonical_project) {
731 continue;
732 }
733 }
734
735 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736 match metadata.status {
737 BgTaskStatus::Starting => {
738 let completion_was_delivered = metadata.completion_delivered;
739 metadata.mark_terminal(
740 BgTaskStatus::Failed,
741 None,
742 Some("spawn aborted".to_string()),
743 );
744 metadata.completion_delivered |= completion_was_delivered;
745 let _ = self.persist_task(&paths, &metadata);
746 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747 self.insert_rehydrated_task(metadata, paths, true)?;
748 }
749 BgTaskStatus::Running | BgTaskStatus::Killing => {
750 if metadata.mode == BgMode::Pty {
751 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752 let completion_was_delivered = metadata.completion_delivered;
753 metadata = terminal_metadata_from_marker(metadata, marker, None);
754 metadata.completion_delivered |= completion_was_delivered;
755 let _ = self.persist_task(&paths, &metadata);
756 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757 self.insert_rehydrated_task(metadata, paths, true)?;
758 } else if metadata.status.is_terminal() {
759 self.insert_rehydrated_task(metadata, paths, true)?;
760 } else {
761 let completion_was_delivered = metadata.completion_delivered;
762 metadata.mark_terminal(
763 BgTaskStatus::Killed,
764 None,
765 Some("pty_lost_on_bridge_restart".to_string()),
766 );
767 metadata.completion_delivered |= completion_was_delivered;
768 let _ = self.persist_task(&paths, &metadata);
769 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770 self.insert_rehydrated_task(metadata, paths, true)?;
771 }
772 } else if self.running_metadata_is_stale(&metadata) {
773 let completion_was_delivered = metadata.completion_delivered;
774 metadata.mark_terminal(
775 BgTaskStatus::Killed,
776 None,
777 Some("orphaned (>24h)".to_string()),
778 );
779 metadata.completion_delivered |= completion_was_delivered;
780 if !paths.exit.exists() {
781 let _ = write_kill_marker_if_absent(&paths.exit);
782 }
783 let _ = self.persist_task(&paths, &metadata);
784 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785 self.insert_rehydrated_task(metadata, paths, true)?;
786 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788 "recovered from inconsistent killing state on replay".to_string()
789 });
790 if reason.is_some() {
791 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792 metadata.task_id);
793 }
794 let completion_was_delivered = metadata.completion_delivered;
795 metadata = terminal_metadata_from_marker(metadata, marker, reason);
796 metadata.completion_delivered |= completion_was_delivered;
797 let _ = self.persist_task(&paths, &metadata);
798 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799 self.insert_rehydrated_task(metadata, paths, true)?;
800 } else if metadata.status == BgTaskStatus::Killing {
801 if !paths.exit.exists() {
802 let _ = write_kill_marker_if_absent(&paths.exit);
803 }
804 let completion_was_delivered = metadata.completion_delivered;
805 metadata.mark_terminal(
806 BgTaskStatus::Killed,
807 None,
808 Some("recovered from inconsistent killing state on replay".to_string()),
809 );
810 metadata.completion_delivered |= completion_was_delivered;
811 let _ = self.persist_task(&paths, &metadata);
812 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813 self.insert_rehydrated_task(metadata, paths, true)?;
814 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815 let completion_was_delivered = metadata.completion_delivered;
816 metadata.mark_terminal(
817 BgTaskStatus::Failed,
818 None,
819 Some("process exited without exit marker".to_string()),
820 );
821 metadata.completion_delivered |= completion_was_delivered;
822 let _ = self.persist_task(&paths, &metadata);
823 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824 self.insert_rehydrated_task(metadata, paths, true)?;
825 } else {
826 self.insert_rehydrated_task(metadata, paths, true)?;
827 }
828 }
829 _ if metadata.status.is_terminal() => {
830 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836 self.insert_rehydrated_task(metadata, paths, true)?;
837 }
838 _ => {}
839 }
840 }
841
842 Ok(())
843 }
844
845 fn replay_session_from_db(
846 &self,
847 session_id: &str,
848 ) -> Option<Result<Vec<PersistedTask>, String>> {
849 let pool = self
850 .inner
851 .db_pool
852 .read()
853 .ok()
854 .and_then(|slot| slot.clone())?;
855 let harness = self
856 .inner
857 .db_harness
858 .read()
859 .ok()
860 .and_then(|slot| slot.clone())?;
861 let conn = match pool.lock() {
862 Ok(conn) => conn,
863 Err(_) => return Some(Err("db mutex poisoned".to_string())),
864 };
865 Some(
866 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868 .map_err(|error| error.to_string()),
869 )
870 }
871
872 fn replay_session_from_disk(
873 &self,
874 storage_dir: &Path,
875 session_id: &str,
876 ) -> Result<Vec<PersistedTask>, String> {
877 let dir = session_tasks_dir(storage_dir, session_id);
878 if !dir.exists() {
879 return Ok(Vec::new());
880 }
881
882 let entries = fs::read_dir(&dir)
883 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884 let mut tasks = Vec::new();
885 for entry in entries.flatten() {
886 let path = entry.path();
887 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888 continue;
889 }
890 match read_task(&path) {
891 Ok(metadata) => tasks.push(metadata),
892 Err(error) => {
893 crate::slog_warn!(
894 "quarantining invalid background task metadata {} during replay: {error}",
895 path.display()
896 );
897 if let Err(quarantine_error) =
898 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899 {
900 crate::slog_warn!(
901 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902 path.display()
903 );
904 }
905 }
906 }
907 }
908 Ok(tasks)
909 }
910
911 pub fn register_watch(
912 &self,
913 task_id: String,
914 pattern: WatchPattern,
915 once: bool,
916 ) -> Result<String, &'static str> {
917 let task = self.task(&task_id).ok_or("task_not_found")?;
918 let (mode, terminal_at_registration, stdout, stderr, pty) = task
919 .state
920 .lock()
921 .map(|state| {
922 (
923 state.metadata.mode.clone(),
924 state.metadata.status.is_terminal(),
925 task.paths.stdout.clone(),
926 task.paths.stderr.clone(),
927 task.paths.pty.clone(),
928 )
929 })
930 .map_err(|_| "background_task_lock_poisoned")?;
931
932 let mut terminal_matches = Vec::new();
933 let scanned_terminal = terminal_at_registration;
934 let watch_id = {
935 let mut registry = self
936 .inner
937 .watch_registry
938 .lock()
939 .map_err(|_| "watch_registry_poisoned")?;
940 let watch_id = registry.register(task_id.clone(), pattern, once)?;
941 match &mode {
942 BgMode::Pipes => {
943 let stdout_key = format!("{task_id}:stdout");
944 let stderr_key = format!("{task_id}:stderr");
945 if terminal_at_registration {
946 registry.set_file_cursor(&stdout_key, 0);
947 registry.set_file_cursor(&stderr_key, 0);
948 terminal_matches.extend(registry.scan_file_new_bytes(
949 &stdout_key,
950 &task_id,
951 &stdout,
952 ));
953 terminal_matches.extend(registry.scan_file_new_bytes(
954 &stderr_key,
955 &task_id,
956 &stderr,
957 ));
958 } else {
959 registry.prime_file_cursor(&stdout_key, &stdout);
960 registry.prime_file_cursor(&stderr_key, &stderr);
961 }
962 }
963 BgMode::Pty => {
964 let pty_key = format!("{task_id}:pty");
965 if terminal_at_registration {
966 registry.set_file_cursor(&pty_key, 0);
967 terminal_matches
968 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
969 } else {
970 registry.prime_file_cursor(&pty_key, &pty);
971 }
972 }
973 }
974 watch_id
975 };
976
977 if task.is_terminal() {
978 if !scanned_terminal {
979 terminal_matches = {
980 let mut registry = self
981 .inner
982 .watch_registry
983 .lock()
984 .map_err(|_| "watch_registry_poisoned")?;
985 match &mode {
986 BgMode::Pipes => {
987 let stdout_key = format!("{task_id}:stdout");
988 let stderr_key = format!("{task_id}:stderr");
989 registry.set_file_cursor(&stdout_key, 0);
990 registry.set_file_cursor(&stderr_key, 0);
991 let mut matches =
992 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
993 matches.extend(registry.scan_file_new_bytes(
994 &stderr_key,
995 &task_id,
996 &stderr,
997 ));
998 matches
999 }
1000 BgMode::Pty => {
1001 let pty_key = format!("{task_id}:pty");
1002 registry.set_file_cursor(&pty_key, 0);
1003 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1004 }
1005 }
1006 };
1007 }
1008
1009 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1010 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1011 if watch_matched {
1012 let _ = task.set_completion_delivered(true, self);
1013 self.clear_task_watch_state(&task_id);
1014 }
1015 return Ok(watch_id);
1016 }
1017
1018 let completion = self
1019 .remove_pending_completion(&task_id)
1020 .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1021 if terminal_matches.is_empty() {
1022 if let Some(completion) = completion.as_ref() {
1023 self.emit_bash_watch_exit(completion);
1024 }
1025 } else {
1026 for pattern_match in terminal_matches {
1027 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1028 }
1029 }
1030 let _ = task.set_completion_delivered(true, self);
1031 self.clear_task_watch_state(&task_id);
1032 }
1033
1034 Ok(watch_id)
1035 }
1036
1037 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1038 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1039 registry.unregister(task_id, watch_id);
1040 }
1041 }
1042
1043 pub fn active_watch_count(&self, task_id: &str) -> usize {
1044 self.inner
1045 .watch_registry
1046 .lock()
1047 .map(|registry| registry.active_count(task_id))
1048 .unwrap_or(0)
1049 }
1050
1051 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1052 self.inner
1053 .watch_registry
1054 .lock()
1055 .map(|registry| {
1056 (
1057 registry.has_controlled_task(task_id),
1058 registry.has_matched_task(task_id),
1059 )
1060 })
1061 .unwrap_or((false, false))
1062 }
1063
1064 fn task_has_watch_control(&self, task_id: &str) -> bool {
1065 self.inner
1066 .watch_registry
1067 .lock()
1068 .map(|registry| registry.has_controlled_task(task_id))
1069 .unwrap_or(false)
1070 }
1071
1072 fn clear_task_watch_state(&self, task_id: &str) {
1073 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1074 registry.clear_task(task_id);
1075 }
1076 }
1077
1078 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1079 let (mode, stdout, stderr, pty) = match task.state.lock() {
1080 Ok(state) => (
1081 state.metadata.mode.clone(),
1082 task.paths.stdout.clone(),
1083 task.paths.stderr.clone(),
1084 task.paths.pty.clone(),
1085 ),
1086 Err(_) => return,
1087 };
1088 let mut matches = Vec::new();
1089 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1090 match mode {
1091 BgMode::Pipes => {
1092 let stdout_key = format!("{}:stdout", task.task_id);
1093 let stderr_key = format!("{}:stderr", task.task_id);
1094 matches.extend(registry.scan_file_new_bytes(
1095 &stdout_key,
1096 &task.task_id,
1097 &stdout,
1098 ));
1099 matches.extend(registry.scan_file_new_bytes(
1100 &stderr_key,
1101 &task.task_id,
1102 &stderr,
1103 ));
1104 }
1105 BgMode::Pty => {
1106 let pty_key = format!("{}:pty", task.task_id);
1107 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1108 }
1109 }
1110 }
1111 for pattern_match in matches {
1112 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1113 }
1114 }
1115
1116 pub fn status(
1117 &self,
1118 task_id: &str,
1119 session_id: &str,
1120 project_root: Option<&Path>,
1121 storage_dir: Option<&Path>,
1122 preview_bytes: usize,
1123 ) -> Option<BgTaskSnapshot> {
1124 let mut task = self.task_for_session(task_id, session_id);
1125 if task.is_none() {
1126 if let Some(storage_dir) = storage_dir {
1127 let _ = self.replay_session(storage_dir, session_id);
1128 task = self.task_for_session(task_id, session_id);
1129 }
1130 }
1131 let Some(task) = task else {
1132 return self.status_relaxed(
1133 task_id,
1134 session_id,
1135 project_root?,
1136 storage_dir?,
1137 preview_bytes,
1138 );
1139 };
1140 let _ = self.poll_task(&task);
1141 let mut snapshot = task.snapshot(preview_bytes);
1142 self.maybe_compress_snapshot(&task, &mut snapshot);
1143 Some(snapshot)
1144 }
1145
1146 fn status_relaxed_task(
1147 &self,
1148 task_id: &str,
1149 project_root: &Path,
1150 storage_dir: &Path,
1151 ) -> Option<Arc<BgTask>> {
1152 let canonical_project = canonicalized_path(project_root);
1153 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1154 Some(Ok(Some(metadata))) => {
1155 if let Some(task) = self.task(task_id) {
1156 let matches_project = task
1157 .state
1158 .lock()
1159 .map(|state| {
1160 state
1161 .metadata
1162 .project_root
1163 .as_deref()
1164 .map(canonicalized_path)
1165 .as_deref()
1166 == Some(canonical_project.as_path())
1167 })
1168 .unwrap_or(false);
1169 return matches_project.then_some(task);
1170 }
1171 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1172 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1173 return None;
1174 }
1175 return self.task(task_id);
1176 }
1177 Some(Ok(None)) => {
1178 crate::slog_info!(
1179 "bash task relaxed DB miss for {}; falling back to disk",
1180 task_id
1181 );
1182 }
1183 Some(Err(error)) => {
1184 crate::slog_warn!(
1185 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1186 task_id,
1187 error
1188 );
1189 }
1190 None => {
1191 crate::slog_info!(
1192 "bash task relaxed DB unavailable for {}; falling back to disk",
1193 task_id
1194 );
1195 }
1196 }
1197 let root = storage_dir.join("bash-tasks");
1198 let entries = fs::read_dir(&root).ok()?;
1199 for entry in entries.flatten() {
1200 let dir = entry.path();
1201 if !dir.is_dir() {
1202 continue;
1203 }
1204 let path = dir.join(format!("{task_id}.json"));
1205 if !path.exists() {
1206 continue;
1207 }
1208 let metadata = match read_task(&path) {
1209 Ok(metadata) => metadata,
1210 Err(error) => {
1211 crate::slog_warn!(
1212 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1213 path.display()
1214 );
1215 if let Err(quarantine_error) =
1216 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1217 {
1218 crate::slog_warn!(
1219 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1220 path.display()
1221 );
1222 }
1223 continue;
1224 }
1225 };
1226 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1227 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1228 continue;
1229 }
1230 if let Some(task) = self.task(task_id) {
1231 let matches_project = task
1232 .state
1233 .lock()
1234 .map(|state| {
1235 state
1236 .metadata
1237 .project_root
1238 .as_deref()
1239 .map(canonicalized_path)
1240 .as_deref()
1241 == Some(canonical_project.as_path())
1242 })
1243 .unwrap_or(false);
1244 return matches_project.then_some(task);
1245 }
1246 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1247 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1248 return None;
1249 }
1250 return self.task(task_id);
1251 }
1252 None
1253 }
1254
1255 fn lookup_relaxed_task_from_db(
1256 &self,
1257 task_id: &str,
1258 project_root: &Path,
1259 ) -> Option<Result<Option<PersistedTask>, String>> {
1260 let pool = self
1261 .inner
1262 .db_pool
1263 .read()
1264 .ok()
1265 .and_then(|slot| slot.clone())?;
1266 let harness = self
1267 .inner
1268 .db_harness
1269 .read()
1270 .ok()
1271 .and_then(|slot| slot.clone())?;
1272 let conn = match pool.lock() {
1273 Ok(conn) => conn,
1274 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1275 };
1276 let project_key = crate::search_index::project_cache_key(project_root);
1277 Some(
1278 crate::db::bash_tasks::find_bash_task_for_project(
1279 &conn,
1280 &harness,
1281 &project_key,
1282 task_id,
1283 )
1284 .map(|row| row.map(PersistedTask::from))
1285 .map_err(|error| error.to_string()),
1286 )
1287 }
1288
1289 pub(super) fn status_relaxed(
1290 &self,
1291 task_id: &str,
1292 _session_id: &str,
1293 project_root: &Path,
1294 storage_dir: &Path,
1295 preview_bytes: usize,
1296 ) -> Option<BgTaskSnapshot> {
1297 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1298 let _ = self.poll_task(&task);
1299 let mut snapshot = task.snapshot(preview_bytes);
1300 self.maybe_compress_snapshot(&task, &mut snapshot);
1301 Some(snapshot)
1302 }
1303
1304 pub fn kill_relaxed(
1305 &self,
1306 task_id: &str,
1307 project_root: &Path,
1308 storage_dir: &Path,
1309 ) -> Result<BgTaskSnapshot, String> {
1310 let task = self
1311 .status_relaxed_task(task_id, project_root, storage_dir)
1312 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1313 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1314 }
1315
1316 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1317 #[cfg(test)]
1318 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1319
1320 let mut deleted = 0usize;
1321
1322 let root = storage_dir.join("bash-tasks");
1323 if root.exists() {
1324 let session_dirs = fs::read_dir(&root).map_err(|e| {
1325 format!(
1326 "failed to read background task root {}: {e}",
1327 root.display()
1328 )
1329 })?;
1330 for session_entry in session_dirs.flatten() {
1331 let session_dir = session_entry.path();
1332 if !session_dir.is_dir() {
1333 continue;
1334 }
1335 let task_entries = match fs::read_dir(&session_dir) {
1336 Ok(entries) => entries,
1337 Err(error) => {
1338 crate::slog_warn!(
1339 "failed to read background task session dir {}: {error}",
1340 session_dir.display()
1341 );
1342 continue;
1343 }
1344 };
1345 for task_entry in task_entries.flatten() {
1346 let json_path = task_entry.path();
1347 if json_path
1348 .extension()
1349 .and_then(|extension| extension.to_str())
1350 != Some("json")
1351 {
1352 continue;
1353 }
1354 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1355 continue;
1356 }
1357 let metadata = match read_task(&json_path) {
1358 Ok(metadata) => metadata,
1359 Err(error) => {
1360 crate::slog_warn!(
1361 "quarantining corrupt background task metadata {}: {error}",
1362 json_path.display()
1363 );
1364 quarantine_task_json(
1365 storage_dir,
1366 &session_dir,
1367 &json_path,
1368 QuarantineKind::Corrupt,
1369 )?;
1370 continue;
1371 }
1372 };
1373 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1374 continue;
1375 }
1376 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1377 match delete_task_bundle(&paths) {
1378 Ok(()) => {
1379 deleted += 1;
1380 log::debug!(
1381 "deleted persisted background task bundle {}",
1382 metadata.task_id
1383 );
1384 }
1385 Err(error) => {
1386 crate::slog_warn!(
1387 "failed to delete background task bundle {}: {error}",
1388 metadata.task_id
1389 );
1390 continue;
1391 }
1392 }
1393 }
1394 }
1395 }
1396 gc_quarantine(storage_dir);
1397 Ok(deleted)
1398 }
1399
1400 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1401 let tasks = self
1402 .inner
1403 .tasks
1404 .lock()
1405 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1406 .unwrap_or_default();
1407 tasks
1408 .into_iter()
1409 .map(|task| {
1410 let _ = self.poll_task(&task);
1411 let mut snapshot = task.snapshot(preview_bytes);
1412 self.maybe_compress_snapshot(&task, &mut snapshot);
1413 snapshot
1414 })
1415 .collect()
1416 }
1417
1418 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1424 if !snapshot.info.status.is_terminal() {
1425 return;
1426 }
1427 let (compressed_flag, mode) = task
1428 .state
1429 .lock()
1430 .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1431 .unwrap_or((true, BgMode::Pipes));
1432 if mode == BgMode::Pty {
1433 return;
1434 }
1435 if !compressed_flag {
1436 return;
1437 }
1438 let raw = std::mem::take(&mut snapshot.output_preview);
1439 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1440 }
1441
1442 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1443 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1444 }
1445
1446 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1447 let task = self
1448 .task_for_session(task_id, session_id)
1449 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1450 let mut state = task
1451 .state
1452 .lock()
1453 .map_err(|_| "background task lock poisoned".to_string())?;
1454 let updated = self
1455 .update_task_metadata(&task.paths, |metadata| {
1456 metadata.notify_on_completion = true;
1457 metadata.completion_delivered = false;
1458 })
1459 .map_err(|e| format!("failed to promote background task: {e}"))?;
1460 state.metadata = updated;
1461 if state.metadata.status.is_terminal() {
1462 state.buffer.enforce_terminal_cap();
1463 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1464 }
1465 Ok(true)
1466 }
1467
1468 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1469 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1470 .map(|_| ())
1471 }
1472
1473 pub fn cleanup_finished(&self, older_than: Duration) {
1474 let cutoff = Instant::now().checked_sub(older_than);
1475 let removable_paths: Vec<(String, TaskPaths)> =
1476 if let Ok(mut tasks) = self.inner.tasks.lock() {
1477 let removable = tasks
1478 .iter()
1479 .filter_map(|(task_id, task)| {
1480 let delivered_terminal = task
1481 .state
1482 .lock()
1483 .map(|state| {
1484 state.metadata.status.is_terminal()
1485 && state.metadata.completion_delivered
1486 })
1487 .unwrap_or(false);
1488 if !delivered_terminal {
1489 return None;
1490 }
1491
1492 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1493 let expired = match (terminal_at, cutoff) {
1494 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1495 (Some(_), None) => true,
1496 (None, _) => false,
1497 };
1498 expired.then(|| task_id.clone())
1499 })
1500 .collect::<Vec<_>>();
1501
1502 removable
1503 .into_iter()
1504 .filter_map(|task_id| {
1505 tasks
1506 .remove(&task_id)
1507 .map(|task| (task_id, task.paths.clone()))
1508 })
1509 .collect()
1510 } else {
1511 Vec::new()
1512 };
1513
1514 for (task_id, paths) in removable_paths {
1515 match delete_task_bundle(&paths) {
1516 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1517 Err(error) => crate::slog_warn!(
1518 "failed to delete persisted background task bundle {task_id}: {error}"
1519 ),
1520 }
1521 }
1522 }
1523
1524 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1525 self.drain_completions_for_session(None)
1526 }
1527
1528 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1529 let completions = match self.inner.completions.lock() {
1530 Ok(completions) => completions,
1531 Err(_) => return Vec::new(),
1532 };
1533
1534 completions
1535 .iter()
1536 .filter(|completion| {
1537 session_id
1538 .map(|session_id| completion.session_id == session_id)
1539 .unwrap_or(true)
1540 })
1541 .cloned()
1542 .collect()
1543 }
1544
1545 pub fn ack_completions_for_session(
1546 &self,
1547 session_id: Option<&str>,
1548 task_ids: &[String],
1549 ) -> Vec<String> {
1550 if task_ids.is_empty() {
1551 return Vec::new();
1552 }
1553 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1554 let mut completion_sessions = HashMap::new();
1555 if let Ok(mut completions) = self.inner.completions.lock() {
1556 completions.retain(|completion| {
1557 let session_matches = session_id
1558 .map(|session_id| completion.session_id == session_id)
1559 .unwrap_or(true);
1560 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1561 completion_sessions
1562 .insert(completion.task_id.clone(), completion.session_id.clone());
1563 false
1564 } else {
1565 true
1566 }
1567 });
1568 }
1569
1570 let mut delivered = Vec::new();
1571 for task_id in task_ids {
1572 let task = if let Some(session_id) = session_id {
1573 self.task_for_session(task_id, session_id)
1574 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1575 self.task_for_session(task_id, completion_session_id)
1576 } else {
1577 self.task(task_id)
1578 };
1579 if let Some(task) = task {
1580 if task.set_completion_delivered(true, self).is_ok() {
1581 delivered.push(task_id.clone());
1582 }
1583 }
1584 }
1585
1586 delivered
1587 }
1588
1589 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1590 self.inner
1591 .completions
1592 .lock()
1593 .map(|completions| {
1594 completions
1595 .iter()
1596 .filter(|completion| completion.session_id == session_id)
1597 .cloned()
1598 .collect()
1599 })
1600 .unwrap_or_default()
1601 }
1602
1603 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1604 let mut completions = self.inner.completions.lock().ok()?;
1605 let idx = completions
1606 .iter()
1607 .position(|completion| completion.task_id == task_id)?;
1608 completions.remove(idx)
1609 }
1610
1611 fn completion_snapshot_for_task(
1612 &self,
1613 task: &Arc<BgTask>,
1614 preview_bytes: usize,
1615 ) -> Option<BgCompletion> {
1616 let snapshot = task.snapshot(preview_bytes);
1617 if !snapshot.info.status.is_terminal() {
1618 return None;
1619 }
1620 let output_preview = if snapshot.info.mode == BgMode::Pty {
1621 String::new()
1622 } else {
1623 let compressed = task
1624 .state
1625 .lock()
1626 .map(|state| state.metadata.compressed)
1627 .unwrap_or(true);
1628 if compressed {
1629 self.compress_output(&snapshot.info.command, snapshot.output_preview)
1630 } else {
1631 snapshot.output_preview
1632 }
1633 };
1634 Some(BgCompletion {
1635 task_id: snapshot.info.task_id,
1636 session_id: task.session_id.clone(),
1637 status: snapshot.info.status,
1638 exit_code: snapshot.exit_code,
1639 command: snapshot.info.command,
1640 output_preview,
1641 output_truncated: snapshot.output_truncated,
1642 original_tokens: None,
1643 compressed_tokens: None,
1644 tokens_skipped: false,
1645 })
1646 }
1647
1648 pub fn detach(&self) {
1649 self.inner.shutdown.store(true, Ordering::SeqCst);
1650 if let Ok(mut tasks) = self.inner.tasks.lock() {
1651 for task in tasks.values() {
1652 if let Ok(mut state) = task.state.lock() {
1653 match &mut state.runtime {
1654 TaskRuntime::Piped(child) => *child = None,
1655 TaskRuntime::Pty(runtime) => *runtime = None,
1656 }
1657 state.detached = true;
1658 }
1659 }
1660 tasks.clear();
1661 }
1662 }
1663
1664 pub fn shutdown(&self) {
1665 let tasks = self
1666 .inner
1667 .tasks
1668 .lock()
1669 .map(|tasks| {
1670 tasks
1671 .values()
1672 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1673 .collect::<Vec<_>>()
1674 })
1675 .unwrap_or_default();
1676 for (task_id, session_id) in tasks {
1677 let _ = self.kill(&task_id, &session_id);
1678 }
1679 }
1680
1681 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1682 if let Ok(state) = task.state.lock() {
1683 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1684 let complete = pty.reader_done.load(Ordering::SeqCst)
1685 && pty.exit_observed.load(Ordering::SeqCst);
1686 if !complete {
1687 return Ok(());
1688 }
1689 }
1690 }
1691 let marker = match read_exit_marker(&task.paths.exit) {
1692 Ok(Some(marker)) => marker,
1693 Ok(None) => return Ok(()),
1694 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1695 };
1696 self.finalize_from_marker(task, marker, None)
1697 }
1698
1699 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1700 let Ok(mut state) = task.state.lock() else {
1701 return;
1702 };
1703 match &mut state.runtime {
1704 TaskRuntime::Piped(child_slot) => {
1705 if let Some(child) = child_slot.as_mut() {
1706 if matches!(child.try_wait(), Ok(Some(_))) {
1707 *child_slot = None;
1708 state.detached = true;
1709 state.child_exit_observed = true;
1710 }
1711 } else if state.detached {
1712 let child_known_dead = state.child_exit_observed
1713 || state
1714 .metadata
1715 .child_pid
1716 .is_some_and(|pid| !is_process_alive(pid));
1717 if child_known_dead {
1718 self.fail_without_exit_marker_if_needed(task, &mut state);
1719 }
1720 }
1721 }
1722 TaskRuntime::Pty(Some(pty)) => {
1723 let complete = pty.reader_done.load(Ordering::SeqCst)
1724 && pty.exit_observed.load(Ordering::SeqCst);
1725 if complete {
1726 drop(state);
1727 let _ = self.poll_task(task);
1728 }
1729 }
1730 TaskRuntime::Pty(None) => {}
1731 }
1732 }
1733
1734 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1735 if state.metadata.status.is_terminal() {
1736 return;
1737 }
1738 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1739 return;
1740 }
1741 let watch_controlled = self.task_has_watch_control(&task.task_id);
1742 let updated = self.update_task_metadata(&task.paths, |metadata| {
1743 metadata.mark_terminal(
1744 BgTaskStatus::Failed,
1745 None,
1746 Some("process exited without exit marker".to_string()),
1747 );
1748 if watch_controlled {
1749 metadata.completion_delivered = true;
1750 }
1751 });
1752 if let Ok(metadata) = updated {
1753 state.pending_terminal_override = None;
1754 state.metadata = metadata;
1755 task.mark_terminal_now();
1756 state.buffer.enforce_terminal_cap();
1757 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1758 }
1759 }
1760
1761 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1762 self.inner
1763 .tasks
1764 .lock()
1765 .map(|tasks| {
1766 tasks
1767 .values()
1768 .filter(|task| task.is_running())
1769 .cloned()
1770 .collect()
1771 })
1772 .unwrap_or_default()
1773 }
1774
1775 fn insert_rehydrated_task(
1776 &self,
1777 metadata: PersistedTask,
1778 paths: TaskPaths,
1779 detached: bool,
1780 ) -> Result<(), String> {
1781 let task_id = metadata.task_id.clone();
1782 let session_id = metadata.session_id.clone();
1783 let started = started_instant_from_unix_millis(metadata.started_at);
1784 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1785 let mode = metadata.mode.clone();
1786 let task = Arc::new(BgTask {
1787 task_id: task_id.clone(),
1788 session_id,
1789 paths: paths.clone(),
1790 started,
1791 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1792 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1793 state: Mutex::new(BgTaskState {
1794 metadata,
1795 runtime: if mode == BgMode::Pty {
1796 TaskRuntime::Pty(None)
1797 } else {
1798 TaskRuntime::Piped(None)
1799 },
1800 detached,
1801 child_exit_observed: false,
1808 buffer: if mode == BgMode::Pty {
1809 BgBuffer::pty(paths.pty.clone())
1810 } else {
1811 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1812 },
1813 pending_terminal_override: None,
1814 }),
1815 });
1816 self.inner
1817 .tasks
1818 .lock()
1819 .map_err(|_| "background task registry lock poisoned".to_string())?
1820 .insert(task_id, task);
1821 Ok(())
1822 }
1823
1824 fn kill_with_status(
1825 &self,
1826 task_id: &str,
1827 session_id: &str,
1828 terminal_status: BgTaskStatus,
1829 ) -> Result<BgTaskSnapshot, String> {
1830 let task = self
1831 .task_for_session(task_id, session_id)
1832 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1833
1834 {
1835 let mut state = task
1836 .state
1837 .lock()
1838 .map_err(|_| "background task lock poisoned".to_string())?;
1839 if state.metadata.status.is_terminal() {
1840 state.pending_terminal_override = None;
1841 return Ok(task.snapshot_locked(&state, 5 * 1024));
1842 }
1843
1844 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1845 state.metadata =
1846 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1847 if self.task_has_watch_control(&task.task_id) {
1848 state.metadata.completion_delivered = true;
1849 }
1850 state.pending_terminal_override = None;
1851 task.mark_terminal_now();
1852 match &mut state.runtime {
1853 TaskRuntime::Piped(child) => *child = None,
1854 TaskRuntime::Pty(runtime) => *runtime = None,
1855 }
1856 state.detached = true;
1857 state.buffer.enforce_terminal_cap();
1858 self.persist_task(&task.paths, &state.metadata)
1859 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1860 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1861 return Ok(task.snapshot_locked(&state, 5 * 1024));
1862 }
1863
1864 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1865 if !was_already_killing {
1866 state.metadata.status = BgTaskStatus::Killing;
1867 self.persist_task(&task.paths, &state.metadata)
1868 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1869 }
1870
1871 let pgid = state.metadata.pgid;
1872 #[cfg(windows)]
1873 let child_pid = state.metadata.child_pid;
1874 if !was_already_killing
1875 && state.metadata.mode == BgMode::Pty
1876 && terminal_status == BgTaskStatus::TimedOut
1877 {
1878 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1879 }
1880
1881 match &mut state.runtime {
1882 TaskRuntime::Piped(child_slot) => {
1883 #[cfg(unix)]
1884 if let Some(pgid) = pgid {
1885 terminate_pgid(pgid, child_slot.as_mut());
1886 }
1887 #[cfg(windows)]
1888 if let Some(child) = child_slot.as_mut() {
1889 super::process::terminate_process(child);
1890 } else if let Some(pid) = child_pid {
1891 terminate_pid(pid);
1892 }
1893 if let Some(child) = child_slot.as_mut() {
1894 let _ = child.wait();
1895 }
1896 *child_slot = None;
1897 state.detached = true;
1898
1899 if !task.paths.exit.exists() {
1900 write_kill_marker_if_absent(&task.paths.exit)
1901 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1902 }
1903
1904 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1905 Some(124)
1906 } else {
1907 None
1908 };
1909 state
1910 .metadata
1911 .mark_terminal(terminal_status, exit_code, None);
1912 if self.task_has_watch_control(&task.task_id) {
1913 state.metadata.completion_delivered = true;
1914 }
1915 state.pending_terminal_override = None;
1916 task.mark_terminal_now();
1917 self.persist_task(&task.paths, &state.metadata)
1918 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1919 state.buffer.enforce_terminal_cap();
1920 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1921 }
1922 TaskRuntime::Pty(Some(pty)) => {
1923 pty.was_killed.store(true, Ordering::SeqCst);
1924 if let Err(error) = pty.killer.kill() {
1925 crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1926 }
1927 if let Some(pid) = pty.child_pid {
1928 #[cfg(unix)]
1929 terminate_pgid(pid as i32, None);
1930 #[cfg(windows)]
1931 terminate_pid(pid);
1932 }
1933 drop(pty.master.take());
1934 }
1935 TaskRuntime::Pty(None) => {}
1936 }
1937 }
1938
1939 Ok(task.snapshot(5 * 1024))
1940 }
1941
1942 fn finalize_from_marker(
1943 &self,
1944 task: &Arc<BgTask>,
1945 marker: ExitMarker,
1946 reason: Option<String>,
1947 ) -> Result<(), String> {
1948 let watch_controlled = self.task_has_watch_control(&task.task_id);
1949 {
1950 let mut state = task
1951 .state
1952 .lock()
1953 .map_err(|_| "background task lock poisoned".to_string())?;
1954 if state.metadata.status.is_terminal() {
1955 state.pending_terminal_override = None;
1956 return Ok(());
1957 }
1958
1959 let pending_override = state.pending_terminal_override.take();
1960 let is_pty = state.metadata.mode == BgMode::Pty;
1961 let updated = self
1962 .update_task_metadata(&task.paths, |metadata| {
1963 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
1964 let mut metadata = metadata.clone();
1965 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
1966 let exit_code = if target_status == BgTaskStatus::TimedOut {
1967 Some(124)
1968 } else {
1969 None
1970 };
1971 metadata.mark_terminal(target_status, exit_code, reason);
1972 metadata
1973 } else {
1974 terminal_metadata_from_marker(metadata.clone(), marker, reason)
1975 };
1976 if watch_controlled {
1977 new_metadata.completion_delivered = true;
1978 }
1979 *metadata = new_metadata;
1980 })
1981 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1982 state.metadata = updated;
1983 task.mark_terminal_now();
1984 match &mut state.runtime {
1985 TaskRuntime::Piped(child) => *child = None,
1986 TaskRuntime::Pty(runtime) => *runtime = None,
1987 }
1988 state.detached = true;
1989 }
1990
1991 self.scan_task_watch_output(task);
1994
1995 let mut state = task
1996 .state
1997 .lock()
1998 .map_err(|_| "background task lock poisoned".to_string())?;
1999 state.buffer.enforce_terminal_cap();
2000 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
2001 Ok(())
2002 }
2003
2004 fn enqueue_completion_if_needed(
2005 &self,
2006 metadata: &PersistedTask,
2007 paths: Option<&TaskPaths>,
2008 emit_frame: bool,
2009 ) {
2010 if metadata.status.is_terminal() && !metadata.completion_delivered {
2011 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
2012 }
2013 }
2014
2015 fn enqueue_completion_locked(
2016 &self,
2017 metadata: &PersistedTask,
2018 buffer: Option<&BgBuffer>,
2019 emit_frame: bool,
2020 ) {
2021 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
2022 }
2023
2024 fn enqueue_completion_from_parts(
2025 &self,
2026 metadata: &PersistedTask,
2027 buffer: Option<&BgBuffer>,
2028 paths: Option<&TaskPaths>,
2029 emit_frame: bool,
2030 ) {
2031 if !metadata.status.is_terminal() {
2042 return;
2043 }
2044 let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2049 (String::new(), false)
2050 } else {
2051 match buffer {
2052 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2053 None => paths
2054 .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2055 .unwrap_or_else(|| (String::new(), false)),
2056 }
2057 };
2058 let output_preview = if metadata.compressed {
2063 self.compress_output(&metadata.command, raw_preview)
2064 } else {
2065 raw_preview
2066 };
2067 let token_counts = self.completion_token_counts(metadata, buffer, paths);
2068 let completion = BgCompletion {
2069 task_id: metadata.task_id.clone(),
2070 session_id: metadata.session_id.clone(),
2071 status: metadata.status.clone(),
2072 exit_code: metadata.exit_code,
2073 command: metadata.command.clone(),
2074 output_preview,
2075 output_truncated,
2076 original_tokens: token_counts.original_tokens,
2077 compressed_tokens: token_counts.compressed_tokens,
2078 tokens_skipped: token_counts.tokens_skipped,
2079 };
2080
2081 self.record_compression_event_if_applicable(metadata, &token_counts);
2092
2093 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2094 if watch_controlled {
2095 if emit_frame && !watch_matched {
2096 self.emit_bash_watch_exit(&completion);
2097 }
2098 self.clear_task_watch_state(&metadata.task_id);
2099 return;
2100 }
2101
2102 if metadata.completion_delivered {
2112 return;
2113 }
2114
2115 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2118 if completions
2119 .iter()
2120 .any(|existing| existing.task_id == metadata.task_id)
2121 {
2122 false
2123 } else {
2124 completions.push_back(completion.clone());
2125 true
2126 }
2127 } else {
2128 false
2129 };
2130
2131 if pushed && emit_frame {
2132 self.emit_bash_completed(completion);
2133 }
2134 }
2135
2136 fn record_compression_event_if_applicable(
2137 &self,
2138 metadata: &PersistedTask,
2139 token_counts: &CompletionTokenCounts,
2140 ) {
2141 if metadata.mode == BgMode::Pty {
2142 return;
2143 }
2144
2145 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2146 token_counts.original_tokens,
2147 token_counts.compressed_tokens,
2148 token_counts.original_bytes,
2149 token_counts.compressed_bytes,
2150 ) {
2151 (
2152 Some(original_tokens),
2153 Some(compressed_tokens),
2154 Some(original_bytes),
2155 Some(compressed_bytes),
2156 ) => (
2157 original_tokens,
2158 compressed_tokens,
2159 original_bytes,
2160 compressed_bytes,
2161 ),
2162 _ => {
2163 crate::slog_warn!(
2164 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2165 metadata.task_id
2166 );
2167 return;
2168 }
2169 };
2170
2171 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2172 let Some(pool) = pool else {
2173 crate::slog_warn!(
2174 "compression event skipped for {}: db_pool not initialized — was configure run?",
2175 metadata.task_id
2176 );
2177 return;
2178 };
2179 let harness = self
2180 .inner
2181 .db_harness
2182 .read()
2183 .ok()
2184 .and_then(|slot| slot.clone());
2185 let Some(harness) = harness else {
2186 crate::slog_warn!(
2187 "compression event insert skipped for {}: harness not configured",
2188 metadata.task_id
2189 );
2190 return;
2191 };
2192
2193 let project_root = metadata
2194 .project_root
2195 .as_deref()
2196 .unwrap_or(&metadata.workdir);
2197 let project_key = crate::search_index::project_cache_key(project_root);
2198 let row = crate::db::compression_events::CompressionEventRow {
2199 harness: &harness,
2200 session_id: Some(&metadata.session_id),
2201 project_key: &project_key,
2202 tool: "bash",
2203 task_id: Some(&metadata.task_id),
2204 command: Some(&metadata.command),
2205 compressor: if metadata.compressed {
2206 "registry"
2207 } else {
2208 "none"
2209 },
2210 original_bytes,
2211 compressed_bytes,
2212 original_tokens,
2213 compressed_tokens,
2214 created_at: unix_millis() as i64,
2215 };
2216
2217 let conn = match pool.lock() {
2218 Ok(conn) => conn,
2219 Err(_) => {
2220 crate::slog_warn!(
2221 "compression event insert failed for {}: db mutex poisoned",
2222 metadata.task_id
2223 );
2224 return;
2225 }
2226 };
2227 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2228 Ok(_) => {
2229 crate::slog_debug!(
2233 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2234 metadata.task_id,
2235 project_key,
2236 metadata.session_id,
2237 original_tokens,
2238 compressed_tokens
2239 );
2240 }
2241 Err(error) => {
2242 crate::slog_warn!(
2243 "compression event insert failed for {}: {}",
2244 metadata.task_id,
2245 error
2246 );
2247 }
2248 }
2249 }
2250
2251 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2252 let Ok(progress_sender) = self
2253 .inner
2254 .progress_sender
2255 .lock()
2256 .map(|sender| sender.clone())
2257 else {
2258 return;
2259 };
2260 if let Some(sender) = progress_sender.as_ref() {
2261 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2262 pattern_match.task_id,
2263 session_id.to_string(),
2264 pattern_match.watch_id,
2265 pattern_match.match_text,
2266 pattern_match.match_offset,
2267 pattern_match.context,
2268 pattern_match.once,
2269 )));
2270 }
2271 }
2272
2273 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2274 let Ok(progress_sender) = self
2275 .inner
2276 .progress_sender
2277 .lock()
2278 .map(|sender| sender.clone())
2279 else {
2280 return;
2281 };
2282 let Some(sender) = progress_sender.as_ref() else {
2283 return;
2284 };
2285 let status = completion_status_text(&completion.status, completion.exit_code);
2286 let preview = completion.output_preview.trim_end();
2287 let context = if preview.is_empty() {
2288 format!("task {} exited ({status})", completion.task_id)
2289 } else {
2290 format!(
2291 "task {} exited ({status})
2292{preview}",
2293 completion.task_id
2294 )
2295 };
2296 sender(PushFrame::BashPatternMatch(
2297 BashPatternMatchFrame::task_exit(
2298 completion.task_id.clone(),
2299 completion.session_id.clone(),
2300 format!("exited ({status})"),
2301 context,
2302 ),
2303 ));
2304 }
2305
2306 fn emit_bash_completed(&self, completion: BgCompletion) {
2307 let Ok(progress_sender) = self
2308 .inner
2309 .progress_sender
2310 .lock()
2311 .map(|sender| sender.clone())
2312 else {
2313 return;
2314 };
2315 let Some(sender) = progress_sender.as_ref() else {
2316 return;
2317 };
2318 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2326 completion.task_id,
2327 completion.session_id,
2328 completion.status,
2329 completion.exit_code,
2330 completion.command,
2331 completion.output_preview,
2332 completion.output_truncated,
2333 completion.original_tokens,
2334 completion.compressed_tokens,
2335 completion.tokens_skipped,
2336 )));
2337 }
2338
2339 fn completion_token_counts(
2340 &self,
2341 metadata: &PersistedTask,
2342 buffer: Option<&BgBuffer>,
2343 paths: Option<&TaskPaths>,
2344 ) -> CompletionTokenCounts {
2345 if metadata.mode == BgMode::Pty {
2346 return CompletionTokenCounts::skipped();
2347 }
2348
2349 let raw = match buffer {
2350 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2351 None => paths
2352 .map(|paths| {
2353 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2354 })
2355 .unwrap_or(TokenCountInput::Skipped),
2356 };
2357
2358 let TokenCountInput::Text(raw_output) = raw else {
2359 return CompletionTokenCounts::skipped();
2360 };
2361
2362 let original_tokens = token_count_u32(&raw_output);
2363 let original_bytes = raw_output.len() as i64;
2364 let compressed_output = if metadata.compressed {
2365 self.compress_output(&metadata.command, raw_output)
2366 } else {
2367 raw_output
2368 };
2369 let compressed_tokens = token_count_u32(&compressed_output);
2370 let compressed_bytes = compressed_output.len() as i64;
2371 CompletionTokenCounts {
2372 original_tokens: Some(original_tokens),
2373 compressed_tokens: Some(compressed_tokens),
2374 original_bytes: Some(original_bytes),
2375 compressed_bytes: Some(compressed_bytes),
2376 tokens_skipped: false,
2377 }
2378 }
2379
2380 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2381 if !self
2382 .inner
2383 .long_running_reminder_enabled
2384 .load(Ordering::SeqCst)
2385 {
2386 return;
2387 }
2388 let interval_ms = self
2389 .inner
2390 .long_running_reminder_interval_ms
2391 .load(Ordering::SeqCst);
2392 if interval_ms == 0 {
2393 return;
2394 }
2395 let interval = Duration::from_millis(interval_ms);
2396 let now = Instant::now();
2397 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2398 return;
2399 };
2400 let since = last_reminder_at.unwrap_or(task.started);
2401 if now.duration_since(since) < interval {
2402 return;
2403 }
2404 let command = task
2405 .state
2406 .lock()
2407 .map(|state| state.metadata.command.clone())
2408 .unwrap_or_default();
2409 *last_reminder_at = Some(now);
2410 self.emit_bash_long_running(BashLongRunningFrame::new(
2411 task.task_id.clone(),
2412 task.session_id.clone(),
2413 command,
2414 task.started.elapsed().as_millis() as u64,
2415 ));
2416 }
2417
2418 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2419 let Ok(progress_sender) = self
2420 .inner
2421 .progress_sender
2422 .lock()
2423 .map(|sender| sender.clone())
2424 else {
2425 return;
2426 };
2427 if let Some(sender) = progress_sender.as_ref() {
2428 sender(PushFrame::BashLongRunning(frame));
2429 }
2430 }
2431
2432 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2433 self.inner
2434 .tasks
2435 .lock()
2436 .ok()
2437 .and_then(|tasks| tasks.get(task_id).cloned())
2438 }
2439
2440 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2441 self.task(task_id)
2442 .filter(|task| task.session_id == session_id)
2443 }
2444
2445 fn running_count(&self) -> usize {
2446 self.inner
2447 .tasks
2448 .lock()
2449 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2450 .unwrap_or(0)
2451 }
2452
2453 fn start_watchdog(&self) {
2454 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2455 super::watchdog::start(self.clone());
2456 }
2457 }
2458
2459 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2460 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2461 }
2462
2463 #[cfg(test)]
2464 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2465 self.task_for_session(task_id, session_id)
2466 .map(|task| task.paths.json.clone())
2467 }
2468
2469 #[cfg(test)]
2470 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2471 self.task_for_session(task_id, session_id)
2472 .map(|task| task.paths.exit.clone())
2473 }
2474
2475 fn generate_unique_task_id(&self) -> Result<String, String> {
2477 for _ in 0..32 {
2478 let candidate = random_slug();
2479 let tasks = self
2480 .inner
2481 .tasks
2482 .lock()
2483 .map_err(|_| "background task registry lock poisoned".to_string())?;
2484 if tasks.contains_key(&candidate) {
2485 continue;
2486 }
2487 let completions = self
2488 .inner
2489 .completions
2490 .lock()
2491 .map_err(|_| "background completions lock poisoned".to_string())?;
2492 if completions
2493 .iter()
2494 .any(|completion| completion.task_id == candidate)
2495 {
2496 continue;
2497 }
2498 return Ok(candidate);
2499 }
2500 Err("failed to allocate unique background task id after 32 attempts".to_string())
2501 }
2502}
2503
2504struct CompletionTokenCounts {
2505 original_tokens: Option<u32>,
2506 compressed_tokens: Option<u32>,
2507 original_bytes: Option<i64>,
2508 compressed_bytes: Option<i64>,
2509 tokens_skipped: bool,
2510}
2511
2512impl CompletionTokenCounts {
2513 fn skipped() -> Self {
2514 Self {
2515 original_tokens: None,
2516 compressed_tokens: None,
2517 original_bytes: None,
2518 compressed_bytes: None,
2519 tokens_skipped: true,
2520 }
2521 }
2522}
2523
2524fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2525 match status {
2526 BgTaskStatus::TimedOut => "timed out".to_string(),
2527 BgTaskStatus::Killed => "killed".to_string(),
2528 _ => exit_code
2529 .map(|code| format!("exit {code}"))
2530 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2531 }
2532}
2533
2534fn token_count_u32(text: &str) -> u32 {
2535 aft_tokenizer::count_tokens(text)
2536 .try_into()
2537 .unwrap_or(u32::MAX)
2538}
2539
2540impl Default for BgTaskRegistry {
2541 fn default() -> Self {
2542 Self::new(Arc::new(Mutex::new(None)))
2543 }
2544}
2545
2546fn modified_within(path: &Path, grace: Duration) -> bool {
2547 fs::metadata(path)
2548 .and_then(|metadata| metadata.modified())
2549 .ok()
2550 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2551 .map(|age| age < grace)
2552 .unwrap_or(false)
2553}
2554
2555fn canonicalized_path(path: &Path) -> PathBuf {
2556 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2557}
2558
2559fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2560 let now_ms = SystemTime::now()
2561 .duration_since(UNIX_EPOCH)
2562 .ok()
2563 .map(|duration| duration.as_millis() as u64)
2564 .unwrap_or(started_at);
2565 let elapsed_ms = now_ms.saturating_sub(started_at);
2566 Instant::now()
2567 .checked_sub(Duration::from_millis(elapsed_ms))
2568 .unwrap_or_else(Instant::now)
2569}
2570
2571fn gc_quarantine(storage_dir: &Path) {
2572 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2573 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2574 return;
2575 };
2576 for session_entry in session_dirs.flatten() {
2577 let session_quarantine_dir = session_entry.path();
2578 if !session_quarantine_dir.is_dir() {
2579 continue;
2580 }
2581 let entries = match fs::read_dir(&session_quarantine_dir) {
2582 Ok(entries) => entries,
2583 Err(error) => {
2584 crate::slog_warn!(
2585 "failed to read background task quarantine dir {}: {error}",
2586 session_quarantine_dir.display()
2587 );
2588 continue;
2589 }
2590 };
2591 for entry in entries.flatten() {
2592 let path = entry.path();
2593 if modified_within(&path, QUARANTINE_GC_GRACE) {
2594 continue;
2595 }
2596 let result = if path.is_dir() {
2597 fs::remove_dir_all(&path)
2598 } else {
2599 fs::remove_file(&path)
2600 };
2601 match result {
2602 Ok(()) => log::debug!(
2603 "deleted old background task quarantine entry {}",
2604 path.display()
2605 ),
2606 Err(error) => crate::slog_warn!(
2607 "failed to delete old background task quarantine entry {}: {error}",
2608 path.display()
2609 ),
2610 }
2611 }
2612 let _ = fs::remove_dir(&session_quarantine_dir);
2613 }
2614 let _ = fs::remove_dir(&quarantine_root);
2615}
2616
2617enum QuarantineKind {
2618 Corrupt,
2619 Invalid,
2620}
2621
2622fn quarantine_task_json(
2623 storage_dir: &Path,
2624 session_dir: &Path,
2625 json_path: &Path,
2626 kind: QuarantineKind,
2627) -> Result<(), String> {
2628 let session_hash = session_dir
2629 .file_name()
2630 .and_then(|name| name.to_str())
2631 .ok_or_else(|| {
2632 format!(
2633 "invalid background task session dir: {}",
2634 session_dir.display()
2635 )
2636 })?;
2637 let task_name = json_path
2638 .file_name()
2639 .and_then(|name| name.to_str())
2640 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2641 let unix_ts = SystemTime::now()
2642 .duration_since(UNIX_EPOCH)
2643 .map(|duration| duration.as_secs())
2644 .unwrap_or(0);
2645 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2646 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2647 format!(
2648 "failed to create background task quarantine dir {}: {e}",
2649 quarantine_dir.display()
2650 )
2651 })?;
2652 let target_name = quarantine_name(task_name, unix_ts, &kind);
2653 let target = quarantine_dir.join(target_name);
2654 fs::rename(json_path, &target).map_err(|e| {
2655 format!(
2656 "failed to quarantine background task metadata {} to {}: {e}",
2657 json_path.display(),
2658 target.display()
2659 )
2660 })?;
2661
2662 for sibling in task_sibling_paths(json_path) {
2663 if !sibling.exists() {
2664 continue;
2665 }
2666 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2667 crate::slog_warn!(
2668 "skipping background task sibling with invalid name during quarantine: {}",
2669 sibling.display()
2670 );
2671 continue;
2672 };
2673 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2674 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2675 crate::slog_warn!(
2676 "failed to quarantine background task sibling {} to {}: {error}",
2677 sibling.display(),
2678 sibling_target.display()
2679 );
2680 }
2681 }
2682
2683 let _ = fs::remove_dir(session_dir);
2684 Ok(())
2685}
2686
2687fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2688 match kind {
2689 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2690 QuarantineKind::Invalid => {
2691 let path = Path::new(file_name);
2692 let stem = path.file_stem().and_then(|stem| stem.to_str());
2693 let extension = path.extension().and_then(|extension| extension.to_str());
2694 match (stem, extension) {
2695 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2696 _ => format!("{file_name}.invalid.{unix_ts}"),
2697 }
2698 }
2699 }
2700}
2701
2702fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2703 let Some(parent) = json_path.parent() else {
2704 return Vec::new();
2705 };
2706 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2707 return Vec::new();
2708 };
2709 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2710 .into_iter()
2711 .map(|extension| parent.join(format!("{stem}.{extension}")))
2712 .collect()
2713}
2714
2715fn read_tail_from_disk(
2716 metadata: &PersistedTask,
2717 paths: &TaskPaths,
2718 max_bytes: usize,
2719) -> (String, bool) {
2720 if metadata.mode == BgMode::Pty {
2721 return read_file_tail_capped(&paths.pty, max_bytes)
2722 .map(|bytes| {
2723 let truncated = fs::metadata(&paths.pty)
2724 .map(|metadata| metadata.len() > max_bytes as u64)
2725 .unwrap_or(false);
2726 (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2727 })
2728 .unwrap_or_else(|_| (String::new(), false));
2729 }
2730 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2731 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2732 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2733 bytes.extend_from_slice(&stdout);
2734 bytes.extend_from_slice(&stderr);
2735 if bytes.len() <= max_bytes {
2736 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2737 }
2738 let start = bytes.len().saturating_sub(max_bytes);
2739 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2740}
2741
2742fn read_for_token_count_from_disk(
2743 metadata: &PersistedTask,
2744 paths: &TaskPaths,
2745 max_bytes_per_stream: usize,
2746) -> TokenCountInput {
2747 if metadata.mode == BgMode::Pty {
2748 return TokenCountInput::Skipped;
2749 }
2750 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2757 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2758 match (stdout, stderr) {
2759 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2760 String::from_utf8_lossy(&stdout).as_ref(),
2761 String::from_utf8_lossy(&stderr).as_ref(),
2762 )),
2763 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2764 String::from_utf8_lossy(&stdout).as_ref(),
2765 "",
2766 )),
2767 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2768 "",
2769 String::from_utf8_lossy(&stderr).as_ref(),
2770 )),
2771 (Err(_), Err(_)) => TokenCountInput::Skipped,
2772 }
2773}
2774
2775fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2780 use std::io::{Read, Seek, SeekFrom};
2781 let mut file = std::fs::File::open(path)?;
2782 let len = file.metadata()?.len();
2783 let read_len = len.min(max_bytes as u64);
2784 if read_len > 0 && len > max_bytes as u64 {
2785 file.seek(SeekFrom::End(-(read_len as i64)))?;
2786 }
2787 let mut bytes = Vec::with_capacity(read_len as usize);
2788 file.read_to_end(&mut bytes)?;
2789 Ok(bytes)
2790}
2791
2792impl BgTask {
2793 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2794 let state = self
2795 .state
2796 .lock()
2797 .unwrap_or_else(|poison| poison.into_inner());
2798 self.snapshot_locked(&state, preview_bytes)
2799 }
2800
2801 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2802 let metadata = &state.metadata;
2803 let duration_ms = metadata.duration_ms.or_else(|| {
2804 metadata
2805 .status
2806 .is_terminal()
2807 .then(|| self.started.elapsed().as_millis() as u64)
2808 });
2809 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2810 (String::new(), false)
2811 } else {
2812 state.buffer.read_tail(preview_bytes)
2813 };
2814 BgTaskSnapshot {
2815 info: BgTaskInfo {
2816 task_id: self.task_id.clone(),
2817 status: metadata.status.clone(),
2818 command: metadata.command.clone(),
2819 mode: metadata.mode.clone(),
2820 started_at: metadata.started_at,
2821 duration_ms,
2822 },
2823 exit_code: metadata.exit_code,
2824 child_pid: metadata.child_pid,
2825 workdir: metadata.workdir.display().to_string(),
2826 output_preview,
2827 output_truncated,
2828 output_path: state
2829 .buffer
2830 .output_path()
2831 .map(|path| path.display().to_string()),
2832 stderr_path: state
2833 .buffer
2834 .stderr_path()
2835 .map(|path| path.display().to_string()),
2836 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2837 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2838 }
2839 }
2840
2841 pub(crate) fn is_running(&self) -> bool {
2842 self.state
2843 .lock()
2844 .map(|state| {
2845 state.metadata.status == BgTaskStatus::Running
2846 || (state.metadata.mode == BgMode::Pty
2847 && state.metadata.status == BgTaskStatus::Killing)
2848 })
2849 .unwrap_or(false)
2850 }
2851
2852 fn is_terminal(&self) -> bool {
2853 self.state
2854 .lock()
2855 .map(|state| state.metadata.status.is_terminal())
2856 .unwrap_or(false)
2857 }
2858
2859 fn mark_terminal_now(&self) {
2860 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2861 if terminal_at.is_none() {
2862 *terminal_at = Some(Instant::now());
2863 }
2864 }
2865 }
2866
2867 fn set_completion_delivered(
2868 &self,
2869 delivered: bool,
2870 registry: &BgTaskRegistry,
2871 ) -> Result<(), String> {
2872 let mut state = self
2873 .state
2874 .lock()
2875 .map_err(|_| "background task lock poisoned".to_string())?;
2876 let updated = registry
2877 .update_task_metadata(&self.paths, |metadata| {
2878 metadata.completion_delivered = delivered;
2879 })
2880 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2881 state.metadata = updated;
2882 Ok(())
2883 }
2884}
2885
2886fn terminal_metadata_from_marker(
2887 mut metadata: PersistedTask,
2888 marker: ExitMarker,
2889 reason: Option<String>,
2890) -> PersistedTask {
2891 match marker {
2892 ExitMarker::Code(code) => {
2893 let status = if code == 0 {
2894 BgTaskStatus::Completed
2895 } else {
2896 BgTaskStatus::Failed
2897 };
2898 metadata.mark_terminal(status, Some(code), reason);
2899 }
2900 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2901 }
2902 metadata
2903}
2904
2905#[cfg(unix)]
2906fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2907 let shell = resolve_posix_shell();
2908 let mut cmd = Command::new(&shell);
2909 cmd.arg("-c")
2910 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2911 .arg(&shell)
2912 .arg(command)
2913 .arg(exit_path);
2914 unsafe {
2915 cmd.pre_exec(|| {
2916 if libc::setsid() == -1 {
2917 return Err(std::io::Error::last_os_error());
2918 }
2919 Ok(())
2920 });
2921 }
2922 cmd
2923}
2924
2925#[cfg(unix)]
2926fn resolve_posix_shell() -> PathBuf {
2927 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2928 POSIX_SHELL
2929 .get_or_init(|| {
2930 std::env::var_os("BASH")
2931 .filter(|value| !value.is_empty())
2932 .map(PathBuf::from)
2933 .filter(|path| path.exists())
2934 .or_else(|| which::which("bash").ok())
2935 .or_else(|| which::which("zsh").ok())
2936 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
2937 })
2938 .clone()
2939}
2940
2941#[cfg(windows)]
2942fn detached_shell_command_for(
2943 shell: crate::windows_shell::WindowsShell,
2944 command: &str,
2945 exit_path: &Path,
2946 paths: &TaskPaths,
2947 creation_flags: u32,
2948) -> Result<Command, String> {
2949 use crate::windows_shell::WindowsShell;
2950 let wrapper_body = shell.wrapper_script(command, exit_path);
2963 let wrapper_ext = match shell {
2964 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
2965 WindowsShell::Cmd => "bat",
2966 WindowsShell::Posix(_) => "sh",
2970 };
2971 let wrapper_path = paths.dir.join(format!(
2972 "{}.{}",
2973 paths
2974 .json
2975 .file_stem()
2976 .and_then(|s| s.to_str())
2977 .unwrap_or("wrapper"),
2978 wrapper_ext
2979 ));
2980 fs::write(&wrapper_path, wrapper_body)
2981 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
2982
2983 let mut cmd = Command::new(shell.binary().as_ref());
2984 match shell {
2985 WindowsShell::Pwsh | WindowsShell::Powershell => {
2986 cmd.args([
2989 "-NoLogo",
2990 "-NoProfile",
2991 "-NonInteractive",
2992 "-ExecutionPolicy",
2993 "Bypass",
2994 "-File",
2995 ]);
2996 cmd.arg(&wrapper_path);
2997 }
2998 WindowsShell::Cmd => {
2999 cmd.args(["/D", "/C"]);
3006 cmd.arg(&wrapper_path);
3007 }
3008 WindowsShell::Posix(_) => {
3009 cmd.arg(&wrapper_path);
3014 }
3015 }
3016
3017 cmd.creation_flags(creation_flags);
3021 Ok(cmd)
3022}
3023
3024fn spawn_detached_child(
3040 command: &str,
3041 paths: &TaskPaths,
3042 workdir: &Path,
3043 env: &HashMap<String, String>,
3044) -> Result<std::process::Child, String> {
3045 #[cfg(not(windows))]
3046 {
3047 let stdout = create_capture_file(&paths.stdout)
3048 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3049 let stderr = create_capture_file(&paths.stderr)
3050 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3051 detached_shell_command(command, &paths.exit)
3052 .current_dir(workdir)
3053 .envs(env)
3054 .stdin(Stdio::null())
3055 .stdout(Stdio::from(stdout))
3056 .stderr(Stdio::from(stderr))
3057 .spawn()
3058 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3059 }
3060 #[cfg(windows)]
3061 {
3062 use crate::windows_shell::shell_candidates;
3063 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3074 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3095 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3096 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3097 let with_breakaway =
3098 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3099 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3100 let mut last_error: Option<String> = None;
3101 for (idx, shell) in candidates.iter().enumerate() {
3102 for &flags in &[with_breakaway, without_breakaway] {
3106 let stdout = create_capture_file(&paths.stdout)
3108 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3109 let stderr = create_capture_file(&paths.stderr)
3110 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3111 let mut cmd =
3112 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3113 cmd.current_dir(workdir)
3114 .envs(env)
3115 .stdin(Stdio::null())
3116 .stdout(Stdio::from(stdout))
3117 .stderr(Stdio::from(stderr));
3118 match cmd.spawn() {
3119 Ok(child) => {
3120 if idx > 0 {
3121 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3122 the cached PATH probe disagreed with runtime spawn — likely PATH \
3123 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3124 shell.binary(),
3125 idx);
3126 }
3127 if flags == without_breakaway {
3128 crate::slog_warn!(
3129 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3130 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3131 Spawned without breakaway; the bg task will be torn down if the \
3132 AFT process group is killed."
3133 );
3134 }
3135 return Ok(child);
3136 }
3137 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3138 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3139 shell.binary());
3140 last_error = Some(format!("{}: {e}", shell.binary()));
3141 break;
3144 }
3145 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3146 crate::slog_warn!(
3148 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3149 Access Denied — retrying {} without breakaway",
3150 shell.binary()
3151 );
3152 last_error = Some(format!("{}: {e}", shell.binary()));
3153 continue;
3154 }
3155 Err(e) => {
3156 return Err(format!(
3157 "failed to spawn background bash command via {}: {e}",
3158 shell.binary()
3159 ));
3160 }
3161 }
3162 }
3163 }
3164 Err(format!(
3165 "failed to spawn background bash command: no Windows shell could be spawned. \
3166 Last error: {}. PATH-probed candidates: {:?}",
3167 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3168 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3169 ))
3170 }
3171}
3172
3173fn random_slug() -> String {
3174 let mut bytes = [0u8; 4];
3175 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3177 let t = SystemTime::now()
3179 .duration_since(UNIX_EPOCH)
3180 .map(|d| d.subsec_nanos())
3181 .unwrap_or(0);
3182 let p = std::process::id();
3183 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3184 });
3185 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3187 format!("bash-{hex}")
3188}
3189
3190#[cfg(test)]
3191mod tests {
3192 use std::collections::HashMap;
3193 #[cfg(windows)]
3194 use std::fs;
3195 use std::sync::{Arc, Mutex};
3196 use std::time::Duration;
3197 #[cfg(windows)]
3198 use std::time::Instant;
3199
3200 use super::*;
3201
3202 #[cfg(unix)]
3203 const QUICK_SUCCESS_COMMAND: &str = "true";
3204 #[cfg(windows)]
3205 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3206
3207 #[cfg(unix)]
3208 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3209 #[cfg(windows)]
3210 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3211
3212 #[test]
3213 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3214 let registry = BgTaskRegistry::default();
3215 let dir = tempfile::tempdir().unwrap();
3216 let task_id = registry
3217 .spawn_pty(
3218 QUICK_SUCCESS_COMMAND,
3219 "session".to_string(),
3220 dir.path().to_path_buf(),
3221 HashMap::new(),
3222 Some(Duration::from_secs(30)),
3223 dir.path().to_path_buf(),
3224 10,
3225 true,
3226 false,
3227 Some(dir.path().to_path_buf()),
3228 50,
3229 120,
3230 )
3231 .unwrap();
3232
3233 let paths = task_paths(dir.path(), "session", &task_id);
3234 let metadata = read_task(&paths.json).unwrap();
3235 assert_eq!(
3236 metadata.schema_version,
3237 crate::bash_background::persistence::SCHEMA_VERSION
3238 );
3239 assert_eq!(metadata.mode, BgMode::Pty);
3240 assert_eq!(metadata.pty_rows, Some(50));
3241 assert_eq!(metadata.pty_cols, Some(120));
3242
3243 let snapshot = registry
3244 .status(&task_id, "session", None, Some(dir.path()), 1024)
3245 .unwrap();
3246 assert_eq!(snapshot.pty_rows, Some(50));
3247 assert_eq!(snapshot.pty_cols, Some(120));
3248 }
3249
3250 fn spawn_dead_child() -> std::process::Child {
3255 #[cfg(unix)]
3256 let mut cmd = std::process::Command::new("true");
3257 #[cfg(windows)]
3258 let mut cmd = {
3259 let mut c = std::process::Command::new("cmd");
3260 c.args(["/c", "exit", "0"]);
3261 c
3262 };
3263 cmd.stdin(std::process::Stdio::null());
3264 cmd.stdout(std::process::Stdio::null());
3265 cmd.stderr(std::process::Stdio::null());
3266 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3267 let started = Instant::now();
3276 loop {
3277 match child.try_wait() {
3278 Ok(Some(_)) => break,
3279 Ok(None) => {
3280 if started.elapsed() > Duration::from_secs(5) {
3281 panic!("dead-child stand-in did not exit within 5s");
3282 }
3283 std::thread::sleep(Duration::from_millis(10));
3284 }
3285 Err(error) => panic!("dead-child try_wait failed: {error}"),
3286 }
3287 }
3288 child
3289 }
3290
3291 #[test]
3292 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3293 let registry = BgTaskRegistry::default();
3294 let dir = tempfile::tempdir().unwrap();
3295 let task_id = registry
3296 .spawn(
3297 LONG_RUNNING_COMMAND,
3298 "session".to_string(),
3299 dir.path().to_path_buf(),
3300 HashMap::new(),
3301 Some(Duration::from_secs(30)),
3302 dir.path().to_path_buf(),
3303 10,
3304 true,
3305 false,
3306 Some(dir.path().to_path_buf()),
3307 )
3308 .unwrap();
3309 registry
3310 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3311 .unwrap();
3312 assert_eq!(
3313 registry
3314 .drain_completions_for_session(Some("session"))
3315 .len(),
3316 1
3317 );
3318
3319 registry.inner.completions.lock().unwrap().clear();
3322
3323 assert_eq!(
3324 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3325 vec![task_id.clone()]
3326 );
3327 assert!(registry
3328 .drain_completions_for_session(Some("session"))
3329 .is_empty());
3330
3331 let paths = task_paths(dir.path(), "session", &task_id);
3332 let metadata = read_task(&paths.json).unwrap();
3333 assert!(metadata.completion_delivered);
3334
3335 let replayed = BgTaskRegistry::default();
3336 replayed
3337 .replay_session_inner(dir.path(), "session", None)
3338 .unwrap();
3339 assert!(replayed
3340 .drain_completions_for_session(Some("session"))
3341 .is_empty());
3342 }
3343
3344 #[test]
3345 fn register_watch_rejects_unknown_task() {
3346 let registry = BgTaskRegistry::default();
3347
3348 let result = registry.register_watch(
3349 "missing-task".to_string(),
3350 WatchPattern::Substring("READY".into()),
3351 true,
3352 );
3353
3354 assert_eq!(result, Err("task_not_found"));
3355 }
3356
3357 #[test]
3358 fn register_watch_on_terminal_task_scans_existing_output() {
3359 let frames = Arc::new(Mutex::new(Vec::new()));
3360 let captured = Arc::clone(&frames);
3361 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
3362 captured.lock().unwrap().push(frame);
3363 })
3364 as Box<dyn Fn(PushFrame) + Send + Sync>);
3365 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
3366 let dir = tempfile::tempdir().unwrap();
3367 let task_id = registry
3368 .spawn(
3369 LONG_RUNNING_COMMAND,
3370 "session".to_string(),
3371 dir.path().to_path_buf(),
3372 HashMap::new(),
3373 Some(Duration::from_secs(30)),
3374 dir.path().to_path_buf(),
3375 10,
3376 true,
3377 false,
3378 Some(dir.path().to_path_buf()),
3379 )
3380 .unwrap();
3381 registry
3382 .inner
3383 .shutdown
3384 .store(true, std::sync::atomic::Ordering::SeqCst);
3385 let task = registry.task_for_session(&task_id, "session").unwrap();
3386 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
3387 registry
3388 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3389 .unwrap();
3390 frames.lock().unwrap().clear();
3391 registry.inner.completions.lock().unwrap().clear();
3392
3393 registry
3394 .register_watch(
3395 task_id.clone(),
3396 WatchPattern::Substring("READY".into()),
3397 true,
3398 )
3399 .unwrap();
3400
3401 let frames = frames.lock().unwrap();
3402 let frame = frames
3403 .iter()
3404 .find_map(|frame| match frame {
3405 PushFrame::BashPatternMatch(frame) => Some(frame),
3406 _ => None,
3407 })
3408 .expect("terminal watch registration should emit pattern frame");
3409 assert_eq!(frame.reason, "pattern_match");
3410 assert_eq!(frame.task_id, task_id);
3411 assert_eq!(frame.session_id, "session");
3412 assert_eq!(frame.match_text, "READY");
3413 assert_eq!(frame.match_offset, 0);
3414 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
3415 let metadata = read_task(&task.paths.json).unwrap();
3416 assert!(metadata.completion_delivered);
3417 }
3418
3419 #[test]
3420 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3421 let registry = BgTaskRegistry::default();
3422 let dir = tempfile::tempdir().unwrap();
3423 let task_id = registry
3424 .spawn(
3425 QUICK_SUCCESS_COMMAND,
3426 "session".to_string(),
3427 dir.path().to_path_buf(),
3428 HashMap::new(),
3429 Some(Duration::from_secs(30)),
3430 dir.path().to_path_buf(),
3431 10,
3432 true,
3433 false,
3434 Some(dir.path().to_path_buf()),
3435 )
3436 .unwrap();
3437 registry
3438 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3439 .unwrap();
3440 let completions = registry.drain_completions_for_session(Some("session"));
3441 assert_eq!(completions.len(), 1);
3442 assert_eq!(
3443 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3444 vec![task_id.clone()]
3445 );
3446
3447 registry.cleanup_finished(Duration::ZERO);
3448
3449 assert!(registry.inner.tasks.lock().unwrap().is_empty());
3450 }
3451
3452 #[test]
3453 fn cleanup_finished_retains_undelivered_terminals() {
3454 let registry = BgTaskRegistry::default();
3455 let dir = tempfile::tempdir().unwrap();
3456 let task_id = registry
3457 .spawn(
3458 QUICK_SUCCESS_COMMAND,
3459 "session".to_string(),
3460 dir.path().to_path_buf(),
3461 HashMap::new(),
3462 Some(Duration::from_secs(30)),
3463 dir.path().to_path_buf(),
3464 10,
3465 true,
3466 false,
3467 Some(dir.path().to_path_buf()),
3468 )
3469 .unwrap();
3470 registry
3471 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3472 .unwrap();
3473
3474 registry.cleanup_finished(Duration::ZERO);
3475
3476 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3477 }
3478
3479 #[test]
3487 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3488 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3489 let dir = tempfile::tempdir().unwrap();
3490 let task_id = registry
3491 .spawn(
3492 QUICK_SUCCESS_COMMAND,
3493 "session".to_string(),
3494 dir.path().to_path_buf(),
3495 HashMap::new(),
3496 Some(Duration::from_secs(30)),
3497 dir.path().to_path_buf(),
3498 10,
3499 true,
3500 false,
3501 Some(dir.path().to_path_buf()),
3502 )
3503 .unwrap();
3504
3505 let task = registry.task_for_session(&task_id, "session").unwrap();
3506
3507 let started = Instant::now();
3512 loop {
3513 let exited = {
3514 let mut state = task.state.lock().unwrap();
3515 match &mut state.runtime {
3516 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3517 _ => true,
3518 }
3519 };
3520 if exited {
3521 break;
3522 }
3523 assert!(
3524 started.elapsed() < Duration::from_secs(5),
3525 "child should exit quickly"
3526 );
3527 std::thread::sleep(Duration::from_millis(20));
3528 }
3529
3530 registry
3538 .inner
3539 .shutdown
3540 .store(true, std::sync::atomic::Ordering::SeqCst);
3541 std::thread::sleep(Duration::from_millis(550));
3545
3546 let _ = std::fs::remove_file(&task.paths.exit);
3549
3550 {
3565 let mut state = task.state.lock().unwrap();
3566 state.metadata.status = BgTaskStatus::Running;
3567 state.metadata.status_reason = None;
3568 state.metadata.exit_code = None;
3569 state.metadata.finished_at = None;
3570 state.metadata.duration_ms = None;
3571 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3574 .expect("persist reset Running metadata for reap_child test");
3575 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3579 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3580 }
3581 }
3582 *task.terminal_at.lock().unwrap() = None;
3585
3586 assert!(
3589 task.is_running(),
3590 "precondition: metadata.status == Running"
3591 );
3592 assert!(
3593 !task.paths.exit.exists(),
3594 "precondition: exit marker absent"
3595 );
3596
3597 registry.reap_child(&task);
3602
3603 {
3604 let state = task.state.lock().unwrap();
3605 assert_eq!(
3606 state.metadata.status,
3607 BgTaskStatus::Running,
3608 "first reap must leave status Running while waiting one pass for marker"
3609 );
3610 assert_eq!(
3611 state.metadata.status_reason, None,
3612 "first reap must not record a failure reason"
3613 );
3614 assert!(
3615 matches!(state.runtime, TaskRuntime::Piped(None)),
3616 "child handle must be released after first reap"
3617 );
3618 assert!(
3619 state.detached,
3620 "task must be marked detached after first reap"
3621 );
3622 }
3623
3624 registry.reap_child(&task);
3628
3629 let state = task.state.lock().unwrap();
3630 assert!(
3631 state.metadata.status.is_terminal(),
3632 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3633 state.metadata.status
3634 );
3635 assert_eq!(
3636 state.metadata.status,
3637 BgTaskStatus::Failed,
3638 "must specifically be Failed (not Killed): status={:?}",
3639 state.metadata.status
3640 );
3641 assert_eq!(
3642 state.metadata.status_reason.as_deref(),
3643 Some("process exited without exit marker"),
3644 "reason must match replay path's wording: {:?}",
3645 state.metadata.status_reason
3646 );
3647 assert!(
3648 matches!(state.runtime, TaskRuntime::Piped(None)),
3649 "child handle must stay released after second reap"
3650 );
3651 assert!(
3652 state.detached,
3653 "task must remain detached after second reap"
3654 );
3655 }
3656
3657 #[test]
3662 fn reap_child_preserves_running_when_exit_marker_exists() {
3663 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3664 let dir = tempfile::tempdir().unwrap();
3665 let task_id = registry
3666 .spawn(
3667 QUICK_SUCCESS_COMMAND,
3668 "session".to_string(),
3669 dir.path().to_path_buf(),
3670 HashMap::new(),
3671 Some(Duration::from_secs(30)),
3672 dir.path().to_path_buf(),
3673 10,
3674 true,
3675 false,
3676 Some(dir.path().to_path_buf()),
3677 )
3678 .unwrap();
3679
3680 let task = registry.task_for_session(&task_id, "session").unwrap();
3681
3682 let started = Instant::now();
3685 loop {
3686 let exited = {
3687 let mut state = task.state.lock().unwrap();
3688 match &mut state.runtime {
3689 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3690 _ => true,
3691 }
3692 };
3693 if exited && task.paths.exit.exists() {
3694 break;
3695 }
3696 assert!(
3697 started.elapsed() < Duration::from_secs(5),
3698 "child should exit and write marker quickly"
3699 );
3700 std::thread::sleep(Duration::from_millis(20));
3701 }
3702
3703 registry
3709 .inner
3710 .shutdown
3711 .store(true, std::sync::atomic::Ordering::SeqCst);
3712 std::thread::sleep(Duration::from_millis(550));
3713
3714 {
3720 let mut state = task.state.lock().unwrap();
3721 state.metadata.status = BgTaskStatus::Running;
3722 state.metadata.status_reason = None;
3723 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3724 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3725 }
3726 }
3727 *task.terminal_at.lock().unwrap() = None;
3728 if !task.paths.exit.exists() {
3731 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3732 }
3733
3734 registry.reap_child(&task);
3738
3739 let state = task.state.lock().unwrap();
3740 assert!(
3741 matches!(state.runtime, TaskRuntime::Piped(None)),
3742 "child handle still released even when marker exists"
3743 );
3744 assert!(
3745 state.detached,
3746 "task still marked detached even when marker exists"
3747 );
3748 assert_eq!(
3753 state.metadata.status,
3754 BgTaskStatus::Running,
3755 "reap_child must defer to poll_task when marker exists"
3756 );
3757 }
3758
3759 #[test]
3760 fn cleanup_finished_keeps_running_tasks() {
3761 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3762 let dir = tempfile::tempdir().unwrap();
3763 let task_id = registry
3764 .spawn(
3765 LONG_RUNNING_COMMAND,
3766 "session".to_string(),
3767 dir.path().to_path_buf(),
3768 HashMap::new(),
3769 Some(Duration::from_secs(30)),
3770 dir.path().to_path_buf(),
3771 10,
3772 true,
3773 false,
3774 Some(dir.path().to_path_buf()),
3775 )
3776 .unwrap();
3777
3778 registry.cleanup_finished(Duration::ZERO);
3779
3780 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3781 let _ = registry.kill(&task_id, "session");
3782 }
3783
3784 #[cfg(windows)]
3785 fn wait_for_file(path: &Path) -> String {
3786 let started = Instant::now();
3787 loop {
3788 if path.exists() {
3789 return fs::read_to_string(path).expect("read file");
3790 }
3791 assert!(
3792 started.elapsed() < Duration::from_secs(30),
3793 "timed out waiting for {}",
3794 path.display()
3795 );
3796 std::thread::sleep(Duration::from_millis(100));
3797 }
3798 }
3799
3800 #[cfg(windows)]
3801 fn spawn_windows_registry_command(
3802 command: &str,
3803 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
3804 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3805 let dir = tempfile::tempdir().unwrap();
3806 let task_id = registry
3807 .spawn(
3808 command,
3809 "session".to_string(),
3810 dir.path().to_path_buf(),
3811 HashMap::new(),
3812 Some(Duration::from_secs(30)),
3813 dir.path().to_path_buf(),
3814 10,
3815 false,
3816 false,
3817 Some(dir.path().to_path_buf()),
3818 )
3819 .unwrap();
3820 (registry, dir, task_id)
3821 }
3822
3823 #[cfg(windows)]
3824 #[test]
3825 fn windows_spawn_writes_exit_marker_for_zero_exit() {
3826 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
3827 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3828
3829 let content = wait_for_file(&exit_path);
3830
3831 assert_eq!(content.trim(), "0");
3832 }
3833
3834 #[cfg(windows)]
3835 #[test]
3836 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
3837 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
3838 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3839
3840 let content = wait_for_file(&exit_path);
3841
3842 assert_eq!(content.trim(), "42");
3843 }
3844
3845 #[cfg(windows)]
3846 #[test]
3847 fn windows_spawn_captures_stdout_to_disk() {
3848 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3849 let task = registry.task_for_session(&task_id, "session").unwrap();
3850 let stdout_path = task.paths.stdout.clone();
3851 let exit_path = task.paths.exit.clone();
3852
3853 let _ = wait_for_file(&exit_path);
3854 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3855
3856 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3857 }
3858
3859 #[cfg(windows)]
3860 #[test]
3861 fn windows_spawn_uses_pwsh_when_available() {
3862 let candidates = crate::windows_shell::shell_candidates_with(
3866 |binary| match binary {
3867 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3868 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3869 _ => None,
3870 },
3871 || None,
3872 );
3873 let shell = candidates.first().expect("at least one candidate").clone();
3874 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3875 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3876 }
3877
3878 #[cfg(windows)]
3885 #[test]
3886 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3887 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3888 let script =
3889 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3890
3891 assert!(
3895 script.contains("set CODE=%ERRORLEVEL%"),
3896 "wrapper must capture exit code into CODE: {script}"
3897 );
3898 assert!(
3899 script.contains("echo %CODE% >"),
3900 "wrapper must echo CODE to a temp marker file: {script}"
3901 );
3902 assert!(
3903 script.contains("move /Y"),
3904 "wrapper must use atomic move to write the marker: {script}"
3905 );
3906 assert!(
3909 script.contains("> nul"),
3910 "wrapper must redirect move output to nul: {script}"
3911 );
3912 assert!(
3914 script.contains("exit /B %CODE%"),
3915 "wrapper must propagate the captured exit code: {script}"
3916 );
3917 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3918 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3919 }
3920
3921 #[cfg(windows)]
3927 #[test]
3928 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3929 use crate::windows_shell::WindowsShell;
3930 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3931 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3932 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3933 assert_eq!(
3934 args_strs,
3935 vec!["/D", "/S", "/C", "echo wrapped"],
3936 "Cmd::bg_command must prepend /D /S /C"
3937 );
3938 }
3939
3940 #[cfg(windows)]
3944 #[test]
3945 fn windows_shell_pwsh_bg_command_uses_standard_args() {
3946 use crate::windows_shell::WindowsShell;
3947 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
3948 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3949 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3950 assert!(
3951 args_strs.contains(&"-Command"),
3952 "Pwsh::bg_command must use -Command: {args_strs:?}"
3953 );
3954 assert!(
3955 args_strs.contains(&"Get-Date"),
3956 "Pwsh::bg_command must include the user command body"
3957 );
3958 }
3959
3960 #[allow(dead_code)]
3991 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
3993}