1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16#[cfg(windows)]
17use std::os::windows::process::CommandExt;
18
19use super::buffer::BgBuffer;
20use super::persistence::{
21 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
22 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
23};
24use super::process::is_process_alive;
25#[cfg(unix)]
26use super::process::terminate_pgid;
27#[cfg(windows)]
28use super::process::terminate_pid;
29use super::{BgTaskInfo, BgTaskStatus};
30const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
38const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
39
40const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
47
48#[derive(Debug, Clone, Serialize)]
49pub struct BgCompletion {
50 pub task_id: String,
51 #[serde(skip_serializing)]
54 pub session_id: String,
55 pub status: BgTaskStatus,
56 pub exit_code: Option<i32>,
57 pub command: String,
58 #[serde(default, skip_serializing_if = "String::is_empty")]
64 pub output_preview: String,
65 #[serde(default, skip_serializing_if = "is_false")]
70 pub output_truncated: bool,
71}
72
73fn is_false(v: &bool) -> bool {
74 !*v
75}
76
77#[derive(Debug, Clone, Serialize)]
78pub struct BgTaskSnapshot {
79 #[serde(flatten)]
80 pub info: BgTaskInfo,
81 pub exit_code: Option<i32>,
82 pub child_pid: Option<u32>,
83 pub workdir: String,
84 pub output_preview: String,
85 pub output_truncated: bool,
86 pub output_path: Option<String>,
87 pub stderr_path: Option<String>,
88}
89
90#[derive(Clone)]
91pub struct BgTaskRegistry {
92 pub(crate) inner: Arc<RegistryInner>,
93}
94
95pub(crate) struct RegistryInner {
96 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
97 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
98 pub(crate) progress_sender: SharedProgressSender,
99 watchdog_started: AtomicBool,
100 pub(crate) shutdown: AtomicBool,
101 pub(crate) long_running_reminder_enabled: AtomicBool,
102 pub(crate) long_running_reminder_interval_ms: AtomicU64,
103}
104
105pub(crate) struct BgTask {
106 pub(crate) task_id: String,
107 pub(crate) session_id: String,
108 pub(crate) paths: TaskPaths,
109 pub(crate) started: Instant,
110 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
111 pub(crate) terminal_at: Mutex<Option<Instant>>,
112 pub(crate) state: Mutex<BgTaskState>,
113}
114
115pub(crate) struct BgTaskState {
116 pub(crate) metadata: PersistedTask,
117 pub(crate) child: Option<Child>,
118 pub(crate) detached: bool,
119 pub(crate) buffer: BgBuffer,
120}
121
122impl BgTaskRegistry {
123 pub fn new(progress_sender: SharedProgressSender) -> Self {
124 Self {
125 inner: Arc::new(RegistryInner {
126 tasks: Mutex::new(HashMap::new()),
127 completions: Mutex::new(VecDeque::new()),
128 progress_sender,
129 watchdog_started: AtomicBool::new(false),
130 shutdown: AtomicBool::new(false),
131 long_running_reminder_enabled: AtomicBool::new(true),
132 long_running_reminder_interval_ms: AtomicU64::new(600_000),
133 }),
134 }
135 }
136
137 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
138 self.inner
139 .long_running_reminder_enabled
140 .store(enabled, Ordering::SeqCst);
141 self.inner
142 .long_running_reminder_interval_ms
143 .store(interval_ms, Ordering::SeqCst);
144 }
145
146 #[cfg(unix)]
147 pub fn spawn(
148 &self,
149 command: &str,
150 session_id: String,
151 workdir: PathBuf,
152 env: HashMap<String, String>,
153 timeout: Option<Duration>,
154 storage_dir: PathBuf,
155 max_running: usize,
156 notify_on_completion: bool,
157 ) -> Result<String, String> {
158 self.start_watchdog();
159
160 let running = self.running_count();
161 if running >= max_running {
162 return Err(format!(
163 "background bash task limit exceeded: {running} running (max {max_running})"
164 ));
165 }
166
167 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
168 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
169 let task_id = self.generate_unique_task_id()?;
170 let paths = task_paths(&storage_dir, &session_id, &task_id);
171 fs::create_dir_all(&paths.dir)
172 .map_err(|e| format!("failed to create background task dir: {e}"))?;
173
174 let mut metadata = PersistedTask::starting(
175 task_id.clone(),
176 session_id.clone(),
177 command.to_string(),
178 workdir.clone(),
179 timeout_ms,
180 notify_on_completion,
181 );
182 write_task(&paths.json, &metadata)
183 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
184
185 create_capture_file(&paths.stdout)
189 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
190 create_capture_file(&paths.stderr)
191 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
192
193 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
194
195 let child_pid = child.id();
196 metadata.mark_running(child_pid, child_pid as i32);
197 write_task(&paths.json, &metadata)
198 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
199
200 let task = Arc::new(BgTask {
201 task_id: task_id.clone(),
202 session_id,
203 paths: paths.clone(),
204 started: Instant::now(),
205 last_reminder_at: Mutex::new(None),
206 terminal_at: Mutex::new(None),
207 state: Mutex::new(BgTaskState {
208 metadata,
209 child: Some(child),
210 detached: false,
211 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
212 }),
213 });
214
215 self.inner
216 .tasks
217 .lock()
218 .map_err(|_| "background task registry lock poisoned".to_string())?
219 .insert(task_id.clone(), task);
220
221 Ok(task_id)
222 }
223
224 #[cfg(windows)]
225 pub fn spawn(
226 &self,
227 command: &str,
228 session_id: String,
229 workdir: PathBuf,
230 env: HashMap<String, String>,
231 timeout: Option<Duration>,
232 storage_dir: PathBuf,
233 max_running: usize,
234 notify_on_completion: bool,
235 ) -> Result<String, String> {
236 self.start_watchdog();
237
238 let running = self.running_count();
239 if running >= max_running {
240 return Err(format!(
241 "background bash task limit exceeded: {running} running (max {max_running})"
242 ));
243 }
244
245 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
246 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
247 let task_id = self.generate_unique_task_id()?;
248 let paths = task_paths(&storage_dir, &session_id, &task_id);
249 fs::create_dir_all(&paths.dir)
250 .map_err(|e| format!("failed to create background task dir: {e}"))?;
251
252 let mut metadata = PersistedTask::starting(
253 task_id.clone(),
254 session_id.clone(),
255 command.to_string(),
256 workdir.clone(),
257 timeout_ms,
258 notify_on_completion,
259 );
260 write_task(&paths.json, &metadata)
261 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
262
263 create_capture_file(&paths.stdout)
269 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
270 create_capture_file(&paths.stderr)
271 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
272
273 let child = spawn_detached_child(command, &paths, &workdir, &env)?;
274
275 let child_pid = child.id();
276 metadata.status = BgTaskStatus::Running;
277 metadata.child_pid = Some(child_pid);
278 metadata.pgid = None;
279 write_task(&paths.json, &metadata)
280 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
281
282 let task = Arc::new(BgTask {
283 task_id: task_id.clone(),
284 session_id,
285 paths: paths.clone(),
286 started: Instant::now(),
287 last_reminder_at: Mutex::new(None),
288 terminal_at: Mutex::new(None),
289 state: Mutex::new(BgTaskState {
290 metadata,
291 child: Some(child),
292 detached: false,
293 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
294 }),
295 });
296
297 self.inner
298 .tasks
299 .lock()
300 .map_err(|_| "background task registry lock poisoned".to_string())?
301 .insert(task_id.clone(), task);
302
303 Ok(task_id)
304 }
305
306 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
307 self.start_watchdog();
308 let dir = session_tasks_dir(storage_dir, session_id);
309 if !dir.exists() {
310 return Ok(());
311 }
312
313 let entries = fs::read_dir(&dir)
314 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
315 for entry in entries.flatten() {
316 let path = entry.path();
317 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
318 continue;
319 }
320 let Ok(mut metadata) = read_task(&path) else {
321 continue;
322 };
323 if metadata.session_id != session_id {
324 continue;
325 }
326
327 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
328 match metadata.status {
329 BgTaskStatus::Starting => {
330 metadata.mark_terminal(
331 BgTaskStatus::Failed,
332 None,
333 Some("spawn aborted".to_string()),
334 );
335 let _ = write_task(&paths.json, &metadata);
336 self.enqueue_completion_if_needed(&metadata, false);
337 }
338 BgTaskStatus::Running => {
339 if self.running_metadata_is_stale(&metadata) {
340 metadata.mark_terminal(
341 BgTaskStatus::Killed,
342 None,
343 Some("orphaned (>24h)".to_string()),
344 );
345 if !paths.exit.exists() {
346 let _ = write_kill_marker_if_absent(&paths.exit);
347 }
348 let _ = write_task(&paths.json, &metadata);
349 self.enqueue_completion_if_needed(&metadata, false);
350 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
351 metadata = terminal_metadata_from_marker(metadata, marker, None);
352 let _ = write_task(&paths.json, &metadata);
353 self.enqueue_completion_if_needed(&metadata, false);
354 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
355 metadata.mark_terminal(
356 BgTaskStatus::Failed,
357 None,
358 Some("process exited without exit marker".to_string()),
359 );
360 let _ = write_task(&paths.json, &metadata);
361 self.enqueue_completion_if_needed(&metadata, false);
362 } else {
363 self.insert_rehydrated_task(metadata, paths, true)?;
364 }
365 }
366 _ if metadata.status.is_terminal() => {
367 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
368 self.enqueue_completion_if_needed(&metadata, false);
369 }
370 _ => {}
371 }
372 }
373
374 Ok(())
375 }
376
377 pub fn status(
378 &self,
379 task_id: &str,
380 session_id: &str,
381 preview_bytes: usize,
382 ) -> Option<BgTaskSnapshot> {
383 let task = self.task_for_session(task_id, session_id)?;
384 let _ = self.poll_task(&task);
385 Some(task.snapshot(preview_bytes))
386 }
387
388 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
389 let tasks = self
390 .inner
391 .tasks
392 .lock()
393 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
394 .unwrap_or_default();
395 tasks
396 .into_iter()
397 .map(|task| {
398 let _ = self.poll_task(&task);
399 task.snapshot(preview_bytes)
400 })
401 .collect()
402 }
403
404 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
405 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
406 }
407
408 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
409 let task = self
410 .task_for_session(task_id, session_id)
411 .ok_or_else(|| format!("background task not found: {task_id}"))?;
412 let mut state = task
413 .state
414 .lock()
415 .map_err(|_| "background task lock poisoned".to_string())?;
416 let updated = update_task(&task.paths.json, |metadata| {
417 metadata.notify_on_completion = true;
418 metadata.completion_delivered = false;
419 })
420 .map_err(|e| format!("failed to promote background task: {e}"))?;
421 state.metadata = updated;
422 if state.metadata.status.is_terminal() {
423 state.buffer.enforce_terminal_cap();
424 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
425 }
426 Ok(true)
427 }
428
429 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
430 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
431 .map(|_| ())
432 }
433
434 pub fn cleanup_finished(&self, older_than: Duration) {
435 let cutoff = Instant::now().checked_sub(older_than);
436 if let Ok(mut tasks) = self.inner.tasks.lock() {
437 tasks.retain(|_, task| {
438 let is_terminal = task
439 .state
440 .lock()
441 .map(|state| state.metadata.status.is_terminal())
442 .unwrap_or(false);
443 if !is_terminal {
444 return true;
445 }
446
447 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
448 match (terminal_at, cutoff) {
449 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
450 (Some(_), None) => false,
451 (None, _) => true,
452 }
453 });
454 }
455 }
456
457 pub fn drain_completions(&self) -> Vec<BgCompletion> {
458 self.drain_completions_for_session(None)
459 }
460
461 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
462 let mut completions = match self.inner.completions.lock() {
463 Ok(completions) => completions,
464 Err(_) => return Vec::new(),
465 };
466
467 let drained = if let Some(session_id) = session_id {
468 let mut matched = Vec::new();
469 let mut retained = VecDeque::new();
470 while let Some(completion) = completions.pop_front() {
471 if completion.session_id == session_id {
472 matched.push(completion);
473 } else {
474 retained.push_back(completion);
475 }
476 }
477 *completions = retained;
478 matched
479 } else {
480 completions.drain(..).collect()
481 };
482 drop(completions);
483
484 for completion in &drained {
485 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
486 let _ = task.set_completion_delivered(true);
487 }
488 }
489
490 drained
491 }
492
493 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
494 self.inner
495 .completions
496 .lock()
497 .map(|completions| {
498 completions
499 .iter()
500 .filter(|completion| completion.session_id == session_id)
501 .cloned()
502 .collect()
503 })
504 .unwrap_or_default()
505 }
506
507 pub fn detach(&self) {
508 self.inner.shutdown.store(true, Ordering::SeqCst);
509 if let Ok(mut tasks) = self.inner.tasks.lock() {
510 for task in tasks.values() {
511 if let Ok(mut state) = task.state.lock() {
512 state.child = None;
513 state.detached = true;
514 }
515 }
516 tasks.clear();
517 }
518 }
519
520 pub fn shutdown(&self) {
521 let tasks = self
522 .inner
523 .tasks
524 .lock()
525 .map(|tasks| {
526 tasks
527 .values()
528 .map(|task| (task.task_id.clone(), task.session_id.clone()))
529 .collect::<Vec<_>>()
530 })
531 .unwrap_or_default();
532 for (task_id, session_id) in tasks {
533 let _ = self.kill(&task_id, &session_id);
534 }
535 }
536
537 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
538 let marker = match read_exit_marker(&task.paths.exit) {
539 Ok(Some(marker)) => marker,
540 Ok(None) => return Ok(()),
541 Err(error) => return Err(format!("failed to read exit marker: {error}")),
542 };
543 self.finalize_from_marker(task, marker, None)
544 }
545
546 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
547 let Ok(mut state) = task.state.lock() else {
548 return;
549 };
550 if let Some(child) = state.child.as_mut() {
551 if matches!(child.try_wait(), Ok(Some(_))) {
552 state.child = None;
569 state.detached = true;
570 if state.metadata.status.is_terminal() {
571 return;
572 }
573 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
574 return;
575 }
576 let updated = update_task(&task.paths.json, |metadata| {
577 metadata.mark_terminal(
578 BgTaskStatus::Failed,
579 None,
580 Some("process exited without exit marker".to_string()),
581 );
582 });
583 if let Ok(metadata) = updated {
584 state.metadata = metadata;
585 task.mark_terminal_now();
586 state.buffer.enforce_terminal_cap();
587 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
588 }
589 }
590 }
591 }
592
593 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
594 self.inner
595 .tasks
596 .lock()
597 .map(|tasks| {
598 tasks
599 .values()
600 .filter(|task| task.is_running())
601 .cloned()
602 .collect()
603 })
604 .unwrap_or_default()
605 }
606
607 fn insert_rehydrated_task(
608 &self,
609 metadata: PersistedTask,
610 paths: TaskPaths,
611 detached: bool,
612 ) -> Result<(), String> {
613 let task_id = metadata.task_id.clone();
614 let session_id = metadata.session_id.clone();
615 let task = Arc::new(BgTask {
616 task_id: task_id.clone(),
617 session_id,
618 paths: paths.clone(),
619 started: Instant::now(),
620 last_reminder_at: Mutex::new(None),
621 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
622 state: Mutex::new(BgTaskState {
623 metadata,
624 child: None,
625 detached,
626 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
627 }),
628 });
629 self.inner
630 .tasks
631 .lock()
632 .map_err(|_| "background task registry lock poisoned".to_string())?
633 .insert(task_id, task);
634 Ok(())
635 }
636
637 fn kill_with_status(
638 &self,
639 task_id: &str,
640 session_id: &str,
641 terminal_status: BgTaskStatus,
642 ) -> Result<BgTaskSnapshot, String> {
643 let task = self
644 .task_for_session(task_id, session_id)
645 .ok_or_else(|| format!("background task not found: {task_id}"))?;
646
647 {
648 let mut state = task
649 .state
650 .lock()
651 .map_err(|_| "background task lock poisoned".to_string())?;
652 if state.metadata.status.is_terminal() {
653 return Ok(task.snapshot_locked(&state, 5 * 1024));
654 }
655
656 state.metadata.status = BgTaskStatus::Killing;
657 write_task(&task.paths.json, &state.metadata)
658 .map_err(|e| format!("failed to persist killing state: {e}"))?;
659
660 #[cfg(unix)]
661 if let Some(pgid) = state.metadata.pgid {
662 terminate_pgid(pgid, state.child.as_mut());
663 }
664 #[cfg(windows)]
665 if let Some(child) = state.child.as_mut() {
666 super::process::terminate_process(child);
667 } else if let Some(pid) = state.metadata.child_pid {
668 terminate_pid(pid);
669 }
670 if let Some(child) = state.child.as_mut() {
671 let _ = child.wait();
672 }
673 state.child = None;
674 state.detached = true;
675
676 if !task.paths.exit.exists() {
677 write_kill_marker_if_absent(&task.paths.exit)
678 .map_err(|e| format!("failed to write kill marker: {e}"))?;
679 }
680
681 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
682 Some(124)
683 } else {
684 None
685 };
686 state
687 .metadata
688 .mark_terminal(terminal_status, exit_code, None);
689 task.mark_terminal_now();
690 write_task(&task.paths.json, &state.metadata)
691 .map_err(|e| format!("failed to persist killed state: {e}"))?;
692 state.buffer.enforce_terminal_cap();
693 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
694 }
695
696 Ok(task.snapshot(5 * 1024))
697 }
698
699 fn finalize_from_marker(
700 &self,
701 task: &Arc<BgTask>,
702 marker: ExitMarker,
703 reason: Option<String>,
704 ) -> Result<(), String> {
705 let mut state = task
706 .state
707 .lock()
708 .map_err(|_| "background task lock poisoned".to_string())?;
709 if state.metadata.status.is_terminal() {
710 return Ok(());
711 }
712
713 let updated = update_task(&task.paths.json, |metadata| {
714 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
715 *metadata = new_metadata;
716 })
717 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
718 state.metadata = updated;
719 task.mark_terminal_now();
720 state.child = None;
721 state.detached = true;
722 state.buffer.enforce_terminal_cap();
723 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
724 Ok(())
725 }
726
727 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
728 if metadata.status.is_terminal() && !metadata.completion_delivered {
729 self.enqueue_completion_locked(metadata, None, emit_frame);
730 }
731 }
732
733 fn enqueue_completion_locked(
734 &self,
735 metadata: &PersistedTask,
736 buffer: Option<&BgBuffer>,
737 emit_frame: bool,
738 ) {
739 if !metadata.status.is_terminal() || metadata.completion_delivered {
740 return;
741 }
742 let (output_preview, output_truncated) = match buffer {
747 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
748 None => (String::new(), false),
749 };
750 let completion = BgCompletion {
751 task_id: metadata.task_id.clone(),
752 session_id: metadata.session_id.clone(),
753 status: metadata.status.clone(),
754 exit_code: metadata.exit_code,
755 command: metadata.command.clone(),
756 output_preview,
757 output_truncated,
758 };
759 if let Ok(mut completions) = self.inner.completions.lock() {
760 if completions
761 .iter()
762 .any(|completion| completion.task_id == metadata.task_id)
763 {
764 return;
765 }
766 completions.push_back(completion.clone());
767 } else {
768 return;
769 }
770
771 if emit_frame {
772 self.emit_bash_completed(completion);
773 }
774 }
775
776 fn emit_bash_completed(&self, completion: BgCompletion) {
777 let Ok(progress_sender) = self
778 .inner
779 .progress_sender
780 .lock()
781 .map(|sender| sender.clone())
782 else {
783 return;
784 };
785 let Some(sender) = progress_sender.as_ref() else {
786 return;
787 };
788 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
796 completion.task_id,
797 completion.session_id,
798 completion.status,
799 completion.exit_code,
800 completion.command,
801 completion.output_preview,
802 completion.output_truncated,
803 )));
804 }
805
806 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
807 if !self
808 .inner
809 .long_running_reminder_enabled
810 .load(Ordering::SeqCst)
811 {
812 return;
813 }
814 let interval_ms = self
815 .inner
816 .long_running_reminder_interval_ms
817 .load(Ordering::SeqCst);
818 if interval_ms == 0 {
819 return;
820 }
821 let interval = Duration::from_millis(interval_ms);
822 let now = Instant::now();
823 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
824 return;
825 };
826 let since = last_reminder_at.unwrap_or(task.started);
827 if now.duration_since(since) < interval {
828 return;
829 }
830 let command = task
831 .state
832 .lock()
833 .map(|state| state.metadata.command.clone())
834 .unwrap_or_default();
835 *last_reminder_at = Some(now);
836 self.emit_bash_long_running(BashLongRunningFrame::new(
837 task.task_id.clone(),
838 task.session_id.clone(),
839 command,
840 task.started.elapsed().as_millis() as u64,
841 ));
842 }
843
844 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
845 let Ok(progress_sender) = self
846 .inner
847 .progress_sender
848 .lock()
849 .map(|sender| sender.clone())
850 else {
851 return;
852 };
853 if let Some(sender) = progress_sender.as_ref() {
854 sender(PushFrame::BashLongRunning(frame));
855 }
856 }
857
858 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
859 self.inner
860 .tasks
861 .lock()
862 .ok()
863 .and_then(|tasks| tasks.get(task_id).cloned())
864 }
865
866 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
867 self.task(task_id)
868 .filter(|task| task.session_id == session_id)
869 }
870
871 fn running_count(&self) -> usize {
872 self.inner
873 .tasks
874 .lock()
875 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
876 .unwrap_or(0)
877 }
878
879 fn start_watchdog(&self) {
880 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
881 super::watchdog::start(self.clone());
882 }
883 }
884
885 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
886 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
887 }
888
889 #[cfg(test)]
890 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
891 self.task_for_session(task_id, session_id)
892 .map(|task| task.paths.json.clone())
893 }
894
895 #[cfg(test)]
896 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
897 self.task_for_session(task_id, session_id)
898 .map(|task| task.paths.exit.clone())
899 }
900
901 fn generate_unique_task_id(&self) -> Result<String, String> {
903 for _ in 0..32 {
904 let candidate = random_slug();
905 let tasks = self
906 .inner
907 .tasks
908 .lock()
909 .map_err(|_| "background task registry lock poisoned".to_string())?;
910 if tasks.contains_key(&candidate) {
911 continue;
912 }
913 let completions = self
914 .inner
915 .completions
916 .lock()
917 .map_err(|_| "background completions lock poisoned".to_string())?;
918 if completions
919 .iter()
920 .any(|completion| completion.task_id == candidate)
921 {
922 continue;
923 }
924 return Ok(candidate);
925 }
926 Err("failed to allocate unique background task id after 32 attempts".to_string())
927 }
928}
929
930impl Default for BgTaskRegistry {
931 fn default() -> Self {
932 Self::new(Arc::new(Mutex::new(None)))
933 }
934}
935
936impl BgTask {
937 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
938 let state = self
939 .state
940 .lock()
941 .unwrap_or_else(|poison| poison.into_inner());
942 self.snapshot_locked(&state, preview_bytes)
943 }
944
945 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
946 let metadata = &state.metadata;
947 let duration_ms = metadata.duration_ms.or_else(|| {
948 metadata
949 .status
950 .is_terminal()
951 .then(|| self.started.elapsed().as_millis() as u64)
952 });
953 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
954 BgTaskSnapshot {
955 info: BgTaskInfo {
956 task_id: self.task_id.clone(),
957 status: metadata.status.clone(),
958 command: metadata.command.clone(),
959 started_at: metadata.started_at,
960 duration_ms,
961 },
962 exit_code: metadata.exit_code,
963 child_pid: metadata.child_pid,
964 workdir: metadata.workdir.display().to_string(),
965 output_preview,
966 output_truncated,
967 output_path: state
968 .buffer
969 .output_path()
970 .map(|path| path.display().to_string()),
971 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
972 }
973 }
974
975 pub(crate) fn is_running(&self) -> bool {
976 self.state
977 .lock()
978 .map(|state| state.metadata.status == BgTaskStatus::Running)
979 .unwrap_or(false)
980 }
981
982 fn mark_terminal_now(&self) {
983 if let Ok(mut terminal_at) = self.terminal_at.lock() {
984 if terminal_at.is_none() {
985 *terminal_at = Some(Instant::now());
986 }
987 }
988 }
989
990 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
991 let mut state = self
992 .state
993 .lock()
994 .map_err(|_| "background task lock poisoned".to_string())?;
995 let updated = update_task(&self.paths.json, |metadata| {
996 metadata.completion_delivered = delivered;
997 })
998 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
999 state.metadata = updated;
1000 Ok(())
1001 }
1002}
1003
1004fn terminal_metadata_from_marker(
1005 mut metadata: PersistedTask,
1006 marker: ExitMarker,
1007 reason: Option<String>,
1008) -> PersistedTask {
1009 match marker {
1010 ExitMarker::Code(code) => {
1011 let status = if code == 0 {
1012 BgTaskStatus::Completed
1013 } else {
1014 BgTaskStatus::Failed
1015 };
1016 metadata.mark_terminal(status, Some(code), reason);
1017 }
1018 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1019 }
1020 metadata
1021}
1022
1023#[cfg(unix)]
1024fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1025 let mut cmd = Command::new("/bin/sh");
1026 cmd.arg("-c")
1027 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1028 .arg("/bin/sh")
1029 .arg(command)
1030 .arg(exit_path);
1031 unsafe {
1032 cmd.pre_exec(|| {
1033 if libc::setsid() == -1 {
1034 return Err(std::io::Error::last_os_error());
1035 }
1036 Ok(())
1037 });
1038 }
1039 cmd
1040}
1041
1042#[cfg(windows)]
1043fn detached_shell_command_for(
1044 shell: crate::windows_shell::WindowsShell,
1045 command: &str,
1046 exit_path: &Path,
1047 paths: &TaskPaths,
1048 creation_flags: u32,
1049) -> Result<Command, String> {
1050 use crate::windows_shell::WindowsShell;
1051 let wrapper_body = shell.wrapper_script(command, exit_path);
1064 let wrapper_ext = match shell {
1065 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1066 WindowsShell::Cmd => "bat",
1067 WindowsShell::Posix(_) => "sh",
1071 };
1072 let wrapper_path = paths.dir.join(format!(
1073 "{}.{}",
1074 paths
1075 .json
1076 .file_stem()
1077 .and_then(|s| s.to_str())
1078 .unwrap_or("wrapper"),
1079 wrapper_ext
1080 ));
1081 fs::write(&wrapper_path, wrapper_body)
1082 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1083
1084 let mut cmd = Command::new(shell.binary().as_ref());
1085 match shell {
1086 WindowsShell::Pwsh | WindowsShell::Powershell => {
1087 cmd.args([
1090 "-NoLogo",
1091 "-NoProfile",
1092 "-NonInteractive",
1093 "-ExecutionPolicy",
1094 "Bypass",
1095 "-File",
1096 ]);
1097 cmd.arg(&wrapper_path);
1098 }
1099 WindowsShell::Cmd => {
1100 cmd.args(["/V:ON", "/D", "/C"]);
1107 cmd.arg(&wrapper_path);
1108 }
1109 WindowsShell::Posix(_) => {
1110 cmd.arg(&wrapper_path);
1115 }
1116 }
1117
1118 cmd.creation_flags(creation_flags);
1122 Ok(cmd)
1123}
1124
1125fn spawn_detached_child(
1141 command: &str,
1142 paths: &TaskPaths,
1143 workdir: &Path,
1144 env: &HashMap<String, String>,
1145) -> Result<std::process::Child, String> {
1146 #[cfg(not(windows))]
1147 {
1148 let stdout = create_capture_file(&paths.stdout)
1149 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1150 let stderr = create_capture_file(&paths.stderr)
1151 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1152 detached_shell_command(command, &paths.exit)
1153 .current_dir(workdir)
1154 .envs(env)
1155 .stdin(Stdio::null())
1156 .stdout(Stdio::from(stdout))
1157 .stderr(Stdio::from(stderr))
1158 .spawn()
1159 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1160 }
1161 #[cfg(windows)]
1162 {
1163 use crate::windows_shell::{shell_candidates, WindowsShell};
1164 let raw_candidates = shell_candidates();
1182 let mut candidates: Vec<WindowsShell> = Vec::with_capacity(raw_candidates.len());
1183 for shell in &raw_candidates {
1184 if matches!(shell, WindowsShell::Cmd) {
1185 candidates.insert(0, shell.clone());
1186 } else {
1187 candidates.push(shell.clone());
1188 }
1189 }
1190 const FLAG_DETACHED_PROCESS: u32 = 0x0000_0008;
1199 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1200 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1201 let with_breakaway =
1202 FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1203 let without_breakaway = FLAG_DETACHED_PROCESS | FLAG_CREATE_NEW_PROCESS_GROUP;
1204 let mut last_error: Option<String> = None;
1205 for (idx, shell) in candidates.iter().enumerate() {
1206 for &flags in &[with_breakaway, without_breakaway] {
1210 let stdout = create_capture_file(&paths.stdout)
1212 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1213 let stderr = create_capture_file(&paths.stderr)
1214 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1215 let mut cmd =
1216 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1217 cmd.current_dir(workdir)
1218 .envs(env)
1219 .stdin(Stdio::null())
1220 .stdout(Stdio::from(stdout))
1221 .stderr(Stdio::from(stderr));
1222 match cmd.spawn() {
1223 Ok(child) => {
1224 if idx > 0 {
1225 log::warn!(
1226 "[aft] background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1227 the cached PATH probe disagreed with runtime spawn — likely PATH \
1228 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1229 shell.binary(),
1230 idx
1231 );
1232 }
1233 if flags == without_breakaway {
1234 log::warn!(
1235 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1236 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1237 Spawned without breakaway; the bg task will be torn down if the \
1238 AFT process group is killed."
1239 );
1240 }
1241 return Ok(child);
1242 }
1243 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1244 log::warn!(
1245 "[aft] background bash spawn: {} returned NotFound at runtime — trying next candidate",
1246 shell.binary()
1247 );
1248 last_error = Some(format!("{}: {e}", shell.binary()));
1249 break;
1252 }
1253 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1254 log::warn!(
1256 "[aft] background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1257 Access Denied — retrying {} without breakaway",
1258 shell.binary()
1259 );
1260 last_error = Some(format!("{}: {e}", shell.binary()));
1261 continue;
1262 }
1263 Err(e) => {
1264 return Err(format!(
1265 "failed to spawn background bash command via {}: {e}",
1266 shell.binary()
1267 ));
1268 }
1269 }
1270 }
1271 }
1272 Err(format!(
1273 "failed to spawn background bash command: no Windows shell could be spawned. \
1274 Last error: {}. PATH-probed candidates: {:?}",
1275 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1276 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1277 ))
1278 }
1279}
1280
1281fn random_slug() -> String {
1282 let mut bytes = [0u8; 4];
1283 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1285 let t = SystemTime::now()
1287 .duration_since(UNIX_EPOCH)
1288 .map(|d| d.subsec_nanos())
1289 .unwrap_or(0);
1290 let p = std::process::id();
1291 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1292 });
1293 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1295 format!("bgb-{hex}")
1296}
1297
1298#[cfg(test)]
1299mod tests {
1300 use std::collections::HashMap;
1301 #[cfg(windows)]
1302 use std::fs;
1303 use std::sync::{Arc, Mutex};
1304 use std::time::Duration;
1305 #[cfg(windows)]
1306 use std::time::Instant;
1307
1308 use super::*;
1309
1310 #[cfg(unix)]
1311 const QUICK_SUCCESS_COMMAND: &str = "true";
1312 #[cfg(windows)]
1313 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1314
1315 #[cfg(unix)]
1316 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1317 #[cfg(windows)]
1318 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1319
1320 #[test]
1321 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1322 let registry = BgTaskRegistry::default();
1323 let dir = tempfile::tempdir().unwrap();
1324 let task_id = registry
1325 .spawn(
1326 QUICK_SUCCESS_COMMAND,
1327 "session".to_string(),
1328 dir.path().to_path_buf(),
1329 HashMap::new(),
1330 Some(Duration::from_secs(30)),
1331 dir.path().to_path_buf(),
1332 10,
1333 true,
1334 )
1335 .unwrap();
1336 registry
1337 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1338 .unwrap();
1339
1340 registry.cleanup_finished(Duration::ZERO);
1341
1342 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1343 }
1344
1345 #[test]
1355 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1356 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1357 let dir = tempfile::tempdir().unwrap();
1358 let task_id = registry
1359 .spawn(
1360 QUICK_SUCCESS_COMMAND,
1361 "session".to_string(),
1362 dir.path().to_path_buf(),
1363 HashMap::new(),
1364 Some(Duration::from_secs(30)),
1365 dir.path().to_path_buf(),
1366 10,
1367 true,
1368 )
1369 .unwrap();
1370
1371 let task = registry.task_for_session(&task_id, "session").unwrap();
1372
1373 let started = Instant::now();
1378 loop {
1379 let exited = {
1380 let mut state = task.state.lock().unwrap();
1381 if let Some(child) = state.child.as_mut() {
1382 matches!(child.try_wait(), Ok(Some(_)))
1383 } else {
1384 true
1385 }
1386 };
1387 if exited {
1388 break;
1389 }
1390 assert!(
1391 started.elapsed() < Duration::from_secs(5),
1392 "child should exit quickly"
1393 );
1394 std::thread::sleep(Duration::from_millis(20));
1395 }
1396
1397 let _ = std::fs::remove_file(&task.paths.exit);
1400
1401 assert!(
1404 task.is_running(),
1405 "precondition: metadata.status == Running"
1406 );
1407 assert!(
1408 !task.paths.exit.exists(),
1409 "precondition: exit marker absent"
1410 );
1411
1412 registry.reap_child(&task);
1416
1417 let state = task.state.lock().unwrap();
1418 assert!(
1419 state.metadata.status.is_terminal(),
1420 "reap_child must transition to terminal when PID dead and no marker. \
1421 Got status={:?}",
1422 state.metadata.status
1423 );
1424 assert_eq!(
1425 state.metadata.status,
1426 BgTaskStatus::Failed,
1427 "must specifically be Failed (not Killed): status={:?}",
1428 state.metadata.status
1429 );
1430 assert_eq!(
1431 state.metadata.status_reason.as_deref(),
1432 Some("process exited without exit marker"),
1433 "reason must match replay path's wording: {:?}",
1434 state.metadata.status_reason
1435 );
1436 assert!(
1437 state.child.is_none(),
1438 "child handle must be released after reap"
1439 );
1440 assert!(state.detached, "task must be marked detached after reap");
1441 }
1442
1443 #[test]
1449 fn reap_child_preserves_running_when_exit_marker_exists() {
1450 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1451 let dir = tempfile::tempdir().unwrap();
1452 let task_id = registry
1453 .spawn(
1454 QUICK_SUCCESS_COMMAND,
1455 "session".to_string(),
1456 dir.path().to_path_buf(),
1457 HashMap::new(),
1458 Some(Duration::from_secs(30)),
1459 dir.path().to_path_buf(),
1460 10,
1461 true,
1462 )
1463 .unwrap();
1464
1465 let task = registry.task_for_session(&task_id, "session").unwrap();
1466
1467 let started = Instant::now();
1470 loop {
1471 let exited = {
1472 let mut state = task.state.lock().unwrap();
1473 if let Some(child) = state.child.as_mut() {
1474 matches!(child.try_wait(), Ok(Some(_)))
1475 } else {
1476 true
1477 }
1478 };
1479 if exited && task.paths.exit.exists() {
1480 break;
1481 }
1482 assert!(
1483 started.elapsed() < Duration::from_secs(5),
1484 "child should exit and write marker quickly"
1485 );
1486 std::thread::sleep(Duration::from_millis(20));
1487 }
1488
1489 registry.reap_child(&task);
1493
1494 let state = task.state.lock().unwrap();
1495 assert!(
1496 state.child.is_none(),
1497 "child handle still released even when marker exists"
1498 );
1499 assert!(
1500 state.detached,
1501 "task still marked detached even when marker exists"
1502 );
1503 assert_eq!(
1508 state.metadata.status,
1509 BgTaskStatus::Running,
1510 "reap_child must defer to poll_task when marker exists"
1511 );
1512 }
1513
1514 #[test]
1515 fn cleanup_finished_keeps_running_tasks() {
1516 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1517 let dir = tempfile::tempdir().unwrap();
1518 let task_id = registry
1519 .spawn(
1520 LONG_RUNNING_COMMAND,
1521 "session".to_string(),
1522 dir.path().to_path_buf(),
1523 HashMap::new(),
1524 Some(Duration::from_secs(30)),
1525 dir.path().to_path_buf(),
1526 10,
1527 true,
1528 )
1529 .unwrap();
1530
1531 registry.cleanup_finished(Duration::ZERO);
1532
1533 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1534 let _ = registry.kill(&task_id, "session");
1535 }
1536
1537 #[cfg(windows)]
1538 fn wait_for_file(path: &Path) -> String {
1539 let started = Instant::now();
1540 loop {
1541 if path.exists() {
1542 return fs::read_to_string(path).expect("read file");
1543 }
1544 assert!(
1545 started.elapsed() < Duration::from_secs(30),
1546 "timed out waiting for {}",
1547 path.display()
1548 );
1549 std::thread::sleep(Duration::from_millis(100));
1550 }
1551 }
1552
1553 #[cfg(windows)]
1554 fn spawn_windows_registry_command(
1555 command: &str,
1556 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
1557 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1558 let dir = tempfile::tempdir().unwrap();
1559 let task_id = registry
1560 .spawn(
1561 command,
1562 "session".to_string(),
1563 dir.path().to_path_buf(),
1564 HashMap::new(),
1565 Some(Duration::from_secs(30)),
1566 dir.path().to_path_buf(),
1567 10,
1568 )
1569 .unwrap();
1570 (registry, dir, task_id)
1571 }
1572
1573 #[cfg(windows)]
1574 #[test]
1575 fn windows_spawn_writes_exit_marker_for_zero_exit() {
1576 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
1577 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1578
1579 let content = wait_for_file(&exit_path);
1580
1581 assert_eq!(content.trim(), "0");
1582 }
1583
1584 #[cfg(windows)]
1585 #[test]
1586 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
1587 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
1588 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
1589
1590 let content = wait_for_file(&exit_path);
1591
1592 assert_eq!(content.trim(), "42");
1593 }
1594
1595 #[cfg(windows)]
1596 #[test]
1597 fn windows_spawn_captures_stdout_to_disk() {
1598 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
1599 let task = registry.task_for_session(&task_id, "session").unwrap();
1600 let stdout_path = task.paths.stdout.clone();
1601 let exit_path = task.paths.exit.clone();
1602
1603 let _ = wait_for_file(&exit_path);
1604 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
1605
1606 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
1607 }
1608
1609 #[cfg(windows)]
1610 #[test]
1611 fn windows_spawn_uses_pwsh_when_available() {
1612 let candidates = crate::windows_shell::shell_candidates_with(
1616 |binary| match binary {
1617 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
1618 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
1619 _ => None,
1620 },
1621 || None,
1622 );
1623 let shell = candidates.first().expect("at least one candidate").clone();
1624 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
1625 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
1626 }
1627
1628 #[cfg(windows)]
1636 #[test]
1637 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
1638 let exit_path = Path::new(r"C:\Temp\bgb-test.exit");
1639 let script =
1640 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
1641
1642 assert!(
1646 script.contains("& echo !ERRORLEVEL! >"),
1647 "wrapper must use delayed expansion: {script}"
1648 );
1649 assert!(
1650 !script.contains("%ERRORLEVEL%"),
1651 "wrapper must NOT use parse-time %ERRORLEVEL% expansion: {script}"
1652 );
1653 assert!(script.contains("& move /Y"));
1654 assert!(
1657 script.contains("> nul"),
1658 "wrapper must redirect move output to nul: {script}"
1659 );
1660 assert!(script.contains(r#""C:\Temp\bgb-test.exit.tmp""#));
1661 assert!(script.contains(r#""C:\Temp\bgb-test.exit""#));
1662 }
1663
1664 #[cfg(windows)]
1669 #[test]
1670 fn windows_shell_cmd_bg_command_enables_delayed_expansion() {
1671 use crate::windows_shell::WindowsShell;
1672 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
1673 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1674 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1675 assert_eq!(
1676 args_strs,
1677 vec!["/V:ON", "/D", "/S", "/C", "echo wrapped"],
1678 "Cmd::bg_command must prepend /V:ON /D /S /C"
1679 );
1680 }
1681
1682 #[cfg(windows)]
1686 #[test]
1687 fn windows_shell_pwsh_bg_command_uses_standard_args() {
1688 use crate::windows_shell::WindowsShell;
1689 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
1690 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
1691 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
1692 assert!(
1693 args_strs.contains(&"-Command"),
1694 "Pwsh::bg_command must use -Command: {args_strs:?}"
1695 );
1696 assert!(
1697 args_strs.contains(&"Get-Date"),
1698 "Pwsh::bg_command must include the user command body"
1699 );
1700 }
1701
1702 #[allow(dead_code)]
1733 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
1735}