1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50 pub task_id: String,
51 #[serde(skip_serializing)]
54 pub session_id: String,
55 pub status: BgTaskStatus,
56 pub exit_code: Option<i32>,
57 pub command: String,
58 #[serde(default, skip_serializing_if = "String::is_empty")]
64 pub output_preview: String,
65 #[serde(default, skip_serializing_if = "is_false")]
70 pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74 !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79 #[serde(flatten)]
80 pub info: BgTaskInfo,
81 pub exit_code: Option<i32>,
82 pub child_pid: Option<u32>,
83 pub workdir: String,
84 pub output_preview: String,
85 pub output_truncated: bool,
86 pub output_path: Option<String>,
87 pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92 pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98 pub(crate) progress_sender: SharedProgressSender,
99 watchdog_started: AtomicBool,
100 pub(crate) shutdown: AtomicBool,
101}
102
103pub(crate) struct BgTask {
104 pub(crate) task_id: String,
105 pub(crate) session_id: String,
106 pub(crate) paths: TaskPaths,
107 pub(crate) started: Instant,
108 pub(crate) terminal_at: Mutex<Option<Instant>>,
109 pub(crate) state: Mutex<BgTaskState>,
110}
111
112pub(crate) struct BgTaskState {
113 pub(crate) metadata: PersistedTask,
114 pub(crate) child: Option<Child>,
115 pub(crate) detached: bool,
116 pub(crate) buffer: BgBuffer,
117}
118
119impl BgTaskRegistry {
120 pub fn new(progress_sender: SharedProgressSender) -> Self {
121 Self {
122 inner: Arc::new(RegistryInner {
123 tasks: Mutex::new(HashMap::new()),
124 completions: Mutex::new(VecDeque::new()),
125 progress_sender,
126 watchdog_started: AtomicBool::new(false),
127 shutdown: AtomicBool::new(false),
128 }),
129 }
130 }
131
132 #[cfg(unix)]
133 pub fn spawn(
134 &self,
135 command: &str,
136 session_id: String,
137 workdir: PathBuf,
138 env: HashMap<String, String>,
139 timeout: Option<Duration>,
140 storage_dir: PathBuf,
141 max_running: usize,
142 ) -> Result<String, String> {
143 self.start_watchdog();
144
145 let running = self.running_count();
146 if running >= max_running {
147 return Err(format!(
148 "background bash task limit exceeded: {running} running (max {max_running})"
149 ));
150 }
151
152 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
153 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
154 let task_id = self.generate_unique_task_id()?;
155 let paths = task_paths(&storage_dir, &session_id, &task_id);
156 fs::create_dir_all(&paths.dir)
157 .map_err(|e| format!("failed to create background task dir: {e}"))?;
158
159 let mut metadata = PersistedTask::starting(
160 task_id.clone(),
161 session_id.clone(),
162 command.to_string(),
163 workdir.clone(),
164 timeout_ms,
165 );
166 write_task(&paths.json, &metadata)
167 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
168
169 create_capture_file(&paths.stdout)
173 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
174 create_capture_file(&paths.stderr)
175 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
176
177 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
178
179 let child_pid = child.id();
180 metadata.mark_running(child_pid, child_pid as i32);
181 write_task(&paths.json, &metadata)
182 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
183
184 let task = Arc::new(BgTask {
185 task_id: task_id.clone(),
186 session_id,
187 paths: paths.clone(),
188 started: Instant::now(),
189 terminal_at: Mutex::new(None),
190 state: Mutex::new(BgTaskState {
191 metadata,
192 child: Some(child),
193 detached: false,
194 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
195 }),
196 });
197
198 self.inner
199 .tasks
200 .lock()
201 .map_err(|_| "background task registry lock poisoned".to_string())?
202 .insert(task_id.clone(), task);
203
204 Ok(task_id)
205 }
206
207 #[cfg(windows)]
208 pub fn spawn(
209 &self,
210 command: &str,
211 session_id: String,
212 workdir: PathBuf,
213 env: HashMap<String, String>,
214 timeout: Option<Duration>,
215 storage_dir: PathBuf,
216 max_running: usize,
217 ) -> Result<String, String> {
218 self.start_watchdog();
219
220 let running = self.running_count();
221 if running >= max_running {
222 return Err(format!(
223 "background bash task limit exceeded: {running} running (max {max_running})"
224 ));
225 }
226
227 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
228 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
229 let task_id = self.generate_unique_task_id()?;
230 let paths = task_paths(&storage_dir, &session_id, &task_id);
231 fs::create_dir_all(&paths.dir)
232 .map_err(|e| format!("failed to create background task dir: {e}"))?;
233
234 let mut metadata = PersistedTask::starting(
235 task_id.clone(),
236 session_id.clone(),
237 command.to_string(),
238 workdir.clone(),
239 timeout_ms,
240 );
241 write_task(&paths.json, &metadata)
242 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
243
244 create_capture_file(&paths.stdout)
250 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
251 create_capture_file(&paths.stderr)
252 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
253
254 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
255
256 let child_pid = child.id();
257 metadata.status = BgTaskStatus::Running;
258 metadata.child_pid = Some(child_pid);
259 metadata.pgid = None;
260 write_task(&paths.json, &metadata)
261 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
262
263 let task = Arc::new(BgTask {
264 task_id: task_id.clone(),
265 session_id,
266 paths: paths.clone(),
267 started: Instant::now(),
268 terminal_at: Mutex::new(None),
269 state: Mutex::new(BgTaskState {
270 metadata,
271 child: Some(child),
272 detached: false,
273 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
274 }),
275 });
276
277 self.inner
278 .tasks
279 .lock()
280 .map_err(|_| "background task registry lock poisoned".to_string())?
281 .insert(task_id.clone(), task);
282
283 Ok(task_id)
284 }
285
286 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
287 self.start_watchdog();
288 let dir = session_tasks_dir(storage_dir, session_id);
289 if !dir.exists() {
290 return Ok(());
291 }
292
293 let entries = fs::read_dir(&dir)
294 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
295 for entry in entries.flatten() {
296 let path = entry.path();
297 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
298 continue;
299 }
300 let Ok(mut metadata) = read_task(&path) else {
301 continue;
302 };
303 if metadata.session_id != session_id {
304 continue;
305 }
306
307 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
308 match metadata.status {
309 BgTaskStatus::Starting => {
310 metadata.mark_terminal(
311 BgTaskStatus::Failed,
312 None,
313 Some("spawn aborted".to_string()),
314 );
315 let _ = write_task(&paths.json, &metadata);
316 self.enqueue_completion_if_needed(&metadata, false);
317 }
318 BgTaskStatus::Running => {
319 if self.running_metadata_is_stale(&metadata) {
320 metadata.mark_terminal(
321 BgTaskStatus::Killed,
322 None,
323 Some("orphaned (>24h)".to_string()),
324 );
325 if !paths.exit.exists() {
326 let _ = write_kill_marker_if_absent(&paths.exit);
327 }
328 let _ = write_task(&paths.json, &metadata);
329 self.enqueue_completion_if_needed(&metadata, false);
330 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
331 metadata = terminal_metadata_from_marker(metadata, marker, None);
332 let _ = write_task(&paths.json, &metadata);
333 self.enqueue_completion_if_needed(&metadata, false);
334 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
335 metadata.mark_terminal(
336 BgTaskStatus::Failed,
337 None,
338 Some("process exited without exit marker".to_string()),
339 );
340 let _ = write_task(&paths.json, &metadata);
341 self.enqueue_completion_if_needed(&metadata, false);
342 } else {
343 self.insert_rehydrated_task(metadata, paths, true)?;
344 }
345 }
346 _ if metadata.status.is_terminal() => {
347 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
348 self.enqueue_completion_if_needed(&metadata, false);
349 }
350 _ => {}
351 }
352 }
353
354 Ok(())
355 }
356
357 pub fn status(
358 &self,
359 task_id: &str,
360 session_id: &str,
361 preview_bytes: usize,
362 ) -> Option<BgTaskSnapshot> {
363 let task = self.task_for_session(task_id, session_id)?;
364 let _ = self.poll_task(&task);
365 Some(task.snapshot(preview_bytes))
366 }
367
368 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
369 let tasks = self
370 .inner
371 .tasks
372 .lock()
373 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
374 .unwrap_or_default();
375 tasks
376 .into_iter()
377 .map(|task| {
378 let _ = self.poll_task(&task);
379 task.snapshot(preview_bytes)
380 })
381 .collect()
382 }
383
384 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
385 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
386 }
387
388 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
389 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
390 .map(|_| ())
391 }
392
393 pub fn cleanup_finished(&self, older_than: Duration) {
394 let cutoff = Instant::now().checked_sub(older_than);
395 if let Ok(mut tasks) = self.inner.tasks.lock() {
396 tasks.retain(|_, task| {
397 let is_terminal = task
398 .state
399 .lock()
400 .map(|state| state.metadata.status.is_terminal())
401 .unwrap_or(false);
402 if !is_terminal {
403 return true;
404 }
405
406 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
407 match (terminal_at, cutoff) {
408 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
409 (Some(_), None) => false,
410 (None, _) => true,
411 }
412 });
413 }
414 }
415
416 pub fn drain_completions(&self) -> Vec<BgCompletion> {
417 self.drain_completions_for_session(None)
418 }
419
420 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
421 let mut completions = match self.inner.completions.lock() {
422 Ok(completions) => completions,
423 Err(_) => return Vec::new(),
424 };
425
426 let drained = if let Some(session_id) = session_id {
427 let mut matched = Vec::new();
428 let mut retained = VecDeque::new();
429 while let Some(completion) = completions.pop_front() {
430 if completion.session_id == session_id {
431 matched.push(completion);
432 } else {
433 retained.push_back(completion);
434 }
435 }
436 *completions = retained;
437 matched
438 } else {
439 completions.drain(..).collect()
440 };
441 drop(completions);
442
443 for completion in &drained {
444 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
445 let _ = task.set_completion_delivered(true);
446 }
447 }
448
449 drained
450 }
451
452 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
453 self.inner
454 .completions
455 .lock()
456 .map(|completions| {
457 completions
458 .iter()
459 .filter(|completion| completion.session_id == session_id)
460 .cloned()
461 .collect()
462 })
463 .unwrap_or_default()
464 }
465
466 pub fn detach(&self) {
467 self.inner.shutdown.store(true, Ordering::SeqCst);
468 if let Ok(mut tasks) = self.inner.tasks.lock() {
469 for task in tasks.values() {
470 if let Ok(mut state) = task.state.lock() {
471 state.child = None;
472 state.detached = true;
473 }
474 }
475 tasks.clear();
476 }
477 }
478
479 pub fn shutdown(&self) {
480 let tasks = self
481 .inner
482 .tasks
483 .lock()
484 .map(|tasks| {
485 tasks
486 .values()
487 .map(|task| (task.task_id.clone(), task.session_id.clone()))
488 .collect::<Vec<_>>()
489 })
490 .unwrap_or_default();
491 for (task_id, session_id) in tasks {
492 let _ = self.kill(&task_id, &session_id);
493 }
494 }
495
496 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
497 let marker = match read_exit_marker(&task.paths.exit) {
498 Ok(Some(marker)) => marker,
499 Ok(None) => return Ok(()),
500 Err(error) => return Err(format!("failed to read exit marker: {error}")),
501 };
502 self.finalize_from_marker(task, marker, None)
503 }
504
505 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
506 let Ok(mut state) = task.state.lock() else {
507 return;
508 };
509 if let Some(child) = state.child.as_mut() {
510 if matches!(child.try_wait(), Ok(Some(_))) {
511 state.child = None;
528 state.detached = true;
529 if state.metadata.status.is_terminal() {
530 return;
531 }
532 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
533 return;
534 }
535 let updated = update_task(&task.paths.json, |metadata| {
536 metadata.mark_terminal(
537 BgTaskStatus::Failed,
538 None,
539 Some("process exited without exit marker".to_string()),
540 );
541 });
542 if let Ok(metadata) = updated {
543 state.metadata = metadata;
544 task.mark_terminal_now();
545 state.buffer.enforce_terminal_cap();
546 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
547 }
548 }
549 }
550 }
551
552 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
553 self.inner
554 .tasks
555 .lock()
556 .map(|tasks| {
557 tasks
558 .values()
559 .filter(|task| task.is_running())
560 .cloned()
561 .collect()
562 })
563 .unwrap_or_default()
564 }
565
566 fn insert_rehydrated_task(
567 &self,
568 metadata: PersistedTask,
569 paths: TaskPaths,
570 detached: bool,
571 ) -> Result<(), String> {
572 let task_id = metadata.task_id.clone();
573 let session_id = metadata.session_id.clone();
574 let task = Arc::new(BgTask {
575 task_id: task_id.clone(),
576 session_id,
577 paths: paths.clone(),
578 started: Instant::now(),
579 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
580 state: Mutex::new(BgTaskState {
581 metadata,
582 child: None,
583 detached,
584 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
585 }),
586 });
587 self.inner
588 .tasks
589 .lock()
590 .map_err(|_| "background task registry lock poisoned".to_string())?
591 .insert(task_id, task);
592 Ok(())
593 }
594
595 fn kill_with_status(
596 &self,
597 task_id: &str,
598 session_id: &str,
599 terminal_status: BgTaskStatus,
600 ) -> Result<BgTaskSnapshot, String> {
601 let task = self
602 .task_for_session(task_id, session_id)
603 .ok_or_else(|| format!("background task not found: {task_id}"))?;
604
605 {
606 let mut state = task
607 .state
608 .lock()
609 .map_err(|_| "background task lock poisoned".to_string())?;
610 if state.metadata.status.is_terminal() {
611 return Ok(task.snapshot_locked(&state, 5 * 1024));
612 }
613
614 state.metadata.status = BgTaskStatus::Killing;
615 write_task(&task.paths.json, &state.metadata)
616 .map_err(|e| format!("failed to persist killing state: {e}"))?;
617
618 #[cfg(unix)]
619 if let Some(pgid) = state.metadata.pgid {
620 terminate_pgid(pgid, state.child.as_mut());
621 }
622 #[cfg(windows)]
623 if let Some(child) = state.child.as_mut() {
624 super::process::terminate_process(child);
625 } else if let Some(pid) = state.metadata.child_pid {
626 terminate_pid(pid);
627 }
628 if let Some(child) = state.child.as_mut() {
629 let _ = child.wait();
630 }
631 state.child = None;
632 state.detached = true;
633
634 if !task.paths.exit.exists() {
635 write_kill_marker_if_absent(&task.paths.exit)
636 .map_err(|e| format!("failed to write kill marker: {e}"))?;
637 }
638
639 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
640 Some(124)
641 } else {
642 None
643 };
644 state
645 .metadata
646 .mark_terminal(terminal_status, exit_code, None);
647 task.mark_terminal_now();
648 write_task(&task.paths.json, &state.metadata)
649 .map_err(|e| format!("failed to persist killed state: {e}"))?;
650 state.buffer.enforce_terminal_cap();
651 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
652 }
653
654 Ok(task.snapshot(5 * 1024))
655 }
656
657 fn finalize_from_marker(
658 &self,
659 task: &Arc<BgTask>,
660 marker: ExitMarker,
661 reason: Option<String>,
662 ) -> Result<(), String> {
663 let mut state = task
664 .state
665 .lock()
666 .map_err(|_| "background task lock poisoned".to_string())?;
667 if state.metadata.status.is_terminal() {
668 return Ok(());
669 }
670
671 let updated = update_task(&task.paths.json, |metadata| {
672 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
673 *metadata = new_metadata;
674 })
675 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
676 state.metadata = updated;
677 task.mark_terminal_now();
678 state.child = None;
679 state.detached = true;
680 state.buffer.enforce_terminal_cap();
681 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
682 Ok(())
683 }
684
685 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
686 if metadata.status.is_terminal() && !metadata.completion_delivered {
687 self.enqueue_completion_locked(metadata, None, emit_frame);
688 }
689 }
690
691 fn enqueue_completion_locked(
692 &self,
693 metadata: &PersistedTask,
694 buffer: Option<&BgBuffer>,
695 emit_frame: bool,
696 ) {
697 if !metadata.status.is_terminal() || metadata.completion_delivered {
698 return;
699 }
700 let (output_preview, output_truncated) = match buffer {
705 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
706 None => (String::new(), false),
707 };
708 let completion = BgCompletion {
709 task_id: metadata.task_id.clone(),
710 session_id: metadata.session_id.clone(),
711 status: metadata.status.clone(),
712 exit_code: metadata.exit_code,
713 command: metadata.command.clone(),
714 output_preview,
715 output_truncated,
716 };
717 if let Ok(mut completions) = self.inner.completions.lock() {
718 if completions
719 .iter()
720 .any(|completion| completion.task_id == metadata.task_id)
721 {
722 return;
723 }
724 completions.push_back(completion.clone());
725 } else {
726 return;
727 }
728
729 if emit_frame {
730 self.emit_bash_completed(completion);
731 }
732 }
733
734 fn emit_bash_completed(&self, completion: BgCompletion) {
735 let Ok(progress_sender) = self
736 .inner
737 .progress_sender
738 .lock()
739 .map(|sender| sender.clone())
740 else {
741 return;
742 };
743 let Some(sender) = progress_sender.as_ref() else {
744 return;
745 };
746 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
754 completion.task_id,
755 completion.session_id,
756 completion.status,
757 completion.exit_code,
758 completion.command,
759 completion.output_preview,
760 completion.output_truncated,
761 )));
762 }
763
764 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
765 self.inner
766 .tasks
767 .lock()
768 .ok()
769 .and_then(|tasks| tasks.get(task_id).cloned())
770 }
771
772 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
773 self.task(task_id)
774 .filter(|task| task.session_id == session_id)
775 }
776
777 fn running_count(&self) -> usize {
778 self.inner
779 .tasks
780 .lock()
781 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
782 .unwrap_or(0)
783 }
784
785 fn start_watchdog(&self) {
786 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
787 super::watchdog::start(self.clone());
788 }
789 }
790
791 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
792 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
793 }
794
795 #[cfg(test)]
796 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
797 self.task_for_session(task_id, session_id)
798 .map(|task| task.paths.json.clone())
799 }
800
801 #[cfg(test)]
802 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
803 self.task_for_session(task_id, session_id)
804 .map(|task| task.paths.exit.clone())
805 }
806
807 fn generate_unique_task_id(&self) -> Result<String, String> {
809 for _ in 0..32 {
810 let candidate = random_slug();
811 let tasks = self
812 .inner
813 .tasks
814 .lock()
815 .map_err(|_| "background task registry lock poisoned".to_string())?;
816 if tasks.contains_key(&candidate) {
817 continue;
818 }
819 let completions = self
820 .inner
821 .completions
822 .lock()
823 .map_err(|_| "background completions lock poisoned".to_string())?;
824 if completions
825 .iter()
826 .any(|completion| completion.task_id == candidate)
827 {
828 continue;
829 }
830 return Ok(candidate);
831 }
832 Err("failed to allocate unique background task id after 32 attempts".to_string())
833 }
834}
835
836impl Default for BgTaskRegistry {
837 fn default() -> Self {
838 Self::new(Arc::new(Mutex::new(None)))
839 }
840}
841
842impl BgTask {
843 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
844 let state = self
845 .state
846 .lock()
847 .unwrap_or_else(|poison| poison.into_inner());
848 self.snapshot_locked(&state, preview_bytes)
849 }
850
851 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
852 let metadata = &state.metadata;
853 let duration_ms = metadata.duration_ms.or_else(|| {
854 metadata
855 .status
856 .is_terminal()
857 .then(|| self.started.elapsed().as_millis() as u64)
858 });
859 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
860 BgTaskSnapshot {
861 info: BgTaskInfo {
862 task_id: self.task_id.clone(),
863 status: metadata.status.clone(),
864 command: metadata.command.clone(),
865 started_at: metadata.started_at,
866 duration_ms,
867 },
868 exit_code: metadata.exit_code,
869 child_pid: metadata.child_pid,
870 workdir: metadata.workdir.display().to_string(),
871 output_preview,
872 output_truncated,
873 output_path: state
874 .buffer
875 .output_path()
876 .map(|path| path.display().to_string()),
877 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
878 }
879 }
880
881 pub(crate) fn is_running(&self) -> bool {
882 self.state
883 .lock()
884 .map(|state| state.metadata.status == BgTaskStatus::Running)
885 .unwrap_or(false)
886 }
887
888 fn mark_terminal_now(&self) {
889 if let Ok(mut terminal_at) = self.terminal_at.lock() {
890 if terminal_at.is_none() {
891 *terminal_at = Some(Instant::now());
892 }
893 }
894 }
895
896 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
897 let mut state = self
898 .state
899 .lock()
900 .map_err(|_| "background task lock poisoned".to_string())?;
901 let updated = update_task(&self.paths.json, |metadata| {
902 metadata.completion_delivered = delivered;
903 })
904 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
905 state.metadata = updated;
906 Ok(())
907 }
908}
909
910fn terminal_metadata_from_marker(
911 mut metadata: PersistedTask,
912 marker: ExitMarker,
913 reason: Option<String>,
914) -> PersistedTask {
915 match marker {
916 ExitMarker::Code(code) => {
917 let status = if code == 0 {
918 BgTaskStatus::Completed
919 } else {
920 BgTaskStatus::Failed
921 };
922 metadata.mark_terminal(status, Some(code), reason);
923 }
924 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
925 }
926 metadata
927}
928
929#[cfg(unix)]
930fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
931 let mut cmd = Command::new("/bin/sh");
932 cmd.arg("-c")
933 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
934 .arg("/bin/sh")
935 .arg(command)
936 .arg(exit_path);
937 unsafe {
938 cmd.pre_exec(|| {
939 if libc::setsid() == -1 {
940 return Err(std::io::Error::last_os_error());
941 }
942 Ok(())
943 });
944 }
945 cmd
946}
947
948#[cfg(windows)]
949fn detached_shell_command_for(
950 shell: crate::windows_shell::WindowsShell,
951 command: &str,
952 exit_path: &Path,
953 paths: &TaskPaths,
954 creation_flags: u32,
955) -> Result<Command, String> {
956 use crate::windows_shell::WindowsShell;
957 let wrapper_body = shell.wrapper_script(command, exit_path);
970 let wrapper_ext = match shell {
971 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
972 WindowsShell::Cmd => "bat",
973 WindowsShell::Posix(_) => "sh",
977 };
978 let wrapper_path = paths.dir.join(format!(
979 "{}.{}",
980 paths
981 .json
982 .file_stem()
983 .and_then(|s| s.to_str())
984 .unwrap_or("wrapper"),
985 wrapper_ext
986 ));
987 fs::write(&wrapper_path, wrapper_body)
988 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
989
990 let mut cmd = Command::new(shell.binary().as_ref());
991 match shell {
992 WindowsShell::Pwsh | WindowsShell::Powershell => {
993 cmd.args([
996 "-NoLogo",
997 "-NoProfile",
998 "-NonInteractive",
999 "-ExecutionPolicy",
1000 "Bypass",
1001 "-File",
1002 ]);
1003 cmd.arg(&wrapper_path);
1004 }
1005 WindowsShell::Cmd => {
1006 cmd.args(["/V:ON", "/D", "/C"]);
1013 cmd.arg(&wrapper_path);
1014 }
1015 WindowsShell::Posix(_) => {
1016 cmd.arg(&wrapper_path);
1021 }
1022 }
1023
1024 cmd.creation_flags(creation_flags);
1028 Ok(cmd)
1029}
1030
1031fn spawn_detached_child(
1047 command: &str,
1048 paths: &TaskPaths,
1049 workdir: &Path,
1050 env: &HashMap<String, String>,
1051) -> Result<std::process::Child, String> {
1052 #[cfg(not(windows))]
1053 {
1054 let stdout = create_capture_file(&paths.stdout)
1055 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1056 let stderr = create_capture_file(&paths.stderr)
1057 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1058 detached_shell_command(command, &paths.exit)
1059 .current_dir(workdir)
1060 .envs(env)
1061 .stdin(Stdio::null())
1062 .stdout(Stdio::from(stdout))
1063 .stderr(Stdio::from(stderr))
1064 .spawn()
1065 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1066 }
1067 #[cfg(windows)]
1068 {
1069 use crate::windows_shell::{shell_candidates, WindowsShell};
1070 let raw_candidates = shell_candidates();
1088 let mut candidates: Vec<WindowsShell> = Vec::with_capacity(raw_candidates.len());
1089 for shell in &raw_candidates {
1090 if matches!(shell, WindowsShell::Cmd) {
1091 candidates.insert(0, shell.clone());
1092 } else {
1093 candidates.push(shell.clone());
1094 }
1095 }
1096 const FLAG_DETACHED_PROCESS: u32 = 0x0000_0008;
1105 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1106 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1107 let with_breakaway =
1108 FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1109 let without_breakaway = FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP;
1110 let mut last_error: Option<String> = None;
1111 for (idx, shell) in candidates.iter().enumerate() {
1112 for &flags in &[with_breakaway, without_breakaway] {
1116 let stdout = create_capture_file(&paths.stdout)
1118 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1119 let stderr = create_capture_file(&paths.stderr)
1120 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1121 let mut cmd =
1122 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1123 cmd.current_dir(workdir)
1124 .envs(env)
1125 .stdin(Stdio::null())
1126 .stdout(Stdio::from(stdout))
1127 .stderr(Stdio::from(stderr));
1128 match cmd.spawn() {
1129 Ok(child) => {
1130 if idx > 0 {
1131 log::warn!(
1132 "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1133 the cached PATH probe disagreed with runtime spawn — likely PATH \
1134 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1135 shell.binary(),
1136 idx
1137 );
1138 }
1139 if flags == without_breakaway {
1140 log::warn!(
1141 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1142 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1143 Spawned without breakaway; the bg task will be torn down if the \
1144 AFT process group is killed."
1145 );
1146 }
1147 return Ok(child);
1148 }
1149 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1150 log::warn!(
1151 "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1152 shell.binary()
1153 );
1154 last_error = Some(format!("{}: {e}", shell.binary()));
1155 break;
1158 }
1159 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1160 log::warn!(
1162 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1163 Access Denied — retrying {} without breakaway",
1164 shell.binary()
1165 );
1166 last_error = Some(format!("{}: {e}", shell.binary()));
1167 continue;
1168 }
1169 Err(e) => {
1170 return Err(format!(
1171 "failed to spawn background bash command via {}: {e}",
1172 shell.binary()
1173 ));
1174 }
1175 }
1176 }
1177 }
1178 Err(format!(
1179 "failed to spawn background bash command: no Windows shell could be spawned. \
1180 Last error: {}. PATH-probed candidates: {:?}",
1181 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1182 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1183 ))
1184 }
1185}
1186
1187fn random_slug() -> String {
1188 let mut bytes = [0u8; 4];
1189 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1191 let t = SystemTime::now()
1193 .duration_since(UNIX_EPOCH)
1194 .map(|d| d.subsec_nanos())
1195 .unwrap_or(0);
1196 let p = std::process::id();
1197 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1198 });
1199 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1201 format!("bgb-{hex}")
1202}
1203
1204#[cfg(test)]
1205mod tests {
1206 use std::collections::HashMap;
1207 #[cfg(windows)]
1208 use std::fs;
1209 use std::sync::{Arc, Mutex};
1210 use std::time::Duration;
1211 #[cfg(windows)]
1212 use std::time::Instant;
1213
1214 use super::*;
1215
1216 #[cfg(unix)]
1217 const QUICK_SUCCESS_COMMAND: &str = "true";
1218 #[cfg(windows)]
1219 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1220
1221 #[cfg(unix)]
1222 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1223 #[cfg(windows)]
1224 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1225
1226 #[test]
1227 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1228 let registry = BgTaskRegistry::default();
1229 let dir = tempfile::tempdir().unwrap();
1230 let task_id = registry
1231 .spawn(
1232 QUICK_SUCCESS_COMMAND,
1233 "session".to_string(),
1234 dir.path().to_path_buf(),
1235 HashMap::new(),
1236 Some(Duration::from_secs(30)),
1237 dir.path().to_path_buf(),
1238 10,
1239 )
1240 .unwrap();
1241 registry
1242 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1243 .unwrap();
1244
1245 registry.cleanup_finished(Duration::ZERO);
1246
1247 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1248 }
1249
1250 #[test]
1260 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1261 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1262 let dir = tempfile::tempdir().unwrap();
1263 let task_id = registry
1264 .spawn(
1265 QUICK_SUCCESS_COMMAND,
1266 "session".to_string(),
1267 dir.path().to_path_buf(),
1268 HashMap::new(),
1269 Some(Duration::from_secs(30)),
1270 dir.path().to_path_buf(),
1271 10,
1272 )
1273 .unwrap();
1274
1275 let task = registry.task_for_session(&task_id, "session").unwrap();
1276
1277 let started = Instant::now();
1282 loop {
1283 let exited = {
1284 let mut state = task.state.lock().unwrap();
1285 if let Some(child) = state.child.as_mut() {
1286 matches!(child.try_wait(), Ok(Some(_)))
1287 } else {
1288 true
1289 }
1290 };
1291 if exited {
1292 break;
1293 }
1294 assert!(
1295 started.elapsed() < Duration::from_secs(5),
1296 "child should exit quickly"
1297 );
1298 std::thread::sleep(Duration::from_millis(20));
1299 }
1300
1301 let _ = std::fs::remove_file(&task.paths.exit);
1304
1305 assert!(
1308 task.is_running(),
1309 "precondition: metadata.status == Running"
1310 );
1311 assert!(
1312 !task.paths.exit.exists(),
1313 "precondition: exit marker absent"
1314 );
1315
1316 registry.reap_child(&task);
1320
1321 let state = task.state.lock().unwrap();
1322 assert!(
1323 state.metadata.status.is_terminal(),
1324 "reap_child must transition to terminal when PID dead and no marker. \
1325 Got status={:?}",
1326 state.metadata.status
1327 );
1328 assert_eq!(
1329 state.metadata.status,
1330 BgTaskStatus::Failed,
1331 "must specifically be Failed (not Killed): status={:?}",
1332 state.metadata.status
1333 );
1334 assert_eq!(
1335 state.metadata.status_reason.as_deref(),
1336 Some("process exited without exit marker"),
1337 "reason must match replay path's wording: {:?}",
1338 state.metadata.status_reason
1339 );
1340 assert!(
1341 state.child.is_none(),
1342 "child handle must be released after reap"
1343 );
1344 assert!(state.detached, "task must be marked detached after reap");
1345 }
1346
1347 #[test]
1353 fn reap_child_preserves_running_when_exit_marker_exists() {
1354 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1355 let dir = tempfile::tempdir().unwrap();
1356 let task_id = registry
1357 .spawn(
1358 QUICK_SUCCESS_COMMAND,
1359 "session".to_string(),
1360 dir.path().to_path_buf(),
1361 HashMap::new(),
1362 Some(Duration::from_secs(30)),
1363 dir.path().to_path_buf(),
1364 10,
1365 )
1366 .unwrap();
1367
1368 let task = registry.task_for_session(&task_id, "session").unwrap();
1369
1370 let started = Instant::now();
1373 loop {
1374 let exited = {
1375 let mut state = task.state.lock().unwrap();
1376 if let Some(child) = state.child.as_mut() {
1377 matches!(child.try_wait(), Ok(Some(_)))
1378 } else {
1379 true
1380 }
1381 };
1382 if exited && task.paths.exit.exists() {
1383 break;
1384 }
1385 assert!(
1386 started.elapsed() < Duration::from_secs(5),
1387 "child should exit and write marker quickly"
1388 );
1389 std::thread::sleep(Duration::from_millis(20));
1390 }
1391
1392 registry.reap_child(&task);
1396
1397 let state = task.state.lock().unwrap();
1398 assert!(
1399 state.child.is_none(),
1400 "child handle still released even when marker exists"
1401 );
1402 assert!(
1403 state.detached,
1404 "task still marked detached even when marker exists"
1405 );
1406 assert_eq!(
1411 state.metadata.status,
1412 BgTaskStatus::Running,
1413 "reap_child must defer to poll_task when marker exists"
1414 );
1415 }
1416
1417 #[test]
1418 fn cleanup_finished_keeps_running_tasks() {
1419 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1420 let dir = tempfile::tempdir().unwrap();
1421 let task_id = registry
1422 .spawn(
1423 LONG_RUNNING_COMMAND,
1424 "session".to_string(),
1425 dir.path().to_path_buf(),
1426 HashMap::new(),
1427 Some(Duration::from_secs(30)),
1428 dir.path().to_path_buf(),
1429 10,
1430 )
1431 .unwrap();
1432
1433 registry.cleanup_finished(Duration::ZERO);
1434
1435 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1436 let _ = registry.kill(&task_id, "session");
1437 }
1438
1439 #[cfg(windows)]
1440 fn wait_for_file(path: &Path) -> String {
1441 let started = Instant::now();
1442 loop {
1443 if path.exists() {
1444 return fs::read_to_string(path).expect("read file");
1445 }
1446 assert!(
1447 started.elapsed() < Duration::from_secs(30),
1448 "timed out waiting for {}",
1449 path.display()
1450 );
1451 std::thread::sleep(Duration::from_millis(100));
1452 }
1453 }
1454
1455 #[cfg(windows)]
1456 fn spawn_windows_registry_command(
1457 command: &str,
1458 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1459 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1460 let dir = tempfile::tempdir().unwrap();
1461 let task_id = registry
1462 .spawn(
1463 command,
1464 "session".to_string(),
1465 dir.path().to_path_buf(),
1466 HashMap::new(),
1467 Some(Duration::from_secs(30)),
1468 dir.path().to_path_buf(),
1469 10,
1470 )
1471 .unwrap();
1472 (registry, dir, task_id)
1473 }
1474
1475 #[cfg(windows)]
1476 #[test]
1477 fn windows_spawn_writes_exit_marker_for_zero_exit() {
1478 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1479 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1480
1481 let content = wait_for_file(&exit_path);
1482
1483 assert_eq!(content.trim(), "0");
1484 }
1485
1486 #[cfg(windows)]
1487 #[test]
1488 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1489 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1490 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1491
1492 let content = wait_for_file(&exit_path);
1493
1494 assert_eq!(content.trim(), "42");
1495 }
1496
1497 #[cfg(windows)]
1498 #[test]
1499 fn windows_spawn_captures_stdout_to_disk() {
1500 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1501 let task = registry.task_for_session(&task_id, "session").unwrap();
1502 let stdout_path = task.paths.stdout.clone();
1503 let exit_path = task.paths.exit.clone();
1504
1505 let _ = wait_for_file(&exit_path);
1506 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1507
1508 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1509 }
1510
1511 #[cfg(windows)]
1512 #[test]
1513 fn windows_spawn_uses_pwsh_when_available() {
1514 let candidates = crate::windows_shell::shell_candidates_with(
1518 |binary| match binary {
1519 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
1520 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
1521 _ => None,
1522 },
1523 || None,
1524 );
1525 let shell = candidates.first().expect("at least one candidate").clone();
1526 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1527 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
1528 }
1529
1530 #[cfg(windows)]
1538 #[test]
1539 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1540 let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1541 let script =
1542 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1543
1544 assert!(
1548 script.contains("& echo !ERRORLEVEL! >"),
1549 "wrapper must use delayed expansion: {script}"
1550 );
1551 assert!(
1552 !script.contains("%ERRORLEVEL%"),
1553 "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1554 );
1555 assert!(script.contains("& move /Y"));
1556 assert!(
1559 script.contains("> nul"),
1560 "wrapper must redirect move output to nul: {script}"
1561 );
1562 assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1563 assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1564 }
1565
1566 #[cfg(windows)]
1571 #[test]
1572 fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1573 use crate::windows_shell::WindowsShell;
1574 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1575 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1576 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1577 assert_eq!(
1578 args_strs,
1579 vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1580 "Cmd::bg_command must prepend /V:ON /D /S /C"
1581 );
1582 }
1583
1584 #[cfg(windows)]
1588 #[test]
1589 fn windows_shell_pwsh_bg_command_uses_standard_args() {
1590 use crate::windows_shell::WindowsShell;
1591 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1592 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1593 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1594 assert!(
1595 args_strs.contains(&"-Command"),
1596 "Pwsh::bg_command must use -Command: {args_strs:?}"
1597 );
1598 assert!(
1599 args_strs.contains(&"Get-Date"),
1600 "Pwsh::bg_command must include the user command body"
1601 );
1602 }
1603
1604 #[allow(dead_code)]
1635 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1637}