1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use serde::Serialize;
12
13use crate::context::SharedProgressSender;
14use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
15
16#[cfg(unix)]
17use std::os::unix::process::CommandExt;
18#[cfg(windows)]
19use std::os::windows::process::CommandExt;
20
21use super::buffer::BgBuffer;
22use super::persistence::{
23 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
24 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
25 PersistedTask, TaskPaths,
26};
27use super::process::is_process_alive;
28#[cfg(unix)]
29use super::process::terminate_pgid;
30#[cfg(windows)]
31use super::process::terminate_pid;
32use super::{BgTaskInfo, BgTaskStatus};
33const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
41const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
42const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
43const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
44
45const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
52
53#[derive(Debug, Clone, Serialize)]
54pub struct BgCompletion {
55 pub task_id: String,
56 #[serde(skip_serializing)]
59 pub session_id: String,
60 pub status: BgTaskStatus,
61 pub exit_code: Option<i32>,
62 pub command: String,
63 #[serde(default, skip_serializing_if = "String::is_empty")]
69 pub output_preview: String,
70 #[serde(default, skip_serializing_if = "is_false")]
75 pub output_truncated: bool,
76}
77
78fn is_false(v: &bool) -> bool {
79 !*v
80}
81
82#[derive(Debug, Clone, Serialize)]
83pub struct BgTaskSnapshot {
84 #[serde(flatten)]
85 pub info: BgTaskInfo,
86 pub exit_code: Option<i32>,
87 pub child_pid: Option<u32>,
88 pub workdir: String,
89 pub output_preview: String,
90 pub output_truncated: bool,
91 pub output_path: Option<String>,
92 pub stderr_path: Option<String>,
93}
94
95#[derive(Clone)]
96pub struct BgTaskRegistry {
97 pub(crate) inner: Arc<RegistryInner>,
98}
99
100pub(crate) struct RegistryInner {
101 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
102 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
103 pub(crate) progress_sender: SharedProgressSender,
104 watchdog_started: AtomicBool,
105 pub(crate) shutdown: AtomicBool,
106 pub(crate) long_running_reminder_enabled: AtomicBool,
107 pub(crate) long_running_reminder_interval_ms: AtomicU64,
108 persisted_gc_started: AtomicBool,
109 #[cfg(test)]
110 persisted_gc_runs: AtomicU64,
111 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
117}
118
119pub(crate) struct BgTask {
120 pub(crate) task_id: String,
121 pub(crate) session_id: String,
122 pub(crate) paths: TaskPaths,
123 pub(crate) started: Instant,
124 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
125 pub(crate) terminal_at: Mutex<Option<Instant>>,
126 pub(crate) state: Mutex<BgTaskState>,
127}
128
129pub(crate) struct BgTaskState {
130 pub(crate) metadata: PersistedTask,
131 pub(crate) child: Option<Child>,
132 pub(crate) detached: bool,
133 pub(crate) buffer: BgBuffer,
134}
135
136impl BgTaskRegistry {
137 pub fn new(progress_sender: SharedProgressSender) -> Self {
138 Self {
139 inner: Arc::new(RegistryInner {
140 tasks: Mutex::new(HashMap::new()),
141 completions: Mutex::new(VecDeque::new()),
142 progress_sender,
143 watchdog_started: AtomicBool::new(false),
144 shutdown: AtomicBool::new(false),
145 long_running_reminder_enabled: AtomicBool::new(true),
146 long_running_reminder_interval_ms: AtomicU64::new(600_000),
147 persisted_gc_started: AtomicBool::new(false),
148 #[cfg(test)]
149 persisted_gc_runs: AtomicU64::new(0),
150 compressor: Mutex::new(None),
151 }),
152 }
153 }
154
155 pub fn set_compressor<F>(&self, compressor: F)
160 where
161 F: Fn(&str, String) -> String + Send + Sync + 'static,
162 {
163 if let Ok(mut slot) = self.inner.compressor.lock() {
164 *slot = Some(Box::new(compressor));
165 }
166 }
167
168 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
171 let Ok(slot) = self.inner.compressor.lock() else {
172 return output;
173 };
174 match slot.as_ref() {
175 Some(compressor) => compressor(command, output),
176 None => output,
177 }
178 }
179
180 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
181 self.inner
182 .long_running_reminder_enabled
183 .store(enabled, Ordering::SeqCst);
184 self.inner
185 .long_running_reminder_interval_ms
186 .store(interval_ms, Ordering::SeqCst);
187 }
188
189 #[cfg(unix)]
190 #[allow(clippy::too_many_arguments)]
191 pub fn spawn(
192 &self,
193 command: &str,
194 session_id: String,
195 workdir: PathBuf,
196 env: HashMap<String, String>,
197 timeout: Option<Duration>,
198 storage_dir: PathBuf,
199 max_running: usize,
200 notify_on_completion: bool,
201 compressed: bool,
202 project_root: Option<PathBuf>,
203 ) -> Result<String, String> {
204 self.start_watchdog();
205
206 let running = self.running_count();
207 if running >= max_running {
208 return Err(format!(
209 "background bash task limit exceeded: {running} running (max {max_running})"
210 ));
211 }
212
213 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
214 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
215 let task_id = self.generate_unique_task_id()?;
216 let paths = task_paths(&storage_dir, &session_id, &task_id);
217 fs::create_dir_all(&paths.dir)
218 .map_err(|e| format!("failed to create background task dir: {e}"))?;
219
220 let mut metadata = PersistedTask::starting(
221 task_id.clone(),
222 session_id.clone(),
223 command.to_string(),
224 workdir.clone(),
225 project_root,
226 timeout_ms,
227 notify_on_completion,
228 compressed,
229 );
230 write_task(&paths.json, &metadata)
231 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
232
233 create_capture_file(&paths.stdout)
237 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
238 create_capture_file(&paths.stderr)
239 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
240
241 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
242 Ok(child) => child,
243 Err(error) => {
244 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
245 let _ = delete_task_bundle(&paths);
246 return Err(error);
247 }
248 };
249
250 let child_pid = child.id();
251 metadata.mark_running(child_pid, child_pid as i32);
252 write_task(&paths.json, &metadata)
253 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
254
255 let task = Arc::new(BgTask {
256 task_id: task_id.clone(),
257 session_id,
258 paths: paths.clone(),
259 started: Instant::now(),
260 last_reminder_at: Mutex::new(None),
261 terminal_at: Mutex::new(None),
262 state: Mutex::new(BgTaskState {
263 metadata,
264 child: Some(child),
265 detached: false,
266 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
267 }),
268 });
269
270 self.inner
271 .tasks
272 .lock()
273 .map_err(|_| "background task registry lock poisoned".to_string())?
274 .insert(task_id.clone(), task);
275
276 Ok(task_id)
277 }
278
279 #[cfg(windows)]
280 #[allow(clippy::too_many_arguments)]
281 pub fn spawn(
282 &self,
283 command: &str,
284 session_id: String,
285 workdir: PathBuf,
286 env: HashMap<String, String>,
287 timeout: Option<Duration>,
288 storage_dir: PathBuf,
289 max_running: usize,
290 notify_on_completion: bool,
291 compressed: bool,
292 project_root: Option<PathBuf>,
293 ) -> Result<String, String> {
294 self.start_watchdog();
295
296 let running = self.running_count();
297 if running >= max_running {
298 return Err(format!(
299 "background bash task limit exceeded: {running} running (max {max_running})"
300 ));
301 }
302
303 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
304 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
305 let task_id = self.generate_unique_task_id()?;
306 let paths = task_paths(&storage_dir, &session_id, &task_id);
307 fs::create_dir_all(&paths.dir)
308 .map_err(|e| format!("failed to create background task dir: {e}"))?;
309
310 let mut metadata = PersistedTask::starting(
311 task_id.clone(),
312 session_id.clone(),
313 command.to_string(),
314 workdir.clone(),
315 project_root,
316 timeout_ms,
317 notify_on_completion,
318 compressed,
319 );
320 write_task(&paths.json, &metadata)
321 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
322
323 create_capture_file(&paths.stdout)
329 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
330 create_capture_file(&paths.stderr)
331 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
332
333 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
334 Ok(child) => child,
335 Err(error) => {
336 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
337 let _ = delete_task_bundle(&paths);
338 return Err(error);
339 }
340 };
341
342 let child_pid = child.id();
343 metadata.status = BgTaskStatus::Running;
344 metadata.child_pid = Some(child_pid);
345 metadata.pgid = None;
346 write_task(&paths.json, &metadata)
347 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
348
349 let task = Arc::new(BgTask {
350 task_id: task_id.clone(),
351 session_id,
352 paths: paths.clone(),
353 started: Instant::now(),
354 last_reminder_at: Mutex::new(None),
355 terminal_at: Mutex::new(None),
356 state: Mutex::new(BgTaskState {
357 metadata,
358 child: Some(child),
359 detached: false,
360 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
361 }),
362 });
363
364 self.inner
365 .tasks
366 .lock()
367 .map_err(|_| "background task registry lock poisoned".to_string())?
368 .insert(task_id.clone(), task);
369
370 Ok(task_id)
371 }
372
373 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
374 self.replay_session_inner(storage_dir, session_id, None)
375 }
376
377 pub fn replay_session_for_project(
378 &self,
379 storage_dir: &Path,
380 session_id: &str,
381 project_root: &Path,
382 ) -> Result<(), String> {
383 self.replay_session_inner(storage_dir, session_id, Some(project_root))
384 }
385
386 fn replay_session_inner(
387 &self,
388 storage_dir: &Path,
389 session_id: &str,
390 project_root: Option<&Path>,
391 ) -> Result<(), String> {
392 self.start_watchdog();
393 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
394 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
395 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
396 }
397 }
398 let dir = session_tasks_dir(storage_dir, session_id);
399 if !dir.exists() {
400 return Ok(());
401 }
402
403 let canonical_project = project_root.map(canonicalized_path);
404 let entries = fs::read_dir(&dir)
405 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
406 for entry in entries.flatten() {
407 let path = entry.path();
408 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
409 continue;
410 }
411 let Ok(mut metadata) = read_task(&path) else {
412 continue;
413 };
414 if metadata.session_id != session_id {
415 continue;
416 }
417 if let Some(canonical_project) = canonical_project.as_deref() {
418 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
419 if metadata_project.as_deref() != Some(canonical_project) {
420 continue;
421 }
422 }
423
424 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
425 match metadata.status {
426 BgTaskStatus::Starting => {
427 metadata.mark_terminal(
428 BgTaskStatus::Failed,
429 None,
430 Some("spawn aborted".to_string()),
431 );
432 let _ = write_task(&paths.json, &metadata);
433 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
434 }
435 BgTaskStatus::Running | BgTaskStatus::Killing => {
436 if self.running_metadata_is_stale(&metadata) {
437 metadata.mark_terminal(
438 BgTaskStatus::Killed,
439 None,
440 Some("orphaned (>24h)".to_string()),
441 );
442 if !paths.exit.exists() {
443 let _ = write_kill_marker_if_absent(&paths.exit);
444 }
445 let _ = write_task(&paths.json, &metadata);
446 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
447 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
448 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
449 "recovered from inconsistent killing state on replay".to_string()
450 });
451 if reason.is_some() {
452 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
453 metadata.task_id);
454 }
455 metadata = terminal_metadata_from_marker(metadata, marker, reason);
456 let _ = write_task(&paths.json, &metadata);
457 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
458 } else if metadata.status == BgTaskStatus::Killing {
459 if !paths.exit.exists() {
460 let _ = write_kill_marker_if_absent(&paths.exit);
461 }
462 metadata.mark_terminal(
463 BgTaskStatus::Killed,
464 None,
465 Some("recovered from inconsistent killing state on replay".to_string()),
466 );
467 let _ = write_task(&paths.json, &metadata);
468 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
469 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
470 metadata.mark_terminal(
471 BgTaskStatus::Failed,
472 None,
473 Some("process exited without exit marker".to_string()),
474 );
475 let _ = write_task(&paths.json, &metadata);
476 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
477 } else {
478 self.insert_rehydrated_task(metadata, paths, true)?;
479 }
480 }
481 _ if metadata.status.is_terminal() => {
482 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
488 self.insert_rehydrated_task(metadata, paths, true)?;
489 }
490 _ => {}
491 }
492 }
493
494 Ok(())
495 }
496
497 pub fn status(
498 &self,
499 task_id: &str,
500 session_id: &str,
501 project_root: Option<&Path>,
502 storage_dir: Option<&Path>,
503 preview_bytes: usize,
504 ) -> Option<BgTaskSnapshot> {
505 let mut task = self.task_for_session(task_id, session_id);
506 if task.is_none() {
507 if let Some(storage_dir) = storage_dir {
508 let _ = self.replay_session(storage_dir, session_id);
509 task = self.task_for_session(task_id, session_id);
510 }
511 }
512 let Some(task) = task else {
513 return self.status_relaxed(
514 task_id,
515 session_id,
516 project_root?,
517 storage_dir?,
518 preview_bytes,
519 );
520 };
521 let _ = self.poll_task(&task);
522 let mut snapshot = task.snapshot(preview_bytes);
523 self.maybe_compress_snapshot(&task, &mut snapshot);
524 Some(snapshot)
525 }
526
527 fn status_relaxed_task(
528 &self,
529 task_id: &str,
530 project_root: &Path,
531 storage_dir: &Path,
532 ) -> Option<Arc<BgTask>> {
533 let canonical_project = canonicalized_path(project_root);
534 let root = storage_dir.join("bash-tasks");
535 let entries = fs::read_dir(&root).ok()?;
536 for entry in entries.flatten() {
537 let dir = entry.path();
538 if !dir.is_dir() {
539 continue;
540 }
541 let path = dir.join(format!("{task_id}.json"));
542 if !path.exists() {
543 continue;
544 }
545 let Ok(metadata) = read_task(&path) else {
546 continue;
547 };
548 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
549 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
550 continue;
551 }
552 if let Some(task) = self.task(task_id) {
553 let matches_project = task
554 .state
555 .lock()
556 .map(|state| {
557 state
558 .metadata
559 .project_root
560 .as_deref()
561 .map(canonicalized_path)
562 .as_deref()
563 == Some(canonical_project.as_path())
564 })
565 .unwrap_or(false);
566 return matches_project.then_some(task);
567 }
568 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
569 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
570 return None;
571 }
572 return self.task(task_id);
573 }
574 None
575 }
576
577 pub(super) fn status_relaxed(
578 &self,
579 task_id: &str,
580 _session_id: &str,
581 project_root: &Path,
582 storage_dir: &Path,
583 preview_bytes: usize,
584 ) -> Option<BgTaskSnapshot> {
585 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
586 let _ = self.poll_task(&task);
587 let mut snapshot = task.snapshot(preview_bytes);
588 self.maybe_compress_snapshot(&task, &mut snapshot);
589 Some(snapshot)
590 }
591
592 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
593 #[cfg(test)]
594 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
595
596 let mut deleted = 0usize;
597
598 let root = storage_dir.join("bash-tasks");
599 if root.exists() {
600 let session_dirs = fs::read_dir(&root).map_err(|e| {
601 format!(
602 "failed to read background task root {}: {e}",
603 root.display()
604 )
605 })?;
606 for session_entry in session_dirs.flatten() {
607 let session_dir = session_entry.path();
608 if !session_dir.is_dir() {
609 continue;
610 }
611 let task_entries = match fs::read_dir(&session_dir) {
612 Ok(entries) => entries,
613 Err(error) => {
614 crate::slog_warn!(
615 "failed to read background task session dir {}: {error}",
616 session_dir.display()
617 );
618 continue;
619 }
620 };
621 for task_entry in task_entries.flatten() {
622 let json_path = task_entry.path();
623 if json_path
624 .extension()
625 .and_then(|extension| extension.to_str())
626 != Some("json")
627 {
628 continue;
629 }
630 if modified_within(&json_path, PERSISTED_GC_GRACE) {
631 continue;
632 }
633 let metadata = match read_task(&json_path) {
634 Ok(metadata) => metadata,
635 Err(error) => {
636 crate::slog_warn!(
637 "quarantining corrupt background task metadata {}: {error}",
638 json_path.display()
639 );
640 quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
641 continue;
642 }
643 };
644 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
645 continue;
646 }
647 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
648 match delete_task_bundle(&paths) {
649 Ok(()) => {
650 deleted += 1;
651 log::debug!(
652 "deleted persisted background task bundle {}",
653 metadata.task_id
654 );
655 }
656 Err(error) => {
657 crate::slog_warn!(
658 "failed to delete background task bundle {}: {error}",
659 metadata.task_id
660 );
661 continue;
662 }
663 }
664 }
665 }
666 }
667 gc_quarantine(storage_dir);
668 Ok(deleted)
669 }
670
671 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
672 let tasks = self
673 .inner
674 .tasks
675 .lock()
676 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
677 .unwrap_or_default();
678 tasks
679 .into_iter()
680 .map(|task| {
681 let _ = self.poll_task(&task);
682 let mut snapshot = task.snapshot(preview_bytes);
683 self.maybe_compress_snapshot(&task, &mut snapshot);
684 snapshot
685 })
686 .collect()
687 }
688
689 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
695 if !snapshot.info.status.is_terminal() {
696 return;
697 }
698 let compressed_flag = task
699 .state
700 .lock()
701 .map(|state| state.metadata.compressed)
702 .unwrap_or(true);
703 if !compressed_flag {
704 return;
705 }
706 let raw = std::mem::take(&mut snapshot.output_preview);
707 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
708 }
709
710 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
711 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
712 }
713
714 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
715 let task = self
716 .task_for_session(task_id, session_id)
717 .ok_or_else(|| format!("background task not found: {task_id}"))?;
718 let mut state = task
719 .state
720 .lock()
721 .map_err(|_| "background task lock poisoned".to_string())?;
722 let updated = update_task(&task.paths.json, |metadata| {
723 metadata.notify_on_completion = true;
724 metadata.completion_delivered = false;
725 })
726 .map_err(|e| format!("failed to promote background task: {e}"))?;
727 state.metadata = updated;
728 if state.metadata.status.is_terminal() {
729 state.buffer.enforce_terminal_cap();
730 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
731 }
732 Ok(true)
733 }
734
735 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
736 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
737 .map(|_| ())
738 }
739
740 pub fn cleanup_finished(&self, older_than: Duration) {
741 let cutoff = Instant::now().checked_sub(older_than);
742 let removable_paths: Vec<(String, TaskPaths)> =
743 if let Ok(mut tasks) = self.inner.tasks.lock() {
744 let removable = tasks
745 .iter()
746 .filter_map(|(task_id, task)| {
747 let delivered_terminal = task
748 .state
749 .lock()
750 .map(|state| {
751 state.metadata.status.is_terminal()
752 && state.metadata.completion_delivered
753 })
754 .unwrap_or(false);
755 if !delivered_terminal {
756 return None;
757 }
758
759 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
760 let expired = match (terminal_at, cutoff) {
761 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
762 (Some(_), None) => true,
763 (None, _) => false,
764 };
765 expired.then(|| task_id.clone())
766 })
767 .collect::<Vec<_>>();
768
769 removable
770 .into_iter()
771 .filter_map(|task_id| {
772 tasks
773 .remove(&task_id)
774 .map(|task| (task_id, task.paths.clone()))
775 })
776 .collect()
777 } else {
778 Vec::new()
779 };
780
781 for (task_id, paths) in removable_paths {
782 match delete_task_bundle(&paths) {
783 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
784 Err(error) => crate::slog_warn!(
785 "failed to delete persisted background task bundle {task_id}: {error}"
786 ),
787 }
788 }
789 }
790
791 pub fn drain_completions(&self) -> Vec<BgCompletion> {
792 self.drain_completions_for_session(None)
793 }
794
795 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
796 let mut completions = match self.inner.completions.lock() {
797 Ok(completions) => completions,
798 Err(_) => return Vec::new(),
799 };
800
801 let drained = if let Some(session_id) = session_id {
802 let mut matched = Vec::new();
803 let mut retained = VecDeque::new();
804 while let Some(completion) = completions.pop_front() {
805 if completion.session_id == session_id {
806 matched.push(completion);
807 } else {
808 retained.push_back(completion);
809 }
810 }
811 *completions = retained;
812 matched
813 } else {
814 completions.drain(..).collect()
815 };
816 drop(completions);
817
818 for completion in &drained {
819 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
820 let _ = task.set_completion_delivered(true);
821 }
822 }
823
824 drained
825 }
826
827 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
828 self.inner
829 .completions
830 .lock()
831 .map(|completions| {
832 completions
833 .iter()
834 .filter(|completion| completion.session_id == session_id)
835 .cloned()
836 .collect()
837 })
838 .unwrap_or_default()
839 }
840
841 pub fn detach(&self) {
842 self.inner.shutdown.store(true, Ordering::SeqCst);
843 if let Ok(mut tasks) = self.inner.tasks.lock() {
844 for task in tasks.values() {
845 if let Ok(mut state) = task.state.lock() {
846 state.child = None;
847 state.detached = true;
848 }
849 }
850 tasks.clear();
851 }
852 }
853
854 pub fn shutdown(&self) {
855 let tasks = self
856 .inner
857 .tasks
858 .lock()
859 .map(|tasks| {
860 tasks
861 .values()
862 .map(|task| (task.task_id.clone(), task.session_id.clone()))
863 .collect::<Vec<_>>()
864 })
865 .unwrap_or_default();
866 for (task_id, session_id) in tasks {
867 let _ = self.kill(&task_id, &session_id);
868 }
869 }
870
871 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
872 let marker = match read_exit_marker(&task.paths.exit) {
873 Ok(Some(marker)) => marker,
874 Ok(None) => return Ok(()),
875 Err(error) => return Err(format!("failed to read exit marker: {error}")),
876 };
877 self.finalize_from_marker(task, marker, None)
878 }
879
880 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
881 let Ok(mut state) = task.state.lock() else {
882 return;
883 };
884 if let Some(child) = state.child.as_mut() {
885 if matches!(child.try_wait(), Ok(Some(_))) {
886 state.child = None;
903 state.detached = true;
904 self.fail_without_exit_marker_if_needed(task, &mut state);
905 }
906 } else if state.detached
907 && state
908 .metadata
909 .child_pid
910 .is_some_and(|pid| !is_process_alive(pid))
911 {
912 self.fail_without_exit_marker_if_needed(task, &mut state);
913 }
914 }
915
916 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
917 if state.metadata.status.is_terminal() {
918 return;
919 }
920 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
921 return;
922 }
923 let updated = update_task(&task.paths.json, |metadata| {
924 metadata.mark_terminal(
925 BgTaskStatus::Failed,
926 None,
927 Some("process exited without exit marker".to_string()),
928 );
929 });
930 if let Ok(metadata) = updated {
931 state.metadata = metadata;
932 task.mark_terminal_now();
933 state.buffer.enforce_terminal_cap();
934 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
935 }
936 }
937
938 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
939 self.inner
940 .tasks
941 .lock()
942 .map(|tasks| {
943 tasks
944 .values()
945 .filter(|task| task.is_running())
946 .cloned()
947 .collect()
948 })
949 .unwrap_or_default()
950 }
951
952 fn insert_rehydrated_task(
953 &self,
954 metadata: PersistedTask,
955 paths: TaskPaths,
956 detached: bool,
957 ) -> Result<(), String> {
958 let task_id = metadata.task_id.clone();
959 let session_id = metadata.session_id.clone();
960 let started = started_instant_from_unix_millis(metadata.started_at);
961 let task = Arc::new(BgTask {
962 task_id: task_id.clone(),
963 session_id,
964 paths: paths.clone(),
965 started,
966 last_reminder_at: Mutex::new(None),
967 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
968 state: Mutex::new(BgTaskState {
969 metadata,
970 child: None,
971 detached,
972 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
973 }),
974 });
975 self.inner
976 .tasks
977 .lock()
978 .map_err(|_| "background task registry lock poisoned".to_string())?
979 .insert(task_id, task);
980 Ok(())
981 }
982
983 fn kill_with_status(
984 &self,
985 task_id: &str,
986 session_id: &str,
987 terminal_status: BgTaskStatus,
988 ) -> Result<BgTaskSnapshot, String> {
989 let task = self
990 .task_for_session(task_id, session_id)
991 .ok_or_else(|| format!("background task not found: {task_id}"))?;
992
993 {
994 let mut state = task
995 .state
996 .lock()
997 .map_err(|_| "background task lock poisoned".to_string())?;
998 if state.metadata.status.is_terminal() {
999 return Ok(task.snapshot_locked(&state, 5 * 1024));
1000 }
1001
1002 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1003 state.metadata =
1004 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1005 task.mark_terminal_now();
1006 state.child = None;
1007 state.detached = true;
1008 state.buffer.enforce_terminal_cap();
1009 write_task(&task.paths.json, &state.metadata)
1010 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1011 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1012 return Ok(task.snapshot_locked(&state, 5 * 1024));
1013 }
1014
1015 state.metadata.status = BgTaskStatus::Killing;
1016 write_task(&task.paths.json, &state.metadata)
1017 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1018
1019 #[cfg(unix)]
1020 if let Some(pgid) = state.metadata.pgid {
1021 terminate_pgid(pgid, state.child.as_mut());
1022 }
1023 #[cfg(windows)]
1024 if let Some(child) = state.child.as_mut() {
1025 super::process::terminate_process(child);
1026 } else if let Some(pid) = state.metadata.child_pid {
1027 terminate_pid(pid);
1028 }
1029 if let Some(child) = state.child.as_mut() {
1030 let _ = child.wait();
1031 }
1032 state.child = None;
1033 state.detached = true;
1034
1035 if !task.paths.exit.exists() {
1036 write_kill_marker_if_absent(&task.paths.exit)
1037 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1038 }
1039
1040 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1041 Some(124)
1042 } else {
1043 None
1044 };
1045 state
1046 .metadata
1047 .mark_terminal(terminal_status, exit_code, None);
1048 task.mark_terminal_now();
1049 write_task(&task.paths.json, &state.metadata)
1050 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1051 state.buffer.enforce_terminal_cap();
1052 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1053 }
1054
1055 Ok(task.snapshot(5 * 1024))
1056 }
1057
1058 fn finalize_from_marker(
1059 &self,
1060 task: &Arc<BgTask>,
1061 marker: ExitMarker,
1062 reason: Option<String>,
1063 ) -> Result<(), String> {
1064 let mut state = task
1065 .state
1066 .lock()
1067 .map_err(|_| "background task lock poisoned".to_string())?;
1068 if state.metadata.status.is_terminal() {
1069 return Ok(());
1070 }
1071
1072 let updated = update_task(&task.paths.json, |metadata| {
1073 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1074 *metadata = new_metadata;
1075 })
1076 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1077 state.metadata = updated;
1078 task.mark_terminal_now();
1079 state.child = None;
1080 state.detached = true;
1081 state.buffer.enforce_terminal_cap();
1082 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1083 Ok(())
1084 }
1085
1086 fn enqueue_completion_if_needed(
1087 &self,
1088 metadata: &PersistedTask,
1089 paths: Option<&TaskPaths>,
1090 emit_frame: bool,
1091 ) {
1092 if metadata.status.is_terminal() && !metadata.completion_delivered {
1093 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1094 }
1095 }
1096
1097 fn enqueue_completion_locked(
1098 &self,
1099 metadata: &PersistedTask,
1100 buffer: Option<&BgBuffer>,
1101 emit_frame: bool,
1102 ) {
1103 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1104 }
1105
1106 fn enqueue_completion_from_parts(
1107 &self,
1108 metadata: &PersistedTask,
1109 buffer: Option<&BgBuffer>,
1110 paths: Option<&TaskPaths>,
1111 emit_frame: bool,
1112 ) {
1113 if !metadata.status.is_terminal() || metadata.completion_delivered {
1114 return;
1115 }
1116 let (raw_preview, output_truncated) = match buffer {
1121 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1122 None => paths
1123 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1124 .unwrap_or_else(|| (String::new(), false)),
1125 };
1126 let output_preview = if metadata.compressed {
1131 self.compress_output(&metadata.command, raw_preview)
1132 } else {
1133 raw_preview
1134 };
1135 let completion = BgCompletion {
1136 task_id: metadata.task_id.clone(),
1137 session_id: metadata.session_id.clone(),
1138 status: metadata.status.clone(),
1139 exit_code: metadata.exit_code,
1140 command: metadata.command.clone(),
1141 output_preview,
1142 output_truncated,
1143 };
1144 if let Ok(mut completions) = self.inner.completions.lock() {
1145 if completions
1146 .iter()
1147 .any(|completion| completion.task_id == metadata.task_id)
1148 {
1149 return;
1150 }
1151 completions.push_back(completion.clone());
1152 } else {
1153 return;
1154 }
1155
1156 if emit_frame {
1157 self.emit_bash_completed(completion);
1158 }
1159 }
1160
1161 fn emit_bash_completed(&self, completion: BgCompletion) {
1162 let Ok(progress_sender) = self
1163 .inner
1164 .progress_sender
1165 .lock()
1166 .map(|sender| sender.clone())
1167 else {
1168 return;
1169 };
1170 let Some(sender) = progress_sender.as_ref() else {
1171 return;
1172 };
1173 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1181 completion.task_id,
1182 completion.session_id,
1183 completion.status,
1184 completion.exit_code,
1185 completion.command,
1186 completion.output_preview,
1187 completion.output_truncated,
1188 )));
1189 }
1190
1191 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1192 if !self
1193 .inner
1194 .long_running_reminder_enabled
1195 .load(Ordering::SeqCst)
1196 {
1197 return;
1198 }
1199 let interval_ms = self
1200 .inner
1201 .long_running_reminder_interval_ms
1202 .load(Ordering::SeqCst);
1203 if interval_ms == 0 {
1204 return;
1205 }
1206 let interval = Duration::from_millis(interval_ms);
1207 let now = Instant::now();
1208 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1209 return;
1210 };
1211 let since = last_reminder_at.unwrap_or(task.started);
1212 if now.duration_since(since) < interval {
1213 return;
1214 }
1215 let command = task
1216 .state
1217 .lock()
1218 .map(|state| state.metadata.command.clone())
1219 .unwrap_or_default();
1220 *last_reminder_at = Some(now);
1221 self.emit_bash_long_running(BashLongRunningFrame::new(
1222 task.task_id.clone(),
1223 task.session_id.clone(),
1224 command,
1225 task.started.elapsed().as_millis() as u64,
1226 ));
1227 }
1228
1229 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1230 let Ok(progress_sender) = self
1231 .inner
1232 .progress_sender
1233 .lock()
1234 .map(|sender| sender.clone())
1235 else {
1236 return;
1237 };
1238 if let Some(sender) = progress_sender.as_ref() {
1239 sender(PushFrame::BashLongRunning(frame));
1240 }
1241 }
1242
1243 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1244 self.inner
1245 .tasks
1246 .lock()
1247 .ok()
1248 .and_then(|tasks| tasks.get(task_id).cloned())
1249 }
1250
1251 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1252 self.task(task_id)
1253 .filter(|task| task.session_id == session_id)
1254 }
1255
1256 fn running_count(&self) -> usize {
1257 self.inner
1258 .tasks
1259 .lock()
1260 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1261 .unwrap_or(0)
1262 }
1263
1264 fn start_watchdog(&self) {
1265 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1266 super::watchdog::start(self.clone());
1267 }
1268 }
1269
1270 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1271 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1272 }
1273
1274 #[cfg(test)]
1275 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1276 self.task_for_session(task_id, session_id)
1277 .map(|task| task.paths.json.clone())
1278 }
1279
1280 #[cfg(test)]
1281 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1282 self.task_for_session(task_id, session_id)
1283 .map(|task| task.paths.exit.clone())
1284 }
1285
1286 fn generate_unique_task_id(&self) -> Result<String, String> {
1288 for _ in 0..32 {
1289 let candidate = random_slug();
1290 let tasks = self
1291 .inner
1292 .tasks
1293 .lock()
1294 .map_err(|_| "background task registry lock poisoned".to_string())?;
1295 if tasks.contains_key(&candidate) {
1296 continue;
1297 }
1298 let completions = self
1299 .inner
1300 .completions
1301 .lock()
1302 .map_err(|_| "background completions lock poisoned".to_string())?;
1303 if completions
1304 .iter()
1305 .any(|completion| completion.task_id == candidate)
1306 {
1307 continue;
1308 }
1309 return Ok(candidate);
1310 }
1311 Err("failed to allocate unique background task id after 32 attempts".to_string())
1312 }
1313}
1314
1315impl Default for BgTaskRegistry {
1316 fn default() -> Self {
1317 Self::new(Arc::new(Mutex::new(None)))
1318 }
1319}
1320
1321fn modified_within(path: &Path, grace: Duration) -> bool {
1322 fs::metadata(path)
1323 .and_then(|metadata| metadata.modified())
1324 .ok()
1325 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1326 .map(|age| age < grace)
1327 .unwrap_or(false)
1328}
1329
1330fn canonicalized_path(path: &Path) -> PathBuf {
1331 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1332}
1333
1334fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1335 let now_ms = SystemTime::now()
1336 .duration_since(UNIX_EPOCH)
1337 .ok()
1338 .map(|duration| duration.as_millis() as u64)
1339 .unwrap_or(started_at);
1340 let elapsed_ms = now_ms.saturating_sub(started_at);
1341 Instant::now()
1342 .checked_sub(Duration::from_millis(elapsed_ms))
1343 .unwrap_or_else(Instant::now)
1344}
1345
1346fn gc_quarantine(storage_dir: &Path) {
1347 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1348 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1349 return;
1350 };
1351 for session_entry in session_dirs.flatten() {
1352 let session_quarantine_dir = session_entry.path();
1353 if !session_quarantine_dir.is_dir() {
1354 continue;
1355 }
1356 let entries = match fs::read_dir(&session_quarantine_dir) {
1357 Ok(entries) => entries,
1358 Err(error) => {
1359 crate::slog_warn!(
1360 "failed to read background task quarantine dir {}: {error}",
1361 session_quarantine_dir.display()
1362 );
1363 continue;
1364 }
1365 };
1366 for entry in entries.flatten() {
1367 let path = entry.path();
1368 if modified_within(&path, QUARANTINE_GC_GRACE) {
1369 continue;
1370 }
1371 let result = if path.is_dir() {
1372 fs::remove_dir_all(&path)
1373 } else {
1374 fs::remove_file(&path)
1375 };
1376 match result {
1377 Ok(()) => log::debug!(
1378 "deleted old background task quarantine entry {}",
1379 path.display()
1380 ),
1381 Err(error) => crate::slog_warn!(
1382 "failed to delete old background task quarantine entry {}: {error}",
1383 path.display()
1384 ),
1385 }
1386 }
1387 let _ = fs::remove_dir(&session_quarantine_dir);
1388 }
1389 let _ = fs::remove_dir(&quarantine_root);
1390}
1391
1392fn quarantine_corrupt_task_json(
1393 storage_dir: &Path,
1394 session_dir: &Path,
1395 json_path: &Path,
1396) -> Result<(), String> {
1397 let session_hash = session_dir
1398 .file_name()
1399 .and_then(|name| name.to_str())
1400 .ok_or_else(|| {
1401 format!(
1402 "invalid background task session dir: {}",
1403 session_dir.display()
1404 )
1405 })?;
1406 let task_name = json_path
1407 .file_name()
1408 .and_then(|name| name.to_str())
1409 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1410 let unix_ts = SystemTime::now()
1411 .duration_since(UNIX_EPOCH)
1412 .map(|duration| duration.as_secs())
1413 .unwrap_or(0);
1414 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1415 fs::create_dir_all(&quarantine_dir).map_err(|e| {
1416 format!(
1417 "failed to create background task quarantine dir {}: {e}",
1418 quarantine_dir.display()
1419 )
1420 })?;
1421 let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
1422 fs::rename(json_path, &target).map_err(|e| {
1423 format!(
1424 "failed to quarantine corrupt background task metadata {} to {}: {e}",
1425 json_path.display(),
1426 target.display()
1427 )
1428 })?;
1429
1430 for sibling in task_sibling_paths(json_path) {
1431 if !sibling.exists() {
1432 continue;
1433 }
1434 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1435 crate::slog_warn!(
1436 "skipping background task sibling with invalid name during quarantine: {}",
1437 sibling.display()
1438 );
1439 continue;
1440 };
1441 let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
1442 if let Err(error) = fs::rename(&sibling, &sibling_target) {
1443 crate::slog_warn!(
1444 "failed to quarantine background task sibling {} to {}: {error}",
1445 sibling.display(),
1446 sibling_target.display()
1447 );
1448 }
1449 }
1450
1451 let _ = fs::remove_dir(session_dir);
1452 Ok(())
1453}
1454
1455fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1456 let Some(parent) = json_path.parent() else {
1457 return Vec::new();
1458 };
1459 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1460 return Vec::new();
1461 };
1462 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1463 .into_iter()
1464 .map(|extension| parent.join(format!("{stem}.{extension}")))
1465 .collect()
1466}
1467
1468fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1469 let stdout = fs::read(&paths.stdout).unwrap_or_default();
1470 let stderr = fs::read(&paths.stderr).unwrap_or_default();
1471 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1472 bytes.extend_from_slice(&stdout);
1473 bytes.extend_from_slice(&stderr);
1474 if bytes.len() <= max_bytes {
1475 return (String::from_utf8_lossy(&bytes).into_owned(), false);
1476 }
1477 let start = bytes.len().saturating_sub(max_bytes);
1478 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1479}
1480
1481impl BgTask {
1482 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1483 let state = self
1484 .state
1485 .lock()
1486 .unwrap_or_else(|poison| poison.into_inner());
1487 self.snapshot_locked(&state, preview_bytes)
1488 }
1489
1490 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1491 let metadata = &state.metadata;
1492 let duration_ms = metadata.duration_ms.or_else(|| {
1493 metadata
1494 .status
1495 .is_terminal()
1496 .then(|| self.started.elapsed().as_millis() as u64)
1497 });
1498 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1499 BgTaskSnapshot {
1500 info: BgTaskInfo {
1501 task_id: self.task_id.clone(),
1502 status: metadata.status.clone(),
1503 command: metadata.command.clone(),
1504 started_at: metadata.started_at,
1505 duration_ms,
1506 },
1507 exit_code: metadata.exit_code,
1508 child_pid: metadata.child_pid,
1509 workdir: metadata.workdir.display().to_string(),
1510 output_preview,
1511 output_truncated,
1512 output_path: state
1513 .buffer
1514 .output_path()
1515 .map(|path| path.display().to_string()),
1516 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1517 }
1518 }
1519
1520 pub(crate) fn is_running(&self) -> bool {
1521 self.state
1522 .lock()
1523 .map(|state| state.metadata.status == BgTaskStatus::Running)
1524 .unwrap_or(false)
1525 }
1526
1527 fn mark_terminal_now(&self) {
1528 if let Ok(mut terminal_at) = self.terminal_at.lock() {
1529 if terminal_at.is_none() {
1530 *terminal_at = Some(Instant::now());
1531 }
1532 }
1533 }
1534
1535 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1536 let mut state = self
1537 .state
1538 .lock()
1539 .map_err(|_| "background task lock poisoned".to_string())?;
1540 let updated = update_task(&self.paths.json, |metadata| {
1541 metadata.completion_delivered = delivered;
1542 })
1543 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1544 state.metadata = updated;
1545 Ok(())
1546 }
1547}
1548
1549fn terminal_metadata_from_marker(
1550 mut metadata: PersistedTask,
1551 marker: ExitMarker,
1552 reason: Option<String>,
1553) -> PersistedTask {
1554 match marker {
1555 ExitMarker::Code(code) => {
1556 let status = if code == 0 {
1557 BgTaskStatus::Completed
1558 } else {
1559 BgTaskStatus::Failed
1560 };
1561 metadata.mark_terminal(status, Some(code), reason);
1562 }
1563 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1564 }
1565 metadata
1566}
1567
1568#[cfg(unix)]
1569fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1570 let shell = resolve_posix_shell();
1571 let mut cmd = Command::new(&shell);
1572 cmd.arg("-c")
1573 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1574 .arg(&shell)
1575 .arg(command)
1576 .arg(exit_path);
1577 unsafe {
1578 cmd.pre_exec(|| {
1579 if libc::setsid() == -1 {
1580 return Err(std::io::Error::last_os_error());
1581 }
1582 Ok(())
1583 });
1584 }
1585 cmd
1586}
1587
1588#[cfg(unix)]
1589fn resolve_posix_shell() -> PathBuf {
1590 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1591 POSIX_SHELL
1592 .get_or_init(|| {
1593 std::env::var_os("BASH")
1594 .filter(|value| !value.is_empty())
1595 .map(PathBuf::from)
1596 .filter(|path| path.exists())
1597 .or_else(|| which::which("bash").ok())
1598 .or_else(|| which::which("zsh").ok())
1599 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1600 })
1601 .clone()
1602}
1603
1604#[cfg(windows)]
1605fn detached_shell_command_for(
1606 shell: crate::windows_shell::WindowsShell,
1607 command: &str,
1608 exit_path: &Path,
1609 paths: &TaskPaths,
1610 creation_flags: u32,
1611) -> Result<Command, String> {
1612 use crate::windows_shell::WindowsShell;
1613 let wrapper_body = shell.wrapper_script(command, exit_path);
1626 let wrapper_ext = match shell {
1627 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1628 WindowsShell::Cmd => "bat",
1629 WindowsShell::Posix(_) => "sh",
1633 };
1634 let wrapper_path = paths.dir.join(format!(
1635 "{}.{}",
1636 paths
1637 .json
1638 .file_stem()
1639 .and_then(|s| s.to_str())
1640 .unwrap_or("wrapper"),
1641 wrapper_ext
1642 ));
1643 fs::write(&wrapper_path, wrapper_body)
1644 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1645
1646 let mut cmd = Command::new(shell.binary().as_ref());
1647 match shell {
1648 WindowsShell::Pwsh | WindowsShell::Powershell => {
1649 cmd.args([
1652 "-NoLogo",
1653 "-NoProfile",
1654 "-NonInteractive",
1655 "-ExecutionPolicy",
1656 "Bypass",
1657 "-File",
1658 ]);
1659 cmd.arg(&wrapper_path);
1660 }
1661 WindowsShell::Cmd => {
1662 cmd.args(["/D", "/C"]);
1669 cmd.arg(&wrapper_path);
1670 }
1671 WindowsShell::Posix(_) => {
1672 cmd.arg(&wrapper_path);
1677 }
1678 }
1679
1680 cmd.creation_flags(creation_flags);
1684 Ok(cmd)
1685}
1686
1687fn spawn_detached_child(
1703 command: &str,
1704 paths: &TaskPaths,
1705 workdir: &Path,
1706 env: &HashMap<String, String>,
1707) -> Result<std::process::Child, String> {
1708 #[cfg(not(windows))]
1709 {
1710 let stdout = create_capture_file(&paths.stdout)
1711 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1712 let stderr = create_capture_file(&paths.stderr)
1713 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1714 detached_shell_command(command, &paths.exit)
1715 .current_dir(workdir)
1716 .envs(env)
1717 .stdin(Stdio::null())
1718 .stdout(Stdio::from(stdout))
1719 .stderr(Stdio::from(stderr))
1720 .spawn()
1721 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1722 }
1723 #[cfg(windows)]
1724 {
1725 use crate::windows_shell::shell_candidates;
1726 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1737 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1758 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1759 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1760 let with_breakaway =
1761 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1762 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1763 let mut last_error: Option<String> = None;
1764 for (idx, shell) in candidates.iter().enumerate() {
1765 for &flags in &[with_breakaway, without_breakaway] {
1769 let stdout = create_capture_file(&paths.stdout)
1771 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1772 let stderr = create_capture_file(&paths.stderr)
1773 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1774 let mut cmd =
1775 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1776 cmd.current_dir(workdir)
1777 .envs(env)
1778 .stdin(Stdio::null())
1779 .stdout(Stdio::from(stdout))
1780 .stderr(Stdio::from(stderr));
1781 match cmd.spawn() {
1782 Ok(child) => {
1783 if idx > 0 {
1784 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1785 the cached PATH probe disagreed with runtime spawn — likely PATH \
1786 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1787 shell.binary(),
1788 idx);
1789 }
1790 if flags == without_breakaway {
1791 crate::slog_warn!(
1792 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1793 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1794 Spawned without breakaway; the bg task will be torn down if the \
1795 AFT process group is killed."
1796 );
1797 }
1798 return Ok(child);
1799 }
1800 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1801 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
1802 shell.binary());
1803 last_error = Some(format!("{}: {e}", shell.binary()));
1804 break;
1807 }
1808 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1809 crate::slog_warn!(
1811 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1812 Access Denied — retrying {} without breakaway",
1813 shell.binary()
1814 );
1815 last_error = Some(format!("{}: {e}", shell.binary()));
1816 continue;
1817 }
1818 Err(e) => {
1819 return Err(format!(
1820 "failed to spawn background bash command via {}: {e}",
1821 shell.binary()
1822 ));
1823 }
1824 }
1825 }
1826 }
1827 Err(format!(
1828 "failed to spawn background bash command: no Windows shell could be spawned. \
1829 Last error: {}. PATH-probed candidates: {:?}",
1830 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1831 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1832 ))
1833 }
1834}
1835
1836fn random_slug() -> String {
1837 let mut bytes = [0u8; 4];
1838 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1840 let t = SystemTime::now()
1842 .duration_since(UNIX_EPOCH)
1843 .map(|d| d.subsec_nanos())
1844 .unwrap_or(0);
1845 let p = std::process::id();
1846 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1847 });
1848 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1850 format!("bash-{hex}")
1851}
1852
1853#[cfg(test)]
1854mod tests {
1855 use std::collections::HashMap;
1856 #[cfg(windows)]
1857 use std::fs;
1858 use std::sync::{Arc, Mutex};
1859 use std::time::Duration;
1860 #[cfg(windows)]
1861 use std::time::Instant;
1862
1863 use super::*;
1864
1865 #[cfg(unix)]
1866 const QUICK_SUCCESS_COMMAND: &str = "true";
1867 #[cfg(windows)]
1868 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1869
1870 #[cfg(unix)]
1871 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1872 #[cfg(windows)]
1873 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1874
1875 fn spawn_dead_child() -> std::process::Child {
1880 #[cfg(unix)]
1881 let mut cmd = std::process::Command::new("true");
1882 #[cfg(windows)]
1883 let mut cmd = {
1884 let mut c = std::process::Command::new("cmd");
1885 c.args(["/c", "exit", "0"]);
1886 c
1887 };
1888 cmd.stdin(std::process::Stdio::null());
1889 cmd.stdout(std::process::Stdio::null());
1890 cmd.stderr(std::process::Stdio::null());
1891 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
1892 let started = Instant::now();
1901 loop {
1902 match child.try_wait() {
1903 Ok(Some(_)) => break,
1904 Ok(None) => {
1905 if started.elapsed() > Duration::from_secs(5) {
1906 panic!("dead-child stand-in did not exit within 5s");
1907 }
1908 std::thread::sleep(Duration::from_millis(10));
1909 }
1910 Err(error) => panic!("dead-child try_wait failed: {error}"),
1911 }
1912 }
1913 child
1914 }
1915
1916 #[test]
1917 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
1918 let registry = BgTaskRegistry::default();
1919 let dir = tempfile::tempdir().unwrap();
1920 let task_id = registry
1921 .spawn(
1922 QUICK_SUCCESS_COMMAND,
1923 "session".to_string(),
1924 dir.path().to_path_buf(),
1925 HashMap::new(),
1926 Some(Duration::from_secs(30)),
1927 dir.path().to_path_buf(),
1928 10,
1929 true,
1930 false,
1931 Some(dir.path().to_path_buf()),
1932 )
1933 .unwrap();
1934 registry
1935 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1936 .unwrap();
1937 let completions = registry.drain_completions_for_session(Some("session"));
1938 assert_eq!(completions.len(), 1);
1939
1940 registry.cleanup_finished(Duration::ZERO);
1941
1942 assert!(registry.inner.tasks.lock().unwrap().is_empty());
1943 }
1944
1945 #[test]
1946 fn cleanup_finished_retains_undelivered_terminals() {
1947 let registry = BgTaskRegistry::default();
1948 let dir = tempfile::tempdir().unwrap();
1949 let task_id = registry
1950 .spawn(
1951 QUICK_SUCCESS_COMMAND,
1952 "session".to_string(),
1953 dir.path().to_path_buf(),
1954 HashMap::new(),
1955 Some(Duration::from_secs(30)),
1956 dir.path().to_path_buf(),
1957 10,
1958 true,
1959 false,
1960 Some(dir.path().to_path_buf()),
1961 )
1962 .unwrap();
1963 registry
1964 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
1965 .unwrap();
1966
1967 registry.cleanup_finished(Duration::ZERO);
1968
1969 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
1970 }
1971
1972 #[test]
1982 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
1983 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
1984 let dir = tempfile::tempdir().unwrap();
1985 let task_id = registry
1986 .spawn(
1987 QUICK_SUCCESS_COMMAND,
1988 "session".to_string(),
1989 dir.path().to_path_buf(),
1990 HashMap::new(),
1991 Some(Duration::from_secs(30)),
1992 dir.path().to_path_buf(),
1993 10,
1994 true,
1995 false,
1996 Some(dir.path().to_path_buf()),
1997 )
1998 .unwrap();
1999
2000 let task = registry.task_for_session(&task_id, "session").unwrap();
2001
2002 let started = Instant::now();
2007 loop {
2008 let exited = {
2009 let mut state = task.state.lock().unwrap();
2010 if let Some(child) = state.child.as_mut() {
2011 matches!(child.try_wait(), Ok(Some(_)))
2012 } else {
2013 true
2014 }
2015 };
2016 if exited {
2017 break;
2018 }
2019 assert!(
2020 started.elapsed() < Duration::from_secs(5),
2021 "child should exit quickly"
2022 );
2023 std::thread::sleep(Duration::from_millis(20));
2024 }
2025
2026 registry
2034 .inner
2035 .shutdown
2036 .store(true, std::sync::atomic::Ordering::SeqCst);
2037 std::thread::sleep(Duration::from_millis(550));
2041
2042 let _ = std::fs::remove_file(&task.paths.exit);
2045
2046 {
2063 let mut state = task.state.lock().unwrap();
2064 state.metadata.status = BgTaskStatus::Running;
2065 state.metadata.status_reason = None;
2066 state.metadata.exit_code = None;
2067 state.metadata.finished_at = None;
2068 state.metadata.duration_ms = None;
2069 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2072 .expect("persist reset Running metadata for reap_child test");
2073 if state.child.is_none() {
2077 state.child = Some(spawn_dead_child());
2078 }
2079 }
2080 *task.terminal_at.lock().unwrap() = None;
2083
2084 assert!(
2087 task.is_running(),
2088 "precondition: metadata.status == Running"
2089 );
2090 assert!(
2091 !task.paths.exit.exists(),
2092 "precondition: exit marker absent"
2093 );
2094
2095 registry.reap_child(&task);
2099
2100 let state = task.state.lock().unwrap();
2101 assert!(
2102 state.metadata.status.is_terminal(),
2103 "reap_child must transition to terminal when PID dead and no marker. \
2104 Got status={:?}",
2105 state.metadata.status
2106 );
2107 assert_eq!(
2108 state.metadata.status,
2109 BgTaskStatus::Failed,
2110 "must specifically be Failed (not Killed): status={:?}",
2111 state.metadata.status
2112 );
2113 assert_eq!(
2114 state.metadata.status_reason.as_deref(),
2115 Some("process exited without exit marker"),
2116 "reason must match replay path's wording: {:?}",
2117 state.metadata.status_reason
2118 );
2119 assert!(
2120 state.child.is_none(),
2121 "child handle must be released after reap"
2122 );
2123 assert!(state.detached, "task must be marked detached after reap");
2124 }
2125
2126 #[test]
2132 fn reap_child_preserves_running_when_exit_marker_exists() {
2133 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2134 let dir = tempfile::tempdir().unwrap();
2135 let task_id = registry
2136 .spawn(
2137 QUICK_SUCCESS_COMMAND,
2138 "session".to_string(),
2139 dir.path().to_path_buf(),
2140 HashMap::new(),
2141 Some(Duration::from_secs(30)),
2142 dir.path().to_path_buf(),
2143 10,
2144 true,
2145 false,
2146 Some(dir.path().to_path_buf()),
2147 )
2148 .unwrap();
2149
2150 let task = registry.task_for_session(&task_id, "session").unwrap();
2151
2152 let started = Instant::now();
2155 loop {
2156 let exited = {
2157 let mut state = task.state.lock().unwrap();
2158 if let Some(child) = state.child.as_mut() {
2159 matches!(child.try_wait(), Ok(Some(_)))
2160 } else {
2161 true
2162 }
2163 };
2164 if exited && task.paths.exit.exists() {
2165 break;
2166 }
2167 assert!(
2168 started.elapsed() < Duration::from_secs(5),
2169 "child should exit and write marker quickly"
2170 );
2171 std::thread::sleep(Duration::from_millis(20));
2172 }
2173
2174 registry
2180 .inner
2181 .shutdown
2182 .store(true, std::sync::atomic::Ordering::SeqCst);
2183 std::thread::sleep(Duration::from_millis(550));
2184
2185 {
2191 let mut state = task.state.lock().unwrap();
2192 state.metadata.status = BgTaskStatus::Running;
2193 state.metadata.status_reason = None;
2194 if state.child.is_none() {
2195 state.child = Some(spawn_dead_child());
2196 }
2197 }
2198 *task.terminal_at.lock().unwrap() = None;
2199 if !task.paths.exit.exists() {
2202 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2203 }
2204
2205 registry.reap_child(&task);
2209
2210 let state = task.state.lock().unwrap();
2211 assert!(
2212 state.child.is_none(),
2213 "child handle still released even when marker exists"
2214 );
2215 assert!(
2216 state.detached,
2217 "task still marked detached even when marker exists"
2218 );
2219 assert_eq!(
2224 state.metadata.status,
2225 BgTaskStatus::Running,
2226 "reap_child must defer to poll_task when marker exists"
2227 );
2228 }
2229
2230 #[test]
2231 fn cleanup_finished_keeps_running_tasks() {
2232 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2233 let dir = tempfile::tempdir().unwrap();
2234 let task_id = registry
2235 .spawn(
2236 LONG_RUNNING_COMMAND,
2237 "session".to_string(),
2238 dir.path().to_path_buf(),
2239 HashMap::new(),
2240 Some(Duration::from_secs(30)),
2241 dir.path().to_path_buf(),
2242 10,
2243 true,
2244 false,
2245 Some(dir.path().to_path_buf()),
2246 )
2247 .unwrap();
2248
2249 registry.cleanup_finished(Duration::ZERO);
2250
2251 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2252 let _ = registry.kill(&task_id, "session");
2253 }
2254
2255 #[cfg(windows)]
2256 fn wait_for_file(path: &Path) -> String {
2257 let started = Instant::now();
2258 loop {
2259 if path.exists() {
2260 return fs::read_to_string(path).expect("read file");
2261 }
2262 assert!(
2263 started.elapsed() < Duration::from_secs(30),
2264 "timed out waiting for {}",
2265 path.display()
2266 );
2267 std::thread::sleep(Duration::from_millis(100));
2268 }
2269 }
2270
2271 #[cfg(windows)]
2272 fn spawn_windows_registry_command(
2273 command: &str,
2274 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2275 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2276 let dir = tempfile::tempdir().unwrap();
2277 let task_id = registry
2278 .spawn(
2279 command,
2280 "session".to_string(),
2281 dir.path().to_path_buf(),
2282 HashMap::new(),
2283 Some(Duration::from_secs(30)),
2284 dir.path().to_path_buf(),
2285 10,
2286 false,
2287 false,
2288 Some(dir.path().to_path_buf()),
2289 )
2290 .unwrap();
2291 (registry, dir, task_id)
2292 }
2293
2294 #[cfg(windows)]
2295 #[test]
2296 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2297 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2298 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2299
2300 let content = wait_for_file(&exit_path);
2301
2302 assert_eq!(content.trim(), "0");
2303 }
2304
2305 #[cfg(windows)]
2306 #[test]
2307 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2308 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2309 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2310
2311 let content = wait_for_file(&exit_path);
2312
2313 assert_eq!(content.trim(), "42");
2314 }
2315
2316 #[cfg(windows)]
2317 #[test]
2318 fn windows_spawn_captures_stdout_to_disk() {
2319 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2320 let task = registry.task_for_session(&task_id, "session").unwrap();
2321 let stdout_path = task.paths.stdout.clone();
2322 let exit_path = task.paths.exit.clone();
2323
2324 let _ = wait_for_file(&exit_path);
2325 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2326
2327 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2328 }
2329
2330 #[cfg(windows)]
2331 #[test]
2332 fn windows_spawn_uses_pwsh_when_available() {
2333 let candidates = crate::windows_shell::shell_candidates_with(
2337 |binary| match binary {
2338 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2339 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2340 _ => None,
2341 },
2342 || None,
2343 );
2344 let shell = candidates.first().expect("at least one candidate").clone();
2345 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2346 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2347 }
2348
2349 #[cfg(windows)]
2356 #[test]
2357 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2358 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2359 let script =
2360 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2361
2362 assert!(
2366 script.contains("set CODE=%ERRORLEVEL%"),
2367 "wrapper must capture exit code into CODE: {script}"
2368 );
2369 assert!(
2370 script.contains("echo %CODE% >"),
2371 "wrapper must echo CODE to a temp marker file: {script}"
2372 );
2373 assert!(
2374 script.contains("move /Y"),
2375 "wrapper must use atomic move to write the marker: {script}"
2376 );
2377 assert!(
2380 script.contains("> nul"),
2381 "wrapper must redirect move output to nul: {script}"
2382 );
2383 assert!(
2385 script.contains("exit /B %CODE%"),
2386 "wrapper must propagate the captured exit code: {script}"
2387 );
2388 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2389 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2390 }
2391
2392 #[cfg(windows)]
2398 #[test]
2399 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
2400 use crate::windows_shell::WindowsShell;
2401 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2402 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2403 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2404 assert_eq!(
2405 args_strs,
2406 vec!["/D", "/S", "/C", "echo wrapped"],
2407 "Cmd::bg_command must prepend /D /S /C"
2408 );
2409 }
2410
2411 #[cfg(windows)]
2415 #[test]
2416 fn windows_shell_pwsh_bg_command_uses_standard_args() {
2417 use crate::windows_shell::WindowsShell;
2418 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2419 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2420 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2421 assert!(
2422 args_strs.contains(&"-Command"),
2423 "Pwsh::bg_command must use -Command: {args_strs:?}"
2424 );
2425 assert!(
2426 args_strs.contains(&"Get-Date"),
2427 "Pwsh::bg_command must include the user command body"
2428 );
2429 }
2430
2431 #[allow(dead_code)]
2462 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2464}