1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use serde::Serialize;
12
13use crate::context::SharedProgressSender;
14use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
15
16#[cfg(unix)]
17use std::os::unix::process::CommandExt;
18#[cfg(windows)]
19use std::os::windows::process::CommandExt;
20
21use super::buffer::BgBuffer;
22use super::persistence::{
23 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
24 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
25 PersistedTask, TaskPaths,
26};
27use super::process::is_process_alive;
28#[cfg(unix)]
29use super::process::terminate_pgid;
30#[cfg(windows)]
31use super::process::terminate_pid;
32use super::{BgTaskInfo, BgTaskStatus};
33const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
41const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
42const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
43const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
44
45const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
52
53#[derive(Debug, Clone, Serialize)]
54pub struct BgCompletion {
55 pub task_id: String,
56 #[serde(skip_serializing)]
59 pub session_id: String,
60 pub status: BgTaskStatus,
61 pub exit_code: Option<i32>,
62 pub command: String,
63 #[serde(default, skip_serializing_if = "String::is_empty")]
69 pub output_preview: String,
70 #[serde(default, skip_serializing_if = "is_false")]
75 pub output_truncated: bool,
76}
77
78fn is_false(v: &bool) -> bool {
79 !*v
80}
81
82#[derive(Debug, Clone, Serialize)]
83pub struct BgTaskSnapshot {
84 #[serde(flatten)]
85 pub info: BgTaskInfo,
86 pub exit_code: Option<i32>,
87 pub child_pid: Option<u32>,
88 pub workdir: String,
89 pub output_preview: String,
90 pub output_truncated: bool,
91 pub output_path: Option<String>,
92 pub stderr_path: Option<String>,
93}
94
95#[derive(Clone)]
96pub struct BgTaskRegistry {
97 pub(crate) inner: Arc<RegistryInner>,
98}
99
100pub(crate) struct RegistryInner {
101 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
102 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
103 pub(crate) progress_sender: SharedProgressSender,
104 watchdog_started: AtomicBool,
105 pub(crate) shutdown: AtomicBool,
106 pub(crate) long_running_reminder_enabled: AtomicBool,
107 pub(crate) long_running_reminder_interval_ms: AtomicU64,
108 persisted_gc_started: AtomicBool,
109 #[cfg(test)]
110 persisted_gc_runs: AtomicU64,
111 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
117}
118
119pub(crate) struct BgTask {
120 pub(crate) task_id: String,
121 pub(crate) session_id: String,
122 pub(crate) paths: TaskPaths,
123 pub(crate) started: Instant,
124 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
125 pub(crate) terminal_at: Mutex<Option<Instant>>,
126 pub(crate) state: Mutex<BgTaskState>,
127}
128
129pub(crate) struct BgTaskState {
130 pub(crate) metadata: PersistedTask,
131 pub(crate) child: Option<Child>,
132 pub(crate) detached: bool,
133 pub(crate) buffer: BgBuffer,
134}
135
136impl BgTaskRegistry {
137 pub fn new(progress_sender: SharedProgressSender) -> Self {
138 Self {
139 inner: Arc::new(RegistryInner {
140 tasks: Mutex::new(HashMap::new()),
141 completions: Mutex::new(VecDeque::new()),
142 progress_sender,
143 watchdog_started: AtomicBool::new(false),
144 shutdown: AtomicBool::new(false),
145 long_running_reminder_enabled: AtomicBool::new(true),
146 long_running_reminder_interval_ms: AtomicU64::new(600_000),
147 persisted_gc_started: AtomicBool::new(false),
148 #[cfg(test)]
149 persisted_gc_runs: AtomicU64::new(0),
150 compressor: Mutex::new(None),
151 }),
152 }
153 }
154
155 pub fn set_compressor<F>(&self, compressor: F)
160 where
161 F: Fn(&str, String) -> String + Send + Sync + 'static,
162 {
163 if let Ok(mut slot) = self.inner.compressor.lock() {
164 *slot = Some(Box::new(compressor));
165 }
166 }
167
168 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
171 let Ok(slot) = self.inner.compressor.lock() else {
172 return output;
173 };
174 match slot.as_ref() {
175 Some(compressor) => compressor(command, output),
176 None => output,
177 }
178 }
179
180 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
181 self.inner
182 .long_running_reminder_enabled
183 .store(enabled, Ordering::SeqCst);
184 self.inner
185 .long_running_reminder_interval_ms
186 .store(interval_ms, Ordering::SeqCst);
187 }
188
189 #[cfg(unix)]
190 #[allow(clippy::too_many_arguments)]
191 pub fn spawn(
192 &self,
193 command: &str,
194 session_id: String,
195 workdir: PathBuf,
196 env: HashMap<String, String>,
197 timeout: Option<Duration>,
198 storage_dir: PathBuf,
199 max_running: usize,
200 notify_on_completion: bool,
201 compressed: bool,
202 project_root: Option<PathBuf>,
203 ) -> Result<String, String> {
204 self.start_watchdog();
205
206 let running = self.running_count();
207 if running >= max_running {
208 return Err(format!(
209 "background bash task limit exceeded: {running} running (max {max_running})"
210 ));
211 }
212
213 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
214 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
215 let task_id = self.generate_unique_task_id()?;
216 let paths = task_paths(&storage_dir, &session_id, &task_id);
217 fs::create_dir_all(&paths.dir)
218 .map_err(|e| format!("failed to create background task dir: {e}"))?;
219
220 let mut metadata = PersistedTask::starting(
221 task_id.clone(),
222 session_id.clone(),
223 command.to_string(),
224 workdir.clone(),
225 project_root,
226 timeout_ms,
227 notify_on_completion,
228 compressed,
229 );
230 write_task(&paths.json, &metadata)
231 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
232
233 create_capture_file(&paths.stdout)
237 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
238 create_capture_file(&paths.stderr)
239 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
240
241 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
242 Ok(child) => child,
243 Err(error) => {
244 log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
245 let _ = delete_task_bundle(&paths);
246 return Err(error);
247 }
248 };
249
250 let child_pid = child.id();
251 metadata.mark_running(child_pid, child_pid as i32);
252 write_task(&paths.json, &metadata)
253 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
254
255 let task = Arc::new(BgTask {
256 task_id: task_id.clone(),
257 session_id,
258 paths: paths.clone(),
259 started: Instant::now(),
260 last_reminder_at: Mutex::new(None),
261 terminal_at: Mutex::new(None),
262 state: Mutex::new(BgTaskState {
263 metadata,
264 child: Some(child),
265 detached: false,
266 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
267 }),
268 });
269
270 self.inner
271 .tasks
272 .lock()
273 .map_err(|_| "background task registry lock poisoned".to_string())?
274 .insert(task_id.clone(), task);
275
276 Ok(task_id)
277 }
278
279 #[cfg(windows)]
280 #[allow(clippy::too_many_arguments)]
281 pub fn spawn(
282 &self,
283 command: &str,
284 session_id: String,
285 workdir: PathBuf,
286 env: HashMap<String, String>,
287 timeout: Option<Duration>,
288 storage_dir: PathBuf,
289 max_running: usize,
290 notify_on_completion: bool,
291 compressed: bool,
292 project_root: Option<PathBuf>,
293 ) -> Result<String, String> {
294 self.start_watchdog();
295
296 let running = self.running_count();
297 if running >= max_running {
298 return Err(format!(
299 "background bash task limit exceeded: {running} running (max {max_running})"
300 ));
301 }
302
303 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
304 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
305 let task_id = self.generate_unique_task_id()?;
306 let paths = task_paths(&storage_dir, &session_id, &task_id);
307 fs::create_dir_all(&paths.dir)
308 .map_err(|e| format!("failed to create background task dir: {e}"))?;
309
310 let mut metadata = PersistedTask::starting(
311 task_id.clone(),
312 session_id.clone(),
313 command.to_string(),
314 workdir.clone(),
315 project_root,
316 timeout_ms,
317 notify_on_completion,
318 compressed,
319 );
320 write_task(&paths.json, &metadata)
321 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
322
323 create_capture_file(&paths.stdout)
329 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
330 create_capture_file(&paths.stderr)
331 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
332
333 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
334 Ok(child) => child,
335 Err(error) => {
336 log::warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
337 let _ = delete_task_bundle(&paths);
338 return Err(error);
339 }
340 };
341
342 let child_pid = child.id();
343 metadata.status = BgTaskStatus::Running;
344 metadata.child_pid = Some(child_pid);
345 metadata.pgid = None;
346 write_task(&paths.json, &metadata)
347 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
348
349 let task = Arc::new(BgTask {
350 task_id: task_id.clone(),
351 session_id,
352 paths: paths.clone(),
353 started: Instant::now(),
354 last_reminder_at: Mutex::new(None),
355 terminal_at: Mutex::new(None),
356 state: Mutex::new(BgTaskState {
357 metadata,
358 child: Some(child),
359 detached: false,
360 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
361 }),
362 });
363
364 self.inner
365 .tasks
366 .lock()
367 .map_err(|_| "background task registry lock poisoned".to_string())?
368 .insert(task_id.clone(), task);
369
370 Ok(task_id)
371 }
372
373 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
374 self.start_watchdog();
375 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
376 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
377 log::warn!("failed to GC persisted background bash tasks: {error}");
378 }
379 }
380 let dir = session_tasks_dir(storage_dir, session_id);
381 if !dir.exists() {
382 return Ok(());
383 }
384
385 let entries = fs::read_dir(&dir)
386 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
387 for entry in entries.flatten() {
388 let path = entry.path();
389 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
390 continue;
391 }
392 let Ok(mut metadata) = read_task(&path) else {
393 continue;
394 };
395 if metadata.session_id != session_id {
396 continue;
397 }
398
399 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
400 match metadata.status {
401 BgTaskStatus::Starting => {
402 metadata.mark_terminal(
403 BgTaskStatus::Failed,
404 None,
405 Some("spawn aborted".to_string()),
406 );
407 let _ = write_task(&paths.json, &metadata);
408 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
409 }
410 BgTaskStatus::Running | BgTaskStatus::Killing => {
411 if self.running_metadata_is_stale(&metadata) {
412 metadata.mark_terminal(
413 BgTaskStatus::Killed,
414 None,
415 Some("orphaned (>24h)".to_string()),
416 );
417 if !paths.exit.exists() {
418 let _ = write_kill_marker_if_absent(&paths.exit);
419 }
420 let _ = write_task(&paths.json, &metadata);
421 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
422 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
423 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
424 "recovered from inconsistent killing state on replay".to_string()
425 });
426 if reason.is_some() {
427 log::warn!(
428 "background task {} had killing state with exit marker; preferring marker",
429 metadata.task_id
430 );
431 }
432 metadata = terminal_metadata_from_marker(metadata, marker, reason);
433 let _ = write_task(&paths.json, &metadata);
434 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
435 } else if metadata.status == BgTaskStatus::Killing {
436 if !paths.exit.exists() {
437 let _ = write_kill_marker_if_absent(&paths.exit);
438 }
439 metadata.mark_terminal(
440 BgTaskStatus::Killed,
441 None,
442 Some("recovered from inconsistent killing state on replay".to_string()),
443 );
444 let _ = write_task(&paths.json, &metadata);
445 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
446 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
447 metadata.mark_terminal(
448 BgTaskStatus::Failed,
449 None,
450 Some("process exited without exit marker".to_string()),
451 );
452 let _ = write_task(&paths.json, &metadata);
453 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
454 } else {
455 self.insert_rehydrated_task(metadata, paths, true)?;
456 }
457 }
458 _ if metadata.status.is_terminal() => {
459 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
465 self.insert_rehydrated_task(metadata, paths, true)?;
466 }
467 _ => {}
468 }
469 }
470
471 Ok(())
472 }
473
474 pub fn status(
475 &self,
476 task_id: &str,
477 session_id: &str,
478 project_root: Option<&Path>,
479 storage_dir: Option<&Path>,
480 preview_bytes: usize,
481 ) -> Option<BgTaskSnapshot> {
482 let mut task = self.task_for_session(task_id, session_id);
483 if task.is_none() {
484 if let Some(storage_dir) = storage_dir {
485 let _ = self.replay_session(storage_dir, session_id);
486 task = self.task_for_session(task_id, session_id);
487 }
488 }
489 let Some(task) = task else {
490 return self.status_relaxed(
491 task_id,
492 session_id,
493 project_root?,
494 storage_dir?,
495 preview_bytes,
496 );
497 };
498 let _ = self.poll_task(&task);
499 let mut snapshot = task.snapshot(preview_bytes);
500 self.maybe_compress_snapshot(&task, &mut snapshot);
501 Some(snapshot)
502 }
503
504 fn status_relaxed_task(
505 &self,
506 task_id: &str,
507 project_root: &Path,
508 storage_dir: &Path,
509 ) -> Option<Arc<BgTask>> {
510 let canonical_project = canonicalized_path(project_root);
511 let root = storage_dir.join("bash-tasks");
512 let entries = fs::read_dir(&root).ok()?;
513 for entry in entries.flatten() {
514 let dir = entry.path();
515 if !dir.is_dir() {
516 continue;
517 }
518 let path = dir.join(format!("{task_id}.json"));
519 if !path.exists() {
520 continue;
521 }
522 let Ok(metadata) = read_task(&path) else {
523 continue;
524 };
525 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
526 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
527 continue;
528 }
529 if let Some(task) = self.task(task_id) {
530 let matches_project = task
531 .state
532 .lock()
533 .map(|state| {
534 state
535 .metadata
536 .project_root
537 .as_deref()
538 .map(canonicalized_path)
539 .as_deref()
540 == Some(canonical_project.as_path())
541 })
542 .unwrap_or(false);
543 return matches_project.then_some(task);
544 }
545 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
546 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
547 return None;
548 }
549 return self.task(task_id);
550 }
551 None
552 }
553
554 pub(super) fn status_relaxed(
555 &self,
556 task_id: &str,
557 _session_id: &str,
558 project_root: &Path,
559 storage_dir: &Path,
560 preview_bytes: usize,
561 ) -> Option<BgTaskSnapshot> {
562 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
563 let _ = self.poll_task(&task);
564 let mut snapshot = task.snapshot(preview_bytes);
565 self.maybe_compress_snapshot(&task, &mut snapshot);
566 Some(snapshot)
567 }
568
569 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
570 #[cfg(test)]
571 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
572
573 let mut deleted = 0usize;
574
575 let root = storage_dir.join("bash-tasks");
576 if root.exists() {
577 let session_dirs = fs::read_dir(&root).map_err(|e| {
578 format!(
579 "failed to read background task root {}: {e}",
580 root.display()
581 )
582 })?;
583 for session_entry in session_dirs.flatten() {
584 let session_dir = session_entry.path();
585 if !session_dir.is_dir() {
586 continue;
587 }
588 let task_entries = match fs::read_dir(&session_dir) {
589 Ok(entries) => entries,
590 Err(error) => {
591 log::warn!(
592 "failed to read background task session dir {}: {error}",
593 session_dir.display()
594 );
595 continue;
596 }
597 };
598 for task_entry in task_entries.flatten() {
599 let json_path = task_entry.path();
600 if json_path
601 .extension()
602 .and_then(|extension| extension.to_str())
603 != Some("json")
604 {
605 continue;
606 }
607 if modified_within(&json_path, PERSISTED_GC_GRACE) {
608 continue;
609 }
610 let metadata = match read_task(&json_path) {
611 Ok(metadata) => metadata,
612 Err(error) => {
613 log::warn!(
614 "quarantining corrupt background task metadata {}: {error}",
615 json_path.display()
616 );
617 quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
618 continue;
619 }
620 };
621 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
622 continue;
623 }
624 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
625 match delete_task_bundle(&paths) {
626 Ok(()) => {
627 deleted += 1;
628 log::debug!(
629 "deleted persisted background task bundle {}",
630 metadata.task_id
631 );
632 }
633 Err(error) => {
634 log::warn!(
635 "failed to delete background task bundle {}: {error}",
636 metadata.task_id
637 );
638 continue;
639 }
640 }
641 }
642 }
643 }
644 gc_quarantine(storage_dir);
645 Ok(deleted)
646 }
647
648 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
649 let tasks = self
650 .inner
651 .tasks
652 .lock()
653 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
654 .unwrap_or_default();
655 tasks
656 .into_iter()
657 .map(|task| {
658 let _ = self.poll_task(&task);
659 let mut snapshot = task.snapshot(preview_bytes);
660 self.maybe_compress_snapshot(&task, &mut snapshot);
661 snapshot
662 })
663 .collect()
664 }
665
666 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
672 if !snapshot.info.status.is_terminal() {
673 return;
674 }
675 let compressed_flag = task
676 .state
677 .lock()
678 .map(|state| state.metadata.compressed)
679 .unwrap_or(true);
680 if !compressed_flag {
681 return;
682 }
683 let raw = std::mem::take(&mut snapshot.output_preview);
684 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
685 }
686
687 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
688 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
689 }
690
691 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
692 let task = self
693 .task_for_session(task_id, session_id)
694 .ok_or_else(|| format!("background task not found: {task_id}"))?;
695 let mut state = task
696 .state
697 .lock()
698 .map_err(|_| "background task lock poisoned".to_string())?;
699 let updated = update_task(&task.paths.json, |metadata| {
700 metadata.notify_on_completion = true;
701 metadata.completion_delivered = false;
702 })
703 .map_err(|e| format!("failed to promote background task: {e}"))?;
704 state.metadata = updated;
705 if state.metadata.status.is_terminal() {
706 state.buffer.enforce_terminal_cap();
707 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
708 }
709 Ok(true)
710 }
711
712 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
713 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
714 .map(|_| ())
715 }
716
717 pub fn cleanup_finished(&self, older_than: Duration) {
718 let cutoff = Instant::now().checked_sub(older_than);
719 let removable_paths: Vec<(String, TaskPaths)> =
720 if let Ok(mut tasks) = self.inner.tasks.lock() {
721 let removable = tasks
722 .iter()
723 .filter_map(|(task_id, task)| {
724 let delivered_terminal = task
725 .state
726 .lock()
727 .map(|state| {
728 state.metadata.status.is_terminal()
729 && state.metadata.completion_delivered
730 })
731 .unwrap_or(false);
732 if !delivered_terminal {
733 return None;
734 }
735
736 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
737 let expired = match (terminal_at, cutoff) {
738 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
739 (Some(_), None) => true,
740 (None, _) => false,
741 };
742 expired.then(|| task_id.clone())
743 })
744 .collect::<Vec<_>>();
745
746 removable
747 .into_iter()
748 .filter_map(|task_id| {
749 tasks
750 .remove(&task_id)
751 .map(|task| (task_id, task.paths.clone()))
752 })
753 .collect()
754 } else {
755 Vec::new()
756 };
757
758 for (task_id, paths) in removable_paths {
759 match delete_task_bundle(&paths) {
760 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
761 Err(error) => log::warn!(
762 "failed to delete persisted background task bundle {task_id}: {error}"
763 ),
764 }
765 }
766 }
767
768 pub fn drain_completions(&self) -> Vec<BgCompletion> {
769 self.drain_completions_for_session(None)
770 }
771
772 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
773 let mut completions = match self.inner.completions.lock() {
774 Ok(completions) => completions,
775 Err(_) => return Vec::new(),
776 };
777
778 let drained = if let Some(session_id) = session_id {
779 let mut matched = Vec::new();
780 let mut retained = VecDeque::new();
781 while let Some(completion) = completions.pop_front() {
782 if completion.session_id == session_id {
783 matched.push(completion);
784 } else {
785 retained.push_back(completion);
786 }
787 }
788 *completions = retained;
789 matched
790 } else {
791 completions.drain(..).collect()
792 };
793 drop(completions);
794
795 for completion in &drained {
796 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
797 let _ = task.set_completion_delivered(true);
798 }
799 }
800
801 drained
802 }
803
804 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
805 self.inner
806 .completions
807 .lock()
808 .map(|completions| {
809 completions
810 .iter()
811 .filter(|completion| completion.session_id == session_id)
812 .cloned()
813 .collect()
814 })
815 .unwrap_or_default()
816 }
817
818 pub fn detach(&self) {
819 self.inner.shutdown.store(true, Ordering::SeqCst);
820 if let Ok(mut tasks) = self.inner.tasks.lock() {
821 for task in tasks.values() {
822 if let Ok(mut state) = task.state.lock() {
823 state.child = None;
824 state.detached = true;
825 }
826 }
827 tasks.clear();
828 }
829 }
830
831 pub fn shutdown(&self) {
832 let tasks = self
833 .inner
834 .tasks
835 .lock()
836 .map(|tasks| {
837 tasks
838 .values()
839 .map(|task| (task.task_id.clone(), task.session_id.clone()))
840 .collect::<Vec<_>>()
841 })
842 .unwrap_or_default();
843 for (task_id, session_id) in tasks {
844 let _ = self.kill(&task_id, &session_id);
845 }
846 }
847
848 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
849 let marker = match read_exit_marker(&task.paths.exit) {
850 Ok(Some(marker)) => marker,
851 Ok(None) => return Ok(()),
852 Err(error) => return Err(format!("failed to read exit marker: {error}")),
853 };
854 self.finalize_from_marker(task, marker, None)
855 }
856
857 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
858 let Ok(mut state) = task.state.lock() else {
859 return;
860 };
861 if let Some(child) = state.child.as_mut() {
862 if matches!(child.try_wait(), Ok(Some(_))) {
863 state.child = None;
880 state.detached = true;
881 if state.metadata.status.is_terminal() {
882 return;
883 }
884 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
885 return;
886 }
887 let updated = update_task(&task.paths.json, |metadata| {
888 metadata.mark_terminal(
889 BgTaskStatus::Failed,
890 None,
891 Some("process exited without exit marker".to_string()),
892 );
893 });
894 if let Ok(metadata) = updated {
895 state.metadata = metadata;
896 task.mark_terminal_now();
897 state.buffer.enforce_terminal_cap();
898 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
899 }
900 }
901 }
902 }
903
904 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
905 self.inner
906 .tasks
907 .lock()
908 .map(|tasks| {
909 tasks
910 .values()
911 .filter(|task| task.is_running())
912 .cloned()
913 .collect()
914 })
915 .unwrap_or_default()
916 }
917
918 fn insert_rehydrated_task(
919 &self,
920 metadata: PersistedTask,
921 paths: TaskPaths,
922 detached: bool,
923 ) -> Result<(), String> {
924 let task_id = metadata.task_id.clone();
925 let session_id = metadata.session_id.clone();
926 let started = started_instant_from_unix_millis(metadata.started_at);
927 let task = Arc::new(BgTask {
928 task_id: task_id.clone(),
929 session_id,
930 paths: paths.clone(),
931 started,
932 last_reminder_at: Mutex::new(None),
933 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
934 state: Mutex::new(BgTaskState {
935 metadata,
936 child: None,
937 detached,
938 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
939 }),
940 });
941 self.inner
942 .tasks
943 .lock()
944 .map_err(|_| "background task registry lock poisoned".to_string())?
945 .insert(task_id, task);
946 Ok(())
947 }
948
949 fn kill_with_status(
950 &self,
951 task_id: &str,
952 session_id: &str,
953 terminal_status: BgTaskStatus,
954 ) -> Result<BgTaskSnapshot, String> {
955 let task = self
956 .task_for_session(task_id, session_id)
957 .ok_or_else(|| format!("background task not found: {task_id}"))?;
958
959 {
960 let mut state = task
961 .state
962 .lock()
963 .map_err(|_| "background task lock poisoned".to_string())?;
964 if state.metadata.status.is_terminal() {
965 return Ok(task.snapshot_locked(&state, 5 * 1024));
966 }
967
968 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
969 state.metadata =
970 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
971 task.mark_terminal_now();
972 state.child = None;
973 state.detached = true;
974 state.buffer.enforce_terminal_cap();
975 write_task(&task.paths.json, &state.metadata)
976 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
977 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
978 return Ok(task.snapshot_locked(&state, 5 * 1024));
979 }
980
981 state.metadata.status = BgTaskStatus::Killing;
982 write_task(&task.paths.json, &state.metadata)
983 .map_err(|e| format!("failed to persist killing state: {e}"))?;
984
985 #[cfg(unix)]
986 if let Some(pgid) = state.metadata.pgid {
987 terminate_pgid(pgid, state.child.as_mut());
988 }
989 #[cfg(windows)]
990 if let Some(child) = state.child.as_mut() {
991 super::process::terminate_process(child);
992 } else if let Some(pid) = state.metadata.child_pid {
993 terminate_pid(pid);
994 }
995 if let Some(child) = state.child.as_mut() {
996 let _ = child.wait();
997 }
998 state.child = None;
999 state.detached = true;
1000
1001 if !task.paths.exit.exists() {
1002 write_kill_marker_if_absent(&task.paths.exit)
1003 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1004 }
1005
1006 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1007 Some(124)
1008 } else {
1009 None
1010 };
1011 state
1012 .metadata
1013 .mark_terminal(terminal_status, exit_code, None);
1014 task.mark_terminal_now();
1015 write_task(&task.paths.json, &state.metadata)
1016 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1017 state.buffer.enforce_terminal_cap();
1018 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1019 }
1020
1021 Ok(task.snapshot(5 * 1024))
1022 }
1023
1024 fn finalize_from_marker(
1025 &self,
1026 task: &Arc<BgTask>,
1027 marker: ExitMarker,
1028 reason: Option<String>,
1029 ) -> Result<(), String> {
1030 let mut state = task
1031 .state
1032 .lock()
1033 .map_err(|_| "background task lock poisoned".to_string())?;
1034 if state.metadata.status.is_terminal() {
1035 return Ok(());
1036 }
1037
1038 let updated = update_task(&task.paths.json, |metadata| {
1039 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1040 *metadata = new_metadata;
1041 })
1042 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1043 state.metadata = updated;
1044 task.mark_terminal_now();
1045 state.child = None;
1046 state.detached = true;
1047 state.buffer.enforce_terminal_cap();
1048 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1049 Ok(())
1050 }
1051
1052 fn enqueue_completion_if_needed(
1053 &self,
1054 metadata: &PersistedTask,
1055 paths: Option<&TaskPaths>,
1056 emit_frame: bool,
1057 ) {
1058 if metadata.status.is_terminal() && !metadata.completion_delivered {
1059 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1060 }
1061 }
1062
1063 fn enqueue_completion_locked(
1064 &self,
1065 metadata: &PersistedTask,
1066 buffer: Option<&BgBuffer>,
1067 emit_frame: bool,
1068 ) {
1069 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1070 }
1071
1072 fn enqueue_completion_from_parts(
1073 &self,
1074 metadata: &PersistedTask,
1075 buffer: Option<&BgBuffer>,
1076 paths: Option<&TaskPaths>,
1077 emit_frame: bool,
1078 ) {
1079 if !metadata.status.is_terminal() || metadata.completion_delivered {
1080 return;
1081 }
1082 let (raw_preview, output_truncated) = match buffer {
1087 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1088 None => paths
1089 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1090 .unwrap_or_else(|| (String::new(), false)),
1091 };
1092 let output_preview = if metadata.compressed {
1097 self.compress_output(&metadata.command, raw_preview)
1098 } else {
1099 raw_preview
1100 };
1101 let completion = BgCompletion {
1102 task_id: metadata.task_id.clone(),
1103 session_id: metadata.session_id.clone(),
1104 status: metadata.status.clone(),
1105 exit_code: metadata.exit_code,
1106 command: metadata.command.clone(),
1107 output_preview,
1108 output_truncated,
1109 };
1110 if let Ok(mut completions) = self.inner.completions.lock() {
1111 if completions
1112 .iter()
1113 .any(|completion| completion.task_id == metadata.task_id)
1114 {
1115 return;
1116 }
1117 completions.push_back(completion.clone());
1118 } else {
1119 return;
1120 }
1121
1122 if emit_frame {
1123 self.emit_bash_completed(completion);
1124 }
1125 }
1126
1127 fn emit_bash_completed(&self, completion: BgCompletion) {
1128 let Ok(progress_sender) = self
1129 .inner
1130 .progress_sender
1131 .lock()
1132 .map(|sender| sender.clone())
1133 else {
1134 return;
1135 };
1136 let Some(sender) = progress_sender.as_ref() else {
1137 return;
1138 };
1139 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1147 completion.task_id,
1148 completion.session_id,
1149 completion.status,
1150 completion.exit_code,
1151 completion.command,
1152 completion.output_preview,
1153 completion.output_truncated,
1154 )));
1155 }
1156
1157 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1158 if !self
1159 .inner
1160 .long_running_reminder_enabled
1161 .load(Ordering::SeqCst)
1162 {
1163 return;
1164 }
1165 let interval_ms = self
1166 .inner
1167 .long_running_reminder_interval_ms
1168 .load(Ordering::SeqCst);
1169 if interval_ms == 0 {
1170 return;
1171 }
1172 let interval = Duration::from_millis(interval_ms);
1173 let now = Instant::now();
1174 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1175 return;
1176 };
1177 let since = last_reminder_at.unwrap_or(task.started);
1178 if now.duration_since(since) < interval {
1179 return;
1180 }
1181 let command = task
1182 .state
1183 .lock()
1184 .map(|state| state.metadata.command.clone())
1185 .unwrap_or_default();
1186 *last_reminder_at = Some(now);
1187 self.emit_bash_long_running(BashLongRunningFrame::new(
1188 task.task_id.clone(),
1189 task.session_id.clone(),
1190 command,
1191 task.started.elapsed().as_millis() as u64,
1192 ));
1193 }
1194
1195 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1196 let Ok(progress_sender) = self
1197 .inner
1198 .progress_sender
1199 .lock()
1200 .map(|sender| sender.clone())
1201 else {
1202 return;
1203 };
1204 if let Some(sender) = progress_sender.as_ref() {
1205 sender(PushFrame::BashLongRunning(frame));
1206 }
1207 }
1208
1209 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1210 self.inner
1211 .tasks
1212 .lock()
1213 .ok()
1214 .and_then(|tasks| tasks.get(task_id).cloned())
1215 }
1216
1217 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1218 self.task(task_id)
1219 .filter(|task| task.session_id == session_id)
1220 }
1221
1222 fn running_count(&self) -> usize {
1223 self.inner
1224 .tasks
1225 .lock()
1226 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1227 .unwrap_or(0)
1228 }
1229
1230 fn start_watchdog(&self) {
1231 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1232 super::watchdog::start(self.clone());
1233 }
1234 }
1235
1236 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1237 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1238 }
1239
1240 #[cfg(test)]
1241 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1242 self.task_for_session(task_id, session_id)
1243 .map(|task| task.paths.json.clone())
1244 }
1245
1246 #[cfg(test)]
1247 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1248 self.task_for_session(task_id, session_id)
1249 .map(|task| task.paths.exit.clone())
1250 }
1251
1252 fn generate_unique_task_id(&self) -> Result<String, String> {
1254 for _ in 0..32 {
1255 let candidate = random_slug();
1256 let tasks = self
1257 .inner
1258 .tasks
1259 .lock()
1260 .map_err(|_| "background task registry lock poisoned".to_string())?;
1261 if tasks.contains_key(&candidate) {
1262 continue;
1263 }
1264 let completions = self
1265 .inner
1266 .completions
1267 .lock()
1268 .map_err(|_| "background completions lock poisoned".to_string())?;
1269 if completions
1270 .iter()
1271 .any(|completion| completion.task_id == candidate)
1272 {
1273 continue;
1274 }
1275 return Ok(candidate);
1276 }
1277 Err("failed to allocate unique background task id after 32 attempts".to_string())
1278 }
1279}
1280
1281impl Default for BgTaskRegistry {
1282 fn default() -> Self {
1283 Self::new(Arc::new(Mutex::new(None)))
1284 }
1285}
1286
1287fn modified_within(path: &Path, grace: Duration) -> bool {
1288 fs::metadata(path)
1289 .and_then(|metadata| metadata.modified())
1290 .ok()
1291 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1292 .map(|age| age < grace)
1293 .unwrap_or(false)
1294}
1295
1296fn canonicalized_path(path: &Path) -> PathBuf {
1297 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1298}
1299
1300fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1301 let now_ms = SystemTime::now()
1302 .duration_since(UNIX_EPOCH)
1303 .ok()
1304 .map(|duration| duration.as_millis() as u64)
1305 .unwrap_or(started_at);
1306 let elapsed_ms = now_ms.saturating_sub(started_at);
1307 Instant::now()
1308 .checked_sub(Duration::from_millis(elapsed_ms))
1309 .unwrap_or_else(Instant::now)
1310}
1311
1312fn gc_quarantine(storage_dir: &Path) {
1313 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1314 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1315 return;
1316 };
1317 for session_entry in session_dirs.flatten() {
1318 let session_quarantine_dir = session_entry.path();
1319 if !session_quarantine_dir.is_dir() {
1320 continue;
1321 }
1322 let entries = match fs::read_dir(&session_quarantine_dir) {
1323 Ok(entries) => entries,
1324 Err(error) => {
1325 log::warn!(
1326 "failed to read background task quarantine dir {}: {error}",
1327 session_quarantine_dir.display()
1328 );
1329 continue;
1330 }
1331 };
1332 for entry in entries.flatten() {
1333 let path = entry.path();
1334 if modified_within(&path, QUARANTINE_GC_GRACE) {
1335 continue;
1336 }
1337 let result = if path.is_dir() {
1338 fs::remove_dir_all(&path)
1339 } else {
1340 fs::remove_file(&path)
1341 };
1342 match result {
1343 Ok(()) => log::debug!(
1344 "deleted old background task quarantine entry {}",
1345 path.display()
1346 ),
1347 Err(error) => log::warn!(
1348 "failed to delete old background task quarantine entry {}: {error}",
1349 path.display()
1350 ),
1351 }
1352 }
1353 let _ = fs::remove_dir(&session_quarantine_dir);
1354 }
1355 let _ = fs::remove_dir(&quarantine_root);
1356}
1357
1358fn quarantine_corrupt_task_json(
1359 storage_dir: &Path,
1360 session_dir: &Path,
1361 json_path: &Path,
1362) -> Result<(), String> {
1363 let session_hash = session_dir
1364 .file_name()
1365 .and_then(|name| name.to_str())
1366 .ok_or_else(|| {
1367 format!(
1368 "invalid background task session dir: {}",
1369 session_dir.display()
1370 )
1371 })?;
1372 let task_name = json_path
1373 .file_name()
1374 .and_then(|name| name.to_str())
1375 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1376 let unix_ts = SystemTime::now()
1377 .duration_since(UNIX_EPOCH)
1378 .map(|duration| duration.as_secs())
1379 .unwrap_or(0);
1380 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1381 fs::create_dir_all(&quarantine_dir).map_err(|e| {
1382 format!(
1383 "failed to create background task quarantine dir {}: {e}",
1384 quarantine_dir.display()
1385 )
1386 })?;
1387 let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
1388 fs::rename(json_path, &target).map_err(|e| {
1389 format!(
1390 "failed to quarantine corrupt background task metadata {} to {}: {e}",
1391 json_path.display(),
1392 target.display()
1393 )
1394 })?;
1395
1396 for sibling in task_sibling_paths(json_path) {
1397 if !sibling.exists() {
1398 continue;
1399 }
1400 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1401 log::warn!(
1402 "skipping background task sibling with invalid name during quarantine: {}",
1403 sibling.display()
1404 );
1405 continue;
1406 };
1407 let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
1408 if let Err(error) = fs::rename(&sibling, &sibling_target) {
1409 log::warn!(
1410 "failed to quarantine background task sibling {} to {}: {error}",
1411 sibling.display(),
1412 sibling_target.display()
1413 );
1414 }
1415 }
1416
1417 let _ = fs::remove_dir(session_dir);
1418 Ok(())
1419}
1420
1421fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1422 let Some(parent) = json_path.parent() else {
1423 return Vec::new();
1424 };
1425 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1426 return Vec::new();
1427 };
1428 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1429 .into_iter()
1430 .map(|extension| parent.join(format!("{stem}.{extension}")))
1431 .collect()
1432}
1433
1434fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1435 let stdout = fs::read(&paths.stdout).unwrap_or_default();
1436 let stderr = fs::read(&paths.stderr).unwrap_or_default();
1437 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1438 bytes.extend_from_slice(&stdout);
1439 bytes.extend_from_slice(&stderr);
1440 if bytes.len() <= max_bytes {
1441 return (String::from_utf8_lossy(&bytes).into_owned(), false);
1442 }
1443 let start = bytes.len().saturating_sub(max_bytes);
1444 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1445}
1446
1447impl BgTask {
1448 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1449 let state = self
1450 .state
1451 .lock()
1452 .unwrap_or_else(|poison| poison.into_inner());
1453 self.snapshot_locked(&state, preview_bytes)
1454 }
1455
1456 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1457 let metadata = &state.metadata;
1458 let duration_ms = metadata.duration_ms.or_else(|| {
1459 metadata
1460 .status
1461 .is_terminal()
1462 .then(|| self.started.elapsed().as_millis() as u64)
1463 });
1464 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1465 BgTaskSnapshot {
1466 info: BgTaskInfo {
1467 task_id: self.task_id.clone(),
1468 status: metadata.status.clone(),
1469 command: metadata.command.clone(),
1470 started_at: metadata.started_at,
1471 duration_ms,
1472 },
1473 exit_code: metadata.exit_code,
1474 child_pid: metadata.child_pid,
1475 workdir: metadata.workdir.display().to_string(),
1476 output_preview,
1477 output_truncated,
1478 output_path: state
1479 .buffer
1480 .output_path()
1481 .map(|path| path.display().to_string()),
1482 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1483 }
1484 }
1485
1486 pub(crate) fn is_running(&self) -> bool {
1487 self.state
1488 .lock()
1489 .map(|state| state.metadata.status == BgTaskStatus::Running)
1490 .unwrap_or(false)
1491 }
1492
1493 fn mark_terminal_now(&self) {
1494 if let Ok(mut terminal_at) = self.terminal_at.lock() {
1495 if terminal_at.is_none() {
1496 *terminal_at = Some(Instant::now());
1497 }
1498 }
1499 }
1500
1501 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1502 let mut state = self
1503 .state
1504 .lock()
1505 .map_err(|_| "background task lock poisoned".to_string())?;
1506 let updated = update_task(&self.paths.json, |metadata| {
1507 metadata.completion_delivered = delivered;
1508 })
1509 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1510 state.metadata = updated;
1511 Ok(())
1512 }
1513}
1514
1515fn terminal_metadata_from_marker(
1516 mut metadata: PersistedTask,
1517 marker: ExitMarker,
1518 reason: Option<String>,
1519) -> PersistedTask {
1520 match marker {
1521 ExitMarker::Code(code) => {
1522 let status = if code == 0 {
1523 BgTaskStatus::Completed
1524 } else {
1525 BgTaskStatus::Failed
1526 };
1527 metadata.mark_terminal(status, Some(code), reason);
1528 }
1529 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1530 }
1531 metadata
1532}
1533
1534#[cfg(unix)]
1535fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1536 let shell = resolve_posix_shell();
1537 let mut cmd = Command::new(&shell);
1538 cmd.arg("-c")
1539 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1540 .arg(&shell)
1541 .arg(command)
1542 .arg(exit_path);
1543 unsafe {
1544 cmd.pre_exec(|| {
1545 if libc::setsid() == -1 {
1546 return Err(std::io::Error::last_os_error());
1547 }
1548 Ok(())
1549 });
1550 }
1551 cmd
1552}
1553
1554#[cfg(unix)]
1555fn resolve_posix_shell() -> PathBuf {
1556 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1557 POSIX_SHELL
1558 .get_or_init(|| {
1559 std::env::var_os("BASH")
1560 .filter(|value| !value.is_empty())
1561 .map(PathBuf::from)
1562 .filter(|path| path.exists())
1563 .or_else(|| which::which("bash").ok())
1564 .or_else(|| which::which("zsh").ok())
1565 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1566 })
1567 .clone()
1568}
1569
1570#[cfg(windows)]
1571fn detached_shell_command_for(
1572 shell: crate::windows_shell::WindowsShell,
1573 command: &str,
1574 exit_path: &Path,
1575 paths: &TaskPaths,
1576 creation_flags: u32,
1577) -> Result<Command, String> {
1578 use crate::windows_shell::WindowsShell;
1579 let wrapper_body = shell.wrapper_script(command, exit_path);
1592 let wrapper_ext = match shell {
1593 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1594 WindowsShell::Cmd => "bat",
1595 WindowsShell::Posix(_) => "sh",
1599 };
1600 let wrapper_path = paths.dir.join(format!(
1601 "{}.{}",
1602 paths
1603 .json
1604 .file_stem()
1605 .and_then(|s| s.to_str())
1606 .unwrap_or("wrapper"),
1607 wrapper_ext
1608 ));
1609 fs::write(&wrapper_path, wrapper_body)
1610 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1611
1612 let mut cmd = Command::new(shell.binary().as_ref());
1613 match shell {
1614 WindowsShell::Pwsh | WindowsShell::Powershell => {
1615 cmd.args([
1618 "-NoLogo",
1619 "-NoProfile",
1620 "-NonInteractive",
1621 "-ExecutionPolicy",
1622 "Bypass",
1623 "-File",
1624 ]);
1625 cmd.arg(&wrapper_path);
1626 }
1627 WindowsShell::Cmd => {
1628 cmd.args(["/D", "/C"]);
1635 cmd.arg(&wrapper_path);
1636 }
1637 WindowsShell::Posix(_) => {
1638 cmd.arg(&wrapper_path);
1643 }
1644 }
1645
1646 cmd.creation_flags(creation_flags);
1650 Ok(cmd)
1651}
1652
1653fn spawn_detached_child(
1669 command: &str,
1670 paths: &TaskPaths,
1671 workdir: &Path,
1672 env: &HashMap<String, String>,
1673) -> Result<std::process::Child, String> {
1674 #[cfg(not(windows))]
1675 {
1676 let stdout = create_capture_file(&paths.stdout)
1677 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1678 let stderr = create_capture_file(&paths.stderr)
1679 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1680 detached_shell_command(command, &paths.exit)
1681 .current_dir(workdir)
1682 .envs(env)
1683 .stdin(Stdio::null())
1684 .stdout(Stdio::from(stdout))
1685 .stderr(Stdio::from(stderr))
1686 .spawn()
1687 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1688 }
1689 #[cfg(windows)]
1690 {
1691 use crate::windows_shell::shell_candidates;
1692 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1703 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1724 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1725 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1726 let with_breakaway =
1727 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1728 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1729 let mut last_error: Option<String> = None;
1730 for (idx, shell) in candidates.iter().enumerate() {
1731 for &flags in &[with_breakaway, without_breakaway] {
1735 let stdout = create_capture_file(&paths.stdout)
1737 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1738 let stderr = create_capture_file(&paths.stderr)
1739 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1740 let mut cmd =
1741 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1742 cmd.current_dir(workdir)
1743 .envs(env)
1744 .stdin(Stdio::null())
1745 .stdout(Stdio::from(stdout))
1746 .stderr(Stdio::from(stderr));
1747 match cmd.spawn() {
1748 Ok(child) => {
1749 if idx > 0 {
1750 log::warn!(
1751 "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1752 the cached PATH probe disagreed with runtime spawn — likely PATH \
1753 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1754 shell.binary(),
1755 idx
1756 );
1757 }
1758 if flags == without_breakaway {
1759 log::warn!(
1760 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1761 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1762 Spawned without breakaway; the bg task will be torn down if the \
1763 AFT process group is killed."
1764 );
1765 }
1766 return Ok(child);
1767 }
1768 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1769 log::warn!(
1770 "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1771 shell.binary()
1772 );
1773 last_error = Some(format!("{}: {e}", shell.binary()));
1774 break;
1777 }
1778 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1779 log::warn!(
1781 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1782 Access Denied — retrying {} without breakaway",
1783 shell.binary()
1784 );
1785 last_error = Some(format!("{}: {e}", shell.binary()));
1786 continue;
1787 }
1788 Err(e) => {
1789 return Err(format!(
1790 "failed to spawn background bash command via {}: {e}",
1791 shell.binary()
1792 ));
1793 }
1794 }
1795 }
1796 }
1797 Err(format!(
1798 "failed to spawn background bash command: no Windows shell could be spawned. \
1799 Last error: {}. PATH-probed candidates: {:?}",
1800 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1801 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1802 ))
1803 }
1804}
1805
1806fn random_slug() -> String {
1807 let mut bytes = [0u8; 4];
1808 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1810 let t = SystemTime::now()
1812 .duration_since(UNIX_EPOCH)
1813 .map(|d| d.subsec_nanos())
1814 .unwrap_or(0);
1815 let p = std::process::id();
1816 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1817 });
1818 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1820 format!("bash-{hex}")
1821}
1822
1823#[cfg(test)]
1824mod tests {
1825 use std::collections::HashMap;
1826 #[cfg(windows)]
1827 use std::fs;
1828 use std::sync::{Arc, Mutex};
1829 use std::time::Duration;
1830 #[cfg(windows)]
1831 use std::time::Instant;
1832
1833 use super::*;
1834
1835 #[cfg(unix)]
1836 const QUICK_SUCCESS_COMMAND: &str = "true";
1837 #[cfg(windows)]
1838 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1839
1840 #[cfg(unix)]
1841 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1842 #[cfg(windows)]
1843 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1844
1845 #[test]
1846 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1847 let registry = BgTaskRegistry::default();
1848 let dir = tempfile::tempdir().unwrap();
1849 let task_id = registry
1850 .spawn(
1851 QUICK_SUCCESS_COMMAND,
1852 "session".to_string(),
1853 dir.path().to_path_buf(),
1854 HashMap::new(),
1855 Some(Duration::from_secs(30)),
1856 dir.path().to_path_buf(),
1857 10,
1858 true,
1859 false,
1860 Some(dir.path().to_path_buf()),
1861 )
1862 .unwrap();
1863 registry
1864 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1865 .unwrap();
1866 let completions = registry.drain_completions_for_session(Some("session"));
1867 assert_eq!(completions.len(), 1);
1868
1869 registry.cleanup_finished(Duration::ZERO);
1870
1871 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1872 }
1873
1874 #[test]
1875 fn cleanup_finished_retains_undelivered_terminals() {
1876 let registry = BgTaskRegistry::default();
1877 let dir = tempfile::tempdir().unwrap();
1878 let task_id = registry
1879 .spawn(
1880 QUICK_SUCCESS_COMMAND,
1881 "session".to_string(),
1882 dir.path().to_path_buf(),
1883 HashMap::new(),
1884 Some(Duration::from_secs(30)),
1885 dir.path().to_path_buf(),
1886 10,
1887 true,
1888 false,
1889 Some(dir.path().to_path_buf()),
1890 )
1891 .unwrap();
1892 registry
1893 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1894 .unwrap();
1895
1896 registry.cleanup_finished(Duration::ZERO);
1897
1898 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1899 }
1900
1901 #[test]
1911 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1912 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1913 let dir = tempfile::tempdir().unwrap();
1914 let task_id = registry
1915 .spawn(
1916 QUICK_SUCCESS_COMMAND,
1917 "session".to_string(),
1918 dir.path().to_path_buf(),
1919 HashMap::new(),
1920 Some(Duration::from_secs(30)),
1921 dir.path().to_path_buf(),
1922 10,
1923 true,
1924 false,
1925 Some(dir.path().to_path_buf()),
1926 )
1927 .unwrap();
1928
1929 let task = registry.task_for_session(&task_id, "session").unwrap();
1930
1931 let started = Instant::now();
1936 loop {
1937 let exited = {
1938 let mut state = task.state.lock().unwrap();
1939 if let Some(child) = state.child.as_mut() {
1940 matches!(child.try_wait(), Ok(Some(_)))
1941 } else {
1942 true
1943 }
1944 };
1945 if exited {
1946 break;
1947 }
1948 assert!(
1949 started.elapsed() < Duration::from_secs(5),
1950 "child should exit quickly"
1951 );
1952 std::thread::sleep(Duration::from_millis(20));
1953 }
1954
1955 let _ = std::fs::remove_file(&task.paths.exit);
1958
1959 assert!(
1962 task.is_running(),
1963 "precondition: metadata.status == Running"
1964 );
1965 assert!(
1966 !task.paths.exit.exists(),
1967 "precondition: exit marker absent"
1968 );
1969
1970 registry.reap_child(&task);
1974
1975 let state = task.state.lock().unwrap();
1976 assert!(
1977 state.metadata.status.is_terminal(),
1978 "reap_child must transition to terminal when PID dead and no marker. \
1979 Got status={:?}",
1980 state.metadata.status
1981 );
1982 assert_eq!(
1983 state.metadata.status,
1984 BgTaskStatus::Failed,
1985 "must specifically be Failed (not Killed): status={:?}",
1986 state.metadata.status
1987 );
1988 assert_eq!(
1989 state.metadata.status_reason.as_deref(),
1990 Some("process exited without exit marker"),
1991 "reason must match replay path's wording: {:?}",
1992 state.metadata.status_reason
1993 );
1994 assert!(
1995 state.child.is_none(),
1996 "child handle must be released after reap"
1997 );
1998 assert!(state.detached, "task must be marked detached after reap");
1999 }
2000
2001 #[test]
2007 fn reap_child_preserves_running_when_exit_marker_exists() {
2008 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2009 let dir = tempfile::tempdir().unwrap();
2010 let task_id = registry
2011 .spawn(
2012 QUICK_SUCCESS_COMMAND,
2013 "session".to_string(),
2014 dir.path().to_path_buf(),
2015 HashMap::new(),
2016 Some(Duration::from_secs(30)),
2017 dir.path().to_path_buf(),
2018 10,
2019 true,
2020 false,
2021 Some(dir.path().to_path_buf()),
2022 )
2023 .unwrap();
2024
2025 let task = registry.task_for_session(&task_id, "session").unwrap();
2026
2027 let started = Instant::now();
2030 loop {
2031 let exited = {
2032 let mut state = task.state.lock().unwrap();
2033 if let Some(child) = state.child.as_mut() {
2034 matches!(child.try_wait(), Ok(Some(_)))
2035 } else {
2036 true
2037 }
2038 };
2039 if exited && task.paths.exit.exists() {
2040 break;
2041 }
2042 assert!(
2043 started.elapsed() < Duration::from_secs(5),
2044 "child should exit and write marker quickly"
2045 );
2046 std::thread::sleep(Duration::from_millis(20));
2047 }
2048
2049 registry.reap_child(&task);
2053
2054 let state = task.state.lock().unwrap();
2055 assert!(
2056 state.child.is_none(),
2057 "child handle still released even when marker exists"
2058 );
2059 assert!(
2060 state.detached,
2061 "task still marked detached even when marker exists"
2062 );
2063 assert_eq!(
2068 state.metadata.status,
2069 BgTaskStatus::Running,
2070 "reap_child must defer to poll_task when marker exists"
2071 );
2072 }
2073
2074 #[test]
2075 fn cleanup_finished_keeps_running_tasks() {
2076 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2077 let dir = tempfile::tempdir().unwrap();
2078 let task_id = registry
2079 .spawn(
2080 LONG_RUNNING_COMMAND,
2081 "session".to_string(),
2082 dir.path().to_path_buf(),
2083 HashMap::new(),
2084 Some(Duration::from_secs(30)),
2085 dir.path().to_path_buf(),
2086 10,
2087 true,
2088 false,
2089 Some(dir.path().to_path_buf()),
2090 )
2091 .unwrap();
2092
2093 registry.cleanup_finished(Duration::ZERO);
2094
2095 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2096 let _ = registry.kill(&task_id, "session");
2097 }
2098
2099 #[cfg(windows)]
2100 fn wait_for_file(path: &Path) -> String {
2101 let started = Instant::now();
2102 loop {
2103 if path.exists() {
2104 return fs::read_to_string(path).expect("read file");
2105 }
2106 assert!(
2107 started.elapsed() < Duration::from_secs(30),
2108 "timed out waiting for {}",
2109 path.display()
2110 );
2111 std::thread::sleep(Duration::from_millis(100));
2112 }
2113 }
2114
2115 #[cfg(windows)]
2116 fn spawn_windows_registry_command(
2117 command: &str,
2118 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2119 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2120 let dir = tempfile::tempdir().unwrap();
2121 let task_id = registry
2122 .spawn(
2123 command,
2124 "session".to_string(),
2125 dir.path().to_path_buf(),
2126 HashMap::new(),
2127 Some(Duration::from_secs(30)),
2128 dir.path().to_path_buf(),
2129 10,
2130 false,
2131 false,
2132 Some(dir.path().to_path_buf()),
2133 )
2134 .unwrap();
2135 (registry, dir, task_id)
2136 }
2137
2138 #[cfg(windows)]
2139 #[test]
2140 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2141 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2142 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2143
2144 let content = wait_for_file(&exit_path);
2145
2146 assert_eq!(content.trim(), "0");
2147 }
2148
2149 #[cfg(windows)]
2150 #[test]
2151 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2152 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2153 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2154
2155 let content = wait_for_file(&exit_path);
2156
2157 assert_eq!(content.trim(), "42");
2158 }
2159
2160 #[cfg(windows)]
2161 #[test]
2162 fn windows_spawn_captures_stdout_to_disk() {
2163 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2164 let task = registry.task_for_session(&task_id, "session").unwrap();
2165 let stdout_path = task.paths.stdout.clone();
2166 let exit_path = task.paths.exit.clone();
2167
2168 let _ = wait_for_file(&exit_path);
2169 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2170
2171 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2172 }
2173
2174 #[cfg(windows)]
2175 #[test]
2176 fn windows_spawn_uses_pwsh_when_available() {
2177 let candidates = crate::windows_shell::shell_candidates_with(
2181 |binary| match binary {
2182 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2183 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2184 _ => None,
2185 },
2186 || None,
2187 );
2188 let shell = candidates.first().expect("at least one candidate").clone();
2189 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2190 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2191 }
2192
2193 #[cfg(windows)]
2201 #[test]
2202 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2203 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2204 let script =
2205 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2206
2207 assert!(
2211 script.contains("& echo !ERRORLEVEL! >"),
2212 "wrapper must use delayed expansion: {script}"
2213 );
2214 assert!(
2215 !script.contains("%ERRORLEVEL%"),
2216 "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
2217 );
2218 assert!(script.contains("& move /Y"));
2219 assert!(
2222 script.contains("> nul"),
2223 "wrapper must redirect move output to nul: {script}"
2224 );
2225 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2226 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2227 }
2228
2229 #[cfg(windows)]
2234 #[test]
2235 fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
2236 use crate::windows_shell::WindowsShell;
2237 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2238 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2239 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2240 assert_eq!(
2241 args_strs,
2242 vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
2243 "Cmd::bg_command must prepend /V:ON /D /S /C"
2244 );
2245 }
2246
2247 #[cfg(windows)]
2251 #[test]
2252 fn windows_shell_pwsh_bg_command_uses_standard_args() {
2253 use crate::windows_shell::WindowsShell;
2254 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2255 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2256 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2257 assert!(
2258 args_strs.contains(&"-Command"),
2259 "Pwsh::bg_command must use -Command: {args_strs:?}"
2260 );
2261 assert!(
2262 args_strs.contains(&"Get-Date"),
2263 "Pwsh::bg_command must include the user command body"
2264 );
2265 }
2266
2267 #[allow(dead_code)]
2298 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2300}