1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50 pub task_id: String,
51 #[serde(skip_serializing)]
54 pub session_id: String,
55 pub status: BgTaskStatus,
56 pub exit_code: Option<i32>,
57 pub command: String,
58 #[serde(default, skip_serializing_if = "String::is_empty")]
64 pub output_preview: String,
65 #[serde(default, skip_serializing_if = "is_false")]
70 pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74 !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79 #[serde(flatten)]
80 pub info: BgTaskInfo,
81 pub exit_code: Option<i32>,
82 pub child_pid: Option<u32>,
83 pub workdir: String,
84 pub output_preview: String,
85 pub output_truncated: bool,
86 pub output_path: Option<String>,
87 pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92 pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98 pub(crate) progress_sender: SharedProgressSender,
99 watchdog_started: AtomicBool,
100 pub(crate) shutdown: AtomicBool,
101 pub(crate) long_running_reminder_enabled: AtomicBool,
102 pub(crate) long_running_reminder_interval_ms: AtomicU64,
103 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
109}
110
111pub(crate) struct BgTask {
112 pub(crate) task_id: String,
113 pub(crate) session_id: String,
114 pub(crate) paths: TaskPaths,
115 pub(crate) started: Instant,
116 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
117 pub(crate) terminal_at: Mutex<Option<Instant>>,
118 pub(crate) state: Mutex<BgTaskState>,
119}
120
121pub(crate) struct BgTaskState {
122 pub(crate) metadata: PersistedTask,
123 pub(crate) child: Option<Child>,
124 pub(crate) detached: bool,
125 pub(crate) buffer: BgBuffer,
126}
127
128impl BgTaskRegistry {
129 pub fn new(progress_sender: SharedProgressSender) -> Self {
130 Self {
131 inner: Arc::new(RegistryInner {
132 tasks: Mutex::new(HashMap::new()),
133 completions: Mutex::new(VecDeque::new()),
134 progress_sender,
135 watchdog_started: AtomicBool::new(false),
136 shutdown: AtomicBool::new(false),
137 long_running_reminder_enabled: AtomicBool::new(true),
138 long_running_reminder_interval_ms: AtomicU64::new(600_000),
139 compressor: Mutex::new(None),
140 }),
141 }
142 }
143
144 pub fn set_compressor<F>(&self, compressor: F)
149 where
150 F: Fn(&str, String) -> String + Send + Sync + 'static,
151 {
152 if let Ok(mut slot) = self.inner.compressor.lock() {
153 *slot = Some(Box::new(compressor));
154 }
155 }
156
157 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
160 let Ok(slot) = self.inner.compressor.lock() else {
161 return output;
162 };
163 match slot.as_ref() {
164 Some(compressor) => compressor(command, output),
165 None => output,
166 }
167 }
168
169 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
170 self.inner
171 .long_running_reminder_enabled
172 .store(enabled, Ordering::SeqCst);
173 self.inner
174 .long_running_reminder_interval_ms
175 .store(interval_ms, Ordering::SeqCst);
176 }
177
178 #[cfg(unix)]
179 #[allow(clippy::too_many_arguments)]
180 pub fn spawn(
181 &self,
182 command: &str,
183 session_id: String,
184 workdir: PathBuf,
185 env: HashMap<String, String>,
186 timeout: Option<Duration>,
187 storage_dir: PathBuf,
188 max_running: usize,
189 notify_on_completion: bool,
190 compressed: bool,
191 ) -> Result<String, String> {
192 self.start_watchdog();
193
194 let running = self.running_count();
195 if running >= max_running {
196 return Err(format!(
197 "background bash task limit exceeded: {running} running (max {max_running})"
198 ));
199 }
200
201 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
202 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
203 let task_id = self.generate_unique_task_id()?;
204 let paths = task_paths(&storage_dir, &session_id, &task_id);
205 fs::create_dir_all(&paths.dir)
206 .map_err(|e| format!("failed to create background task dir: {e}"))?;
207
208 let mut metadata = PersistedTask::starting(
209 task_id.clone(),
210 session_id.clone(),
211 command.to_string(),
212 workdir.clone(),
213 timeout_ms,
214 notify_on_completion,
215 compressed,
216 );
217 write_task(&paths.json, &metadata)
218 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
219
220 create_capture_file(&paths.stdout)
224 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
225 create_capture_file(&paths.stderr)
226 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
227
228 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
229
230 let child_pid = child.id();
231 metadata.mark_running(child_pid, child_pid as i32);
232 write_task(&paths.json, &metadata)
233 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
234
235 let task = Arc::new(BgTask {
236 task_id: task_id.clone(),
237 session_id,
238 paths: paths.clone(),
239 started: Instant::now(),
240 last_reminder_at: Mutex::new(None),
241 terminal_at: Mutex::new(None),
242 state: Mutex::new(BgTaskState {
243 metadata,
244 child: Some(child),
245 detached: false,
246 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
247 }),
248 });
249
250 self.inner
251 .tasks
252 .lock()
253 .map_err(|_| "background task registry lock poisoned".to_string())?
254 .insert(task_id.clone(), task);
255
256 Ok(task_id)
257 }
258
259 #[cfg(windows)]
260 #[allow(clippy::too_many_arguments)]
261 pub fn spawn(
262 &self,
263 command: &str,
264 session_id: String,
265 workdir: PathBuf,
266 env: HashMap<String, String>,
267 timeout: Option<Duration>,
268 storage_dir: PathBuf,
269 max_running: usize,
270 notify_on_completion: bool,
271 compressed: bool,
272 ) -> Result<String, String> {
273 self.start_watchdog();
274
275 let running = self.running_count();
276 if running >= max_running {
277 return Err(format!(
278 "background bash task limit exceeded: {running} running (max {max_running})"
279 ));
280 }
281
282 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
283 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
284 let task_id = self.generate_unique_task_id()?;
285 let paths = task_paths(&storage_dir, &session_id, &task_id);
286 fs::create_dir_all(&paths.dir)
287 .map_err(|e| format!("failed to create background task dir: {e}"))?;
288
289 let mut metadata = PersistedTask::starting(
290 task_id.clone(),
291 session_id.clone(),
292 command.to_string(),
293 workdir.clone(),
294 timeout_ms,
295 notify_on_completion,
296 compressed,
297 );
298 write_task(&paths.json, &metadata)
299 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
300
301 create_capture_file(&paths.stdout)
307 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
308 create_capture_file(&paths.stderr)
309 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
310
311 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
312
313 let child_pid = child.id();
314 metadata.status = BgTaskStatus::Running;
315 metadata.child_pid = Some(child_pid);
316 metadata.pgid = None;
317 write_task(&paths.json, &metadata)
318 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
319
320 let task = Arc::new(BgTask {
321 task_id: task_id.clone(),
322 session_id,
323 paths: paths.clone(),
324 started: Instant::now(),
325 last_reminder_at: Mutex::new(None),
326 terminal_at: Mutex::new(None),
327 state: Mutex::new(BgTaskState {
328 metadata,
329 child: Some(child),
330 detached: false,
331 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
332 }),
333 });
334
335 self.inner
336 .tasks
337 .lock()
338 .map_err(|_| "background task registry lock poisoned".to_string())?
339 .insert(task_id.clone(), task);
340
341 Ok(task_id)
342 }
343
344 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
345 self.start_watchdog();
346 let dir = session_tasks_dir(storage_dir, session_id);
347 if !dir.exists() {
348 return Ok(());
349 }
350
351 let entries = fs::read_dir(&dir)
352 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
353 for entry in entries.flatten() {
354 let path = entry.path();
355 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
356 continue;
357 }
358 let Ok(mut metadata) = read_task(&path) else {
359 continue;
360 };
361 if metadata.session_id != session_id {
362 continue;
363 }
364
365 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
366 match metadata.status {
367 BgTaskStatus::Starting => {
368 metadata.mark_terminal(
369 BgTaskStatus::Failed,
370 None,
371 Some("spawn aborted".to_string()),
372 );
373 let _ = write_task(&paths.json, &metadata);
374 self.enqueue_completion_if_needed(&metadata, false);
375 }
376 BgTaskStatus::Running => {
377 if self.running_metadata_is_stale(&metadata) {
378 metadata.mark_terminal(
379 BgTaskStatus::Killed,
380 None,
381 Some("orphaned (>24h)".to_string()),
382 );
383 if !paths.exit.exists() {
384 let _ = write_kill_marker_if_absent(&paths.exit);
385 }
386 let _ = write_task(&paths.json, &metadata);
387 self.enqueue_completion_if_needed(&metadata, false);
388 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
389 metadata = terminal_metadata_from_marker(metadata, marker, None);
390 let _ = write_task(&paths.json, &metadata);
391 self.enqueue_completion_if_needed(&metadata, false);
392 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
393 metadata.mark_terminal(
394 BgTaskStatus::Failed,
395 None,
396 Some("process exited without exit marker".to_string()),
397 );
398 let _ = write_task(&paths.json, &metadata);
399 self.enqueue_completion_if_needed(&metadata, false);
400 } else {
401 self.insert_rehydrated_task(metadata, paths, true)?;
402 }
403 }
404 _ if metadata.status.is_terminal() => {
405 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
406 self.enqueue_completion_if_needed(&metadata, false);
407 }
408 _ => {}
409 }
410 }
411
412 Ok(())
413 }
414
415 pub fn status(
416 &self,
417 task_id: &str,
418 session_id: &str,
419 preview_bytes: usize,
420 ) -> Option<BgTaskSnapshot> {
421 let task = self.task_for_session(task_id, session_id)?;
422 let _ = self.poll_task(&task);
423 let mut snapshot = task.snapshot(preview_bytes);
424 self.maybe_compress_snapshot(&task, &mut snapshot);
425 Some(snapshot)
426 }
427
428 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
429 let tasks = self
430 .inner
431 .tasks
432 .lock()
433 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
434 .unwrap_or_default();
435 tasks
436 .into_iter()
437 .map(|task| {
438 let _ = self.poll_task(&task);
439 let mut snapshot = task.snapshot(preview_bytes);
440 self.maybe_compress_snapshot(&task, &mut snapshot);
441 snapshot
442 })
443 .collect()
444 }
445
446 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
452 if !snapshot.info.status.is_terminal() {
453 return;
454 }
455 let compressed_flag = task
456 .state
457 .lock()
458 .map(|state| state.metadata.compressed)
459 .unwrap_or(true);
460 if !compressed_flag {
461 return;
462 }
463 let raw = std::mem::take(&mut snapshot.output_preview);
464 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
465 }
466
467 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
468 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
469 }
470
471 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
472 let task = self
473 .task_for_session(task_id, session_id)
474 .ok_or_else(|| format!("background task not found: {task_id}"))?;
475 let mut state = task
476 .state
477 .lock()
478 .map_err(|_| "background task lock poisoned".to_string())?;
479 let updated = update_task(&task.paths.json, |metadata| {
480 metadata.notify_on_completion = true;
481 metadata.completion_delivered = false;
482 })
483 .map_err(|e| format!("failed to promote background task: {e}"))?;
484 state.metadata = updated;
485 if state.metadata.status.is_terminal() {
486 state.buffer.enforce_terminal_cap();
487 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
488 }
489 Ok(true)
490 }
491
492 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
493 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
494 .map(|_| ())
495 }
496
497 pub fn cleanup_finished(&self, older_than: Duration) {
498 let cutoff = Instant::now().checked_sub(older_than);
499 if let Ok(mut tasks) = self.inner.tasks.lock() {
500 tasks.retain(|_, task| {
501 let is_terminal = task
502 .state
503 .lock()
504 .map(|state| state.metadata.status.is_terminal())
505 .unwrap_or(false);
506 if !is_terminal {
507 return true;
508 }
509
510 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
511 match (terminal_at, cutoff) {
512 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
513 (Some(_), None) => false,
514 (None, _) => true,
515 }
516 });
517 }
518 }
519
520 pub fn drain_completions(&self) -> Vec<BgCompletion> {
521 self.drain_completions_for_session(None)
522 }
523
524 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
525 let mut completions = match self.inner.completions.lock() {
526 Ok(completions) => completions,
527 Err(_) => return Vec::new(),
528 };
529
530 let drained = if let Some(session_id) = session_id {
531 let mut matched = Vec::new();
532 let mut retained = VecDeque::new();
533 while let Some(completion) = completions.pop_front() {
534 if completion.session_id == session_id {
535 matched.push(completion);
536 } else {
537 retained.push_back(completion);
538 }
539 }
540 *completions = retained;
541 matched
542 } else {
543 completions.drain(..).collect()
544 };
545 drop(completions);
546
547 for completion in &drained {
548 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
549 let _ = task.set_completion_delivered(true);
550 }
551 }
552
553 drained
554 }
555
556 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
557 self.inner
558 .completions
559 .lock()
560 .map(|completions| {
561 completions
562 .iter()
563 .filter(|completion| completion.session_id == session_id)
564 .cloned()
565 .collect()
566 })
567 .unwrap_or_default()
568 }
569
570 pub fn detach(&self) {
571 self.inner.shutdown.store(true, Ordering::SeqCst);
572 if let Ok(mut tasks) = self.inner.tasks.lock() {
573 for task in tasks.values() {
574 if let Ok(mut state) = task.state.lock() {
575 state.child = None;
576 state.detached = true;
577 }
578 }
579 tasks.clear();
580 }
581 }
582
583 pub fn shutdown(&self) {
584 let tasks = self
585 .inner
586 .tasks
587 .lock()
588 .map(|tasks| {
589 tasks
590 .values()
591 .map(|task| (task.task_id.clone(), task.session_id.clone()))
592 .collect::<Vec<_>>()
593 })
594 .unwrap_or_default();
595 for (task_id, session_id) in tasks {
596 let _ = self.kill(&task_id, &session_id);
597 }
598 }
599
600 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
601 let marker = match read_exit_marker(&task.paths.exit) {
602 Ok(Some(marker)) => marker,
603 Ok(None) => return Ok(()),
604 Err(error) => return Err(format!("failed to read exit marker: {error}")),
605 };
606 self.finalize_from_marker(task, marker, None)
607 }
608
609 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
610 let Ok(mut state) = task.state.lock() else {
611 return;
612 };
613 if let Some(child) = state.child.as_mut() {
614 if matches!(child.try_wait(), Ok(Some(_))) {
615 state.child = None;
632 state.detached = true;
633 if state.metadata.status.is_terminal() {
634 return;
635 }
636 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
637 return;
638 }
639 let updated = update_task(&task.paths.json, |metadata| {
640 metadata.mark_terminal(
641 BgTaskStatus::Failed,
642 None,
643 Some("process exited without exit marker".to_string()),
644 );
645 });
646 if let Ok(metadata) = updated {
647 state.metadata = metadata;
648 task.mark_terminal_now();
649 state.buffer.enforce_terminal_cap();
650 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
651 }
652 }
653 }
654 }
655
656 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
657 self.inner
658 .tasks
659 .lock()
660 .map(|tasks| {
661 tasks
662 .values()
663 .filter(|task| task.is_running())
664 .cloned()
665 .collect()
666 })
667 .unwrap_or_default()
668 }
669
670 fn insert_rehydrated_task(
671 &self,
672 metadata: PersistedTask,
673 paths: TaskPaths,
674 detached: bool,
675 ) -> Result<(), String> {
676 let task_id = metadata.task_id.clone();
677 let session_id = metadata.session_id.clone();
678 let task = Arc::new(BgTask {
679 task_id: task_id.clone(),
680 session_id,
681 paths: paths.clone(),
682 started: Instant::now(),
683 last_reminder_at: Mutex::new(None),
684 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
685 state: Mutex::new(BgTaskState {
686 metadata,
687 child: None,
688 detached,
689 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
690 }),
691 });
692 self.inner
693 .tasks
694 .lock()
695 .map_err(|_| "background task registry lock poisoned".to_string())?
696 .insert(task_id, task);
697 Ok(())
698 }
699
700 fn kill_with_status(
701 &self,
702 task_id: &str,
703 session_id: &str,
704 terminal_status: BgTaskStatus,
705 ) -> Result<BgTaskSnapshot, String> {
706 let task = self
707 .task_for_session(task_id, session_id)
708 .ok_or_else(|| format!("background task not found: {task_id}"))?;
709
710 {
711 let mut state = task
712 .state
713 .lock()
714 .map_err(|_| "background task lock poisoned".to_string())?;
715 if state.metadata.status.is_terminal() {
716 return Ok(task.snapshot_locked(&state, 5 * 1024));
717 }
718
719 state.metadata.status = BgTaskStatus::Killing;
720 write_task(&task.paths.json, &state.metadata)
721 .map_err(|e| format!("failed to persist killing state: {e}"))?;
722
723 #[cfg(unix)]
724 if let Some(pgid) = state.metadata.pgid {
725 terminate_pgid(pgid, state.child.as_mut());
726 }
727 #[cfg(windows)]
728 if let Some(child) = state.child.as_mut() {
729 super::process::terminate_process(child);
730 } else if let Some(pid) = state.metadata.child_pid {
731 terminate_pid(pid);
732 }
733 if let Some(child) = state.child.as_mut() {
734 let _ = child.wait();
735 }
736 state.child = None;
737 state.detached = true;
738
739 if !task.paths.exit.exists() {
740 write_kill_marker_if_absent(&task.paths.exit)
741 .map_err(|e| format!("failed to write kill marker: {e}"))?;
742 }
743
744 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
745 Some(124)
746 } else {
747 None
748 };
749 state
750 .metadata
751 .mark_terminal(terminal_status, exit_code, None);
752 task.mark_terminal_now();
753 write_task(&task.paths.json, &state.metadata)
754 .map_err(|e| format!("failed to persist killed state: {e}"))?;
755 state.buffer.enforce_terminal_cap();
756 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
757 }
758
759 Ok(task.snapshot(5 * 1024))
760 }
761
762 fn finalize_from_marker(
763 &self,
764 task: &Arc<BgTask>,
765 marker: ExitMarker,
766 reason: Option<String>,
767 ) -> Result<(), String> {
768 let mut state = task
769 .state
770 .lock()
771 .map_err(|_| "background task lock poisoned".to_string())?;
772 if state.metadata.status.is_terminal() {
773 return Ok(());
774 }
775
776 let updated = update_task(&task.paths.json, |metadata| {
777 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
778 *metadata = new_metadata;
779 })
780 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
781 state.metadata = updated;
782 task.mark_terminal_now();
783 state.child = None;
784 state.detached = true;
785 state.buffer.enforce_terminal_cap();
786 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
787 Ok(())
788 }
789
790 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
791 if metadata.status.is_terminal() && !metadata.completion_delivered {
792 self.enqueue_completion_locked(metadata, None, emit_frame);
793 }
794 }
795
796 fn enqueue_completion_locked(
797 &self,
798 metadata: &PersistedTask,
799 buffer: Option<&BgBuffer>,
800 emit_frame: bool,
801 ) {
802 if !metadata.status.is_terminal() || metadata.completion_delivered {
803 return;
804 }
805 let (raw_preview, output_truncated) = match buffer {
810 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
811 None => (String::new(), false),
812 };
813 let output_preview = if metadata.compressed {
818 self.compress_output(&metadata.command, raw_preview)
819 } else {
820 raw_preview
821 };
822 let completion = BgCompletion {
823 task_id: metadata.task_id.clone(),
824 session_id: metadata.session_id.clone(),
825 status: metadata.status.clone(),
826 exit_code: metadata.exit_code,
827 command: metadata.command.clone(),
828 output_preview,
829 output_truncated,
830 };
831 if let Ok(mut completions) = self.inner.completions.lock() {
832 if completions
833 .iter()
834 .any(|completion| completion.task_id == metadata.task_id)
835 {
836 return;
837 }
838 completions.push_back(completion.clone());
839 } else {
840 return;
841 }
842
843 if emit_frame {
844 self.emit_bash_completed(completion);
845 }
846 }
847
848 fn emit_bash_completed(&self, completion: BgCompletion) {
849 let Ok(progress_sender) = self
850 .inner
851 .progress_sender
852 .lock()
853 .map(|sender| sender.clone())
854 else {
855 return;
856 };
857 let Some(sender) = progress_sender.as_ref() else {
858 return;
859 };
860 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
868 completion.task_id,
869 completion.session_id,
870 completion.status,
871 completion.exit_code,
872 completion.command,
873 completion.output_preview,
874 completion.output_truncated,
875 )));
876 }
877
878 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
879 if !self
880 .inner
881 .long_running_reminder_enabled
882 .load(Ordering::SeqCst)
883 {
884 return;
885 }
886 let interval_ms = self
887 .inner
888 .long_running_reminder_interval_ms
889 .load(Ordering::SeqCst);
890 if interval_ms == 0 {
891 return;
892 }
893 let interval = Duration::from_millis(interval_ms);
894 let now = Instant::now();
895 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
896 return;
897 };
898 let since = last_reminder_at.unwrap_or(task.started);
899 if now.duration_since(since) < interval {
900 return;
901 }
902 let command = task
903 .state
904 .lock()
905 .map(|state| state.metadata.command.clone())
906 .unwrap_or_default();
907 *last_reminder_at = Some(now);
908 self.emit_bash_long_running(BashLongRunningFrame::new(
909 task.task_id.clone(),
910 task.session_id.clone(),
911 command,
912 task.started.elapsed().as_millis() as u64,
913 ));
914 }
915
916 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
917 let Ok(progress_sender) = self
918 .inner
919 .progress_sender
920 .lock()
921 .map(|sender| sender.clone())
922 else {
923 return;
924 };
925 if let Some(sender) = progress_sender.as_ref() {
926 sender(PushFrame::BashLongRunning(frame));
927 }
928 }
929
930 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
931 self.inner
932 .tasks
933 .lock()
934 .ok()
935 .and_then(|tasks| tasks.get(task_id).cloned())
936 }
937
938 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
939 self.task(task_id)
940 .filter(|task| task.session_id == session_id)
941 }
942
943 fn running_count(&self) -> usize {
944 self.inner
945 .tasks
946 .lock()
947 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
948 .unwrap_or(0)
949 }
950
951 fn start_watchdog(&self) {
952 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
953 super::watchdog::start(self.clone());
954 }
955 }
956
957 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
958 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
959 }
960
961 #[cfg(test)]
962 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
963 self.task_for_session(task_id, session_id)
964 .map(|task| task.paths.json.clone())
965 }
966
967 #[cfg(test)]
968 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
969 self.task_for_session(task_id, session_id)
970 .map(|task| task.paths.exit.clone())
971 }
972
973 fn generate_unique_task_id(&self) -> Result<String, String> {
975 for _ in 0..32 {
976 let candidate = random_slug();
977 let tasks = self
978 .inner
979 .tasks
980 .lock()
981 .map_err(|_| "background task registry lock poisoned".to_string())?;
982 if tasks.contains_key(&candidate) {
983 continue;
984 }
985 let completions = self
986 .inner
987 .completions
988 .lock()
989 .map_err(|_| "background completions lock poisoned".to_string())?;
990 if completions
991 .iter()
992 .any(|completion| completion.task_id == candidate)
993 {
994 continue;
995 }
996 return Ok(candidate);
997 }
998 Err("failed to allocate unique background task id after 32 attempts".to_string())
999 }
1000}
1001
1002impl Default for BgTaskRegistry {
1003 fn default() -> Self {
1004 Self::new(Arc::new(Mutex::new(None)))
1005 }
1006}
1007
1008impl BgTask {
1009 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1010 let state = self
1011 .state
1012 .lock()
1013 .unwrap_or_else(|poison| poison.into_inner());
1014 self.snapshot_locked(&state, preview_bytes)
1015 }
1016
1017 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1018 let metadata = &state.metadata;
1019 let duration_ms = metadata.duration_ms.or_else(|| {
1020 metadata
1021 .status
1022 .is_terminal()
1023 .then(|| self.started.elapsed().as_millis() as u64)
1024 });
1025 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1026 BgTaskSnapshot {
1027 info: BgTaskInfo {
1028 task_id: self.task_id.clone(),
1029 status: metadata.status.clone(),
1030 command: metadata.command.clone(),
1031 started_at: metadata.started_at,
1032 duration_ms,
1033 },
1034 exit_code: metadata.exit_code,
1035 child_pid: metadata.child_pid,
1036 workdir: metadata.workdir.display().to_string(),
1037 output_preview,
1038 output_truncated,
1039 output_path: state
1040 .buffer
1041 .output_path()
1042 .map(|path| path.display().to_string()),
1043 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1044 }
1045 }
1046
1047 pub(crate) fn is_running(&self) -> bool {
1048 self.state
1049 .lock()
1050 .map(|state| state.metadata.status == BgTaskStatus::Running)
1051 .unwrap_or(false)
1052 }
1053
1054 fn mark_terminal_now(&self) {
1055 if let Ok(mut terminal_at) = self.terminal_at.lock() {
1056 if terminal_at.is_none() {
1057 *terminal_at = Some(Instant::now());
1058 }
1059 }
1060 }
1061
1062 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1063 let mut state = self
1064 .state
1065 .lock()
1066 .map_err(|_| "background task lock poisoned".to_string())?;
1067 let updated = update_task(&self.paths.json, |metadata| {
1068 metadata.completion_delivered = delivered;
1069 })
1070 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1071 state.metadata = updated;
1072 Ok(())
1073 }
1074}
1075
1076fn terminal_metadata_from_marker(
1077 mut metadata: PersistedTask,
1078 marker: ExitMarker,
1079 reason: Option<String>,
1080) -> PersistedTask {
1081 match marker {
1082 ExitMarker::Code(code) => {
1083 let status = if code == 0 {
1084 BgTaskStatus::Completed
1085 } else {
1086 BgTaskStatus::Failed
1087 };
1088 metadata.mark_terminal(status, Some(code), reason);
1089 }
1090 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1091 }
1092 metadata
1093}
1094
1095#[cfg(unix)]
1096fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1097 let mut cmd = Command::new("/bin/sh");
1098 cmd.arg("-c")
1099 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1100 .arg("/bin/sh")
1101 .arg(command)
1102 .arg(exit_path);
1103 unsafe {
1104 cmd.pre_exec(|| {
1105 if libc::setsid() == -1 {
1106 return Err(std::io::Error::last_os_error());
1107 }
1108 Ok(())
1109 });
1110 }
1111 cmd
1112}
1113
1114#[cfg(windows)]
1115fn detached_shell_command_for(
1116 shell: crate::windows_shell::WindowsShell,
1117 command: &str,
1118 exit_path: &Path,
1119 paths: &TaskPaths,
1120 creation_flags: u32,
1121) -> Result<Command, String> {
1122 use crate::windows_shell::WindowsShell;
1123 let wrapper_body = shell.wrapper_script(command, exit_path);
1136 let wrapper_ext = match shell {
1137 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1138 WindowsShell::Cmd => "bat",
1139 WindowsShell::Posix(_) => "sh",
1143 };
1144 let wrapper_path = paths.dir.join(format!(
1145 "{}.{}",
1146 paths
1147 .json
1148 .file_stem()
1149 .and_then(|s| s.to_str())
1150 .unwrap_or("wrapper"),
1151 wrapper_ext
1152 ));
1153 fs::write(&wrapper_path, wrapper_body)
1154 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1155
1156 let mut cmd = Command::new(shell.binary().as_ref());
1157 match shell {
1158 WindowsShell::Pwsh | WindowsShell::Powershell => {
1159 cmd.args([
1162 "-NoLogo",
1163 "-NoProfile",
1164 "-NonInteractive",
1165 "-ExecutionPolicy",
1166 "Bypass",
1167 "-File",
1168 ]);
1169 cmd.arg(&wrapper_path);
1170 }
1171 WindowsShell::Cmd => {
1172 cmd.args(["/V:ON", "/D", "/C"]);
1179 cmd.arg(&wrapper_path);
1180 }
1181 WindowsShell::Posix(_) => {
1182 cmd.arg(&wrapper_path);
1187 }
1188 }
1189
1190 cmd.creation_flags(creation_flags);
1194 Ok(cmd)
1195}
1196
1197fn spawn_detached_child(
1213 command: &str,
1214 paths: &TaskPaths,
1215 workdir: &Path,
1216 env: &HashMap<String, String>,
1217) -> Result<std::process::Child, String> {
1218 #[cfg(not(windows))]
1219 {
1220 let stdout = create_capture_file(&paths.stdout)
1221 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1222 let stderr = create_capture_file(&paths.stderr)
1223 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1224 detached_shell_command(command, &paths.exit)
1225 .current_dir(workdir)
1226 .envs(env)
1227 .stdin(Stdio::null())
1228 .stdout(Stdio::from(stdout))
1229 .stderr(Stdio::from(stderr))
1230 .spawn()
1231 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1232 }
1233 #[cfg(windows)]
1234 {
1235 use crate::windows_shell::shell_candidates;
1236 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1247 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1268 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1269 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1270 let with_breakaway =
1271 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1272 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1273 let mut last_error: Option<String> = None;
1274 for (idx, shell) in candidates.iter().enumerate() {
1275 for &flags in &[with_breakaway, without_breakaway] {
1279 let stdout = create_capture_file(&paths.stdout)
1281 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1282 let stderr = create_capture_file(&paths.stderr)
1283 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1284 let mut cmd =
1285 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1286 cmd.current_dir(workdir)
1287 .envs(env)
1288 .stdin(Stdio::null())
1289 .stdout(Stdio::from(stdout))
1290 .stderr(Stdio::from(stderr));
1291 match cmd.spawn() {
1292 Ok(child) => {
1293 if idx > 0 {
1294 log::warn!(
1295 "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1296 the cached PATH probe disagreed with runtime spawn — likely PATH \
1297 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1298 shell.binary(),
1299 idx
1300 );
1301 }
1302 if flags == without_breakaway {
1303 log::warn!(
1304 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1305 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1306 Spawned without breakaway; the bg task will be torn down if the \
1307 AFT process group is killed."
1308 );
1309 }
1310 return Ok(child);
1311 }
1312 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1313 log::warn!(
1314 "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1315 shell.binary()
1316 );
1317 last_error = Some(format!("{}: {e}", shell.binary()));
1318 break;
1321 }
1322 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1323 log::warn!(
1325 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1326 Access Denied — retrying {} without breakaway",
1327 shell.binary()
1328 );
1329 last_error = Some(format!("{}: {e}", shell.binary()));
1330 continue;
1331 }
1332 Err(e) => {
1333 return Err(format!(
1334 "failed to spawn background bash command via {}: {e}",
1335 shell.binary()
1336 ));
1337 }
1338 }
1339 }
1340 }
1341 Err(format!(
1342 "failed to spawn background bash command: no Windows shell could be spawned. \
1343 Last error: {}. PATH-probed candidates: {:?}",
1344 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1345 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1346 ))
1347 }
1348}
1349
1350fn random_slug() -> String {
1351 let mut bytes = [0u8; 4];
1352 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1354 let t = SystemTime::now()
1356 .duration_since(UNIX_EPOCH)
1357 .map(|d| d.subsec_nanos())
1358 .unwrap_or(0);
1359 let p = std::process::id();
1360 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1361 });
1362 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1364 format!("bgb-{hex}")
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369 use std::collections::HashMap;
1370 #[cfg(windows)]
1371 use std::fs;
1372 use std::sync::{Arc, Mutex};
1373 use std::time::Duration;
1374 #[cfg(windows)]
1375 use std::time::Instant;
1376
1377 use super::*;
1378
1379 #[cfg(unix)]
1380 const QUICK_SUCCESS_COMMAND: &str = "true";
1381 #[cfg(windows)]
1382 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1383
1384 #[cfg(unix)]
1385 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1386 #[cfg(windows)]
1387 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1388
1389 #[test]
1390 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1391 let registry = BgTaskRegistry::default();
1392 let dir = tempfile::tempdir().unwrap();
1393 let task_id = registry
1394 .spawn(
1395 QUICK_SUCCESS_COMMAND,
1396 "session".to_string(),
1397 dir.path().to_path_buf(),
1398 HashMap::new(),
1399 Some(Duration::from_secs(30)),
1400 dir.path().to_path_buf(),
1401 10,
1402 true,
1403 false,
1404 )
1405 .unwrap();
1406 registry
1407 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1408 .unwrap();
1409
1410 registry.cleanup_finished(Duration::ZERO);
1411
1412 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1413 }
1414
1415 #[test]
1425 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1426 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1427 let dir = tempfile::tempdir().unwrap();
1428 let task_id = registry
1429 .spawn(
1430 QUICK_SUCCESS_COMMAND,
1431 "session".to_string(),
1432 dir.path().to_path_buf(),
1433 HashMap::new(),
1434 Some(Duration::from_secs(30)),
1435 dir.path().to_path_buf(),
1436 10,
1437 true,
1438 false,
1439 )
1440 .unwrap();
1441
1442 let task = registry.task_for_session(&task_id, "session").unwrap();
1443
1444 let started = Instant::now();
1449 loop {
1450 let exited = {
1451 let mut state = task.state.lock().unwrap();
1452 if let Some(child) = state.child.as_mut() {
1453 matches!(child.try_wait(), Ok(Some(_)))
1454 } else {
1455 true
1456 }
1457 };
1458 if exited {
1459 break;
1460 }
1461 assert!(
1462 started.elapsed() < Duration::from_secs(5),
1463 "child should exit quickly"
1464 );
1465 std::thread::sleep(Duration::from_millis(20));
1466 }
1467
1468 let _ = std::fs::remove_file(&task.paths.exit);
1471
1472 assert!(
1475 task.is_running(),
1476 "precondition: metadata.status == Running"
1477 );
1478 assert!(
1479 !task.paths.exit.exists(),
1480 "precondition: exit marker absent"
1481 );
1482
1483 registry.reap_child(&task);
1487
1488 let state = task.state.lock().unwrap();
1489 assert!(
1490 state.metadata.status.is_terminal(),
1491 "reap_child must transition to terminal when PID dead and no marker. \
1492 Got status={:?}",
1493 state.metadata.status
1494 );
1495 assert_eq!(
1496 state.metadata.status,
1497 BgTaskStatus::Failed,
1498 "must specifically be Failed (not Killed): status={:?}",
1499 state.metadata.status
1500 );
1501 assert_eq!(
1502 state.metadata.status_reason.as_deref(),
1503 Some("process exited without exit marker"),
1504 "reason must match replay path's wording: {:?}",
1505 state.metadata.status_reason
1506 );
1507 assert!(
1508 state.child.is_none(),
1509 "child handle must be released after reap"
1510 );
1511 assert!(state.detached, "task must be marked detached after reap");
1512 }
1513
1514 #[test]
1520 fn reap_child_preserves_running_when_exit_marker_exists() {
1521 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1522 let dir = tempfile::tempdir().unwrap();
1523 let task_id = registry
1524 .spawn(
1525 QUICK_SUCCESS_COMMAND,
1526 "session".to_string(),
1527 dir.path().to_path_buf(),
1528 HashMap::new(),
1529 Some(Duration::from_secs(30)),
1530 dir.path().to_path_buf(),
1531 10,
1532 true,
1533 false,
1534 )
1535 .unwrap();
1536
1537 let task = registry.task_for_session(&task_id, "session").unwrap();
1538
1539 let started = Instant::now();
1542 loop {
1543 let exited = {
1544 let mut state = task.state.lock().unwrap();
1545 if let Some(child) = state.child.as_mut() {
1546 matches!(child.try_wait(), Ok(Some(_)))
1547 } else {
1548 true
1549 }
1550 };
1551 if exited && task.paths.exit.exists() {
1552 break;
1553 }
1554 assert!(
1555 started.elapsed() < Duration::from_secs(5),
1556 "child should exit and write marker quickly"
1557 );
1558 std::thread::sleep(Duration::from_millis(20));
1559 }
1560
1561 registry.reap_child(&task);
1565
1566 let state = task.state.lock().unwrap();
1567 assert!(
1568 state.child.is_none(),
1569 "child handle still released even when marker exists"
1570 );
1571 assert!(
1572 state.detached,
1573 "task still marked detached even when marker exists"
1574 );
1575 assert_eq!(
1580 state.metadata.status,
1581 BgTaskStatus::Running,
1582 "reap_child must defer to poll_task when marker exists"
1583 );
1584 }
1585
1586 #[test]
1587 fn cleanup_finished_keeps_running_tasks() {
1588 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1589 let dir = tempfile::tempdir().unwrap();
1590 let task_id = registry
1591 .spawn(
1592 LONG_RUNNING_COMMAND,
1593 "session".to_string(),
1594 dir.path().to_path_buf(),
1595 HashMap::new(),
1596 Some(Duration::from_secs(30)),
1597 dir.path().to_path_buf(),
1598 10,
1599 true,
1600 false,
1601 )
1602 .unwrap();
1603
1604 registry.cleanup_finished(Duration::ZERO);
1605
1606 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1607 let _ = registry.kill(&task_id, "session");
1608 }
1609
1610 #[cfg(windows)]
1611 fn wait_for_file(path: &Path) -> String {
1612 let started = Instant::now();
1613 loop {
1614 if path.exists() {
1615 return fs::read_to_string(path).expect("read file");
1616 }
1617 assert!(
1618 started.elapsed() < Duration::from_secs(30),
1619 "timed out waiting for {}",
1620 path.display()
1621 );
1622 std::thread::sleep(Duration::from_millis(100));
1623 }
1624 }
1625
1626 #[cfg(windows)]
1627 fn spawn_windows_registry_command(
1628 command: &str,
1629 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1630 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1631 let dir = tempfile::tempdir().unwrap();
1632 let task_id = registry
1633 .spawn(
1634 command,
1635 "session".to_string(),
1636 dir.path().to_path_buf(),
1637 HashMap::new(),
1638 Some(Duration::from_secs(30)),
1639 dir.path().to_path_buf(),
1640 10,
1641 false,
1642 false,
1643 )
1644 .unwrap();
1645 (registry, dir, task_id)
1646 }
1647
1648 #[cfg(windows)]
1649 #[test]
1650 fn windows_spawn_writes_exit_marker_for_zero_exit() {
1651 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1652 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1653
1654 let content = wait_for_file(&exit_path);
1655
1656 assert_eq!(content.trim(), "0");
1657 }
1658
1659 #[cfg(windows)]
1660 #[test]
1661 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1662 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1663 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1664
1665 let content = wait_for_file(&exit_path);
1666
1667 assert_eq!(content.trim(), "42");
1668 }
1669
1670 #[cfg(windows)]
1671 #[test]
1672 fn windows_spawn_captures_stdout_to_disk() {
1673 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1674 let task = registry.task_for_session(&task_id, "session").unwrap();
1675 let stdout_path = task.paths.stdout.clone();
1676 let exit_path = task.paths.exit.clone();
1677
1678 let _ = wait_for_file(&exit_path);
1679 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1680
1681 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1682 }
1683
1684 #[cfg(windows)]
1685 #[test]
1686 fn windows_spawn_uses_pwsh_when_available() {
1687 let candidates = crate::windows_shell::shell_candidates_with(
1691 |binary| match binary {
1692 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
1693 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
1694 _ => None,
1695 },
1696 || None,
1697 );
1698 let shell = candidates.first().expect("at least one candidate").clone();
1699 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1700 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
1701 }
1702
1703 #[cfg(windows)]
1711 #[test]
1712 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1713 let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1714 let script =
1715 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1716
1717 assert!(
1721 script.contains("& echo !ERRORLEVEL! >"),
1722 "wrapper must use delayed expansion: {script}"
1723 );
1724 assert!(
1725 !script.contains("%ERRORLEVEL%"),
1726 "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1727 );
1728 assert!(script.contains("& move /Y"));
1729 assert!(
1732 script.contains("> nul"),
1733 "wrapper must redirect move output to nul: {script}"
1734 );
1735 assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1736 assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1737 }
1738
1739 #[cfg(windows)]
1744 #[test]
1745 fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1746 use crate::windows_shell::WindowsShell;
1747 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1748 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1749 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1750 assert_eq!(
1751 args_strs,
1752 vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1753 "Cmd::bg_command must prepend /V:ON /D /S /C"
1754 );
1755 }
1756
1757 #[cfg(windows)]
1761 #[test]
1762 fn windows_shell_pwsh_bg_command_uses_standard_args() {
1763 use crate::windows_shell::WindowsShell;
1764 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1765 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1766 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1767 assert!(
1768 args_strs.contains(&"-Command"),
1769 "Pwsh::bg_command must use -Command: {args_strs:?}"
1770 );
1771 assert!(
1772 args_strs.contains(&"Get-Date"),
1773 "Pwsh::bg_command must include the user command body"
1774 );
1775 }
1776
1777 #[allow(dead_code)]
1808 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1810}