1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50 pub task_id: String,
51 #[serde(skip_serializing)]
54 pub session_id: String,
55 pub status: BgTaskStatus,
56 pub exit_code: Option<i32>,
57 pub command: String,
58 #[serde(default, skip_serializing_if = "String::is_empty")]
64 pub output_preview: String,
65 #[serde(default, skip_serializing_if = "is_false")]
70 pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74 !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79 #[serde(flatten)]
80 pub info: BgTaskInfo,
81 pub exit_code: Option<i32>,
82 pub child_pid: Option<u32>,
83 pub workdir: String,
84 pub output_preview: String,
85 pub output_truncated: bool,
86 pub output_path: Option<String>,
87 pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92 pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98 pub(crate) progress_sender: SharedProgressSender,
99 watchdog_started: AtomicBool,
100 pub(crate) shutdown: AtomicBool,
101}
102
103pub(crate) struct BgTask {
104 pub(crate) task_id: String,
105 pub(crate) session_id: String,
106 pub(crate) paths: TaskPaths,
107 pub(crate) started: Instant,
108 pub(crate) terminal_at: Mutex<Option<Instant>>,
109 pub(crate) state: Mutex<BgTaskState>,
110}
111
112pub(crate) struct BgTaskState {
113 pub(crate) metadata: PersistedTask,
114 pub(crate) child: Option<Child>,
115 pub(crate) detached: bool,
116 pub(crate) buffer: BgBuffer,
117}
118
119impl BgTaskRegistry {
120 pub fn new(progress_sender: SharedProgressSender) -> Self {
121 Self {
122 inner: Arc::new(RegistryInner {
123 tasks: Mutex::new(HashMap::new()),
124 completions: Mutex::new(VecDeque::new()),
125 progress_sender,
126 watchdog_started: AtomicBool::new(false),
127 shutdown: AtomicBool::new(false),
128 }),
129 }
130 }
131
132 #[cfg(unix)]
133 pub fn spawn(
134 &self,
135 command: &str,
136 session_id: String,
137 workdir: PathBuf,
138 env: HashMap<String, String>,
139 timeout: Option<Duration>,
140 storage_dir: PathBuf,
141 max_running: usize,
142 ) -> Result<String, String> {
143 self.start_watchdog();
144
145 let running = self.running_count();
146 if running >= max_running {
147 return Err(format!(
148 "background bash task limit exceeded: {running} running (max {max_running})"
149 ));
150 }
151
152 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
153 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
154 let task_id = self.generate_unique_task_id()?;
155 let paths = task_paths(&storage_dir, &session_id, &task_id);
156 fs::create_dir_all(&paths.dir)
157 .map_err(|e| format!("failed to create background task dir: {e}"))?;
158
159 let mut metadata = PersistedTask::starting(
160 task_id.clone(),
161 session_id.clone(),
162 command.to_string(),
163 workdir.clone(),
164 timeout_ms,
165 );
166 write_task(&paths.json, &metadata)
167 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
168
169 create_capture_file(&paths.stdout)
173 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
174 create_capture_file(&paths.stderr)
175 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
176
177 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
178
179 let child_pid = child.id();
180 metadata.mark_running(child_pid, child_pid as i32);
181 write_task(&paths.json, &metadata)
182 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
183
184 let task = Arc::new(BgTask {
185 task_id: task_id.clone(),
186 session_id,
187 paths: paths.clone(),
188 started: Instant::now(),
189 terminal_at: Mutex::new(None),
190 state: Mutex::new(BgTaskState {
191 metadata,
192 child: Some(child),
193 detached: false,
194 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
195 }),
196 });
197
198 self.inner
199 .tasks
200 .lock()
201 .map_err(|_| "background task registry lock poisoned".to_string())?
202 .insert(task_id.clone(), task);
203
204 Ok(task_id)
205 }
206
207 #[cfg(windows)]
208 pub fn spawn(
209 &self,
210 command: &str,
211 session_id: String,
212 workdir: PathBuf,
213 env: HashMap<String, String>,
214 timeout: Option<Duration>,
215 storage_dir: PathBuf,
216 max_running: usize,
217 ) -> Result<String, String> {
218 self.start_watchdog();
219
220 let running = self.running_count();
221 if running >= max_running {
222 return Err(format!(
223 "background bash task limit exceeded: {running} running (max {max_running})"
224 ));
225 }
226
227 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
228 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
229 let task_id = self.generate_unique_task_id()?;
230 let paths = task_paths(&storage_dir, &session_id, &task_id);
231 fs::create_dir_all(&paths.dir)
232 .map_err(|e| format!("failed to create background task dir: {e}"))?;
233
234 let mut metadata = PersistedTask::starting(
235 task_id.clone(),
236 session_id.clone(),
237 command.to_string(),
238 workdir.clone(),
239 timeout_ms,
240 );
241 write_task(&paths.json, &metadata)
242 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
243
244 create_capture_file(&paths.stdout)
250 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
251 create_capture_file(&paths.stderr)
252 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
253
254 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
255
256 let child_pid = child.id();
257 metadata.status = BgTaskStatus::Running;
258 metadata.child_pid = Some(child_pid);
259 metadata.pgid = None;
260 write_task(&paths.json, &metadata)
261 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
262
263 let task = Arc::new(BgTask {
264 task_id: task_id.clone(),
265 session_id,
266 paths: paths.clone(),
267 started: Instant::now(),
268 terminal_at: Mutex::new(None),
269 state: Mutex::new(BgTaskState {
270 metadata,
271 child: Some(child),
272 detached: false,
273 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
274 }),
275 });
276
277 self.inner
278 .tasks
279 .lock()
280 .map_err(|_| "background task registry lock poisoned".to_string())?
281 .insert(task_id.clone(), task);
282
283 Ok(task_id)
284 }
285
286 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
287 self.start_watchdog();
288 let dir = session_tasks_dir(storage_dir, session_id);
289 if !dir.exists() {
290 return Ok(());
291 }
292
293 let entries = fs::read_dir(&dir)
294 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
295 for entry in entries.flatten() {
296 let path = entry.path();
297 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
298 continue;
299 }
300 let Ok(mut metadata) = read_task(&path) else {
301 continue;
302 };
303 if metadata.session_id != session_id {
304 continue;
305 }
306
307 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
308 match metadata.status {
309 BgTaskStatus::Starting => {
310 metadata.mark_terminal(
311 BgTaskStatus::Failed,
312 None,
313 Some("spawn aborted".to_string()),
314 );
315 let _ = write_task(&paths.json, &metadata);
316 self.enqueue_completion_if_needed(&metadata, false);
317 }
318 BgTaskStatus::Running => {
319 if self.running_metadata_is_stale(&metadata) {
320 metadata.mark_terminal(
321 BgTaskStatus::Killed,
322 None,
323 Some("orphaned (>24h)".to_string()),
324 );
325 if !paths.exit.exists() {
326 let _ = write_kill_marker_if_absent(&paths.exit);
327 }
328 let _ = write_task(&paths.json, &metadata);
329 self.enqueue_completion_if_needed(&metadata, false);
330 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
331 metadata = terminal_metadata_from_marker(metadata, marker, None);
332 let _ = write_task(&paths.json, &metadata);
333 self.enqueue_completion_if_needed(&metadata, false);
334 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
335 metadata.mark_terminal(
336 BgTaskStatus::Failed,
337 None,
338 Some("process exited without exit marker".to_string()),
339 );
340 let _ = write_task(&paths.json, &metadata);
341 self.enqueue_completion_if_needed(&metadata, false);
342 } else {
343 self.insert_rehydrated_task(metadata, paths, true)?;
344 }
345 }
346 _ if metadata.status.is_terminal() => {
347 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
348 self.enqueue_completion_if_needed(&metadata, false);
349 }
350 _ => {}
351 }
352 }
353
354 Ok(())
355 }
356
357 pub fn status(
358 &self,
359 task_id: &str,
360 session_id: &str,
361 preview_bytes: usize,
362 ) -> Option<BgTaskSnapshot> {
363 let task = self.task_for_session(task_id, session_id)?;
364 let _ = self.poll_task(&task);
365 Some(task.snapshot(preview_bytes))
366 }
367
368 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
369 let tasks = self
370 .inner
371 .tasks
372 .lock()
373 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
374 .unwrap_or_default();
375 tasks
376 .into_iter()
377 .map(|task| {
378 let _ = self.poll_task(&task);
379 task.snapshot(preview_bytes)
380 })
381 .collect()
382 }
383
384 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
385 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
386 }
387
388 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
389 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
390 .map(|_| ())
391 }
392
393 pub fn cleanup_finished(&self, older_than: Duration) {
394 let cutoff = Instant::now().checked_sub(older_than);
395 if let Ok(mut tasks) = self.inner.tasks.lock() {
396 tasks.retain(|_, task| {
397 let is_terminal = task
398 .state
399 .lock()
400 .map(|state| state.metadata.status.is_terminal())
401 .unwrap_or(false);
402 if !is_terminal {
403 return true;
404 }
405
406 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
407 match (terminal_at, cutoff) {
408 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
409 (Some(_), None) => false,
410 (None, _) => true,
411 }
412 });
413 }
414 }
415
416 pub fn drain_completions(&self) -> Vec<BgCompletion> {
417 self.drain_completions_for_session(None)
418 }
419
420 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
421 let mut completions = match self.inner.completions.lock() {
422 Ok(completions) => completions,
423 Err(_) => return Vec::new(),
424 };
425
426 let drained = if let Some(session_id) = session_id {
427 let mut matched = Vec::new();
428 let mut retained = VecDeque::new();
429 while let Some(completion) = completions.pop_front() {
430 if completion.session_id == session_id {
431 matched.push(completion);
432 } else {
433 retained.push_back(completion);
434 }
435 }
436 *completions = retained;
437 matched
438 } else {
439 completions.drain(..).collect()
440 };
441 drop(completions);
442
443 for completion in &drained {
444 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
445 let _ = task.set_completion_delivered(true);
446 }
447 }
448
449 drained
450 }
451
452 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
453 self.inner
454 .completions
455 .lock()
456 .map(|completions| {
457 completions
458 .iter()
459 .filter(|completion| completion.session_id == session_id)
460 .cloned()
461 .collect()
462 })
463 .unwrap_or_default()
464 }
465
466 pub fn detach(&self) {
467 self.inner.shutdown.store(true, Ordering::SeqCst);
468 if let Ok(mut tasks) = self.inner.tasks.lock() {
469 for task in tasks.values() {
470 if let Ok(mut state) = task.state.lock() {
471 state.child = None;
472 state.detached = true;
473 }
474 }
475 tasks.clear();
476 }
477 }
478
479 pub fn shutdown(&self) {
480 let tasks = self
481 .inner
482 .tasks
483 .lock()
484 .map(|tasks| {
485 tasks
486 .values()
487 .map(|task| (task.task_id.clone(), task.session_id.clone()))
488 .collect::<Vec<_>>()
489 })
490 .unwrap_or_default();
491 for (task_id, session_id) in tasks {
492 let _ = self.kill(&task_id, &session_id);
493 }
494 }
495
496 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
497 let marker = match read_exit_marker(&task.paths.exit) {
498 Ok(Some(marker)) => marker,
499 Ok(None) => return Ok(()),
500 Err(error) => return Err(format!("failed to read exit marker: {error}")),
501 };
502 self.finalize_from_marker(task, marker, None)
503 }
504
505 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
506 let Ok(mut state) = task.state.lock() else {
507 return;
508 };
509 if let Some(child) = state.child.as_mut() {
510 if matches!(child.try_wait(), Ok(Some(_))) {
511 state.child = None;
528 state.detached = true;
529 if state.metadata.status.is_terminal() {
530 return;
531 }
532 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
533 return;
534 }
535 let updated = update_task(&task.paths.json, |metadata| {
536 metadata.mark_terminal(
537 BgTaskStatus::Failed,
538 None,
539 Some("process exited without exit marker".to_string()),
540 );
541 });
542 if let Ok(metadata) = updated {
543 state.metadata = metadata;
544 task.mark_terminal_now();
545 state.buffer.enforce_terminal_cap();
546 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
547 }
548 }
549 }
550 }
551
552 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
553 self.inner
554 .tasks
555 .lock()
556 .map(|tasks| {
557 tasks
558 .values()
559 .filter(|task| task.is_running())
560 .cloned()
561 .collect()
562 })
563 .unwrap_or_default()
564 }
565
566 fn insert_rehydrated_task(
567 &self,
568 metadata: PersistedTask,
569 paths: TaskPaths,
570 detached: bool,
571 ) -> Result<(), String> {
572 let task_id = metadata.task_id.clone();
573 let session_id = metadata.session_id.clone();
574 let task = Arc::new(BgTask {
575 task_id: task_id.clone(),
576 session_id,
577 paths: paths.clone(),
578 started: Instant::now(),
579 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
580 state: Mutex::new(BgTaskState {
581 metadata,
582 child: None,
583 detached,
584 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
585 }),
586 });
587 self.inner
588 .tasks
589 .lock()
590 .map_err(|_| "background task registry lock poisoned".to_string())?
591 .insert(task_id, task);
592 Ok(())
593 }
594
595 fn kill_with_status(
596 &self,
597 task_id: &str,
598 session_id: &str,
599 terminal_status: BgTaskStatus,
600 ) -> Result<BgTaskSnapshot, String> {
601 let task = self
602 .task_for_session(task_id, session_id)
603 .ok_or_else(|| format!("background task not found: {task_id}"))?;
604
605 {
606 let mut state = task
607 .state
608 .lock()
609 .map_err(|_| "background task lock poisoned".to_string())?;
610 if state.metadata.status.is_terminal() {
611 return Ok(task.snapshot_locked(&state, 5 * 1024));
612 }
613
614 state.metadata.status = BgTaskStatus::Killing;
615 write_task(&task.paths.json, &state.metadata)
616 .map_err(|e| format!("failed to persist killing state: {e}"))?;
617
618 #[cfg(unix)]
619 if let Some(pgid) = state.metadata.pgid {
620 terminate_pgid(pgid, state.child.as_mut());
621 }
622 #[cfg(windows)]
623 if let Some(child) = state.child.as_mut() {
624 super::process::terminate_process(child);
625 } else if let Some(pid) = state.metadata.child_pid {
626 terminate_pid(pid);
627 }
628 if let Some(child) = state.child.as_mut() {
629 let _ = child.wait();
630 }
631 state.child = None;
632 state.detached = true;
633
634 if !task.paths.exit.exists() {
635 write_kill_marker_if_absent(&task.paths.exit)
636 .map_err(|e| format!("failed to write kill marker: {e}"))?;
637 }
638
639 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
640 Some(124)
641 } else {
642 None
643 };
644 state
645 .metadata
646 .mark_terminal(terminal_status, exit_code, None);
647 task.mark_terminal_now();
648 write_task(&task.paths.json, &state.metadata)
649 .map_err(|e| format!("failed to persist killed state: {e}"))?;
650 state.buffer.enforce_terminal_cap();
651 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
652 }
653
654 Ok(task.snapshot(5 * 1024))
655 }
656
657 fn finalize_from_marker(
658 &self,
659 task: &Arc<BgTask>,
660 marker: ExitMarker,
661 reason: Option<String>,
662 ) -> Result<(), String> {
663 let mut state = task
664 .state
665 .lock()
666 .map_err(|_| "background task lock poisoned".to_string())?;
667 if state.metadata.status.is_terminal() {
668 return Ok(());
669 }
670
671 let updated = update_task(&task.paths.json, |metadata| {
672 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
673 *metadata = new_metadata;
674 })
675 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
676 state.metadata = updated;
677 task.mark_terminal_now();
678 state.child = None;
679 state.detached = true;
680 state.buffer.enforce_terminal_cap();
681 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
682 Ok(())
683 }
684
685 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
686 if metadata.status.is_terminal() && !metadata.completion_delivered {
687 self.enqueue_completion_locked(metadata, None, emit_frame);
688 }
689 }
690
691 fn enqueue_completion_locked(
692 &self,
693 metadata: &PersistedTask,
694 buffer: Option<&BgBuffer>,
695 emit_frame: bool,
696 ) {
697 if !metadata.status.is_terminal() || metadata.completion_delivered {
698 return;
699 }
700 let (output_preview, output_truncated) = match buffer {
705 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
706 None => (String::new(), false),
707 };
708 let completion = BgCompletion {
709 task_id: metadata.task_id.clone(),
710 session_id: metadata.session_id.clone(),
711 status: metadata.status.clone(),
712 exit_code: metadata.exit_code,
713 command: metadata.command.clone(),
714 output_preview,
715 output_truncated,
716 };
717 if let Ok(mut completions) = self.inner.completions.lock() {
718 if completions
719 .iter()
720 .any(|completion| completion.task_id == metadata.task_id)
721 {
722 return;
723 }
724 completions.push_back(completion.clone());
725 } else {
726 return;
727 }
728
729 if emit_frame {
730 self.emit_bash_completed(completion);
731 }
732 }
733
734 fn emit_bash_completed(&self, completion: BgCompletion) {
735 let Ok(progress_sender) = self
736 .inner
737 .progress_sender
738 .lock()
739 .map(|sender| sender.clone())
740 else {
741 return;
742 };
743 let Some(sender) = progress_sender.as_ref() else {
744 return;
745 };
746 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
754 completion.task_id,
755 completion.session_id,
756 completion.status,
757 completion.exit_code,
758 completion.command,
759 completion.output_preview,
760 completion.output_truncated,
761 )));
762 }
763
764 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
765 self.inner
766 .tasks
767 .lock()
768 .ok()
769 .and_then(|tasks| tasks.get(task_id).cloned())
770 }
771
772 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
773 self.task(task_id)
774 .filter(|task| task.session_id == session_id)
775 }
776
777 fn running_count(&self) -> usize {
778 self.inner
779 .tasks
780 .lock()
781 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
782 .unwrap_or(0)
783 }
784
785 fn start_watchdog(&self) {
786 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
787 super::watchdog::start(self.clone());
788 }
789 }
790
791 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
792 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
793 }
794
795 #[cfg(test)]
796 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
797 self.task_for_session(task_id, session_id)
798 .map(|task| task.paths.json.clone())
799 }
800
801 #[cfg(test)]
802 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
803 self.task_for_session(task_id, session_id)
804 .map(|task| task.paths.exit.clone())
805 }
806
807 fn generate_unique_task_id(&self) -> Result<String, String> {
809 for _ in 0..32 {
810 let candidate = random_slug();
811 let tasks = self
812 .inner
813 .tasks
814 .lock()
815 .map_err(|_| "background task registry lock poisoned".to_string())?;
816 if tasks.contains_key(&candidate) {
817 continue;
818 }
819 let completions = self
820 .inner
821 .completions
822 .lock()
823 .map_err(|_| "background completions lock poisoned".to_string())?;
824 if completions
825 .iter()
826 .any(|completion| completion.task_id == candidate)
827 {
828 continue;
829 }
830 return Ok(candidate);
831 }
832 Err("failed to allocate unique background task id after 32 attempts".to_string())
833 }
834}
835
836impl Default for BgTaskRegistry {
837 fn default() -> Self {
838 Self::new(Arc::new(Mutex::new(None)))
839 }
840}
841
842impl BgTask {
843 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
844 let state = self
845 .state
846 .lock()
847 .unwrap_or_else(|poison| poison.into_inner());
848 self.snapshot_locked(&state, preview_bytes)
849 }
850
851 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
852 let metadata = &state.metadata;
853 let duration_ms = metadata.duration_ms.or_else(|| {
854 metadata
855 .status
856 .is_terminal()
857 .then(|| self.started.elapsed().as_millis() as u64)
858 });
859 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
860 BgTaskSnapshot {
861 info: BgTaskInfo {
862 task_id: self.task_id.clone(),
863 status: metadata.status.clone(),
864 command: metadata.command.clone(),
865 started_at: metadata.started_at,
866 duration_ms,
867 },
868 exit_code: metadata.exit_code,
869 child_pid: metadata.child_pid,
870 workdir: metadata.workdir.display().to_string(),
871 output_preview,
872 output_truncated,
873 output_path: state
874 .buffer
875 .output_path()
876 .map(|path| path.display().to_string()),
877 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
878 }
879 }
880
881 pub(crate) fn is_running(&self) -> bool {
882 self.state
883 .lock()
884 .map(|state| state.metadata.status == BgTaskStatus::Running)
885 .unwrap_or(false)
886 }
887
888 fn mark_terminal_now(&self) {
889 if let Ok(mut terminal_at) = self.terminal_at.lock() {
890 if terminal_at.is_none() {
891 *terminal_at = Some(Instant::now());
892 }
893 }
894 }
895
896 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
897 let mut state = self
898 .state
899 .lock()
900 .map_err(|_| "background task lock poisoned".to_string())?;
901 let updated = update_task(&self.paths.json, |metadata| {
902 metadata.completion_delivered = delivered;
903 })
904 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
905 state.metadata = updated;
906 Ok(())
907 }
908}
909
910fn terminal_metadata_from_marker(
911 mut metadata: PersistedTask,
912 marker: ExitMarker,
913 reason: Option<String>,
914) -> PersistedTask {
915 match marker {
916 ExitMarker::Code(code) => {
917 let status = if code == 0 {
918 BgTaskStatus::Completed
919 } else {
920 BgTaskStatus::Failed
921 };
922 metadata.mark_terminal(status, Some(code), reason);
923 }
924 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
925 }
926 metadata
927}
928
929#[cfg(unix)]
930fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
931 let mut cmd = Command::new("/bin/sh");
932 cmd.arg("-c")
933 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
934 .arg("/bin/sh")
935 .arg(command)
936 .arg(exit_path);
937 unsafe {
938 cmd.pre_exec(|| {
939 if libc::setsid() == -1 {
940 return Err(std::io::Error::last_os_error());
941 }
942 Ok(())
943 });
944 }
945 cmd
946}
947
948#[cfg(windows)]
949fn detached_shell_command_for(
950 shell: crate::windows_shell::WindowsShell,
951 command: &str,
952 exit_path: &Path,
953 paths: &TaskPaths,
954 creation_flags: u32,
955) -> Result<Command, String> {
956 use crate::windows_shell::WindowsShell;
957 let wrapper_body = shell.wrapper_script(command, exit_path);
970 let wrapper_ext = match shell {
971 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
972 WindowsShell::Cmd => "bat",
973 };
974 let wrapper_path = paths.dir.join(format!(
975 "{}.{}",
976 paths
977 .json
978 .file_stem()
979 .and_then(|s| s.to_str())
980 .unwrap_or("wrapper"),
981 wrapper_ext
982 ));
983 fs::write(&wrapper_path, wrapper_body)
984 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
985
986 let mut cmd = Command::new(shell.binary());
987 match shell {
988 WindowsShell::Pwsh | WindowsShell::Powershell => {
989 cmd.args([
992 "-NoLogo",
993 "-NoProfile",
994 "-NonInteractive",
995 "-ExecutionPolicy",
996 "Bypass",
997 "-File",
998 ]);
999 cmd.arg(&wrapper_path);
1000 }
1001 WindowsShell::Cmd => {
1002 cmd.args(["/V:ON", "/D", "/C"]);
1009 cmd.arg(&wrapper_path);
1010 }
1011 }
1012
1013 cmd.creation_flags(creation_flags);
1017 Ok(cmd)
1018}
1019
1020fn spawn_detached_child(
1036 command: &str,
1037 paths: &TaskPaths,
1038 workdir: &Path,
1039 env: &HashMap<String, String>,
1040) -> Result<std::process::Child, String> {
1041 #[cfg(not(windows))]
1042 {
1043 let stdout = create_capture_file(&paths.stdout)
1044 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1045 let stderr = create_capture_file(&paths.stderr)
1046 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1047 detached_shell_command(command, &paths.exit)
1048 .current_dir(workdir)
1049 .envs(env)
1050 .stdin(Stdio::null())
1051 .stdout(Stdio::from(stdout))
1052 .stderr(Stdio::from(stderr))
1053 .spawn()
1054 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1055 }
1056 #[cfg(windows)]
1057 {
1058 use crate::windows_shell::{shell_candidates, WindowsShell};
1059 let raw_candidates = shell_candidates();
1077 let mut candidates: Vec<WindowsShell> = Vec::with_capacity(raw_candidates.len());
1078 for shell in &raw_candidates {
1079 if *shell == WindowsShell::Cmd {
1080 candidates.insert(0, *shell);
1081 } else {
1082 candidates.push(*shell);
1083 }
1084 }
1085 const FLAG_DETACHED_PROCESS: u32 = 0x0000_0008;
1094 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1095 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1096 let with_breakaway =
1097 FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1098 let without_breakaway = FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP;
1099 let mut last_error: Option<String> = None;
1100 for (idx, shell) in candidates.iter().enumerate() {
1101 for &flags in &[with_breakaway, without_breakaway] {
1105 let stdout = create_capture_file(&paths.stdout)
1107 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1108 let stderr = create_capture_file(&paths.stderr)
1109 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1110 let mut cmd =
1111 detached_shell_command_for(*shell, command, &paths.exit, paths, flags)?;
1112 cmd.current_dir(workdir)
1113 .envs(env)
1114 .stdin(Stdio::null())
1115 .stdout(Stdio::from(stdout))
1116 .stderr(Stdio::from(stderr));
1117 match cmd.spawn() {
1118 Ok(child) => {
1119 if idx > 0 {
1120 log::warn!(
1121 "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1122 the cached PATH probe disagreed with runtime spawn — likely PATH \
1123 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1124 shell.binary(),
1125 idx
1126 );
1127 }
1128 if flags == without_breakaway {
1129 log::warn!(
1130 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1131 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1132 Spawned without breakaway; the bg task will be torn down if the \
1133 AFT process group is killed."
1134 );
1135 }
1136 return Ok(child);
1137 }
1138 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1139 log::warn!(
1140 "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1141 shell.binary()
1142 );
1143 last_error = Some(format!("{}: {e}", shell.binary()));
1144 break;
1147 }
1148 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1149 log::warn!(
1151 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1152 Access Denied — retrying {} without breakaway",
1153 shell.binary()
1154 );
1155 last_error = Some(format!("{}: {e}", shell.binary()));
1156 continue;
1157 }
1158 Err(e) => {
1159 return Err(format!(
1160 "failed to spawn background bash command via {}: {e}",
1161 shell.binary()
1162 ));
1163 }
1164 }
1165 }
1166 }
1167 Err(format!(
1168 "failed to spawn background bash command: no Windows shell could be spawned. \
1169 Last error: {}. PATH-probed candidates: {:?}",
1170 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1171 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1172 ))
1173 }
1174}
1175
1176fn random_slug() -> String {
1177 let mut bytes = [0u8; 4];
1178 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1180 let t = SystemTime::now()
1182 .duration_since(UNIX_EPOCH)
1183 .map(|d| d.subsec_nanos())
1184 .unwrap_or(0);
1185 let p = std::process::id();
1186 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1187 });
1188 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1190 format!("bgb-{hex}")
1191}
1192
1193#[cfg(test)]
1194mod tests {
1195 use std::collections::HashMap;
1196 #[cfg(windows)]
1197 use std::fs;
1198 use std::sync::{Arc, Mutex};
1199 use std::time::Duration;
1200 #[cfg(windows)]
1201 use std::time::Instant;
1202
1203 use super::*;
1204
1205 #[cfg(unix)]
1206 const QUICK_SUCCESS_COMMAND: &str = "true";
1207 #[cfg(windows)]
1208 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1209
1210 #[cfg(unix)]
1211 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1212 #[cfg(windows)]
1213 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1214
1215 #[test]
1216 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1217 let registry = BgTaskRegistry::default();
1218 let dir = tempfile::tempdir().unwrap();
1219 let task_id = registry
1220 .spawn(
1221 QUICK_SUCCESS_COMMAND,
1222 "session".to_string(),
1223 dir.path().to_path_buf(),
1224 HashMap::new(),
1225 Some(Duration::from_secs(30)),
1226 dir.path().to_path_buf(),
1227 10,
1228 )
1229 .unwrap();
1230 registry
1231 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1232 .unwrap();
1233
1234 registry.cleanup_finished(Duration::ZERO);
1235
1236 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1237 }
1238
1239 #[test]
1249 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1250 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1251 let dir = tempfile::tempdir().unwrap();
1252 let task_id = registry
1253 .spawn(
1254 QUICK_SUCCESS_COMMAND,
1255 "session".to_string(),
1256 dir.path().to_path_buf(),
1257 HashMap::new(),
1258 Some(Duration::from_secs(30)),
1259 dir.path().to_path_buf(),
1260 10,
1261 )
1262 .unwrap();
1263
1264 let task = registry.task_for_session(&task_id, "session").unwrap();
1265
1266 let started = Instant::now();
1271 loop {
1272 let exited = {
1273 let mut state = task.state.lock().unwrap();
1274 if let Some(child) = state.child.as_mut() {
1275 matches!(child.try_wait(), Ok(Some(_)))
1276 } else {
1277 true
1278 }
1279 };
1280 if exited {
1281 break;
1282 }
1283 assert!(
1284 started.elapsed() < Duration::from_secs(5),
1285 "child should exit quickly"
1286 );
1287 std::thread::sleep(Duration::from_millis(20));
1288 }
1289
1290 let _ = std::fs::remove_file(&task.paths.exit);
1293
1294 assert!(
1297 task.is_running(),
1298 "precondition: metadata.status == Running"
1299 );
1300 assert!(
1301 !task.paths.exit.exists(),
1302 "precondition: exit marker absent"
1303 );
1304
1305 registry.reap_child(&task);
1309
1310 let state = task.state.lock().unwrap();
1311 assert!(
1312 state.metadata.status.is_terminal(),
1313 "reap_child must transition to terminal when PID dead and no marker. \
1314 Got status={:?}",
1315 state.metadata.status
1316 );
1317 assert_eq!(
1318 state.metadata.status,
1319 BgTaskStatus::Failed,
1320 "must specifically be Failed (not Killed): status={:?}",
1321 state.metadata.status
1322 );
1323 assert_eq!(
1324 state.metadata.status_reason.as_deref(),
1325 Some("process exited without exit marker"),
1326 "reason must match replay path's wording: {:?}",
1327 state.metadata.status_reason
1328 );
1329 assert!(
1330 state.child.is_none(),
1331 "child handle must be released after reap"
1332 );
1333 assert!(state.detached, "task must be marked detached after reap");
1334 }
1335
1336 #[test]
1342 fn reap_child_preserves_running_when_exit_marker_exists() {
1343 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1344 let dir = tempfile::tempdir().unwrap();
1345 let task_id = registry
1346 .spawn(
1347 QUICK_SUCCESS_COMMAND,
1348 "session".to_string(),
1349 dir.path().to_path_buf(),
1350 HashMap::new(),
1351 Some(Duration::from_secs(30)),
1352 dir.path().to_path_buf(),
1353 10,
1354 )
1355 .unwrap();
1356
1357 let task = registry.task_for_session(&task_id, "session").unwrap();
1358
1359 let started = Instant::now();
1362 loop {
1363 let exited = {
1364 let mut state = task.state.lock().unwrap();
1365 if let Some(child) = state.child.as_mut() {
1366 matches!(child.try_wait(), Ok(Some(_)))
1367 } else {
1368 true
1369 }
1370 };
1371 if exited && task.paths.exit.exists() {
1372 break;
1373 }
1374 assert!(
1375 started.elapsed() < Duration::from_secs(5),
1376 "child should exit and write marker quickly"
1377 );
1378 std::thread::sleep(Duration::from_millis(20));
1379 }
1380
1381 registry.reap_child(&task);
1385
1386 let state = task.state.lock().unwrap();
1387 assert!(
1388 state.child.is_none(),
1389 "child handle still released even when marker exists"
1390 );
1391 assert!(
1392 state.detached,
1393 "task still marked detached even when marker exists"
1394 );
1395 assert_eq!(
1400 state.metadata.status,
1401 BgTaskStatus::Running,
1402 "reap_child must defer to poll_task when marker exists"
1403 );
1404 }
1405
1406 #[test]
1407 fn cleanup_finished_keeps_running_tasks() {
1408 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1409 let dir = tempfile::tempdir().unwrap();
1410 let task_id = registry
1411 .spawn(
1412 LONG_RUNNING_COMMAND,
1413 "session".to_string(),
1414 dir.path().to_path_buf(),
1415 HashMap::new(),
1416 Some(Duration::from_secs(30)),
1417 dir.path().to_path_buf(),
1418 10,
1419 )
1420 .unwrap();
1421
1422 registry.cleanup_finished(Duration::ZERO);
1423
1424 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1425 let _ = registry.kill(&task_id, "session");
1426 }
1427
1428 #[cfg(windows)]
1429 fn wait_for_file(path: &Path) -> String {
1430 let started = Instant::now();
1431 loop {
1432 if path.exists() {
1433 return fs::read_to_string(path).expect("read file");
1434 }
1435 assert!(
1436 started.elapsed() < Duration::from_secs(30),
1437 "timed out waiting for {}",
1438 path.display()
1439 );
1440 std::thread::sleep(Duration::from_millis(100));
1441 }
1442 }
1443
1444 #[cfg(windows)]
1445 fn spawn_windows_registry_command(
1446 command: &str,
1447 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1448 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1449 let dir = tempfile::tempdir().unwrap();
1450 let task_id = registry
1451 .spawn(
1452 command,
1453 "session".to_string(),
1454 dir.path().to_path_buf(),
1455 HashMap::new(),
1456 Some(Duration::from_secs(30)),
1457 dir.path().to_path_buf(),
1458 10,
1459 )
1460 .unwrap();
1461 (registry, dir, task_id)
1462 }
1463
1464 #[cfg(windows)]
1465 #[test]
1466 fn windows_spawn_writes_exit_marker_for_zero_exit() {
1467 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1468 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1469
1470 let content = wait_for_file(&exit_path);
1471
1472 assert_eq!(content.trim(), "0");
1473 }
1474
1475 #[cfg(windows)]
1476 #[test]
1477 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1478 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1479 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1480
1481 let content = wait_for_file(&exit_path);
1482
1483 assert_eq!(content.trim(), "42");
1484 }
1485
1486 #[cfg(windows)]
1487 #[test]
1488 fn windows_spawn_captures_stdout_to_disk() {
1489 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1490 let task = registry.task_for_session(&task_id, "session").unwrap();
1491 let stdout_path = task.paths.stdout.clone();
1492 let exit_path = task.paths.exit.clone();
1493
1494 let _ = wait_for_file(&exit_path);
1495 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1496
1497 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1498 }
1499
1500 #[cfg(windows)]
1501 #[test]
1502 fn windows_spawn_uses_pwsh_when_available() {
1503 let shell = crate::windows_shell::resolve_windows_shell_with(|binary| {
1504 matches!(binary, "pwsh.exe" | "powershell.exe")
1505 });
1506
1507 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1508 assert_eq!(shell.binary(), "pwsh.exe");
1509 }
1510
1511 #[cfg(windows)]
1519 #[test]
1520 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1521 let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1522 let script =
1523 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1524
1525 assert!(
1529 script.contains("& echo !ERRORLEVEL! >"),
1530 "wrapper must use delayed expansion: {script}"
1531 );
1532 assert!(
1533 !script.contains("%ERRORLEVEL%"),
1534 "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1535 );
1536 assert!(script.contains("& move /Y"));
1537 assert!(
1540 script.contains("> nul"),
1541 "wrapper must redirect move output to nul: {script}"
1542 );
1543 assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1544 assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1545 }
1546
1547 #[cfg(windows)]
1552 #[test]
1553 fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1554 use crate::windows_shell::WindowsShell;
1555 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1556 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1557 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1558 assert_eq!(
1559 args_strs,
1560 vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1561 "Cmd::bg_command must prepend /V:ON /D /S /C"
1562 );
1563 }
1564
1565 #[cfg(windows)]
1569 #[test]
1570 fn windows_shell_pwsh_bg_command_uses_standard_args() {
1571 use crate::windows_shell::WindowsShell;
1572 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1573 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1574 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1575 assert!(
1576 args_strs.contains(&"-Command"),
1577 "Pwsh::bg_command must use -Command: {args_strs:?}"
1578 );
1579 assert!(
1580 args_strs.contains(&"Get-Date"),
1581 "Pwsh::bg_command must include the user command body"
1582 );
1583 }
1584
1585 #[allow(dead_code)]
1616 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1618}