1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::io::Write;
4use std::path::{Path, PathBuf};
5use std::process::{Child, Command, Stdio};
6use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
7#[cfg(unix)]
8use std::sync::OnceLock;
9use std::sync::{Arc, Mutex, RwLock};
10use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
11
12use rusqlite::Connection;
13use serde::Serialize;
14
15use crate::context::SharedProgressSender;
16use crate::harness::Harness;
17use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
18
19#[cfg(unix)]
20use std::os::unix::process::CommandExt;
21#[cfg(windows)]
22use std::os::windows::process::CommandExt;
23
24use super::buffer::{combine_streams, BgBuffer, TokenCountInput};
25use super::persistence::{
26 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
27 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
28 ExitMarker, PersistedTask, TaskPaths,
29};
30use super::process::is_process_alive;
31#[cfg(unix)]
32use super::process::terminate_pgid;
33#[cfg(windows)]
34use super::process::terminate_pid;
35use super::pty_process::spawn_pty_for_command;
36use super::pty_runtime::PtyRuntime;
37use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
38use super::{BgTaskInfo, BgTaskStatus};
39const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
47const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
48const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
49const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
50
51const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
58const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
59
60#[derive(Debug, Clone, Serialize)]
61pub struct BgCompletion {
62 pub task_id: String,
63 #[serde(skip_serializing)]
66 pub session_id: String,
67 pub status: BgTaskStatus,
68 pub exit_code: Option<i32>,
69 pub command: String,
70 #[serde(default, skip_serializing_if = "String::is_empty")]
76 pub output_preview: String,
77 #[serde(default, skip_serializing_if = "is_false")]
82 pub output_truncated: bool,
83 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub original_tokens: Option<u32>,
87 #[serde(default, skip_serializing_if = "Option::is_none")]
90 pub compressed_tokens: Option<u32>,
91 #[serde(default, skip_serializing_if = "is_false")]
93 pub tokens_skipped: bool,
94}
95
96fn is_false(v: &bool) -> bool {
97 !*v
98}
99
100#[derive(Debug, Clone, Serialize)]
101pub struct BgTaskSnapshot {
102 #[serde(flatten)]
103 pub info: BgTaskInfo,
104 pub exit_code: Option<i32>,
105 pub child_pid: Option<u32>,
106 pub workdir: String,
107 pub output_preview: String,
108 pub output_truncated: bool,
109 pub output_path: Option<String>,
110 pub stderr_path: Option<String>,
111 #[serde(skip_serializing_if = "Option::is_none")]
112 pub pty_rows: Option<u16>,
113 #[serde(skip_serializing_if = "Option::is_none")]
114 pub pty_cols: Option<u16>,
115}
116
117#[derive(Clone)]
118pub struct BgTaskRegistry {
119 pub(crate) inner: Arc<RegistryInner>,
120}
121
122pub(crate) struct RegistryInner {
123 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
124 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
125 pub(crate) progress_sender: SharedProgressSender,
126 watchdog_started: AtomicBool,
127 pub(crate) shutdown: AtomicBool,
128 pub(crate) long_running_reminder_enabled: AtomicBool,
129 pub(crate) long_running_reminder_interval_ms: AtomicU64,
130 persisted_gc_started: AtomicBool,
131 #[cfg(test)]
132 persisted_gc_runs: AtomicU64,
133 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
139 pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
140 pub(crate) db_harness: RwLock<Option<String>>,
141 pub(crate) wake_tx: crossbeam_channel::Sender<()>,
142 pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
143 pub(crate) watch_registry: Mutex<WatchRegistry>,
144}
145
146pub(crate) struct BgTask {
147 pub(crate) task_id: String,
148 pub(crate) session_id: String,
149 pub(crate) paths: TaskPaths,
150 pub(crate) started: Instant,
151 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
152 pub(crate) terminal_at: Mutex<Option<Instant>>,
153 pub(crate) state: Mutex<BgTaskState>,
154}
155
156pub(crate) enum TaskRuntime {
157 Piped(Option<Child>),
158 Pty(Option<PtyRuntime>),
159}
160
161pub(crate) struct BgTaskState {
162 pub(crate) metadata: PersistedTask,
163 pub(crate) runtime: TaskRuntime,
164 pub(crate) detached: bool,
165 pub(crate) child_exit_observed: bool,
176 pub(crate) buffer: BgBuffer,
177 pub(crate) pending_terminal_override: Option<BgTaskStatus>,
179}
180
181impl BgTaskRegistry {
182 pub fn new(progress_sender: SharedProgressSender) -> Self {
183 let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
184 Self {
185 inner: Arc::new(RegistryInner {
186 tasks: Mutex::new(HashMap::new()),
187 completions: Mutex::new(VecDeque::new()),
188 progress_sender,
189 watchdog_started: AtomicBool::new(false),
190 shutdown: AtomicBool::new(false),
191 long_running_reminder_enabled: AtomicBool::new(true),
192 long_running_reminder_interval_ms: AtomicU64::new(600_000),
193 persisted_gc_started: AtomicBool::new(false),
194 #[cfg(test)]
195 persisted_gc_runs: AtomicU64::new(0),
196 compressor: Mutex::new(None),
197 db_pool: RwLock::new(None),
198 db_harness: RwLock::new(None),
199 wake_tx,
200 wake_rx,
201 watch_registry: Mutex::new(WatchRegistry::default()),
202 }),
203 }
204 }
205
206 pub fn set_harness(&self, harness: Harness) {
207 if let Ok(mut slot) = self.inner.db_harness.write() {
208 *slot = Some(harness.as_str().to_string());
209 }
210 }
211
212 pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
213 if let Ok(mut slot) = self.inner.db_pool.write() {
214 *slot = Some(conn);
215 }
216 }
217
218 pub fn clear_db_pool(&self) {
219 if let Ok(mut slot) = self.inner.db_pool.write() {
220 *slot = None;
221 }
222 }
223
224 pub fn set_compressor<F>(&self, compressor: F)
229 where
230 F: Fn(&str, String) -> String + Send + Sync + 'static,
231 {
232 if let Ok(mut slot) = self.inner.compressor.lock() {
233 *slot = Some(Box::new(compressor));
234 }
235 }
236
237 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
240 let Ok(slot) = self.inner.compressor.lock() else {
241 return output;
242 };
243 match slot.as_ref() {
244 Some(compressor) => compressor(command, output),
245 None => output,
246 }
247 }
248
249 fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
250 write_task(&paths.json, metadata)?;
251 self.dual_write_task(paths, metadata);
252 Ok(())
253 }
254
255 fn update_task_metadata<F>(
256 &self,
257 paths: &TaskPaths,
258 update: F,
259 ) -> std::io::Result<PersistedTask>
260 where
261 F: FnOnce(&mut PersistedTask),
262 {
263 let metadata = update_task(&paths.json, update)?;
264 self.dual_write_task(paths, &metadata);
265 Ok(metadata)
266 }
267
268 fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
269 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
270 let Some(pool) = pool else {
271 return;
272 };
273 let harness = self
274 .inner
275 .db_harness
276 .read()
277 .ok()
278 .and_then(|slot| slot.clone());
279 let Some(harness) = harness else {
280 crate::slog_warn!(
281 "dual-write bash_task to DB skipped for {}: harness not configured",
282 metadata.task_id
283 );
284 return;
285 };
286 let row = match metadata.to_bash_task_row(&harness, paths) {
287 Ok(row) => row,
288 Err(error) => {
289 crate::slog_warn!(
290 "dual-write bash_task to DB failed for {}: {}",
291 metadata.task_id,
292 error
293 );
294 return;
295 }
296 };
297 let conn = match pool.lock() {
298 Ok(conn) => conn,
299 Err(_) => {
300 crate::slog_warn!(
301 "dual-write bash_task to DB failed for {}: db mutex poisoned",
302 metadata.task_id
303 );
304 return;
305 }
306 };
307 if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
308 crate::slog_warn!(
309 "dual-write bash_task to DB failed for {}: {}",
310 metadata.task_id,
311 error
312 );
313 }
314 }
315
316 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
317 self.inner
318 .long_running_reminder_enabled
319 .store(enabled, Ordering::SeqCst);
320 self.inner
321 .long_running_reminder_interval_ms
322 .store(interval_ms, Ordering::SeqCst);
323 }
324
325 #[cfg(unix)]
326 #[allow(clippy::too_many_arguments)]
327 pub fn spawn(
328 &self,
329 command: &str,
330 session_id: String,
331 workdir: PathBuf,
332 env: HashMap<String, String>,
333 timeout: Option<Duration>,
334 storage_dir: PathBuf,
335 max_running: usize,
336 notify_on_completion: bool,
337 compressed: bool,
338 project_root: Option<PathBuf>,
339 ) -> Result<String, String> {
340 self.start_watchdog();
341
342 let running = self.running_count();
343 if running >= max_running {
344 return Err(format!(
345 "background bash task limit exceeded: {running} running (max {max_running})"
346 ));
347 }
348
349 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
350 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
351 let task_id = self.generate_unique_task_id()?;
352 let paths = task_paths(&storage_dir, &session_id, &task_id);
353 fs::create_dir_all(&paths.dir)
354 .map_err(|e| format!("failed to create background task dir: {e}"))?;
355
356 let mut metadata = PersistedTask::starting(
357 task_id.clone(),
358 session_id.clone(),
359 command.to_string(),
360 workdir.clone(),
361 project_root,
362 timeout_ms,
363 notify_on_completion,
364 compressed,
365 );
366 self.persist_task(&paths, &metadata)
367 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
368
369 create_capture_file(&paths.stdout)
373 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
374 create_capture_file(&paths.stderr)
375 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
376
377 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
378 Ok(child) => child,
379 Err(error) => {
380 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
381 let _ = delete_task_bundle(&paths);
382 return Err(error);
383 }
384 };
385
386 let child_pid = child.id();
387 metadata.mark_running(child_pid, child_pid as i32);
388 self.persist_task(&paths, &metadata)
389 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
390
391 let task = Arc::new(BgTask {
392 task_id: task_id.clone(),
393 session_id,
394 paths: paths.clone(),
395 started: Instant::now(),
396 last_reminder_at: Mutex::new(None),
397 terminal_at: Mutex::new(None),
398 state: Mutex::new(BgTaskState {
399 metadata,
400 runtime: TaskRuntime::Piped(Some(child)),
401 detached: false,
402 child_exit_observed: false,
403 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
404 pending_terminal_override: None,
405 }),
406 });
407
408 self.inner
409 .tasks
410 .lock()
411 .map_err(|_| "background task registry lock poisoned".to_string())?
412 .insert(task_id.clone(), task);
413
414 Ok(task_id)
415 }
416
417 #[allow(clippy::too_many_arguments)]
418 pub fn spawn_pty(
419 &self,
420 command: &str,
421 session_id: String,
422 workdir: PathBuf,
423 env: HashMap<String, String>,
424 timeout: Option<Duration>,
425 storage_dir: PathBuf,
426 max_running: usize,
427 notify_on_completion: bool,
428 compressed: bool,
429 project_root: Option<PathBuf>,
430 rows: u16,
431 cols: u16,
432 ) -> Result<String, String> {
433 self.start_watchdog();
434
435 let running = self.running_count();
436 if running >= max_running {
437 return Err(format!(
438 "background bash task limit exceeded: {running} running (max {max_running})"
439 ));
440 }
441
442 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
443 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
444 let task_id = self.generate_unique_task_id()?;
445 let paths = task_paths(&storage_dir, &session_id, &task_id);
446 fs::create_dir_all(&paths.dir)
447 .map_err(|e| format!("failed to create background task dir: {e}"))?;
448
449 let mut metadata = PersistedTask::starting(
450 task_id.clone(),
451 session_id.clone(),
452 command.to_string(),
453 workdir.clone(),
454 project_root,
455 timeout_ms,
456 notify_on_completion,
457 compressed,
458 );
459 metadata.mode = BgMode::Pty;
460 metadata.pty_rows = Some(rows);
461 metadata.pty_cols = Some(cols);
462 self.persist_task(&paths, &metadata)
463 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
464 create_capture_file(&paths.pty)
465 .map_err(|e| format!("failed to create PTY capture file: {e}"))?;
466
467 let runtime = match spawn_pty_for_command(
468 &task_id,
469 &session_id,
470 command,
471 &paths,
472 &workdir,
473 &env,
474 rows,
475 cols,
476 self.inner.wake_tx.clone(),
477 ) {
478 Ok(runtime) => runtime,
479 Err(error) => {
480 crate::slog_warn!(
481 "failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
482 );
483 let _ = delete_task_bundle(&paths);
484 return Err(error);
485 }
486 };
487
488 if let Some(child_pid) = runtime.child_pid {
489 metadata.mark_running(child_pid, child_pid as i32);
490 } else {
491 metadata.status = BgTaskStatus::Running;
492 metadata.pgid = None;
493 }
494 self.persist_task(&paths, &metadata)
495 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
496
497 let task = Arc::new(BgTask {
498 task_id: task_id.clone(),
499 session_id,
500 paths: paths.clone(),
501 started: Instant::now(),
502 last_reminder_at: Mutex::new(None),
503 terminal_at: Mutex::new(None),
504 state: Mutex::new(BgTaskState {
505 metadata,
506 runtime: TaskRuntime::Pty(Some(runtime)),
507 detached: false,
508 child_exit_observed: false,
509 buffer: BgBuffer::pty(paths.pty.clone()),
510 pending_terminal_override: None,
511 }),
512 });
513
514 self.inner
515 .tasks
516 .lock()
517 .map_err(|_| "background task registry lock poisoned".to_string())?
518 .insert(task_id.clone(), task);
519
520 Ok(task_id)
521 }
522
523 #[cfg(windows)]
524 #[allow(clippy::too_many_arguments)]
525 pub fn spawn(
526 &self,
527 command: &str,
528 session_id: String,
529 workdir: PathBuf,
530 env: HashMap<String, String>,
531 timeout: Option<Duration>,
532 storage_dir: PathBuf,
533 max_running: usize,
534 notify_on_completion: bool,
535 compressed: bool,
536 project_root: Option<PathBuf>,
537 ) -> Result<String, String> {
538 self.start_watchdog();
539
540 let running = self.running_count();
541 if running >= max_running {
542 return Err(format!(
543 "background bash task limit exceeded: {running} running (max {max_running})"
544 ));
545 }
546
547 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
548 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
549 let task_id = self.generate_unique_task_id()?;
550 let paths = task_paths(&storage_dir, &session_id, &task_id);
551 fs::create_dir_all(&paths.dir)
552 .map_err(|e| format!("failed to create background task dir: {e}"))?;
553
554 let mut metadata = PersistedTask::starting(
555 task_id.clone(),
556 session_id.clone(),
557 command.to_string(),
558 workdir.clone(),
559 project_root,
560 timeout_ms,
561 notify_on_completion,
562 compressed,
563 );
564 self.persist_task(&paths, &metadata)
565 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
566
567 create_capture_file(&paths.stdout)
573 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
574 create_capture_file(&paths.stderr)
575 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
576
577 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
578 Ok(child) => child,
579 Err(error) => {
580 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
581 let _ = delete_task_bundle(&paths);
582 return Err(error);
583 }
584 };
585
586 let child_pid = child.id();
587 metadata.status = BgTaskStatus::Running;
588 metadata.child_pid = Some(child_pid);
589 metadata.pgid = None;
590 self.persist_task(&paths, &metadata)
591 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
592
593 let task = Arc::new(BgTask {
594 task_id: task_id.clone(),
595 session_id,
596 paths: paths.clone(),
597 started: Instant::now(),
598 last_reminder_at: Mutex::new(None),
599 terminal_at: Mutex::new(None),
600 state: Mutex::new(BgTaskState {
601 metadata,
602 runtime: TaskRuntime::Piped(Some(child)),
603 detached: false,
604 child_exit_observed: false,
605 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
606 pending_terminal_override: None,
607 }),
608 });
609
610 self.inner
611 .tasks
612 .lock()
613 .map_err(|_| "background task registry lock poisoned".to_string())?
614 .insert(task_id.clone(), task);
615
616 Ok(task_id)
617 }
618
619 pub fn write_pty(
620 &self,
621 task_id: &str,
622 session_id: &str,
623 input: &[u8],
624 ) -> Result<usize, String> {
625 let task = self
626 .task_for_session(task_id, session_id)
627 .ok_or_else(|| "task_not_found".to_string())?;
628
629 let writer = {
630 let state = task
631 .state
632 .lock()
633 .map_err(|_| "background task lock poisoned".to_string())?;
634 if state.metadata.mode != BgMode::Pty {
635 return Err("task_not_pty".to_string());
636 }
637 if state.metadata.status.is_terminal() {
638 return Err("task_exited".to_string());
639 }
640 match &state.runtime {
641 TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
642 TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
643 TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
644 }
645 };
646
647 let mut writer = writer
648 .lock()
649 .map_err(|_| "PTY writer lock poisoned".to_string())?;
650 writer
651 .write_all(input)
652 .map_err(|error| format!("failed to write to PTY: {error}"))?;
653 writer
654 .flush()
655 .map_err(|error| format!("failed to flush PTY writer: {error}"))?;
656 Ok(input.len())
657 }
658
659 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
660 self.replay_session_inner(storage_dir, session_id, None)
661 }
662
663 pub fn replay_session_for_project(
664 &self,
665 storage_dir: &Path,
666 session_id: &str,
667 project_root: &Path,
668 ) -> Result<(), String> {
669 self.replay_session_inner(storage_dir, session_id, Some(project_root))
670 }
671
672 fn replay_session_inner(
673 &self,
674 storage_dir: &Path,
675 session_id: &str,
676 project_root: Option<&Path>,
677 ) -> Result<(), String> {
678 self.start_watchdog();
679 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
680 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
681 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
682 }
683 }
684
685 let canonical_project = project_root.map(canonicalized_path);
686 let tasks = match self.replay_session_from_db(session_id) {
698 Some(Ok(tasks)) if !tasks.is_empty() => tasks,
699 Some(Ok(_)) => {
700 let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
701 if !disk_tasks.is_empty() {
702 crate::slog_info!(
703 "bash task replay: 0 in DB for session {}, {} from disk fallback",
704 session_id,
705 disk_tasks.len()
706 );
707 }
708 disk_tasks
709 }
710 Some(Err(error)) => {
711 crate::slog_warn!(
712 "bash task replay DB lookup failed for session {}; falling back to disk: {}",
713 session_id,
714 error
715 );
716 self.replay_session_from_disk(storage_dir, session_id)?
717 }
718 None => {
719 self.replay_session_from_disk(storage_dir, session_id)?
721 }
722 };
723
724 for mut metadata in tasks {
725 if metadata.session_id != session_id {
726 continue;
727 }
728 if let Some(canonical_project) = canonical_project.as_deref() {
729 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
730 if metadata_project.as_deref() != Some(canonical_project) {
731 continue;
732 }
733 }
734
735 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
736 match metadata.status {
737 BgTaskStatus::Starting => {
738 let completion_was_delivered = metadata.completion_delivered;
739 metadata.mark_terminal(
740 BgTaskStatus::Failed,
741 None,
742 Some("spawn aborted".to_string()),
743 );
744 metadata.completion_delivered |= completion_was_delivered;
745 let _ = self.persist_task(&paths, &metadata);
746 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
747 self.insert_rehydrated_task(metadata, paths, true)?;
748 }
749 BgTaskStatus::Running | BgTaskStatus::Killing => {
750 if metadata.mode == BgMode::Pty {
751 if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
752 let completion_was_delivered = metadata.completion_delivered;
753 metadata = terminal_metadata_from_marker(metadata, marker, None);
754 metadata.completion_delivered |= completion_was_delivered;
755 let _ = self.persist_task(&paths, &metadata);
756 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
757 self.insert_rehydrated_task(metadata, paths, true)?;
758 } else if metadata.status.is_terminal() {
759 self.insert_rehydrated_task(metadata, paths, true)?;
760 } else {
761 let completion_was_delivered = metadata.completion_delivered;
762 metadata.mark_terminal(
763 BgTaskStatus::Killed,
764 None,
765 Some("pty_lost_on_bridge_restart".to_string()),
766 );
767 metadata.completion_delivered |= completion_was_delivered;
768 let _ = self.persist_task(&paths, &metadata);
769 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
770 self.insert_rehydrated_task(metadata, paths, true)?;
771 }
772 } else if self.running_metadata_is_stale(&metadata) {
773 let completion_was_delivered = metadata.completion_delivered;
774 metadata.mark_terminal(
775 BgTaskStatus::Killed,
776 None,
777 Some("orphaned (>24h)".to_string()),
778 );
779 metadata.completion_delivered |= completion_was_delivered;
780 if !paths.exit.exists() {
781 let _ = write_kill_marker_if_absent(&paths.exit);
782 }
783 let _ = self.persist_task(&paths, &metadata);
784 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
785 self.insert_rehydrated_task(metadata, paths, true)?;
786 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
787 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
788 "recovered from inconsistent killing state on replay".to_string()
789 });
790 if reason.is_some() {
791 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
792 metadata.task_id);
793 }
794 let completion_was_delivered = metadata.completion_delivered;
795 metadata = terminal_metadata_from_marker(metadata, marker, reason);
796 metadata.completion_delivered |= completion_was_delivered;
797 let _ = self.persist_task(&paths, &metadata);
798 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
799 self.insert_rehydrated_task(metadata, paths, true)?;
800 } else if metadata.status == BgTaskStatus::Killing {
801 if !paths.exit.exists() {
802 let _ = write_kill_marker_if_absent(&paths.exit);
803 }
804 let completion_was_delivered = metadata.completion_delivered;
805 metadata.mark_terminal(
806 BgTaskStatus::Killed,
807 None,
808 Some("recovered from inconsistent killing state on replay".to_string()),
809 );
810 metadata.completion_delivered |= completion_was_delivered;
811 let _ = self.persist_task(&paths, &metadata);
812 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
813 self.insert_rehydrated_task(metadata, paths, true)?;
814 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
815 let completion_was_delivered = metadata.completion_delivered;
816 metadata.mark_terminal(
817 BgTaskStatus::Failed,
818 None,
819 Some("process exited without exit marker".to_string()),
820 );
821 metadata.completion_delivered |= completion_was_delivered;
822 let _ = self.persist_task(&paths, &metadata);
823 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
824 self.insert_rehydrated_task(metadata, paths, true)?;
825 } else {
826 self.insert_rehydrated_task(metadata, paths, true)?;
827 }
828 }
829 _ if metadata.status.is_terminal() => {
830 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
836 self.insert_rehydrated_task(metadata, paths, true)?;
837 }
838 _ => {}
839 }
840 }
841
842 Ok(())
843 }
844
845 fn replay_session_from_db(
846 &self,
847 session_id: &str,
848 ) -> Option<Result<Vec<PersistedTask>, String>> {
849 let pool = self
850 .inner
851 .db_pool
852 .read()
853 .ok()
854 .and_then(|slot| slot.clone())?;
855 let harness = self
856 .inner
857 .db_harness
858 .read()
859 .ok()
860 .and_then(|slot| slot.clone())?;
861 let conn = match pool.lock() {
862 Ok(conn) => conn,
863 Err(_) => return Some(Err("db mutex poisoned".to_string())),
864 };
865 Some(
866 crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
867 .map(|rows| rows.into_iter().map(PersistedTask::from).collect())
868 .map_err(|error| error.to_string()),
869 )
870 }
871
872 fn replay_session_from_disk(
873 &self,
874 storage_dir: &Path,
875 session_id: &str,
876 ) -> Result<Vec<PersistedTask>, String> {
877 let dir = session_tasks_dir(storage_dir, session_id);
878 if !dir.exists() {
879 return Ok(Vec::new());
880 }
881
882 let entries = fs::read_dir(&dir)
883 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
884 let mut tasks = Vec::new();
885 for entry in entries.flatten() {
886 let path = entry.path();
887 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
888 continue;
889 }
890 match read_task(&path) {
891 Ok(metadata) => tasks.push(metadata),
892 Err(error) => {
893 crate::slog_warn!(
894 "quarantining invalid background task metadata {} during replay: {error}",
895 path.display()
896 );
897 if let Err(quarantine_error) =
898 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
899 {
900 crate::slog_warn!(
901 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
902 path.display()
903 );
904 }
905 }
906 }
907 }
908 Ok(tasks)
909 }
910
911 pub fn register_watch(
912 &self,
913 task_id: String,
914 pattern: WatchPattern,
915 once: bool,
916 ) -> Result<String, &'static str> {
917 let task = self.task(&task_id).ok_or("task_not_found")?;
918 let (mode, terminal_at_registration, stdout, stderr, pty) = task
919 .state
920 .lock()
921 .map(|state| {
922 (
923 state.metadata.mode.clone(),
924 state.metadata.status.is_terminal(),
925 task.paths.stdout.clone(),
926 task.paths.stderr.clone(),
927 task.paths.pty.clone(),
928 )
929 })
930 .map_err(|_| "background_task_lock_poisoned")?;
931
932 let mut terminal_matches = Vec::new();
933 let scanned_terminal = terminal_at_registration;
934 let watch_id = {
935 let mut registry = self
936 .inner
937 .watch_registry
938 .lock()
939 .map_err(|_| "watch_registry_poisoned")?;
940 let watch_id = registry.register(task_id.clone(), pattern, once)?;
941 match &mode {
942 BgMode::Pipes => {
943 let stdout_key = format!("{task_id}:stdout");
944 let stderr_key = format!("{task_id}:stderr");
945 if terminal_at_registration {
946 registry.set_file_cursor(&stdout_key, 0);
947 registry.set_file_cursor(&stderr_key, 0);
948 terminal_matches.extend(registry.scan_file_new_bytes(
949 &stdout_key,
950 &task_id,
951 &stdout,
952 ));
953 terminal_matches.extend(registry.scan_file_new_bytes(
954 &stderr_key,
955 &task_id,
956 &stderr,
957 ));
958 } else {
959 registry.prime_file_cursor(&stdout_key, &stdout);
960 registry.prime_file_cursor(&stderr_key, &stderr);
961 }
962 }
963 BgMode::Pty => {
964 let pty_key = format!("{task_id}:pty");
965 if terminal_at_registration {
966 registry.set_file_cursor(&pty_key, 0);
967 terminal_matches
968 .extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
969 } else {
970 registry.prime_file_cursor(&pty_key, &pty);
971 }
972 }
973 }
974 watch_id
975 };
976
977 if task.is_terminal() {
978 if !scanned_terminal {
979 terminal_matches = {
980 let mut registry = self
981 .inner
982 .watch_registry
983 .lock()
984 .map_err(|_| "watch_registry_poisoned")?;
985 match &mode {
986 BgMode::Pipes => {
987 let stdout_key = format!("{task_id}:stdout");
988 let stderr_key = format!("{task_id}:stderr");
989 registry.set_file_cursor(&stdout_key, 0);
990 registry.set_file_cursor(&stderr_key, 0);
991 let mut matches =
992 registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
993 matches.extend(registry.scan_file_new_bytes(
994 &stderr_key,
995 &task_id,
996 &stderr,
997 ));
998 matches
999 }
1000 BgMode::Pty => {
1001 let pty_key = format!("{task_id}:pty");
1002 registry.set_file_cursor(&pty_key, 0);
1003 registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
1004 }
1005 }
1006 };
1007 }
1008
1009 let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
1010 if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
1011 if watch_matched {
1012 let _ = task.set_completion_delivered(true, self);
1013 self.clear_task_watch_state(&task_id);
1014 }
1015 return Ok(watch_id);
1016 }
1017
1018 let completion = self
1019 .remove_pending_completion(&task_id)
1020 .or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
1021 if terminal_matches.is_empty() {
1022 if let Some(completion) = completion.as_ref() {
1023 self.emit_bash_watch_exit(completion);
1024 }
1025 } else {
1026 for pattern_match in terminal_matches {
1027 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1028 }
1029 }
1030 let _ = task.set_completion_delivered(true, self);
1031 self.clear_task_watch_state(&task_id);
1032 }
1033
1034 Ok(watch_id)
1035 }
1036
1037 pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
1038 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1039 registry.unregister(task_id, watch_id);
1040 }
1041 }
1042
1043 pub fn active_watch_count(&self, task_id: &str) -> usize {
1044 self.inner
1045 .watch_registry
1046 .lock()
1047 .map(|registry| registry.active_count(task_id))
1048 .unwrap_or(0)
1049 }
1050
1051 fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
1052 self.inner
1053 .watch_registry
1054 .lock()
1055 .map(|registry| {
1056 (
1057 registry.has_controlled_task(task_id),
1058 registry.has_matched_task(task_id),
1059 )
1060 })
1061 .unwrap_or((false, false))
1062 }
1063
1064 fn task_has_watch_control(&self, task_id: &str) -> bool {
1065 self.inner
1066 .watch_registry
1067 .lock()
1068 .map(|registry| registry.has_controlled_task(task_id))
1069 .unwrap_or(false)
1070 }
1071
1072 fn clear_task_watch_state(&self, task_id: &str) {
1073 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1074 registry.clear_task(task_id);
1075 }
1076 }
1077
1078 pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
1079 let (mode, stdout, stderr, pty) = match task.state.lock() {
1080 Ok(state) => (
1081 state.metadata.mode.clone(),
1082 task.paths.stdout.clone(),
1083 task.paths.stderr.clone(),
1084 task.paths.pty.clone(),
1085 ),
1086 Err(_) => return,
1087 };
1088 let mut matches = Vec::new();
1089 if let Ok(mut registry) = self.inner.watch_registry.lock() {
1090 match mode {
1091 BgMode::Pipes => {
1092 let stdout_key = format!("{}:stdout", task.task_id);
1093 let stderr_key = format!("{}:stderr", task.task_id);
1094 matches.extend(registry.scan_file_new_bytes(
1095 &stdout_key,
1096 &task.task_id,
1097 &stdout,
1098 ));
1099 matches.extend(registry.scan_file_new_bytes(
1100 &stderr_key,
1101 &task.task_id,
1102 &stderr,
1103 ));
1104 }
1105 BgMode::Pty => {
1106 let pty_key = format!("{}:pty", task.task_id);
1107 matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
1108 }
1109 }
1110 }
1111 for pattern_match in matches {
1112 self.emit_bash_pattern_match(&task.session_id, pattern_match);
1113 }
1114 }
1115
1116 pub fn status(
1117 &self,
1118 task_id: &str,
1119 session_id: &str,
1120 project_root: Option<&Path>,
1121 storage_dir: Option<&Path>,
1122 preview_bytes: usize,
1123 ) -> Option<BgTaskSnapshot> {
1124 let mut task = self.task_for_session(task_id, session_id);
1125 if task.is_none() {
1126 if let Some(storage_dir) = storage_dir {
1127 let _ = self.replay_session(storage_dir, session_id);
1128 task = self.task_for_session(task_id, session_id);
1129 }
1130 }
1131 let Some(task) = task else {
1132 return self.status_relaxed(
1133 task_id,
1134 session_id,
1135 project_root?,
1136 storage_dir?,
1137 preview_bytes,
1138 );
1139 };
1140 let _ = self.poll_task(&task);
1141 let mut snapshot = task.snapshot(preview_bytes);
1142 self.maybe_compress_snapshot(&task, &mut snapshot);
1143 Some(snapshot)
1144 }
1145
1146 fn status_relaxed_task(
1147 &self,
1148 task_id: &str,
1149 project_root: &Path,
1150 storage_dir: &Path,
1151 ) -> Option<Arc<BgTask>> {
1152 let canonical_project = canonicalized_path(project_root);
1153 match self.lookup_relaxed_task_from_db(task_id, project_root) {
1154 Some(Ok(Some(metadata))) => {
1155 if let Some(task) = self.task(task_id) {
1156 let matches_project = task
1157 .state
1158 .lock()
1159 .map(|state| {
1160 state
1161 .metadata
1162 .project_root
1163 .as_deref()
1164 .map(canonicalized_path)
1165 .as_deref()
1166 == Some(canonical_project.as_path())
1167 })
1168 .unwrap_or(false);
1169 return matches_project.then_some(task);
1170 }
1171 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1172 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1173 return None;
1174 }
1175 return self.task(task_id);
1176 }
1177 Some(Ok(None)) => {
1178 crate::slog_info!(
1179 "bash task relaxed DB miss for {}; falling back to disk",
1180 task_id
1181 );
1182 }
1183 Some(Err(error)) => {
1184 crate::slog_warn!(
1185 "bash task relaxed DB lookup failed for {}; falling back to disk: {}",
1186 task_id,
1187 error
1188 );
1189 }
1190 None => {
1191 crate::slog_info!(
1192 "bash task relaxed DB unavailable for {}; falling back to disk",
1193 task_id
1194 );
1195 }
1196 }
1197 let root = storage_dir.join("bash-tasks");
1198 let entries = fs::read_dir(&root).ok()?;
1199 for entry in entries.flatten() {
1200 let dir = entry.path();
1201 if !dir.is_dir() {
1202 continue;
1203 }
1204 let path = dir.join(format!("{task_id}.json"));
1205 if !path.exists() {
1206 continue;
1207 }
1208 let metadata = match read_task(&path) {
1209 Ok(metadata) => metadata,
1210 Err(error) => {
1211 crate::slog_warn!(
1212 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
1213 path.display()
1214 );
1215 if let Err(quarantine_error) =
1216 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
1217 {
1218 crate::slog_warn!(
1219 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
1220 path.display()
1221 );
1222 }
1223 continue;
1224 }
1225 };
1226 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
1227 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
1228 continue;
1229 }
1230 if let Some(task) = self.task(task_id) {
1231 let matches_project = task
1232 .state
1233 .lock()
1234 .map(|state| {
1235 state
1236 .metadata
1237 .project_root
1238 .as_deref()
1239 .map(canonicalized_path)
1240 .as_deref()
1241 == Some(canonical_project.as_path())
1242 })
1243 .unwrap_or(false);
1244 return matches_project.then_some(task);
1245 }
1246 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1247 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
1248 return None;
1249 }
1250 return self.task(task_id);
1251 }
1252 None
1253 }
1254
1255 fn lookup_relaxed_task_from_db(
1256 &self,
1257 task_id: &str,
1258 project_root: &Path,
1259 ) -> Option<Result<Option<PersistedTask>, String>> {
1260 let pool = self
1261 .inner
1262 .db_pool
1263 .read()
1264 .ok()
1265 .and_then(|slot| slot.clone())?;
1266 let harness = self
1267 .inner
1268 .db_harness
1269 .read()
1270 .ok()
1271 .and_then(|slot| slot.clone())?;
1272 let conn = match pool.lock() {
1273 Ok(conn) => conn,
1274 Err(_) => return Some(Err("db mutex poisoned".to_string())),
1275 };
1276 let project_key = crate::search_index::project_cache_key(project_root);
1277 Some(
1278 crate::db::bash_tasks::find_bash_task_for_project(
1279 &conn,
1280 &harness,
1281 &project_key,
1282 task_id,
1283 )
1284 .map(|row| row.map(PersistedTask::from))
1285 .map_err(|error| error.to_string()),
1286 )
1287 }
1288
1289 pub(super) fn status_relaxed(
1290 &self,
1291 task_id: &str,
1292 _session_id: &str,
1293 project_root: &Path,
1294 storage_dir: &Path,
1295 preview_bytes: usize,
1296 ) -> Option<BgTaskSnapshot> {
1297 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
1298 let _ = self.poll_task(&task);
1299 let mut snapshot = task.snapshot(preview_bytes);
1300 self.maybe_compress_snapshot(&task, &mut snapshot);
1301 Some(snapshot)
1302 }
1303
1304 pub fn kill_relaxed(
1305 &self,
1306 task_id: &str,
1307 project_root: &Path,
1308 storage_dir: &Path,
1309 ) -> Result<BgTaskSnapshot, String> {
1310 let task = self
1311 .status_relaxed_task(task_id, project_root, storage_dir)
1312 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1313 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
1314 }
1315
1316 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
1317 #[cfg(test)]
1318 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
1319
1320 let mut deleted = 0usize;
1321
1322 let root = storage_dir.join("bash-tasks");
1323 if root.exists() {
1324 let session_dirs = fs::read_dir(&root).map_err(|e| {
1325 format!(
1326 "failed to read background task root {}: {e}",
1327 root.display()
1328 )
1329 })?;
1330 for session_entry in session_dirs.flatten() {
1331 let session_dir = session_entry.path();
1332 if !session_dir.is_dir() {
1333 continue;
1334 }
1335 let task_entries = match fs::read_dir(&session_dir) {
1336 Ok(entries) => entries,
1337 Err(error) => {
1338 crate::slog_warn!(
1339 "failed to read background task session dir {}: {error}",
1340 session_dir.display()
1341 );
1342 continue;
1343 }
1344 };
1345 for task_entry in task_entries.flatten() {
1346 let json_path = task_entry.path();
1347 if json_path
1348 .extension()
1349 .and_then(|extension| extension.to_str())
1350 != Some("json")
1351 {
1352 continue;
1353 }
1354 if modified_within(&json_path, PERSISTED_GC_GRACE) {
1355 continue;
1356 }
1357 let metadata = match read_task(&json_path) {
1358 Ok(metadata) => metadata,
1359 Err(error) => {
1360 crate::slog_warn!(
1361 "quarantining corrupt background task metadata {}: {error}",
1362 json_path.display()
1363 );
1364 quarantine_task_json(
1365 storage_dir,
1366 &session_dir,
1367 &json_path,
1368 QuarantineKind::Corrupt,
1369 )?;
1370 continue;
1371 }
1372 };
1373 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
1374 continue;
1375 }
1376 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
1377 match delete_task_bundle(&paths) {
1378 Ok(()) => {
1379 deleted += 1;
1380 log::debug!(
1381 "deleted persisted background task bundle {}",
1382 metadata.task_id
1383 );
1384 }
1385 Err(error) => {
1386 crate::slog_warn!(
1387 "failed to delete background task bundle {}: {error}",
1388 metadata.task_id
1389 );
1390 continue;
1391 }
1392 }
1393 }
1394 }
1395 }
1396 gc_quarantine(storage_dir);
1397 Ok(deleted)
1398 }
1399
1400 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
1401 let tasks = self
1402 .inner
1403 .tasks
1404 .lock()
1405 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
1406 .unwrap_or_default();
1407 tasks
1408 .into_iter()
1409 .map(|task| {
1410 let _ = self.poll_task(&task);
1411 let mut snapshot = task.snapshot(preview_bytes);
1412 self.maybe_compress_snapshot(&task, &mut snapshot);
1413 snapshot
1414 })
1415 .collect()
1416 }
1417
1418 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
1424 if !snapshot.info.status.is_terminal() {
1425 return;
1426 }
1427 let (compressed_flag, mode) = task
1428 .state
1429 .lock()
1430 .map(|state| (state.metadata.compressed, state.metadata.mode.clone()))
1431 .unwrap_or((true, BgMode::Pipes));
1432 if mode == BgMode::Pty {
1433 return;
1434 }
1435 if !compressed_flag {
1436 return;
1437 }
1438 let raw = std::mem::take(&mut snapshot.output_preview);
1439 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
1440 }
1441
1442 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
1443 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
1444 }
1445
1446 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
1447 let task = self
1448 .task_for_session(task_id, session_id)
1449 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1450 let mut state = task
1451 .state
1452 .lock()
1453 .map_err(|_| "background task lock poisoned".to_string())?;
1454 let updated = self
1455 .update_task_metadata(&task.paths, |metadata| {
1456 metadata.notify_on_completion = true;
1457 metadata.completion_delivered = false;
1458 })
1459 .map_err(|e| format!("failed to promote background task: {e}"))?;
1460 state.metadata = updated;
1461 if state.metadata.status.is_terminal() {
1462 state.buffer.enforce_terminal_cap();
1463 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1464 }
1465 Ok(true)
1466 }
1467
1468 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
1469 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
1470 .map(|_| ())
1471 }
1472
1473 pub fn cleanup_finished(&self, older_than: Duration) {
1474 let cutoff = Instant::now().checked_sub(older_than);
1475 let removable_paths: Vec<(String, TaskPaths)> =
1476 if let Ok(mut tasks) = self.inner.tasks.lock() {
1477 let removable = tasks
1478 .iter()
1479 .filter_map(|(task_id, task)| {
1480 let delivered_terminal = task
1481 .state
1482 .lock()
1483 .map(|state| {
1484 state.metadata.status.is_terminal()
1485 && state.metadata.completion_delivered
1486 })
1487 .unwrap_or(false);
1488 if !delivered_terminal {
1489 return None;
1490 }
1491
1492 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
1493 let expired = match (terminal_at, cutoff) {
1494 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
1495 (Some(_), None) => true,
1496 (None, _) => false,
1497 };
1498 expired.then(|| task_id.clone())
1499 })
1500 .collect::<Vec<_>>();
1501
1502 removable
1503 .into_iter()
1504 .filter_map(|task_id| {
1505 tasks
1506 .remove(&task_id)
1507 .map(|task| (task_id, task.paths.clone()))
1508 })
1509 .collect()
1510 } else {
1511 Vec::new()
1512 };
1513
1514 for (task_id, paths) in removable_paths {
1515 match delete_task_bundle(&paths) {
1516 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
1517 Err(error) => crate::slog_warn!(
1518 "failed to delete persisted background task bundle {task_id}: {error}"
1519 ),
1520 }
1521 }
1522 }
1523
1524 pub fn drain_completions(&self) -> Vec<BgCompletion> {
1525 self.drain_completions_for_session(None)
1526 }
1527
1528 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
1529 let completions = match self.inner.completions.lock() {
1530 Ok(completions) => completions,
1531 Err(_) => return Vec::new(),
1532 };
1533
1534 completions
1535 .iter()
1536 .filter(|completion| {
1537 session_id
1538 .map(|session_id| completion.session_id == session_id)
1539 .unwrap_or(true)
1540 })
1541 .cloned()
1542 .collect()
1543 }
1544
1545 pub fn ack_completions_for_session(
1546 &self,
1547 session_id: Option<&str>,
1548 task_ids: &[String],
1549 ) -> Vec<String> {
1550 if task_ids.is_empty() {
1551 return Vec::new();
1552 }
1553 let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
1554 let mut completion_sessions = HashMap::new();
1555 if let Ok(mut completions) = self.inner.completions.lock() {
1556 completions.retain(|completion| {
1557 let session_matches = session_id
1558 .map(|session_id| completion.session_id == session_id)
1559 .unwrap_or(true);
1560 if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
1561 completion_sessions
1562 .insert(completion.task_id.clone(), completion.session_id.clone());
1563 false
1564 } else {
1565 true
1566 }
1567 });
1568 }
1569
1570 let mut delivered = Vec::new();
1571 for task_id in task_ids {
1572 let task = if let Some(session_id) = session_id {
1573 self.task_for_session(task_id, session_id)
1574 } else if let Some(completion_session_id) = completion_sessions.get(task_id) {
1575 self.task_for_session(task_id, completion_session_id)
1576 } else {
1577 self.task(task_id)
1578 };
1579 if let Some(task) = task {
1580 if task.set_completion_delivered(true, self).is_ok() {
1581 delivered.push(task_id.clone());
1582 }
1583 }
1584 }
1585
1586 delivered
1587 }
1588
1589 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
1590 self.inner
1591 .completions
1592 .lock()
1593 .map(|completions| {
1594 completions
1595 .iter()
1596 .filter(|completion| completion.session_id == session_id)
1597 .cloned()
1598 .collect()
1599 })
1600 .unwrap_or_default()
1601 }
1602
1603 fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
1604 let mut completions = self.inner.completions.lock().ok()?;
1605 let idx = completions
1606 .iter()
1607 .position(|completion| completion.task_id == task_id)?;
1608 completions.remove(idx)
1609 }
1610
1611 fn completion_snapshot_for_task(
1612 &self,
1613 task: &Arc<BgTask>,
1614 preview_bytes: usize,
1615 ) -> Option<BgCompletion> {
1616 let snapshot = task.snapshot(preview_bytes);
1617 if !snapshot.info.status.is_terminal() {
1618 return None;
1619 }
1620 let output_preview = if snapshot.info.mode == BgMode::Pty {
1621 String::new()
1622 } else {
1623 let compressed = task
1624 .state
1625 .lock()
1626 .map(|state| state.metadata.compressed)
1627 .unwrap_or(true);
1628 if compressed {
1629 self.compress_output(&snapshot.info.command, snapshot.output_preview)
1630 } else {
1631 snapshot.output_preview
1632 }
1633 };
1634 Some(BgCompletion {
1635 task_id: snapshot.info.task_id,
1636 session_id: task.session_id.clone(),
1637 status: snapshot.info.status,
1638 exit_code: snapshot.exit_code,
1639 command: snapshot.info.command,
1640 output_preview,
1641 output_truncated: snapshot.output_truncated,
1642 original_tokens: None,
1643 compressed_tokens: None,
1644 tokens_skipped: false,
1645 })
1646 }
1647
1648 pub fn detach(&self) {
1649 self.inner.shutdown.store(true, Ordering::SeqCst);
1650 if let Ok(mut tasks) = self.inner.tasks.lock() {
1651 for task in tasks.values() {
1652 if let Ok(mut state) = task.state.lock() {
1653 match &mut state.runtime {
1654 TaskRuntime::Piped(child) => *child = None,
1655 TaskRuntime::Pty(runtime) => *runtime = None,
1656 }
1657 state.detached = true;
1658 }
1659 }
1660 tasks.clear();
1661 }
1662 }
1663
1664 pub fn shutdown(&self) {
1665 let tasks = self
1666 .inner
1667 .tasks
1668 .lock()
1669 .map(|tasks| {
1670 tasks
1671 .values()
1672 .map(|task| (task.task_id.clone(), task.session_id.clone()))
1673 .collect::<Vec<_>>()
1674 })
1675 .unwrap_or_default();
1676 for (task_id, session_id) in tasks {
1677 let _ = self.kill(&task_id, &session_id);
1678 }
1679 }
1680
1681 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
1682 if let Ok(state) = task.state.lock() {
1683 if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
1684 if !pty.exit_observed.load(Ordering::SeqCst) {
1692 return Ok(());
1693 }
1694 }
1695 }
1696 let marker = match read_exit_marker(&task.paths.exit) {
1697 Ok(Some(marker)) => marker,
1698 Ok(None) => return Ok(()),
1699 Err(error) => return Err(format!("failed to read exit marker: {error}")),
1700 };
1701 self.finalize_from_marker(task, marker, None)
1702 }
1703
1704 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
1705 let Ok(mut state) = task.state.lock() else {
1706 return;
1707 };
1708 match &mut state.runtime {
1709 TaskRuntime::Piped(child_slot) => {
1710 if let Some(child) = child_slot.as_mut() {
1711 if matches!(child.try_wait(), Ok(Some(_))) {
1712 *child_slot = None;
1713 state.detached = true;
1714 state.child_exit_observed = true;
1715 }
1716 } else if state.detached {
1717 let child_known_dead = state.child_exit_observed
1718 || state
1719 .metadata
1720 .child_pid
1721 .is_some_and(|pid| !is_process_alive(pid));
1722 if child_known_dead {
1723 self.fail_without_exit_marker_if_needed(task, &mut state);
1724 }
1725 }
1726 }
1727 TaskRuntime::Pty(Some(pty)) => {
1728 if pty.exit_observed.load(Ordering::SeqCst) {
1729 drop(state);
1730 let _ = self.poll_task(task);
1731 }
1732 }
1733 TaskRuntime::Pty(None) => {}
1734 }
1735 }
1736
1737 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
1738 if state.metadata.status.is_terminal() {
1739 return;
1740 }
1741 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
1742 return;
1743 }
1744 let watch_controlled = self.task_has_watch_control(&task.task_id);
1745 let updated = self.update_task_metadata(&task.paths, |metadata| {
1746 metadata.mark_terminal(
1747 BgTaskStatus::Failed,
1748 None,
1749 Some("process exited without exit marker".to_string()),
1750 );
1751 if watch_controlled {
1752 metadata.completion_delivered = true;
1753 }
1754 });
1755 if let Ok(metadata) = updated {
1756 state.pending_terminal_override = None;
1757 state.metadata = metadata;
1758 task.mark_terminal_now();
1759 state.buffer.enforce_terminal_cap();
1760 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1761 }
1762 }
1763
1764 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1765 self.inner
1766 .tasks
1767 .lock()
1768 .map(|tasks| {
1769 tasks
1770 .values()
1771 .filter(|task| task.is_running())
1772 .cloned()
1773 .collect()
1774 })
1775 .unwrap_or_default()
1776 }
1777
1778 fn insert_rehydrated_task(
1779 &self,
1780 metadata: PersistedTask,
1781 paths: TaskPaths,
1782 detached: bool,
1783 ) -> Result<(), String> {
1784 let task_id = metadata.task_id.clone();
1785 let session_id = metadata.session_id.clone();
1786 let started = started_instant_from_unix_millis(metadata.started_at);
1787 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1788 let mode = metadata.mode.clone();
1789 let task = Arc::new(BgTask {
1790 task_id: task_id.clone(),
1791 session_id,
1792 paths: paths.clone(),
1793 started,
1794 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1795 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1796 state: Mutex::new(BgTaskState {
1797 metadata,
1798 runtime: if mode == BgMode::Pty {
1799 TaskRuntime::Pty(None)
1800 } else {
1801 TaskRuntime::Piped(None)
1802 },
1803 detached,
1804 child_exit_observed: false,
1811 buffer: if mode == BgMode::Pty {
1812 BgBuffer::pty(paths.pty.clone())
1813 } else {
1814 BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
1815 },
1816 pending_terminal_override: None,
1817 }),
1818 });
1819 self.inner
1820 .tasks
1821 .lock()
1822 .map_err(|_| "background task registry lock poisoned".to_string())?
1823 .insert(task_id, task);
1824 Ok(())
1825 }
1826
1827 fn kill_with_status(
1828 &self,
1829 task_id: &str,
1830 session_id: &str,
1831 terminal_status: BgTaskStatus,
1832 ) -> Result<BgTaskSnapshot, String> {
1833 let task = self
1834 .task_for_session(task_id, session_id)
1835 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1836
1837 {
1838 let mut state = task
1839 .state
1840 .lock()
1841 .map_err(|_| "background task lock poisoned".to_string())?;
1842 if state.metadata.status.is_terminal() {
1843 state.pending_terminal_override = None;
1844 return Ok(task.snapshot_locked(&state, 5 * 1024));
1845 }
1846
1847 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1848 state.metadata =
1849 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1850 if self.task_has_watch_control(&task.task_id) {
1851 state.metadata.completion_delivered = true;
1852 }
1853 state.pending_terminal_override = None;
1854 task.mark_terminal_now();
1855 match &mut state.runtime {
1856 TaskRuntime::Piped(child) => *child = None,
1857 TaskRuntime::Pty(runtime) => *runtime = None,
1858 }
1859 state.detached = true;
1860 state.buffer.enforce_terminal_cap();
1861 self.persist_task(&task.paths, &state.metadata)
1862 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1863 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1864 return Ok(task.snapshot_locked(&state, 5 * 1024));
1865 }
1866
1867 let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
1868 if !was_already_killing {
1869 state.metadata.status = BgTaskStatus::Killing;
1870 self.persist_task(&task.paths, &state.metadata)
1871 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1872 }
1873
1874 #[cfg(unix)]
1875 let pgid = state.metadata.pgid;
1876 #[cfg(windows)]
1877 let child_pid = state.metadata.child_pid;
1878 if !was_already_killing
1879 && state.metadata.mode == BgMode::Pty
1880 && terminal_status == BgTaskStatus::TimedOut
1881 {
1882 state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
1883 }
1884
1885 #[cfg(windows)]
1886 let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
1887
1888 match &mut state.runtime {
1889 TaskRuntime::Piped(child_slot) => {
1890 #[cfg(unix)]
1891 if let Some(pgid) = pgid {
1892 terminate_pgid(pgid, child_slot.as_mut());
1893 }
1894 #[cfg(windows)]
1895 if let Some(child) = child_slot.as_mut() {
1896 super::process::terminate_process(child);
1897 } else if let Some(pid) = child_pid {
1898 terminate_pid(pid);
1899 }
1900 if let Some(child) = child_slot.as_mut() {
1901 let _ = child.wait();
1902 }
1903 *child_slot = None;
1904 state.detached = true;
1905
1906 if !task.paths.exit.exists() {
1907 write_kill_marker_if_absent(&task.paths.exit)
1908 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1909 }
1910
1911 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1912 Some(124)
1913 } else {
1914 None
1915 };
1916 state
1917 .metadata
1918 .mark_terminal(terminal_status, exit_code, None);
1919 if self.task_has_watch_control(&task.task_id) {
1920 state.metadata.completion_delivered = true;
1921 }
1922 state.pending_terminal_override = None;
1923 task.mark_terminal_now();
1924 self.persist_task(&task.paths, &state.metadata)
1925 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1926 state.buffer.enforce_terminal_cap();
1927 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1928 }
1929 TaskRuntime::Pty(Some(pty)) => {
1930 pty.was_killed.store(true, Ordering::SeqCst);
1931 if let Err(error) = pty.killer.kill() {
1932 crate::slog_warn!("[pty-kill] {task_id} ChildKiller::kill failed: {error}");
1933 }
1934 if let Some(pid) = pty.child_pid {
1935 #[cfg(unix)]
1936 terminate_pgid(pid as i32, None);
1937 #[cfg(windows)]
1938 terminate_pid(pid);
1939 }
1940 drop(pty.master.take());
1941
1942 #[cfg(windows)]
1943 {
1944 let default_status = if terminal_status == BgTaskStatus::TimedOut {
1945 BgTaskStatus::TimedOut
1946 } else {
1947 BgTaskStatus::Killed
1948 };
1949 pty_forced_terminal_status = Some(
1950 state
1951 .pending_terminal_override
1952 .take()
1953 .unwrap_or(default_status),
1954 );
1955 }
1956 }
1957 TaskRuntime::Pty(None) => {}
1958 }
1959
1960 #[cfg(windows)]
1961 if let Some(target_status) = pty_forced_terminal_status {
1962 if !task.paths.exit.exists() {
1963 write_kill_marker_if_absent(&task.paths.exit)
1964 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1965 }
1966
1967 let exit_code = if target_status == BgTaskStatus::TimedOut {
1968 Some(124)
1969 } else {
1970 None
1971 };
1972 state.metadata.mark_terminal(target_status, exit_code, None);
1973 if self.task_has_watch_control(&task.task_id) {
1974 state.metadata.completion_delivered = true;
1975 }
1976 state.pending_terminal_override = None;
1977 task.mark_terminal_now();
1978 if let TaskRuntime::Pty(runtime) = &mut state.runtime {
1979 *runtime = None;
1980 }
1981 state.detached = true;
1982 self.persist_task(&task.paths, &state.metadata)
1983 .map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
1984 state.buffer.enforce_terminal_cap();
1985 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1986 }
1987 }
1988
1989 Ok(task.snapshot(5 * 1024))
1990 }
1991
1992 fn finalize_from_marker(
1993 &self,
1994 task: &Arc<BgTask>,
1995 marker: ExitMarker,
1996 reason: Option<String>,
1997 ) -> Result<(), String> {
1998 let watch_controlled = self.task_has_watch_control(&task.task_id);
1999 let mut pty_reader_done = None;
2000 {
2001 let mut state = task
2002 .state
2003 .lock()
2004 .map_err(|_| "background task lock poisoned".to_string())?;
2005 if state.metadata.status.is_terminal() {
2006 state.pending_terminal_override = None;
2007 return Ok(());
2008 }
2009
2010 let pending_override = state.pending_terminal_override.take();
2011 let is_pty = state.metadata.mode == BgMode::Pty;
2012 let updated = self
2013 .update_task_metadata(&task.paths, |metadata| {
2014 let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
2015 let mut metadata = metadata.clone();
2016 let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
2017 let exit_code = if target_status == BgTaskStatus::TimedOut {
2018 Some(124)
2019 } else {
2020 None
2021 };
2022 metadata.mark_terminal(target_status, exit_code, reason);
2023 metadata
2024 } else {
2025 terminal_metadata_from_marker(metadata.clone(), marker, reason)
2026 };
2027 if watch_controlled {
2028 new_metadata.completion_delivered = true;
2029 }
2030 *metadata = new_metadata;
2031 })
2032 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
2033 state.metadata = updated;
2034 task.mark_terminal_now();
2035 match &mut state.runtime {
2036 TaskRuntime::Piped(child) => *child = None,
2037 TaskRuntime::Pty(runtime) => {
2038 pty_reader_done = runtime
2039 .as_ref()
2040 .map(|runtime| Arc::clone(&runtime.reader_done));
2041 *runtime = None;
2042 }
2043 }
2044 state.detached = true;
2045 }
2046
2047 if let Some(reader_done) = pty_reader_done {
2048 let deadline = Instant::now() + Duration::from_millis(200);
2049 while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
2050 std::thread::sleep(Duration::from_millis(10));
2051 }
2052 }
2053
2054 self.scan_task_watch_output(task);
2057
2058 let mut state = task
2059 .state
2060 .lock()
2061 .map_err(|_| "background task lock poisoned".to_string())?;
2062 state.buffer.enforce_terminal_cap();
2063 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
2064 Ok(())
2065 }
2066
2067 fn enqueue_completion_if_needed(
2068 &self,
2069 metadata: &PersistedTask,
2070 paths: Option<&TaskPaths>,
2071 emit_frame: bool,
2072 ) {
2073 if metadata.status.is_terminal() && !metadata.completion_delivered {
2074 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
2075 }
2076 }
2077
2078 fn enqueue_completion_locked(
2079 &self,
2080 metadata: &PersistedTask,
2081 buffer: Option<&BgBuffer>,
2082 emit_frame: bool,
2083 ) {
2084 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
2085 }
2086
2087 fn enqueue_completion_from_parts(
2088 &self,
2089 metadata: &PersistedTask,
2090 buffer: Option<&BgBuffer>,
2091 paths: Option<&TaskPaths>,
2092 emit_frame: bool,
2093 ) {
2094 if !metadata.status.is_terminal() {
2105 return;
2106 }
2107 let (raw_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2112 (String::new(), false)
2113 } else {
2114 match buffer {
2115 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
2116 None => paths
2117 .map(|paths| read_tail_from_disk(metadata, paths, BG_COMPLETION_PREVIEW_BYTES))
2118 .unwrap_or_else(|| (String::new(), false)),
2119 }
2120 };
2121 let output_preview = if metadata.compressed {
2126 self.compress_output(&metadata.command, raw_preview)
2127 } else {
2128 raw_preview
2129 };
2130 let token_counts = self.completion_token_counts(metadata, buffer, paths);
2131 let completion = BgCompletion {
2132 task_id: metadata.task_id.clone(),
2133 session_id: metadata.session_id.clone(),
2134 status: metadata.status.clone(),
2135 exit_code: metadata.exit_code,
2136 command: metadata.command.clone(),
2137 output_preview,
2138 output_truncated,
2139 original_tokens: token_counts.original_tokens,
2140 compressed_tokens: token_counts.compressed_tokens,
2141 tokens_skipped: token_counts.tokens_skipped,
2142 };
2143
2144 self.record_compression_event_if_applicable(metadata, &token_counts);
2155
2156 let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
2157 if watch_controlled {
2158 if emit_frame && !watch_matched {
2159 self.emit_bash_watch_exit(&completion);
2160 }
2161 self.clear_task_watch_state(&metadata.task_id);
2162 return;
2163 }
2164
2165 if metadata.completion_delivered {
2175 return;
2176 }
2177
2178 let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
2181 if completions
2182 .iter()
2183 .any(|existing| existing.task_id == metadata.task_id)
2184 {
2185 false
2186 } else {
2187 completions.push_back(completion.clone());
2188 true
2189 }
2190 } else {
2191 false
2192 };
2193
2194 if pushed && emit_frame {
2195 self.emit_bash_completed(completion);
2196 }
2197 }
2198
2199 fn record_compression_event_if_applicable(
2200 &self,
2201 metadata: &PersistedTask,
2202 token_counts: &CompletionTokenCounts,
2203 ) {
2204 if metadata.mode == BgMode::Pty {
2205 return;
2206 }
2207
2208 let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
2209 token_counts.original_tokens,
2210 token_counts.compressed_tokens,
2211 token_counts.original_bytes,
2212 token_counts.compressed_bytes,
2213 ) {
2214 (
2215 Some(original_tokens),
2216 Some(compressed_tokens),
2217 Some(original_bytes),
2218 Some(compressed_bytes),
2219 ) => (
2220 original_tokens,
2221 compressed_tokens,
2222 original_bytes,
2223 compressed_bytes,
2224 ),
2225 _ => {
2226 crate::slog_warn!(
2227 "compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
2228 metadata.task_id
2229 );
2230 return;
2231 }
2232 };
2233
2234 let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
2235 let Some(pool) = pool else {
2236 crate::slog_warn!(
2237 "compression event skipped for {}: db_pool not initialized — was configure run?",
2238 metadata.task_id
2239 );
2240 return;
2241 };
2242 let harness = self
2243 .inner
2244 .db_harness
2245 .read()
2246 .ok()
2247 .and_then(|slot| slot.clone());
2248 let Some(harness) = harness else {
2249 crate::slog_warn!(
2250 "compression event insert skipped for {}: harness not configured",
2251 metadata.task_id
2252 );
2253 return;
2254 };
2255
2256 let project_root = metadata
2257 .project_root
2258 .as_deref()
2259 .unwrap_or(&metadata.workdir);
2260 let project_key = crate::search_index::project_cache_key(project_root);
2261 let row = crate::db::compression_events::CompressionEventRow {
2262 harness: &harness,
2263 session_id: Some(&metadata.session_id),
2264 project_key: &project_key,
2265 tool: "bash",
2266 task_id: Some(&metadata.task_id),
2267 command: Some(&metadata.command),
2268 compressor: if metadata.compressed {
2269 "registry"
2270 } else {
2271 "none"
2272 },
2273 original_bytes,
2274 compressed_bytes,
2275 original_tokens,
2276 compressed_tokens,
2277 created_at: unix_millis() as i64,
2278 };
2279
2280 let conn = match pool.lock() {
2281 Ok(conn) => conn,
2282 Err(_) => {
2283 crate::slog_warn!(
2284 "compression event insert failed for {}: db mutex poisoned",
2285 metadata.task_id
2286 );
2287 return;
2288 }
2289 };
2290 match crate::db::compression_events::insert_compression_event(&conn, &row) {
2291 Ok(_) => {
2292 crate::slog_debug!(
2296 "compression event recorded for {} (project={}, session={}, {} → {} tokens)",
2297 metadata.task_id,
2298 project_key,
2299 metadata.session_id,
2300 original_tokens,
2301 compressed_tokens
2302 );
2303 }
2304 Err(error) => {
2305 crate::slog_warn!(
2306 "compression event insert failed for {}: {}",
2307 metadata.task_id,
2308 error
2309 );
2310 }
2311 }
2312 }
2313
2314 fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
2315 let Ok(progress_sender) = self
2316 .inner
2317 .progress_sender
2318 .lock()
2319 .map(|sender| sender.clone())
2320 else {
2321 return;
2322 };
2323 if let Some(sender) = progress_sender.as_ref() {
2324 sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
2325 pattern_match.task_id,
2326 session_id.to_string(),
2327 pattern_match.watch_id,
2328 pattern_match.match_text,
2329 pattern_match.match_offset,
2330 pattern_match.context,
2331 pattern_match.once,
2332 )));
2333 }
2334 }
2335
2336 fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
2337 let Ok(progress_sender) = self
2338 .inner
2339 .progress_sender
2340 .lock()
2341 .map(|sender| sender.clone())
2342 else {
2343 return;
2344 };
2345 let Some(sender) = progress_sender.as_ref() else {
2346 return;
2347 };
2348 let status = completion_status_text(&completion.status, completion.exit_code);
2349 let preview = completion.output_preview.trim_end();
2350 let context = if preview.is_empty() {
2351 format!("task {} exited ({status})", completion.task_id)
2352 } else {
2353 format!(
2354 "task {} exited ({status})
2355{preview}",
2356 completion.task_id
2357 )
2358 };
2359 sender(PushFrame::BashPatternMatch(
2360 BashPatternMatchFrame::task_exit(
2361 completion.task_id.clone(),
2362 completion.session_id.clone(),
2363 format!("exited ({status})"),
2364 context,
2365 ),
2366 ));
2367 }
2368
2369 fn emit_bash_completed(&self, completion: BgCompletion) {
2370 let Ok(progress_sender) = self
2371 .inner
2372 .progress_sender
2373 .lock()
2374 .map(|sender| sender.clone())
2375 else {
2376 return;
2377 };
2378 let Some(sender) = progress_sender.as_ref() else {
2379 return;
2380 };
2381 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
2389 completion.task_id,
2390 completion.session_id,
2391 completion.status,
2392 completion.exit_code,
2393 completion.command,
2394 completion.output_preview,
2395 completion.output_truncated,
2396 completion.original_tokens,
2397 completion.compressed_tokens,
2398 completion.tokens_skipped,
2399 )));
2400 }
2401
2402 fn completion_token_counts(
2403 &self,
2404 metadata: &PersistedTask,
2405 buffer: Option<&BgBuffer>,
2406 paths: Option<&TaskPaths>,
2407 ) -> CompletionTokenCounts {
2408 if metadata.mode == BgMode::Pty {
2409 return CompletionTokenCounts::skipped();
2410 }
2411
2412 let raw = match buffer {
2413 Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
2414 None => paths
2415 .map(|paths| {
2416 read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
2417 })
2418 .unwrap_or(TokenCountInput::Skipped),
2419 };
2420
2421 let TokenCountInput::Text(raw_output) = raw else {
2422 return CompletionTokenCounts::skipped();
2423 };
2424
2425 let original_tokens = token_count_u32(&raw_output);
2426 let original_bytes = raw_output.len() as i64;
2427 let compressed_output = if metadata.compressed {
2428 self.compress_output(&metadata.command, raw_output)
2429 } else {
2430 raw_output
2431 };
2432 let compressed_tokens = token_count_u32(&compressed_output);
2433 let compressed_bytes = compressed_output.len() as i64;
2434 CompletionTokenCounts {
2435 original_tokens: Some(original_tokens),
2436 compressed_tokens: Some(compressed_tokens),
2437 original_bytes: Some(original_bytes),
2438 compressed_bytes: Some(compressed_bytes),
2439 tokens_skipped: false,
2440 }
2441 }
2442
2443 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
2444 if !self
2445 .inner
2446 .long_running_reminder_enabled
2447 .load(Ordering::SeqCst)
2448 {
2449 return;
2450 }
2451 let interval_ms = self
2452 .inner
2453 .long_running_reminder_interval_ms
2454 .load(Ordering::SeqCst);
2455 if interval_ms == 0 {
2456 return;
2457 }
2458 let interval = Duration::from_millis(interval_ms);
2459 let now = Instant::now();
2460 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
2461 return;
2462 };
2463 let since = last_reminder_at.unwrap_or(task.started);
2464 if now.duration_since(since) < interval {
2465 return;
2466 }
2467 let command = task
2468 .state
2469 .lock()
2470 .map(|state| state.metadata.command.clone())
2471 .unwrap_or_default();
2472 *last_reminder_at = Some(now);
2473 self.emit_bash_long_running(BashLongRunningFrame::new(
2474 task.task_id.clone(),
2475 task.session_id.clone(),
2476 command,
2477 task.started.elapsed().as_millis() as u64,
2478 ));
2479 }
2480
2481 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
2482 let Ok(progress_sender) = self
2483 .inner
2484 .progress_sender
2485 .lock()
2486 .map(|sender| sender.clone())
2487 else {
2488 return;
2489 };
2490 if let Some(sender) = progress_sender.as_ref() {
2491 sender(PushFrame::BashLongRunning(frame));
2492 }
2493 }
2494
2495 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
2496 self.inner
2497 .tasks
2498 .lock()
2499 .ok()
2500 .and_then(|tasks| tasks.get(task_id).cloned())
2501 }
2502
2503 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
2504 self.task(task_id)
2505 .filter(|task| task.session_id == session_id)
2506 }
2507
2508 fn running_count(&self) -> usize {
2509 self.inner
2510 .tasks
2511 .lock()
2512 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
2513 .unwrap_or(0)
2514 }
2515
2516 fn start_watchdog(&self) {
2517 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
2518 super::watchdog::start(self.clone());
2519 }
2520 }
2521
2522 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
2523 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
2524 }
2525
2526 #[cfg(test)]
2527 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2528 self.task_for_session(task_id, session_id)
2529 .map(|task| task.paths.json.clone())
2530 }
2531
2532 #[cfg(test)]
2533 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
2534 self.task_for_session(task_id, session_id)
2535 .map(|task| task.paths.exit.clone())
2536 }
2537
2538 fn generate_unique_task_id(&self) -> Result<String, String> {
2540 for _ in 0..32 {
2541 let candidate = random_slug();
2542 let tasks = self
2543 .inner
2544 .tasks
2545 .lock()
2546 .map_err(|_| "background task registry lock poisoned".to_string())?;
2547 if tasks.contains_key(&candidate) {
2548 continue;
2549 }
2550 let completions = self
2551 .inner
2552 .completions
2553 .lock()
2554 .map_err(|_| "background completions lock poisoned".to_string())?;
2555 if completions
2556 .iter()
2557 .any(|completion| completion.task_id == candidate)
2558 {
2559 continue;
2560 }
2561 return Ok(candidate);
2562 }
2563 Err("failed to allocate unique background task id after 32 attempts".to_string())
2564 }
2565}
2566
2567struct CompletionTokenCounts {
2568 original_tokens: Option<u32>,
2569 compressed_tokens: Option<u32>,
2570 original_bytes: Option<i64>,
2571 compressed_bytes: Option<i64>,
2572 tokens_skipped: bool,
2573}
2574
2575impl CompletionTokenCounts {
2576 fn skipped() -> Self {
2577 Self {
2578 original_tokens: None,
2579 compressed_tokens: None,
2580 original_bytes: None,
2581 compressed_bytes: None,
2582 tokens_skipped: true,
2583 }
2584 }
2585}
2586
2587fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
2588 match status {
2589 BgTaskStatus::TimedOut => "timed out".to_string(),
2590 BgTaskStatus::Killed => "killed".to_string(),
2591 _ => exit_code
2592 .map(|code| format!("exit {code}"))
2593 .unwrap_or_else(|| format!("{status:?}").to_lowercase()),
2594 }
2595}
2596
2597fn token_count_u32(text: &str) -> u32 {
2598 aft_tokenizer::count_tokens(text)
2599 .try_into()
2600 .unwrap_or(u32::MAX)
2601}
2602
2603impl Default for BgTaskRegistry {
2604 fn default() -> Self {
2605 Self::new(Arc::new(Mutex::new(None)))
2606 }
2607}
2608
2609fn modified_within(path: &Path, grace: Duration) -> bool {
2610 fs::metadata(path)
2611 .and_then(|metadata| metadata.modified())
2612 .ok()
2613 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
2614 .map(|age| age < grace)
2615 .unwrap_or(false)
2616}
2617
2618fn canonicalized_path(path: &Path) -> PathBuf {
2619 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
2620}
2621
2622fn started_instant_from_unix_millis(started_at: u64) -> Instant {
2623 let now_ms = SystemTime::now()
2624 .duration_since(UNIX_EPOCH)
2625 .ok()
2626 .map(|duration| duration.as_millis() as u64)
2627 .unwrap_or(started_at);
2628 let elapsed_ms = now_ms.saturating_sub(started_at);
2629 Instant::now()
2630 .checked_sub(Duration::from_millis(elapsed_ms))
2631 .unwrap_or_else(Instant::now)
2632}
2633
2634fn gc_quarantine(storage_dir: &Path) {
2635 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
2636 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
2637 return;
2638 };
2639 for session_entry in session_dirs.flatten() {
2640 let session_quarantine_dir = session_entry.path();
2641 if !session_quarantine_dir.is_dir() {
2642 continue;
2643 }
2644 let entries = match fs::read_dir(&session_quarantine_dir) {
2645 Ok(entries) => entries,
2646 Err(error) => {
2647 crate::slog_warn!(
2648 "failed to read background task quarantine dir {}: {error}",
2649 session_quarantine_dir.display()
2650 );
2651 continue;
2652 }
2653 };
2654 for entry in entries.flatten() {
2655 let path = entry.path();
2656 if modified_within(&path, QUARANTINE_GC_GRACE) {
2657 continue;
2658 }
2659 let result = if path.is_dir() {
2660 fs::remove_dir_all(&path)
2661 } else {
2662 fs::remove_file(&path)
2663 };
2664 match result {
2665 Ok(()) => log::debug!(
2666 "deleted old background task quarantine entry {}",
2667 path.display()
2668 ),
2669 Err(error) => crate::slog_warn!(
2670 "failed to delete old background task quarantine entry {}: {error}",
2671 path.display()
2672 ),
2673 }
2674 }
2675 let _ = fs::remove_dir(&session_quarantine_dir);
2676 }
2677 let _ = fs::remove_dir(&quarantine_root);
2678}
2679
2680enum QuarantineKind {
2681 Corrupt,
2682 Invalid,
2683}
2684
2685fn quarantine_task_json(
2686 storage_dir: &Path,
2687 session_dir: &Path,
2688 json_path: &Path,
2689 kind: QuarantineKind,
2690) -> Result<(), String> {
2691 let session_hash = session_dir
2692 .file_name()
2693 .and_then(|name| name.to_str())
2694 .ok_or_else(|| {
2695 format!(
2696 "invalid background task session dir: {}",
2697 session_dir.display()
2698 )
2699 })?;
2700 let task_name = json_path
2701 .file_name()
2702 .and_then(|name| name.to_str())
2703 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
2704 let unix_ts = SystemTime::now()
2705 .duration_since(UNIX_EPOCH)
2706 .map(|duration| duration.as_secs())
2707 .unwrap_or(0);
2708 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
2709 fs::create_dir_all(&quarantine_dir).map_err(|e| {
2710 format!(
2711 "failed to create background task quarantine dir {}: {e}",
2712 quarantine_dir.display()
2713 )
2714 })?;
2715 let target_name = quarantine_name(task_name, unix_ts, &kind);
2716 let target = quarantine_dir.join(target_name);
2717 fs::rename(json_path, &target).map_err(|e| {
2718 format!(
2719 "failed to quarantine background task metadata {} to {}: {e}",
2720 json_path.display(),
2721 target.display()
2722 )
2723 })?;
2724
2725 for sibling in task_sibling_paths(json_path) {
2726 if !sibling.exists() {
2727 continue;
2728 }
2729 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
2730 crate::slog_warn!(
2731 "skipping background task sibling with invalid name during quarantine: {}",
2732 sibling.display()
2733 );
2734 continue;
2735 };
2736 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
2737 if let Err(error) = fs::rename(&sibling, &sibling_target) {
2738 crate::slog_warn!(
2739 "failed to quarantine background task sibling {} to {}: {error}",
2740 sibling.display(),
2741 sibling_target.display()
2742 );
2743 }
2744 }
2745
2746 let _ = fs::remove_dir(session_dir);
2747 Ok(())
2748}
2749
2750fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
2751 match kind {
2752 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
2753 QuarantineKind::Invalid => {
2754 let path = Path::new(file_name);
2755 let stem = path.file_stem().and_then(|stem| stem.to_str());
2756 let extension = path.extension().and_then(|extension| extension.to_str());
2757 match (stem, extension) {
2758 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
2759 _ => format!("{file_name}.invalid.{unix_ts}"),
2760 }
2761 }
2762 }
2763}
2764
2765fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
2766 let Some(parent) = json_path.parent() else {
2767 return Vec::new();
2768 };
2769 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
2770 return Vec::new();
2771 };
2772 ["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
2773 .into_iter()
2774 .map(|extension| parent.join(format!("{stem}.{extension}")))
2775 .collect()
2776}
2777
2778fn read_tail_from_disk(
2779 metadata: &PersistedTask,
2780 paths: &TaskPaths,
2781 max_bytes: usize,
2782) -> (String, bool) {
2783 if metadata.mode == BgMode::Pty {
2784 return read_file_tail_capped(&paths.pty, max_bytes)
2785 .map(|bytes| {
2786 let truncated = fs::metadata(&paths.pty)
2787 .map(|metadata| metadata.len() > max_bytes as u64)
2788 .unwrap_or(false);
2789 (String::from_utf8_lossy(&bytes).into_owned(), truncated)
2790 })
2791 .unwrap_or_else(|_| (String::new(), false));
2792 }
2793 let stdout = fs::read(&paths.stdout).unwrap_or_default();
2794 let stderr = fs::read(&paths.stderr).unwrap_or_default();
2795 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
2796 bytes.extend_from_slice(&stdout);
2797 bytes.extend_from_slice(&stderr);
2798 if bytes.len() <= max_bytes {
2799 return (String::from_utf8_lossy(&bytes).into_owned(), false);
2800 }
2801 let start = bytes.len().saturating_sub(max_bytes);
2802 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
2803}
2804
2805fn read_for_token_count_from_disk(
2806 metadata: &PersistedTask,
2807 paths: &TaskPaths,
2808 max_bytes_per_stream: usize,
2809) -> TokenCountInput {
2810 if metadata.mode == BgMode::Pty {
2811 return TokenCountInput::Skipped;
2812 }
2813 let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
2820 let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
2821 match (stdout, stderr) {
2822 (Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2823 String::from_utf8_lossy(&stdout).as_ref(),
2824 String::from_utf8_lossy(&stderr).as_ref(),
2825 )),
2826 (Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
2827 String::from_utf8_lossy(&stdout).as_ref(),
2828 "",
2829 )),
2830 (Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
2831 "",
2832 String::from_utf8_lossy(&stderr).as_ref(),
2833 )),
2834 (Err(_), Err(_)) => TokenCountInput::Skipped,
2835 }
2836}
2837
2838fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
2843 use std::io::{Read, Seek, SeekFrom};
2844 let mut file = std::fs::File::open(path)?;
2845 let len = file.metadata()?.len();
2846 let read_len = len.min(max_bytes as u64);
2847 if read_len > 0 && len > max_bytes as u64 {
2848 file.seek(SeekFrom::End(-(read_len as i64)))?;
2849 }
2850 let mut bytes = Vec::with_capacity(read_len as usize);
2851 file.read_to_end(&mut bytes)?;
2852 Ok(bytes)
2853}
2854
2855impl BgTask {
2856 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
2857 let state = self
2858 .state
2859 .lock()
2860 .unwrap_or_else(|poison| poison.into_inner());
2861 self.snapshot_locked(&state, preview_bytes)
2862 }
2863
2864 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
2865 let metadata = &state.metadata;
2866 let duration_ms = metadata.duration_ms.or_else(|| {
2867 metadata
2868 .status
2869 .is_terminal()
2870 .then(|| self.started.elapsed().as_millis() as u64)
2871 });
2872 let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
2873 (String::new(), false)
2874 } else {
2875 state.buffer.read_tail(preview_bytes)
2876 };
2877 BgTaskSnapshot {
2878 info: BgTaskInfo {
2879 task_id: self.task_id.clone(),
2880 status: metadata.status.clone(),
2881 command: metadata.command.clone(),
2882 mode: metadata.mode.clone(),
2883 started_at: metadata.started_at,
2884 duration_ms,
2885 },
2886 exit_code: metadata.exit_code,
2887 child_pid: metadata.child_pid,
2888 workdir: metadata.workdir.display().to_string(),
2889 output_preview,
2890 output_truncated,
2891 output_path: state
2892 .buffer
2893 .output_path()
2894 .map(|path| path.display().to_string()),
2895 stderr_path: state
2896 .buffer
2897 .stderr_path()
2898 .map(|path| path.display().to_string()),
2899 pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
2900 pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
2901 }
2902 }
2903
2904 pub(crate) fn is_running(&self) -> bool {
2905 self.state
2906 .lock()
2907 .map(|state| {
2908 state.metadata.status == BgTaskStatus::Running
2909 || (state.metadata.mode == BgMode::Pty
2910 && state.metadata.status == BgTaskStatus::Killing)
2911 })
2912 .unwrap_or(false)
2913 }
2914
2915 fn is_terminal(&self) -> bool {
2916 self.state
2917 .lock()
2918 .map(|state| state.metadata.status.is_terminal())
2919 .unwrap_or(false)
2920 }
2921
2922 fn mark_terminal_now(&self) {
2923 if let Ok(mut terminal_at) = self.terminal_at.lock() {
2924 if terminal_at.is_none() {
2925 *terminal_at = Some(Instant::now());
2926 }
2927 }
2928 }
2929
2930 fn set_completion_delivered(
2931 &self,
2932 delivered: bool,
2933 registry: &BgTaskRegistry,
2934 ) -> Result<(), String> {
2935 let mut state = self
2936 .state
2937 .lock()
2938 .map_err(|_| "background task lock poisoned".to_string())?;
2939 let updated = registry
2940 .update_task_metadata(&self.paths, |metadata| {
2941 metadata.completion_delivered = delivered;
2942 })
2943 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
2944 state.metadata = updated;
2945 Ok(())
2946 }
2947}
2948
2949fn terminal_metadata_from_marker(
2950 mut metadata: PersistedTask,
2951 marker: ExitMarker,
2952 reason: Option<String>,
2953) -> PersistedTask {
2954 match marker {
2955 ExitMarker::Code(code) => {
2956 let status = if code == 0 {
2957 BgTaskStatus::Completed
2958 } else {
2959 BgTaskStatus::Failed
2960 };
2961 metadata.mark_terminal(status, Some(code), reason);
2962 }
2963 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
2964 }
2965 metadata
2966}
2967
2968#[cfg(unix)]
2969fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
2970 let shell = resolve_posix_shell();
2971 let mut cmd = Command::new(&shell);
2972 cmd.arg("-c")
2973 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
2974 .arg(&shell)
2975 .arg(command)
2976 .arg(exit_path);
2977 unsafe {
2978 cmd.pre_exec(|| {
2979 if libc::setsid() == -1 {
2980 return Err(std::io::Error::last_os_error());
2981 }
2982 Ok(())
2983 });
2984 }
2985 cmd
2986}
2987
2988#[cfg(unix)]
2989fn resolve_posix_shell() -> PathBuf {
2990 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
2991 POSIX_SHELL
2992 .get_or_init(|| {
2993 std::env::var_os("BASH")
2994 .filter(|value| !value.is_empty())
2995 .map(PathBuf::from)
2996 .filter(|path| path.exists())
2997 .or_else(|| which::which("bash").ok())
2998 .or_else(|| which::which("zsh").ok())
2999 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
3000 })
3001 .clone()
3002}
3003
3004#[cfg(windows)]
3005fn detached_shell_command_for(
3006 shell: crate::windows_shell::WindowsShell,
3007 command: &str,
3008 exit_path: &Path,
3009 paths: &TaskPaths,
3010 creation_flags: u32,
3011) -> Result<Command, String> {
3012 use crate::windows_shell::WindowsShell;
3013 let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
3026 let wrapper_ext = match shell {
3027 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
3028 WindowsShell::Cmd => "bat",
3029 WindowsShell::Posix(_) => "sh",
3033 };
3034 let wrapper_path = paths.dir.join(format!(
3035 "{}.{}",
3036 paths
3037 .json
3038 .file_stem()
3039 .and_then(|s| s.to_str())
3040 .unwrap_or("wrapper"),
3041 wrapper_ext
3042 ));
3043 fs::write(&wrapper_path, wrapper_body)
3044 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
3045
3046 let mut cmd = Command::new(shell.binary().as_ref());
3047 match shell {
3048 WindowsShell::Pwsh | WindowsShell::Powershell => {
3049 cmd.args([
3052 "-NoLogo",
3053 "-NoProfile",
3054 "-NonInteractive",
3055 "-ExecutionPolicy",
3056 "Bypass",
3057 "-File",
3058 ]);
3059 cmd.arg(&wrapper_path);
3060 }
3061 WindowsShell::Cmd => {
3062 cmd.args(["/D", "/C"]);
3069 cmd.arg(&wrapper_path);
3070 }
3071 WindowsShell::Posix(_) => {
3072 cmd.arg(&wrapper_path);
3077 }
3078 }
3079
3080 cmd.creation_flags(creation_flags);
3084 Ok(cmd)
3085}
3086
3087fn spawn_detached_child(
3103 command: &str,
3104 paths: &TaskPaths,
3105 workdir: &Path,
3106 env: &HashMap<String, String>,
3107) -> Result<std::process::Child, String> {
3108 #[cfg(not(windows))]
3109 {
3110 let stdout = create_capture_file(&paths.stdout)
3111 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3112 let stderr = create_capture_file(&paths.stderr)
3113 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3114 detached_shell_command(command, &paths.exit)
3115 .current_dir(workdir)
3116 .envs(env)
3117 .stdin(Stdio::null())
3118 .stdout(Stdio::from(stdout))
3119 .stderr(Stdio::from(stderr))
3120 .spawn()
3121 .map_err(|e| format!("failed to spawn background bash command: {e}"))
3122 }
3123 #[cfg(windows)]
3124 {
3125 use crate::windows_shell::shell_candidates;
3126 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
3137 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
3158 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
3159 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
3160 let with_breakaway =
3161 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
3162 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
3163 let mut last_error: Option<String> = None;
3164 for (idx, shell) in candidates.iter().enumerate() {
3165 for &flags in &[with_breakaway, without_breakaway] {
3169 let stdout = create_capture_file(&paths.stdout)
3171 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
3172 let stderr = create_capture_file(&paths.stderr)
3173 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
3174 let mut cmd =
3175 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
3176 cmd.current_dir(workdir)
3177 .envs(env)
3178 .stdin(Stdio::null())
3179 .stdout(Stdio::from(stdout))
3180 .stderr(Stdio::from(stderr));
3181 match cmd.spawn() {
3182 Ok(child) => {
3183 if idx > 0 {
3184 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
3185 the cached PATH probe disagreed with runtime spawn — likely PATH \
3186 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
3187 shell.binary(),
3188 idx);
3189 }
3190 if flags == without_breakaway {
3191 crate::slog_warn!(
3192 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
3193 (likely a restrictive Job Object — CI sandbox or MDM policy). \
3194 Spawned without breakaway; the bg task will be torn down if the \
3195 AFT process group is killed."
3196 );
3197 }
3198 return Ok(child);
3199 }
3200 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
3201 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
3202 shell.binary());
3203 last_error = Some(format!("{}: {e}", shell.binary()));
3204 break;
3207 }
3208 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
3209 crate::slog_warn!(
3211 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
3212 Access Denied — retrying {} without breakaway",
3213 shell.binary()
3214 );
3215 last_error = Some(format!("{}: {e}", shell.binary()));
3216 continue;
3217 }
3218 Err(e) => {
3219 return Err(format!(
3220 "failed to spawn background bash command via {}: {e}",
3221 shell.binary()
3222 ));
3223 }
3224 }
3225 }
3226 }
3227 Err(format!(
3228 "failed to spawn background bash command: no Windows shell could be spawned. \
3229 Last error: {}. PATH-probed candidates: {:?}",
3230 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
3231 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
3232 ))
3233 }
3234}
3235
3236fn random_slug() -> String {
3237 let mut bytes = [0u8; 4];
3238 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
3240 let t = SystemTime::now()
3242 .duration_since(UNIX_EPOCH)
3243 .map(|d| d.subsec_nanos())
3244 .unwrap_or(0);
3245 let p = std::process::id();
3246 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
3247 });
3248 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
3250 format!("bash-{hex}")
3251}
3252
3253#[cfg(test)]
3254mod tests {
3255 use std::collections::HashMap;
3256 #[cfg(windows)]
3257 use std::fs;
3258 use std::sync::{Arc, Mutex};
3259 use std::time::Duration;
3260 #[cfg(windows)]
3261 use std::time::Instant;
3262
3263 use super::*;
3264
3265 #[cfg(unix)]
3266 const QUICK_SUCCESS_COMMAND: &str = "true";
3267 #[cfg(windows)]
3268 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
3269
3270 #[cfg(unix)]
3271 const LONG_RUNNING_COMMAND: &str = "sleep 5";
3272 #[cfg(windows)]
3273 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
3274
3275 #[test]
3276 fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
3277 let registry = BgTaskRegistry::default();
3278 let dir = tempfile::tempdir().unwrap();
3279 let task_id = registry
3280 .spawn_pty(
3281 QUICK_SUCCESS_COMMAND,
3282 "session".to_string(),
3283 dir.path().to_path_buf(),
3284 HashMap::new(),
3285 Some(Duration::from_secs(30)),
3286 dir.path().to_path_buf(),
3287 10,
3288 true,
3289 false,
3290 Some(dir.path().to_path_buf()),
3291 50,
3292 120,
3293 )
3294 .unwrap();
3295
3296 let paths = task_paths(dir.path(), "session", &task_id);
3297 let metadata = read_task(&paths.json).unwrap();
3298 assert_eq!(
3299 metadata.schema_version,
3300 crate::bash_background::persistence::SCHEMA_VERSION
3301 );
3302 assert_eq!(metadata.mode, BgMode::Pty);
3303 assert_eq!(metadata.pty_rows, Some(50));
3304 assert_eq!(metadata.pty_cols, Some(120));
3305
3306 let snapshot = registry
3307 .status(&task_id, "session", None, Some(dir.path()), 1024)
3308 .unwrap();
3309 assert_eq!(snapshot.pty_rows, Some(50));
3310 assert_eq!(snapshot.pty_cols, Some(120));
3311 }
3312
3313 fn spawn_dead_child() -> std::process::Child {
3318 #[cfg(unix)]
3319 let mut cmd = std::process::Command::new("true");
3320 #[cfg(windows)]
3321 let mut cmd = {
3322 let mut c = std::process::Command::new("cmd");
3323 c.args(["/c", "exit", "0"]);
3324 c
3325 };
3326 cmd.stdin(std::process::Stdio::null());
3327 cmd.stdout(std::process::Stdio::null());
3328 cmd.stderr(std::process::Stdio::null());
3329 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
3330 let started = Instant::now();
3339 loop {
3340 match child.try_wait() {
3341 Ok(Some(_)) => break,
3342 Ok(None) => {
3343 if started.elapsed() > Duration::from_secs(5) {
3344 panic!("dead-child stand-in did not exit within 5s");
3345 }
3346 std::thread::sleep(Duration::from_millis(10));
3347 }
3348 Err(error) => panic!("dead-child try_wait failed: {error}"),
3349 }
3350 }
3351 child
3352 }
3353
3354 #[test]
3355 fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
3356 let registry = BgTaskRegistry::default();
3357 let dir = tempfile::tempdir().unwrap();
3358 let task_id = registry
3359 .spawn(
3360 LONG_RUNNING_COMMAND,
3361 "session".to_string(),
3362 dir.path().to_path_buf(),
3363 HashMap::new(),
3364 Some(Duration::from_secs(30)),
3365 dir.path().to_path_buf(),
3366 10,
3367 true,
3368 false,
3369 Some(dir.path().to_path_buf()),
3370 )
3371 .unwrap();
3372 registry
3373 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3374 .unwrap();
3375 assert_eq!(
3376 registry
3377 .drain_completions_for_session(Some("session"))
3378 .len(),
3379 1
3380 );
3381
3382 registry.inner.completions.lock().unwrap().clear();
3385
3386 assert_eq!(
3387 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3388 vec![task_id.clone()]
3389 );
3390 assert!(registry
3391 .drain_completions_for_session(Some("session"))
3392 .is_empty());
3393
3394 let paths = task_paths(dir.path(), "session", &task_id);
3395 let metadata = read_task(&paths.json).unwrap();
3396 assert!(metadata.completion_delivered);
3397
3398 let replayed = BgTaskRegistry::default();
3399 replayed
3400 .replay_session_inner(dir.path(), "session", None)
3401 .unwrap();
3402 assert!(replayed
3403 .drain_completions_for_session(Some("session"))
3404 .is_empty());
3405 }
3406
3407 #[test]
3408 fn register_watch_rejects_unknown_task() {
3409 let registry = BgTaskRegistry::default();
3410
3411 let result = registry.register_watch(
3412 "missing-task".to_string(),
3413 WatchPattern::Substring("READY".into()),
3414 true,
3415 );
3416
3417 assert_eq!(result, Err("task_not_found"));
3418 }
3419
3420 #[test]
3421 fn register_watch_on_terminal_task_scans_existing_output() {
3422 let frames = Arc::new(Mutex::new(Vec::new()));
3423 let captured = Arc::clone(&frames);
3424 let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
3425 captured.lock().unwrap().push(frame);
3426 })
3427 as Box<dyn Fn(PushFrame) + Send + Sync>);
3428 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
3429 let dir = tempfile::tempdir().unwrap();
3430 let task_id = registry
3431 .spawn(
3432 LONG_RUNNING_COMMAND,
3433 "session".to_string(),
3434 dir.path().to_path_buf(),
3435 HashMap::new(),
3436 Some(Duration::from_secs(30)),
3437 dir.path().to_path_buf(),
3438 10,
3439 true,
3440 false,
3441 Some(dir.path().to_path_buf()),
3442 )
3443 .unwrap();
3444 registry
3445 .inner
3446 .shutdown
3447 .store(true, std::sync::atomic::Ordering::SeqCst);
3448 let task = registry.task_for_session(&task_id, "session").unwrap();
3449 std::fs::write(&task.paths.stdout, "READY\n").unwrap();
3450 registry
3451 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3452 .unwrap();
3453 frames.lock().unwrap().clear();
3454 registry.inner.completions.lock().unwrap().clear();
3455
3456 registry
3457 .register_watch(
3458 task_id.clone(),
3459 WatchPattern::Substring("READY".into()),
3460 true,
3461 )
3462 .unwrap();
3463
3464 let frames = frames.lock().unwrap();
3465 let frame = frames
3466 .iter()
3467 .find_map(|frame| match frame {
3468 PushFrame::BashPatternMatch(frame) => Some(frame),
3469 _ => None,
3470 })
3471 .expect("terminal watch registration should emit pattern frame");
3472 assert_eq!(frame.reason, "pattern_match");
3473 assert_eq!(frame.task_id, task_id);
3474 assert_eq!(frame.session_id, "session");
3475 assert_eq!(frame.match_text, "READY");
3476 assert_eq!(frame.match_offset, 0);
3477 assert_eq!(registry.active_watch_count(&frame.task_id), 0);
3478 let metadata = read_task(&task.paths.json).unwrap();
3479 assert!(metadata.completion_delivered);
3480 }
3481
3482 #[test]
3483 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
3484 let registry = BgTaskRegistry::default();
3485 let dir = tempfile::tempdir().unwrap();
3486 let task_id = registry
3487 .spawn(
3488 QUICK_SUCCESS_COMMAND,
3489 "session".to_string(),
3490 dir.path().to_path_buf(),
3491 HashMap::new(),
3492 Some(Duration::from_secs(30)),
3493 dir.path().to_path_buf(),
3494 10,
3495 true,
3496 false,
3497 Some(dir.path().to_path_buf()),
3498 )
3499 .unwrap();
3500 registry
3501 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3502 .unwrap();
3503 let completions = registry.drain_completions_for_session(Some("session"));
3504 assert_eq!(completions.len(), 1);
3505 assert_eq!(
3506 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
3507 vec![task_id.clone()]
3508 );
3509
3510 registry.cleanup_finished(Duration::ZERO);
3511
3512 assert!(registry.inner.tasks.lock().unwrap().is_empty());
3513 }
3514
3515 #[test]
3516 fn cleanup_finished_retains_undelivered_terminals() {
3517 let registry = BgTaskRegistry::default();
3518 let dir = tempfile::tempdir().unwrap();
3519 let task_id = registry
3520 .spawn(
3521 QUICK_SUCCESS_COMMAND,
3522 "session".to_string(),
3523 dir.path().to_path_buf(),
3524 HashMap::new(),
3525 Some(Duration::from_secs(30)),
3526 dir.path().to_path_buf(),
3527 10,
3528 true,
3529 false,
3530 Some(dir.path().to_path_buf()),
3531 )
3532 .unwrap();
3533 registry
3534 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
3535 .unwrap();
3536
3537 registry.cleanup_finished(Duration::ZERO);
3538
3539 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3540 }
3541
3542 #[test]
3550 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
3551 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3552 let dir = tempfile::tempdir().unwrap();
3553 let task_id = registry
3554 .spawn(
3555 QUICK_SUCCESS_COMMAND,
3556 "session".to_string(),
3557 dir.path().to_path_buf(),
3558 HashMap::new(),
3559 Some(Duration::from_secs(30)),
3560 dir.path().to_path_buf(),
3561 10,
3562 true,
3563 false,
3564 Some(dir.path().to_path_buf()),
3565 )
3566 .unwrap();
3567
3568 let task = registry.task_for_session(&task_id, "session").unwrap();
3569
3570 let started = Instant::now();
3575 loop {
3576 let exited = {
3577 let mut state = task.state.lock().unwrap();
3578 match &mut state.runtime {
3579 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3580 _ => true,
3581 }
3582 };
3583 if exited {
3584 break;
3585 }
3586 assert!(
3587 started.elapsed() < Duration::from_secs(5),
3588 "child should exit quickly"
3589 );
3590 std::thread::sleep(Duration::from_millis(20));
3591 }
3592
3593 registry
3601 .inner
3602 .shutdown
3603 .store(true, std::sync::atomic::Ordering::SeqCst);
3604 std::thread::sleep(Duration::from_millis(550));
3608
3609 let _ = std::fs::remove_file(&task.paths.exit);
3612
3613 {
3628 let mut state = task.state.lock().unwrap();
3629 state.metadata.status = BgTaskStatus::Running;
3630 state.metadata.status_reason = None;
3631 state.metadata.exit_code = None;
3632 state.metadata.finished_at = None;
3633 state.metadata.duration_ms = None;
3634 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
3637 .expect("persist reset Running metadata for reap_child test");
3638 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3642 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3643 }
3644 }
3645 *task.terminal_at.lock().unwrap() = None;
3648
3649 assert!(
3652 task.is_running(),
3653 "precondition: metadata.status == Running"
3654 );
3655 assert!(
3656 !task.paths.exit.exists(),
3657 "precondition: exit marker absent"
3658 );
3659
3660 registry.reap_child(&task);
3665
3666 {
3667 let state = task.state.lock().unwrap();
3668 assert_eq!(
3669 state.metadata.status,
3670 BgTaskStatus::Running,
3671 "first reap must leave status Running while waiting one pass for marker"
3672 );
3673 assert_eq!(
3674 state.metadata.status_reason, None,
3675 "first reap must not record a failure reason"
3676 );
3677 assert!(
3678 matches!(state.runtime, TaskRuntime::Piped(None)),
3679 "child handle must be released after first reap"
3680 );
3681 assert!(
3682 state.detached,
3683 "task must be marked detached after first reap"
3684 );
3685 }
3686
3687 registry.reap_child(&task);
3691
3692 let state = task.state.lock().unwrap();
3693 assert!(
3694 state.metadata.status.is_terminal(),
3695 "second reap must transition to terminal when PID dead and no marker. Got status={:?}",
3696 state.metadata.status
3697 );
3698 assert_eq!(
3699 state.metadata.status,
3700 BgTaskStatus::Failed,
3701 "must specifically be Failed (not Killed): status={:?}",
3702 state.metadata.status
3703 );
3704 assert_eq!(
3705 state.metadata.status_reason.as_deref(),
3706 Some("process exited without exit marker"),
3707 "reason must match replay path's wording: {:?}",
3708 state.metadata.status_reason
3709 );
3710 assert!(
3711 matches!(state.runtime, TaskRuntime::Piped(None)),
3712 "child handle must stay released after second reap"
3713 );
3714 assert!(
3715 state.detached,
3716 "task must remain detached after second reap"
3717 );
3718 }
3719
3720 #[test]
3725 fn reap_child_preserves_running_when_exit_marker_exists() {
3726 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3727 let dir = tempfile::tempdir().unwrap();
3728 let task_id = registry
3729 .spawn(
3730 QUICK_SUCCESS_COMMAND,
3731 "session".to_string(),
3732 dir.path().to_path_buf(),
3733 HashMap::new(),
3734 Some(Duration::from_secs(30)),
3735 dir.path().to_path_buf(),
3736 10,
3737 true,
3738 false,
3739 Some(dir.path().to_path_buf()),
3740 )
3741 .unwrap();
3742
3743 let task = registry.task_for_session(&task_id, "session").unwrap();
3744
3745 let started = Instant::now();
3748 loop {
3749 let exited = {
3750 let mut state = task.state.lock().unwrap();
3751 match &mut state.runtime {
3752 TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
3753 _ => true,
3754 }
3755 };
3756 if exited && task.paths.exit.exists() {
3757 break;
3758 }
3759 assert!(
3760 started.elapsed() < Duration::from_secs(5),
3761 "child should exit and write marker quickly"
3762 );
3763 std::thread::sleep(Duration::from_millis(20));
3764 }
3765
3766 registry
3772 .inner
3773 .shutdown
3774 .store(true, std::sync::atomic::Ordering::SeqCst);
3775 std::thread::sleep(Duration::from_millis(550));
3776
3777 {
3783 let mut state = task.state.lock().unwrap();
3784 state.metadata.status = BgTaskStatus::Running;
3785 state.metadata.status_reason = None;
3786 if matches!(state.runtime, TaskRuntime::Piped(None)) {
3787 state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
3788 }
3789 }
3790 *task.terminal_at.lock().unwrap() = None;
3791 if !task.paths.exit.exists() {
3794 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
3795 }
3796
3797 registry.reap_child(&task);
3801
3802 let state = task.state.lock().unwrap();
3803 assert!(
3804 matches!(state.runtime, TaskRuntime::Piped(None)),
3805 "child handle still released even when marker exists"
3806 );
3807 assert!(
3808 state.detached,
3809 "task still marked detached even when marker exists"
3810 );
3811 assert_eq!(
3816 state.metadata.status,
3817 BgTaskStatus::Running,
3818 "reap_child must defer to poll_task when marker exists"
3819 );
3820 }
3821
3822 #[test]
3823 fn cleanup_finished_keeps_running_tasks() {
3824 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3825 let dir = tempfile::tempdir().unwrap();
3826 let task_id = registry
3827 .spawn(
3828 LONG_RUNNING_COMMAND,
3829 "session".to_string(),
3830 dir.path().to_path_buf(),
3831 HashMap::new(),
3832 Some(Duration::from_secs(30)),
3833 dir.path().to_path_buf(),
3834 10,
3835 true,
3836 false,
3837 Some(dir.path().to_path_buf()),
3838 )
3839 .unwrap();
3840
3841 registry.cleanup_finished(Duration::ZERO);
3842
3843 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
3844 let _ = registry.kill(&task_id, "session");
3845 }
3846
3847 #[cfg(windows)]
3848 fn wait_for_file(path: &Path) -> String {
3849 let started = Instant::now();
3850 loop {
3851 if path.exists() {
3852 return fs::read_to_string(path).expect("read file");
3853 }
3854 assert!(
3855 started.elapsed() < Duration::from_secs(30),
3856 "timed out waiting for {}",
3857 path.display()
3858 );
3859 std::thread::sleep(Duration::from_millis(100));
3860 }
3861 }
3862
3863 #[cfg(windows)]
3864 fn spawn_windows_registry_command(
3865 command: &str,
3866 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
3867 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
3868 let dir = tempfile::tempdir().unwrap();
3869 let task_id = registry
3870 .spawn(
3871 command,
3872 "session".to_string(),
3873 dir.path().to_path_buf(),
3874 HashMap::new(),
3875 Some(Duration::from_secs(30)),
3876 dir.path().to_path_buf(),
3877 10,
3878 false,
3879 false,
3880 Some(dir.path().to_path_buf()),
3881 )
3882 .unwrap();
3883 (registry, dir, task_id)
3884 }
3885
3886 #[cfg(windows)]
3887 #[test]
3888 fn windows_spawn_writes_exit_marker_for_zero_exit() {
3889 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
3890 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3891
3892 let content = wait_for_file(&exit_path);
3893
3894 assert_eq!(content.trim(), "0");
3895 }
3896
3897 #[cfg(windows)]
3898 #[test]
3899 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
3900 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
3901 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
3902
3903 let content = wait_for_file(&exit_path);
3904
3905 assert_eq!(content.trim(), "42");
3906 }
3907
3908 #[cfg(windows)]
3909 #[test]
3910 fn windows_spawn_captures_stdout_to_disk() {
3911 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
3912 let task = registry.task_for_session(&task_id, "session").unwrap();
3913 let stdout_path = task.paths.stdout.clone();
3914 let exit_path = task.paths.exit.clone();
3915
3916 let _ = wait_for_file(&exit_path);
3917 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
3918
3919 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
3920 }
3921
3922 #[cfg(windows)]
3923 #[test]
3924 fn windows_spawn_uses_pwsh_when_available() {
3925 let candidates = crate::windows_shell::shell_candidates_with(
3929 |binary| match binary {
3930 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
3931 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
3932 _ => None,
3933 },
3934 || None,
3935 );
3936 let shell = candidates.first().expect("at least one candidate").clone();
3937 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
3938 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
3939 }
3940
3941 #[cfg(windows)]
3948 #[test]
3949 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
3950 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
3951 let script =
3952 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
3953
3954 assert!(
3958 script.contains("set CODE=%ERRORLEVEL%"),
3959 "wrapper must capture exit code into CODE: {script}"
3960 );
3961 assert!(
3962 script.contains("echo %CODE% >"),
3963 "wrapper must echo CODE to a temp marker file: {script}"
3964 );
3965 assert!(
3966 script.contains("move /Y"),
3967 "wrapper must use atomic move to write the marker: {script}"
3968 );
3969 assert!(
3972 script.contains("> nul"),
3973 "wrapper must redirect move output to nul: {script}"
3974 );
3975 assert!(
3977 script.contains("exit /B %CODE%"),
3978 "wrapper must propagate the captured exit code: {script}"
3979 );
3980 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
3981 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
3982 }
3983
3984 #[cfg(windows)]
3990 #[test]
3991 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
3992 use crate::windows_shell::WindowsShell;
3993 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
3994 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
3995 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
3996 assert_eq!(
3997 args_strs,
3998 vec!["/D", "/S", "/C", "echo wrapped"],
3999 "Cmd::bg_command must prepend /D /S /C"
4000 );
4001 }
4002
4003 #[cfg(windows)]
4007 #[test]
4008 fn windows_shell_pwsh_bg_command_uses_standard_args() {
4009 use crate::windows_shell::WindowsShell;
4010 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
4011 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
4012 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
4013 assert!(
4014 args_strs.contains(&"-Command"),
4015 "Pwsh::bg_command must use -Command: {args_strs:?}"
4016 );
4017 assert!(
4018 args_strs.contains(&"Get-Date"),
4019 "Pwsh::bg_command must include the user command body"
4020 );
4021 }
4022
4023 #[allow(dead_code)]
4054 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
4056}