1use std::collections::{HashMap, HashSet, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6#[cfg(unix)]
7use std::sync::OnceLock;
8use std::sync::{Arc, Mutex};
9use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
10
11use serde::Serialize;
12
13use crate::context::SharedProgressSender;
14use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
15
16#[cfg(unix)]
17use std::os::unix::process::CommandExt;
18#[cfg(windows)]
19use std::os::windows::process::CommandExt;
20
21use super::buffer::BgBuffer;
22use super::persistence::{
23 create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
24 task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
25 PersistedTask, TaskPaths,
26};
27use super::process::is_process_alive;
28#[cfg(unix)]
29use super::process::terminate_pgid;
30#[cfg(windows)]
31use super::process::terminate_pid;
32use super::{BgTaskInfo, BgTaskStatus};
33const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
41const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
42const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
43const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
44
45const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
52
53#[derive(Debug, Clone, Serialize)]
54pub struct BgCompletion {
55 pub task_id: String,
56 #[serde(skip_serializing)]
59 pub session_id: String,
60 pub status: BgTaskStatus,
61 pub exit_code: Option<i32>,
62 pub command: String,
63 #[serde(default, skip_serializing_if = "String::is_empty")]
69 pub output_preview: String,
70 #[serde(default, skip_serializing_if = "is_false")]
75 pub output_truncated: bool,
76}
77
78fn is_false(v: &bool) -> bool {
79 !*v
80}
81
82#[derive(Debug, Clone, Serialize)]
83pub struct BgTaskSnapshot {
84 #[serde(flatten)]
85 pub info: BgTaskInfo,
86 pub exit_code: Option<i32>,
87 pub child_pid: Option<u32>,
88 pub workdir: String,
89 pub output_preview: String,
90 pub output_truncated: bool,
91 pub output_path: Option<String>,
92 pub stderr_path: Option<String>,
93}
94
95#[derive(Clone)]
96pub struct BgTaskRegistry {
97 pub(crate) inner: Arc<RegistryInner>,
98}
99
100pub(crate) struct RegistryInner {
101 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
102 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
103 pub(crate) progress_sender: SharedProgressSender,
104 watchdog_started: AtomicBool,
105 pub(crate) shutdown: AtomicBool,
106 pub(crate) long_running_reminder_enabled: AtomicBool,
107 pub(crate) long_running_reminder_interval_ms: AtomicU64,
108 persisted_gc_started: AtomicBool,
109 #[cfg(test)]
110 persisted_gc_runs: AtomicU64,
111 pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
117}
118
119pub(crate) struct BgTask {
120 pub(crate) task_id: String,
121 pub(crate) session_id: String,
122 pub(crate) paths: TaskPaths,
123 pub(crate) started: Instant,
124 pub(crate) last_reminder_at: Mutex<Option<Instant>>,
125 pub(crate) terminal_at: Mutex<Option<Instant>>,
126 pub(crate) state: Mutex<BgTaskState>,
127}
128
129pub(crate) struct BgTaskState {
130 pub(crate) metadata: PersistedTask,
131 pub(crate) child: Option<Child>,
132 pub(crate) detached: bool,
133 pub(crate) buffer: BgBuffer,
134}
135
136impl BgTaskRegistry {
137 pub fn new(progress_sender: SharedProgressSender) -> Self {
138 Self {
139 inner: Arc::new(RegistryInner {
140 tasks: Mutex::new(HashMap::new()),
141 completions: Mutex::new(VecDeque::new()),
142 progress_sender,
143 watchdog_started: AtomicBool::new(false),
144 shutdown: AtomicBool::new(false),
145 long_running_reminder_enabled: AtomicBool::new(true),
146 long_running_reminder_interval_ms: AtomicU64::new(600_000),
147 persisted_gc_started: AtomicBool::new(false),
148 #[cfg(test)]
149 persisted_gc_runs: AtomicU64::new(0),
150 compressor: Mutex::new(None),
151 }),
152 }
153 }
154
155 pub fn set_compressor<F>(&self, compressor: F)
160 where
161 F: Fn(&str, String) -> String + Send + Sync + 'static,
162 {
163 if let Ok(mut slot) = self.inner.compressor.lock() {
164 *slot = Some(Box::new(compressor));
165 }
166 }
167
168 pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
171 let Ok(slot) = self.inner.compressor.lock() else {
172 return output;
173 };
174 match slot.as_ref() {
175 Some(compressor) => compressor(command, output),
176 None => output,
177 }
178 }
179
180 pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
181 self.inner
182 .long_running_reminder_enabled
183 .store(enabled, Ordering::SeqCst);
184 self.inner
185 .long_running_reminder_interval_ms
186 .store(interval_ms, Ordering::SeqCst);
187 }
188
189 #[cfg(unix)]
190 #[allow(clippy::too_many_arguments)]
191 pub fn spawn(
192 &self,
193 command: &str,
194 session_id: String,
195 workdir: PathBuf,
196 env: HashMap<String, String>,
197 timeout: Option<Duration>,
198 storage_dir: PathBuf,
199 max_running: usize,
200 notify_on_completion: bool,
201 compressed: bool,
202 project_root: Option<PathBuf>,
203 ) -> Result<String, String> {
204 self.start_watchdog();
205
206 let running = self.running_count();
207 if running >= max_running {
208 return Err(format!(
209 "background bash task limit exceeded: {running} running (max {max_running})"
210 ));
211 }
212
213 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
214 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
215 let task_id = self.generate_unique_task_id()?;
216 let paths = task_paths(&storage_dir, &session_id, &task_id);
217 fs::create_dir_all(&paths.dir)
218 .map_err(|e| format!("failed to create background task dir: {e}"))?;
219
220 let mut metadata = PersistedTask::starting(
221 task_id.clone(),
222 session_id.clone(),
223 command.to_string(),
224 workdir.clone(),
225 project_root,
226 timeout_ms,
227 notify_on_completion,
228 compressed,
229 );
230 write_task(&paths.json, &metadata)
231 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
232
233 create_capture_file(&paths.stdout)
237 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
238 create_capture_file(&paths.stderr)
239 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
240
241 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
242 Ok(child) => child,
243 Err(error) => {
244 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
245 let _ = delete_task_bundle(&paths);
246 return Err(error);
247 }
248 };
249
250 let child_pid = child.id();
251 metadata.mark_running(child_pid, child_pid as i32);
252 write_task(&paths.json, &metadata)
253 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
254
255 let task = Arc::new(BgTask {
256 task_id: task_id.clone(),
257 session_id,
258 paths: paths.clone(),
259 started: Instant::now(),
260 last_reminder_at: Mutex::new(None),
261 terminal_at: Mutex::new(None),
262 state: Mutex::new(BgTaskState {
263 metadata,
264 child: Some(child),
265 detached: false,
266 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
267 }),
268 });
269
270 self.inner
271 .tasks
272 .lock()
273 .map_err(|_| "background task registry lock poisoned".to_string())?
274 .insert(task_id.clone(), task);
275
276 Ok(task_id)
277 }
278
279 #[cfg(windows)]
280 #[allow(clippy::too_many_arguments)]
281 pub fn spawn(
282 &self,
283 command: &str,
284 session_id: String,
285 workdir: PathBuf,
286 env: HashMap<String, String>,
287 timeout: Option<Duration>,
288 storage_dir: PathBuf,
289 max_running: usize,
290 notify_on_completion: bool,
291 compressed: bool,
292 project_root: Option<PathBuf>,
293 ) -> Result<String, String> {
294 self.start_watchdog();
295
296 let running = self.running_count();
297 if running >= max_running {
298 return Err(format!(
299 "background bash task limit exceeded: {running} running (max {max_running})"
300 ));
301 }
302
303 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
304 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
305 let task_id = self.generate_unique_task_id()?;
306 let paths = task_paths(&storage_dir, &session_id, &task_id);
307 fs::create_dir_all(&paths.dir)
308 .map_err(|e| format!("failed to create background task dir: {e}"))?;
309
310 let mut metadata = PersistedTask::starting(
311 task_id.clone(),
312 session_id.clone(),
313 command.to_string(),
314 workdir.clone(),
315 project_root,
316 timeout_ms,
317 notify_on_completion,
318 compressed,
319 );
320 write_task(&paths.json, &metadata)
321 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
322
323 create_capture_file(&paths.stdout)
329 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
330 create_capture_file(&paths.stderr)
331 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
332
333 let child = match spawn_detached_child(command, &paths, &workdir, &env) {
334 Ok(child) => child,
335 Err(error) => {
336 crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
337 let _ = delete_task_bundle(&paths);
338 return Err(error);
339 }
340 };
341
342 let child_pid = child.id();
343 metadata.status = BgTaskStatus::Running;
344 metadata.child_pid = Some(child_pid);
345 metadata.pgid = None;
346 write_task(&paths.json, &metadata)
347 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
348
349 let task = Arc::new(BgTask {
350 task_id: task_id.clone(),
351 session_id,
352 paths: paths.clone(),
353 started: Instant::now(),
354 last_reminder_at: Mutex::new(None),
355 terminal_at: Mutex::new(None),
356 state: Mutex::new(BgTaskState {
357 metadata,
358 child: Some(child),
359 detached: false,
360 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
361 }),
362 });
363
364 self.inner
365 .tasks
366 .lock()
367 .map_err(|_| "background task registry lock poisoned".to_string())?
368 .insert(task_id.clone(), task);
369
370 Ok(task_id)
371 }
372
373 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
374 self.replay_session_inner(storage_dir, session_id, None)
375 }
376
377 pub fn replay_session_for_project(
378 &self,
379 storage_dir: &Path,
380 session_id: &str,
381 project_root: &Path,
382 ) -> Result<(), String> {
383 self.replay_session_inner(storage_dir, session_id, Some(project_root))
384 }
385
386 fn replay_session_inner(
387 &self,
388 storage_dir: &Path,
389 session_id: &str,
390 project_root: Option<&Path>,
391 ) -> Result<(), String> {
392 self.start_watchdog();
393 if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
394 if let Err(error) = self.maybe_gc_persisted(storage_dir) {
395 crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
396 }
397 }
398 let dir = session_tasks_dir(storage_dir, session_id);
399 if !dir.exists() {
400 return Ok(());
401 }
402
403 let canonical_project = project_root.map(canonicalized_path);
404 let entries = fs::read_dir(&dir)
405 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
406 for entry in entries.flatten() {
407 let path = entry.path();
408 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
409 continue;
410 }
411 let mut metadata = match read_task(&path) {
412 Ok(metadata) => metadata,
413 Err(error) => {
414 crate::slog_warn!(
415 "quarantining invalid background task metadata {} during replay: {error}",
416 path.display()
417 );
418 if let Err(quarantine_error) =
419 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
420 {
421 crate::slog_warn!(
422 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
423 path.display()
424 );
425 }
426 continue;
427 }
428 };
429 if metadata.session_id != session_id {
430 continue;
431 }
432 if let Some(canonical_project) = canonical_project.as_deref() {
433 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
434 if metadata_project.as_deref() != Some(canonical_project) {
435 continue;
436 }
437 }
438
439 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
440 match metadata.status {
441 BgTaskStatus::Starting => {
442 metadata.mark_terminal(
443 BgTaskStatus::Failed,
444 None,
445 Some("spawn aborted".to_string()),
446 );
447 let _ = write_task(&paths.json, &metadata);
448 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
449 self.insert_rehydrated_task(metadata, paths, true)?;
450 }
451 BgTaskStatus::Running | BgTaskStatus::Killing => {
452 if self.running_metadata_is_stale(&metadata) {
453 metadata.mark_terminal(
454 BgTaskStatus::Killed,
455 None,
456 Some("orphaned (>24h)".to_string()),
457 );
458 if !paths.exit.exists() {
459 let _ = write_kill_marker_if_absent(&paths.exit);
460 }
461 let _ = write_task(&paths.json, &metadata);
462 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
463 self.insert_rehydrated_task(metadata, paths, true)?;
464 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
465 let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
466 "recovered from inconsistent killing state on replay".to_string()
467 });
468 if reason.is_some() {
469 crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
470 metadata.task_id);
471 }
472 metadata = terminal_metadata_from_marker(metadata, marker, reason);
473 let _ = write_task(&paths.json, &metadata);
474 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
475 self.insert_rehydrated_task(metadata, paths, true)?;
476 } else if metadata.status == BgTaskStatus::Killing {
477 if !paths.exit.exists() {
478 let _ = write_kill_marker_if_absent(&paths.exit);
479 }
480 metadata.mark_terminal(
481 BgTaskStatus::Killed,
482 None,
483 Some("recovered from inconsistent killing state on replay".to_string()),
484 );
485 let _ = write_task(&paths.json, &metadata);
486 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
487 self.insert_rehydrated_task(metadata, paths, true)?;
488 } else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
489 metadata.mark_terminal(
490 BgTaskStatus::Failed,
491 None,
492 Some("process exited without exit marker".to_string()),
493 );
494 let _ = write_task(&paths.json, &metadata);
495 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
496 self.insert_rehydrated_task(metadata, paths, true)?;
497 } else {
498 self.insert_rehydrated_task(metadata, paths, true)?;
499 }
500 }
501 _ if metadata.status.is_terminal() => {
502 self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
508 self.insert_rehydrated_task(metadata, paths, true)?;
509 }
510 _ => {}
511 }
512 }
513
514 Ok(())
515 }
516
517 pub fn status(
518 &self,
519 task_id: &str,
520 session_id: &str,
521 project_root: Option<&Path>,
522 storage_dir: Option<&Path>,
523 preview_bytes: usize,
524 ) -> Option<BgTaskSnapshot> {
525 let mut task = self.task_for_session(task_id, session_id);
526 if task.is_none() {
527 if let Some(storage_dir) = storage_dir {
528 let _ = self.replay_session(storage_dir, session_id);
529 task = self.task_for_session(task_id, session_id);
530 }
531 }
532 let Some(task) = task else {
533 return self.status_relaxed(
534 task_id,
535 session_id,
536 project_root?,
537 storage_dir?,
538 preview_bytes,
539 );
540 };
541 let _ = self.poll_task(&task);
542 let mut snapshot = task.snapshot(preview_bytes);
543 self.maybe_compress_snapshot(&task, &mut snapshot);
544 Some(snapshot)
545 }
546
547 fn status_relaxed_task(
548 &self,
549 task_id: &str,
550 project_root: &Path,
551 storage_dir: &Path,
552 ) -> Option<Arc<BgTask>> {
553 let canonical_project = canonicalized_path(project_root);
554 let root = storage_dir.join("bash-tasks");
555 let entries = fs::read_dir(&root).ok()?;
556 for entry in entries.flatten() {
557 let dir = entry.path();
558 if !dir.is_dir() {
559 continue;
560 }
561 let path = dir.join(format!("{task_id}.json"));
562 if !path.exists() {
563 continue;
564 }
565 let metadata = match read_task(&path) {
566 Ok(metadata) => metadata,
567 Err(error) => {
568 crate::slog_warn!(
569 "quarantining invalid background task metadata {} during relaxed lookup: {error}",
570 path.display()
571 );
572 if let Err(quarantine_error) =
573 quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
574 {
575 crate::slog_warn!(
576 "failed to quarantine invalid background task metadata {}: {quarantine_error}",
577 path.display()
578 );
579 }
580 continue;
581 }
582 };
583 let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
584 if metadata_project.as_deref() != Some(canonical_project.as_path()) {
585 continue;
586 }
587 if let Some(task) = self.task(task_id) {
588 let matches_project = task
589 .state
590 .lock()
591 .map(|state| {
592 state
593 .metadata
594 .project_root
595 .as_deref()
596 .map(canonicalized_path)
597 .as_deref()
598 == Some(canonical_project.as_path())
599 })
600 .unwrap_or(false);
601 return matches_project.then_some(task);
602 }
603 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
604 if self.insert_rehydrated_task(metadata, paths, true).is_err() {
605 return None;
606 }
607 return self.task(task_id);
608 }
609 None
610 }
611
612 pub(super) fn status_relaxed(
613 &self,
614 task_id: &str,
615 _session_id: &str,
616 project_root: &Path,
617 storage_dir: &Path,
618 preview_bytes: usize,
619 ) -> Option<BgTaskSnapshot> {
620 let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
621 let _ = self.poll_task(&task);
622 let mut snapshot = task.snapshot(preview_bytes);
623 self.maybe_compress_snapshot(&task, &mut snapshot);
624 Some(snapshot)
625 }
626
627 pub fn kill_relaxed(
628 &self,
629 task_id: &str,
630 project_root: &Path,
631 storage_dir: &Path,
632 ) -> Result<BgTaskSnapshot, String> {
633 let task = self
634 .status_relaxed_task(task_id, project_root, storage_dir)
635 .ok_or_else(|| format!("background task not found: {task_id}"))?;
636 self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
637 }
638
639 pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
640 #[cfg(test)]
641 self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
642
643 let mut deleted = 0usize;
644
645 let root = storage_dir.join("bash-tasks");
646 if root.exists() {
647 let session_dirs = fs::read_dir(&root).map_err(|e| {
648 format!(
649 "failed to read background task root {}: {e}",
650 root.display()
651 )
652 })?;
653 for session_entry in session_dirs.flatten() {
654 let session_dir = session_entry.path();
655 if !session_dir.is_dir() {
656 continue;
657 }
658 let task_entries = match fs::read_dir(&session_dir) {
659 Ok(entries) => entries,
660 Err(error) => {
661 crate::slog_warn!(
662 "failed to read background task session dir {}: {error}",
663 session_dir.display()
664 );
665 continue;
666 }
667 };
668 for task_entry in task_entries.flatten() {
669 let json_path = task_entry.path();
670 if json_path
671 .extension()
672 .and_then(|extension| extension.to_str())
673 != Some("json")
674 {
675 continue;
676 }
677 if modified_within(&json_path, PERSISTED_GC_GRACE) {
678 continue;
679 }
680 let metadata = match read_task(&json_path) {
681 Ok(metadata) => metadata,
682 Err(error) => {
683 crate::slog_warn!(
684 "quarantining corrupt background task metadata {}: {error}",
685 json_path.display()
686 );
687 quarantine_task_json(
688 storage_dir,
689 &session_dir,
690 &json_path,
691 QuarantineKind::Corrupt,
692 )?;
693 continue;
694 }
695 };
696 if !(metadata.status.is_terminal() && metadata.completion_delivered) {
697 continue;
698 }
699 let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
700 match delete_task_bundle(&paths) {
701 Ok(()) => {
702 deleted += 1;
703 log::debug!(
704 "deleted persisted background task bundle {}",
705 metadata.task_id
706 );
707 }
708 Err(error) => {
709 crate::slog_warn!(
710 "failed to delete background task bundle {}: {error}",
711 metadata.task_id
712 );
713 continue;
714 }
715 }
716 }
717 }
718 }
719 gc_quarantine(storage_dir);
720 Ok(deleted)
721 }
722
723 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
724 let tasks = self
725 .inner
726 .tasks
727 .lock()
728 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
729 .unwrap_or_default();
730 tasks
731 .into_iter()
732 .map(|task| {
733 let _ = self.poll_task(&task);
734 let mut snapshot = task.snapshot(preview_bytes);
735 self.maybe_compress_snapshot(&task, &mut snapshot);
736 snapshot
737 })
738 .collect()
739 }
740
741 fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
747 if !snapshot.info.status.is_terminal() {
748 return;
749 }
750 let compressed_flag = task
751 .state
752 .lock()
753 .map(|state| state.metadata.compressed)
754 .unwrap_or(true);
755 if !compressed_flag {
756 return;
757 }
758 let raw = std::mem::take(&mut snapshot.output_preview);
759 snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
760 }
761
762 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
763 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
764 }
765
766 pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
767 let task = self
768 .task_for_session(task_id, session_id)
769 .ok_or_else(|| format!("background task not found: {task_id}"))?;
770 let mut state = task
771 .state
772 .lock()
773 .map_err(|_| "background task lock poisoned".to_string())?;
774 let updated = update_task(&task.paths.json, |metadata| {
775 metadata.notify_on_completion = true;
776 metadata.completion_delivered = false;
777 })
778 .map_err(|e| format!("failed to promote background task: {e}"))?;
779 state.metadata = updated;
780 if state.metadata.status.is_terminal() {
781 state.buffer.enforce_terminal_cap();
782 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
783 }
784 Ok(true)
785 }
786
787 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
788 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
789 .map(|_| ())
790 }
791
792 pub fn cleanup_finished(&self, older_than: Duration) {
793 let cutoff = Instant::now().checked_sub(older_than);
794 let removable_paths: Vec<(String, TaskPaths)> =
795 if let Ok(mut tasks) = self.inner.tasks.lock() {
796 let removable = tasks
797 .iter()
798 .filter_map(|(task_id, task)| {
799 let delivered_terminal = task
800 .state
801 .lock()
802 .map(|state| {
803 state.metadata.status.is_terminal()
804 && state.metadata.completion_delivered
805 })
806 .unwrap_or(false);
807 if !delivered_terminal {
808 return None;
809 }
810
811 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
812 let expired = match (terminal_at, cutoff) {
813 (Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
814 (Some(_), None) => true,
815 (None, _) => false,
816 };
817 expired.then(|| task_id.clone())
818 })
819 .collect::<Vec<_>>();
820
821 removable
822 .into_iter()
823 .filter_map(|task_id| {
824 tasks
825 .remove(&task_id)
826 .map(|task| (task_id, task.paths.clone()))
827 })
828 .collect()
829 } else {
830 Vec::new()
831 };
832
833 for (task_id, paths) in removable_paths {
834 match delete_task_bundle(&paths) {
835 Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
836 Err(error) => crate::slog_warn!(
837 "failed to delete persisted background task bundle {task_id}: {error}"
838 ),
839 }
840 }
841 }
842
843 pub fn drain_completions(&self) -> Vec<BgCompletion> {
844 self.drain_completions_for_session(None)
845 }
846
847 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
848 let completions = match self.inner.completions.lock() {
849 Ok(completions) => completions,
850 Err(_) => return Vec::new(),
851 };
852
853 completions
854 .iter()
855 .filter(|completion| {
856 session_id
857 .map(|session_id| completion.session_id == session_id)
858 .unwrap_or(true)
859 })
860 .cloned()
861 .collect()
862 }
863
864 pub fn ack_completions_for_session(
865 &self,
866 session_id: Option<&str>,
867 task_ids: &[String],
868 ) -> Vec<String> {
869 if task_ids.is_empty() {
870 return Vec::new();
871 }
872 let task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
873 let mut completions = match self.inner.completions.lock() {
874 Ok(completions) => completions,
875 Err(_) => return Vec::new(),
876 };
877 let mut acked = Vec::new();
878 completions.retain(|completion| {
879 let session_matches = session_id
880 .map(|session_id| completion.session_id == session_id)
881 .unwrap_or(true);
882 if session_matches && task_ids.contains(completion.task_id.as_str()) {
883 acked.push((completion.task_id.clone(), completion.session_id.clone()));
884 false
885 } else {
886 true
887 }
888 });
889 drop(completions);
890
891 let mut delivered = Vec::new();
892 for (task_id, completion_session_id) in acked {
893 if let Some(task) = self.task_for_session(&task_id, &completion_session_id) {
894 if task.set_completion_delivered(true).is_ok() {
895 delivered.push(task_id);
896 }
897 }
898 }
899
900 delivered
901 }
902
903 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
904 self.inner
905 .completions
906 .lock()
907 .map(|completions| {
908 completions
909 .iter()
910 .filter(|completion| completion.session_id == session_id)
911 .cloned()
912 .collect()
913 })
914 .unwrap_or_default()
915 }
916
917 pub fn detach(&self) {
918 self.inner.shutdown.store(true, Ordering::SeqCst);
919 if let Ok(mut tasks) = self.inner.tasks.lock() {
920 for task in tasks.values() {
921 if let Ok(mut state) = task.state.lock() {
922 state.child = None;
923 state.detached = true;
924 }
925 }
926 tasks.clear();
927 }
928 }
929
930 pub fn shutdown(&self) {
931 let tasks = self
932 .inner
933 .tasks
934 .lock()
935 .map(|tasks| {
936 tasks
937 .values()
938 .map(|task| (task.task_id.clone(), task.session_id.clone()))
939 .collect::<Vec<_>>()
940 })
941 .unwrap_or_default();
942 for (task_id, session_id) in tasks {
943 let _ = self.kill(&task_id, &session_id);
944 }
945 }
946
947 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
948 let marker = match read_exit_marker(&task.paths.exit) {
949 Ok(Some(marker)) => marker,
950 Ok(None) => return Ok(()),
951 Err(error) => return Err(format!("failed to read exit marker: {error}")),
952 };
953 self.finalize_from_marker(task, marker, None)
954 }
955
956 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
957 let Ok(mut state) = task.state.lock() else {
958 return;
959 };
960 if let Some(child) = state.child.as_mut() {
961 if matches!(child.try_wait(), Ok(Some(_))) {
962 state.child = None;
979 state.detached = true;
980 self.fail_without_exit_marker_if_needed(task, &mut state);
981 }
982 } else if state.detached
983 && state
984 .metadata
985 .child_pid
986 .is_some_and(|pid| !is_process_alive(pid))
987 {
988 self.fail_without_exit_marker_if_needed(task, &mut state);
989 }
990 }
991
992 fn fail_without_exit_marker_if_needed(&self, task: &Arc<BgTask>, state: &mut BgTaskState) {
993 if state.metadata.status.is_terminal() {
994 return;
995 }
996 if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
997 return;
998 }
999 let updated = update_task(&task.paths.json, |metadata| {
1000 metadata.mark_terminal(
1001 BgTaskStatus::Failed,
1002 None,
1003 Some("process exited without exit marker".to_string()),
1004 );
1005 });
1006 if let Ok(metadata) = updated {
1007 state.metadata = metadata;
1008 task.mark_terminal_now();
1009 state.buffer.enforce_terminal_cap();
1010 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1011 }
1012 }
1013
1014 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
1015 self.inner
1016 .tasks
1017 .lock()
1018 .map(|tasks| {
1019 tasks
1020 .values()
1021 .filter(|task| task.is_running())
1022 .cloned()
1023 .collect()
1024 })
1025 .unwrap_or_default()
1026 }
1027
1028 fn insert_rehydrated_task(
1029 &self,
1030 metadata: PersistedTask,
1031 paths: TaskPaths,
1032 detached: bool,
1033 ) -> Result<(), String> {
1034 let task_id = metadata.task_id.clone();
1035 let session_id = metadata.session_id.clone();
1036 let started = started_instant_from_unix_millis(metadata.started_at);
1037 let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
1038 let task = Arc::new(BgTask {
1039 task_id: task_id.clone(),
1040 session_id,
1041 paths: paths.clone(),
1042 started,
1043 last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
1044 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
1045 state: Mutex::new(BgTaskState {
1046 metadata,
1047 child: None,
1048 detached,
1049 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
1050 }),
1051 });
1052 self.inner
1053 .tasks
1054 .lock()
1055 .map_err(|_| "background task registry lock poisoned".to_string())?
1056 .insert(task_id, task);
1057 Ok(())
1058 }
1059
1060 fn kill_with_status(
1061 &self,
1062 task_id: &str,
1063 session_id: &str,
1064 terminal_status: BgTaskStatus,
1065 ) -> Result<BgTaskSnapshot, String> {
1066 let task = self
1067 .task_for_session(task_id, session_id)
1068 .ok_or_else(|| format!("background task not found: {task_id}"))?;
1069
1070 {
1071 let mut state = task
1072 .state
1073 .lock()
1074 .map_err(|_| "background task lock poisoned".to_string())?;
1075 if state.metadata.status.is_terminal() {
1076 return Ok(task.snapshot_locked(&state, 5 * 1024));
1077 }
1078
1079 if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
1080 state.metadata =
1081 terminal_metadata_from_marker(state.metadata.clone(), marker, None);
1082 task.mark_terminal_now();
1083 state.child = None;
1084 state.detached = true;
1085 state.buffer.enforce_terminal_cap();
1086 write_task(&task.paths.json, &state.metadata)
1087 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1088 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1089 return Ok(task.snapshot_locked(&state, 5 * 1024));
1090 }
1091
1092 state.metadata.status = BgTaskStatus::Killing;
1093 write_task(&task.paths.json, &state.metadata)
1094 .map_err(|e| format!("failed to persist killing state: {e}"))?;
1095
1096 #[cfg(unix)]
1097 if let Some(pgid) = state.metadata.pgid {
1098 terminate_pgid(pgid, state.child.as_mut());
1099 }
1100 #[cfg(windows)]
1101 if let Some(child) = state.child.as_mut() {
1102 super::process::terminate_process(child);
1103 } else if let Some(pid) = state.metadata.child_pid {
1104 terminate_pid(pid);
1105 }
1106 if let Some(child) = state.child.as_mut() {
1107 let _ = child.wait();
1108 }
1109 state.child = None;
1110 state.detached = true;
1111
1112 if !task.paths.exit.exists() {
1113 write_kill_marker_if_absent(&task.paths.exit)
1114 .map_err(|e| format!("failed to write kill marker: {e}"))?;
1115 }
1116
1117 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
1118 Some(124)
1119 } else {
1120 None
1121 };
1122 state
1123 .metadata
1124 .mark_terminal(terminal_status, exit_code, None);
1125 task.mark_terminal_now();
1126 write_task(&task.paths.json, &state.metadata)
1127 .map_err(|e| format!("failed to persist killed state: {e}"))?;
1128 state.buffer.enforce_terminal_cap();
1129 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1130 }
1131
1132 Ok(task.snapshot(5 * 1024))
1133 }
1134
1135 fn finalize_from_marker(
1136 &self,
1137 task: &Arc<BgTask>,
1138 marker: ExitMarker,
1139 reason: Option<String>,
1140 ) -> Result<(), String> {
1141 let mut state = task
1142 .state
1143 .lock()
1144 .map_err(|_| "background task lock poisoned".to_string())?;
1145 if state.metadata.status.is_terminal() {
1146 return Ok(());
1147 }
1148
1149 let updated = update_task(&task.paths.json, |metadata| {
1150 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
1151 *metadata = new_metadata;
1152 })
1153 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
1154 state.metadata = updated;
1155 task.mark_terminal_now();
1156 state.child = None;
1157 state.detached = true;
1158 state.buffer.enforce_terminal_cap();
1159 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
1160 Ok(())
1161 }
1162
1163 fn enqueue_completion_if_needed(
1164 &self,
1165 metadata: &PersistedTask,
1166 paths: Option<&TaskPaths>,
1167 emit_frame: bool,
1168 ) {
1169 if metadata.status.is_terminal() && !metadata.completion_delivered {
1170 self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
1171 }
1172 }
1173
1174 fn enqueue_completion_locked(
1175 &self,
1176 metadata: &PersistedTask,
1177 buffer: Option<&BgBuffer>,
1178 emit_frame: bool,
1179 ) {
1180 self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
1181 }
1182
1183 fn enqueue_completion_from_parts(
1184 &self,
1185 metadata: &PersistedTask,
1186 buffer: Option<&BgBuffer>,
1187 paths: Option<&TaskPaths>,
1188 emit_frame: bool,
1189 ) {
1190 if !metadata.status.is_terminal() || metadata.completion_delivered {
1191 return;
1192 }
1193 let (raw_preview, output_truncated) = match buffer {
1198 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
1199 None => paths
1200 .map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
1201 .unwrap_or_else(|| (String::new(), false)),
1202 };
1203 let output_preview = if metadata.compressed {
1208 self.compress_output(&metadata.command, raw_preview)
1209 } else {
1210 raw_preview
1211 };
1212 let completion = BgCompletion {
1213 task_id: metadata.task_id.clone(),
1214 session_id: metadata.session_id.clone(),
1215 status: metadata.status.clone(),
1216 exit_code: metadata.exit_code,
1217 command: metadata.command.clone(),
1218 output_preview,
1219 output_truncated,
1220 };
1221 if let Ok(mut completions) = self.inner.completions.lock() {
1222 if completions
1223 .iter()
1224 .any(|completion| completion.task_id == metadata.task_id)
1225 {
1226 return;
1227 }
1228 completions.push_back(completion.clone());
1229 } else {
1230 return;
1231 }
1232
1233 if emit_frame {
1234 self.emit_bash_completed(completion);
1235 }
1236 }
1237
1238 fn emit_bash_completed(&self, completion: BgCompletion) {
1239 let Ok(progress_sender) = self
1240 .inner
1241 .progress_sender
1242 .lock()
1243 .map(|sender| sender.clone())
1244 else {
1245 return;
1246 };
1247 let Some(sender) = progress_sender.as_ref() else {
1248 return;
1249 };
1250 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
1258 completion.task_id,
1259 completion.session_id,
1260 completion.status,
1261 completion.exit_code,
1262 completion.command,
1263 completion.output_preview,
1264 completion.output_truncated,
1265 )));
1266 }
1267
1268 pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
1269 if !self
1270 .inner
1271 .long_running_reminder_enabled
1272 .load(Ordering::SeqCst)
1273 {
1274 return;
1275 }
1276 let interval_ms = self
1277 .inner
1278 .long_running_reminder_interval_ms
1279 .load(Ordering::SeqCst);
1280 if interval_ms == 0 {
1281 return;
1282 }
1283 let interval = Duration::from_millis(interval_ms);
1284 let now = Instant::now();
1285 let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
1286 return;
1287 };
1288 let since = last_reminder_at.unwrap_or(task.started);
1289 if now.duration_since(since) < interval {
1290 return;
1291 }
1292 let command = task
1293 .state
1294 .lock()
1295 .map(|state| state.metadata.command.clone())
1296 .unwrap_or_default();
1297 *last_reminder_at = Some(now);
1298 self.emit_bash_long_running(BashLongRunningFrame::new(
1299 task.task_id.clone(),
1300 task.session_id.clone(),
1301 command,
1302 task.started.elapsed().as_millis() as u64,
1303 ));
1304 }
1305
1306 fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
1307 let Ok(progress_sender) = self
1308 .inner
1309 .progress_sender
1310 .lock()
1311 .map(|sender| sender.clone())
1312 else {
1313 return;
1314 };
1315 if let Some(sender) = progress_sender.as_ref() {
1316 sender(PushFrame::BashLongRunning(frame));
1317 }
1318 }
1319
1320 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
1321 self.inner
1322 .tasks
1323 .lock()
1324 .ok()
1325 .and_then(|tasks| tasks.get(task_id).cloned())
1326 }
1327
1328 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
1329 self.task(task_id)
1330 .filter(|task| task.session_id == session_id)
1331 }
1332
1333 fn running_count(&self) -> usize {
1334 self.inner
1335 .tasks
1336 .lock()
1337 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
1338 .unwrap_or(0)
1339 }
1340
1341 fn start_watchdog(&self) {
1342 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
1343 super::watchdog::start(self.clone());
1344 }
1345 }
1346
1347 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
1348 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
1349 }
1350
1351 #[cfg(test)]
1352 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1353 self.task_for_session(task_id, session_id)
1354 .map(|task| task.paths.json.clone())
1355 }
1356
1357 #[cfg(test)]
1358 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
1359 self.task_for_session(task_id, session_id)
1360 .map(|task| task.paths.exit.clone())
1361 }
1362
1363 fn generate_unique_task_id(&self) -> Result<String, String> {
1365 for _ in 0..32 {
1366 let candidate = random_slug();
1367 let tasks = self
1368 .inner
1369 .tasks
1370 .lock()
1371 .map_err(|_| "background task registry lock poisoned".to_string())?;
1372 if tasks.contains_key(&candidate) {
1373 continue;
1374 }
1375 let completions = self
1376 .inner
1377 .completions
1378 .lock()
1379 .map_err(|_| "background completions lock poisoned".to_string())?;
1380 if completions
1381 .iter()
1382 .any(|completion| completion.task_id == candidate)
1383 {
1384 continue;
1385 }
1386 return Ok(candidate);
1387 }
1388 Err("failed to allocate unique background task id after 32 attempts".to_string())
1389 }
1390}
1391
1392impl Default for BgTaskRegistry {
1393 fn default() -> Self {
1394 Self::new(Arc::new(Mutex::new(None)))
1395 }
1396}
1397
1398fn modified_within(path: &Path, grace: Duration) -> bool {
1399 fs::metadata(path)
1400 .and_then(|metadata| metadata.modified())
1401 .ok()
1402 .and_then(|modified| SystemTime::now().duration_since(modified).ok())
1403 .map(|age| age < grace)
1404 .unwrap_or(false)
1405}
1406
1407fn canonicalized_path(path: &Path) -> PathBuf {
1408 fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
1409}
1410
1411fn started_instant_from_unix_millis(started_at: u64) -> Instant {
1412 let now_ms = SystemTime::now()
1413 .duration_since(UNIX_EPOCH)
1414 .ok()
1415 .map(|duration| duration.as_millis() as u64)
1416 .unwrap_or(started_at);
1417 let elapsed_ms = now_ms.saturating_sub(started_at);
1418 Instant::now()
1419 .checked_sub(Duration::from_millis(elapsed_ms))
1420 .unwrap_or_else(Instant::now)
1421}
1422
1423fn gc_quarantine(storage_dir: &Path) {
1424 let quarantine_root = storage_dir.join("bash-tasks-quarantine");
1425 let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
1426 return;
1427 };
1428 for session_entry in session_dirs.flatten() {
1429 let session_quarantine_dir = session_entry.path();
1430 if !session_quarantine_dir.is_dir() {
1431 continue;
1432 }
1433 let entries = match fs::read_dir(&session_quarantine_dir) {
1434 Ok(entries) => entries,
1435 Err(error) => {
1436 crate::slog_warn!(
1437 "failed to read background task quarantine dir {}: {error}",
1438 session_quarantine_dir.display()
1439 );
1440 continue;
1441 }
1442 };
1443 for entry in entries.flatten() {
1444 let path = entry.path();
1445 if modified_within(&path, QUARANTINE_GC_GRACE) {
1446 continue;
1447 }
1448 let result = if path.is_dir() {
1449 fs::remove_dir_all(&path)
1450 } else {
1451 fs::remove_file(&path)
1452 };
1453 match result {
1454 Ok(()) => log::debug!(
1455 "deleted old background task quarantine entry {}",
1456 path.display()
1457 ),
1458 Err(error) => crate::slog_warn!(
1459 "failed to delete old background task quarantine entry {}: {error}",
1460 path.display()
1461 ),
1462 }
1463 }
1464 let _ = fs::remove_dir(&session_quarantine_dir);
1465 }
1466 let _ = fs::remove_dir(&quarantine_root);
1467}
1468
1469enum QuarantineKind {
1470 Corrupt,
1471 Invalid,
1472}
1473
1474fn quarantine_task_json(
1475 storage_dir: &Path,
1476 session_dir: &Path,
1477 json_path: &Path,
1478 kind: QuarantineKind,
1479) -> Result<(), String> {
1480 let session_hash = session_dir
1481 .file_name()
1482 .and_then(|name| name.to_str())
1483 .ok_or_else(|| {
1484 format!(
1485 "invalid background task session dir: {}",
1486 session_dir.display()
1487 )
1488 })?;
1489 let task_name = json_path
1490 .file_name()
1491 .and_then(|name| name.to_str())
1492 .ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
1493 let unix_ts = SystemTime::now()
1494 .duration_since(UNIX_EPOCH)
1495 .map(|duration| duration.as_secs())
1496 .unwrap_or(0);
1497 let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
1498 fs::create_dir_all(&quarantine_dir).map_err(|e| {
1499 format!(
1500 "failed to create background task quarantine dir {}: {e}",
1501 quarantine_dir.display()
1502 )
1503 })?;
1504 let target_name = quarantine_name(task_name, unix_ts, &kind);
1505 let target = quarantine_dir.join(target_name);
1506 fs::rename(json_path, &target).map_err(|e| {
1507 format!(
1508 "failed to quarantine background task metadata {} to {}: {e}",
1509 json_path.display(),
1510 target.display()
1511 )
1512 })?;
1513
1514 for sibling in task_sibling_paths(json_path) {
1515 if !sibling.exists() {
1516 continue;
1517 }
1518 let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
1519 crate::slog_warn!(
1520 "skipping background task sibling with invalid name during quarantine: {}",
1521 sibling.display()
1522 );
1523 continue;
1524 };
1525 let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
1526 if let Err(error) = fs::rename(&sibling, &sibling_target) {
1527 crate::slog_warn!(
1528 "failed to quarantine background task sibling {} to {}: {error}",
1529 sibling.display(),
1530 sibling_target.display()
1531 );
1532 }
1533 }
1534
1535 let _ = fs::remove_dir(session_dir);
1536 Ok(())
1537}
1538
1539fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
1540 match kind {
1541 QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
1542 QuarantineKind::Invalid => {
1543 let path = Path::new(file_name);
1544 let stem = path.file_stem().and_then(|stem| stem.to_str());
1545 let extension = path.extension().and_then(|extension| extension.to_str());
1546 match (stem, extension) {
1547 (Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
1548 _ => format!("{file_name}.invalid.{unix_ts}"),
1549 }
1550 }
1551 }
1552}
1553
1554fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
1555 let Some(parent) = json_path.parent() else {
1556 return Vec::new();
1557 };
1558 let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
1559 return Vec::new();
1560 };
1561 ["stdout", "stderr", "exit", "ps1", "bat", "sh"]
1562 .into_iter()
1563 .map(|extension| parent.join(format!("{stem}.{extension}")))
1564 .collect()
1565}
1566
1567fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
1568 let stdout = fs::read(&paths.stdout).unwrap_or_default();
1569 let stderr = fs::read(&paths.stderr).unwrap_or_default();
1570 let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
1571 bytes.extend_from_slice(&stdout);
1572 bytes.extend_from_slice(&stderr);
1573 if bytes.len() <= max_bytes {
1574 return (String::from_utf8_lossy(&bytes).into_owned(), false);
1575 }
1576 let start = bytes.len().saturating_sub(max_bytes);
1577 (String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
1578}
1579
1580impl BgTask {
1581 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
1582 let state = self
1583 .state
1584 .lock()
1585 .unwrap_or_else(|poison| poison.into_inner());
1586 self.snapshot_locked(&state, preview_bytes)
1587 }
1588
1589 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
1590 let metadata = &state.metadata;
1591 let duration_ms = metadata.duration_ms.or_else(|| {
1592 metadata
1593 .status
1594 .is_terminal()
1595 .then(|| self.started.elapsed().as_millis() as u64)
1596 });
1597 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
1598 BgTaskSnapshot {
1599 info: BgTaskInfo {
1600 task_id: self.task_id.clone(),
1601 status: metadata.status.clone(),
1602 command: metadata.command.clone(),
1603 started_at: metadata.started_at,
1604 duration_ms,
1605 },
1606 exit_code: metadata.exit_code,
1607 child_pid: metadata.child_pid,
1608 workdir: metadata.workdir.display().to_string(),
1609 output_preview,
1610 output_truncated,
1611 output_path: state
1612 .buffer
1613 .output_path()
1614 .map(|path| path.display().to_string()),
1615 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
1616 }
1617 }
1618
1619 pub(crate) fn is_running(&self) -> bool {
1620 self.state
1621 .lock()
1622 .map(|state| state.metadata.status == BgTaskStatus::Running)
1623 .unwrap_or(false)
1624 }
1625
1626 fn mark_terminal_now(&self) {
1627 if let Ok(mut terminal_at) = self.terminal_at.lock() {
1628 if terminal_at.is_none() {
1629 *terminal_at = Some(Instant::now());
1630 }
1631 }
1632 }
1633
1634 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
1635 let mut state = self
1636 .state
1637 .lock()
1638 .map_err(|_| "background task lock poisoned".to_string())?;
1639 let updated = update_task(&self.paths.json, |metadata| {
1640 metadata.completion_delivered = delivered;
1641 })
1642 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
1643 state.metadata = updated;
1644 Ok(())
1645 }
1646}
1647
1648fn terminal_metadata_from_marker(
1649 mut metadata: PersistedTask,
1650 marker: ExitMarker,
1651 reason: Option<String>,
1652) -> PersistedTask {
1653 match marker {
1654 ExitMarker::Code(code) => {
1655 let status = if code == 0 {
1656 BgTaskStatus::Completed
1657 } else {
1658 BgTaskStatus::Failed
1659 };
1660 metadata.mark_terminal(status, Some(code), reason);
1661 }
1662 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
1663 }
1664 metadata
1665}
1666
1667#[cfg(unix)]
1668fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
1669 let shell = resolve_posix_shell();
1670 let mut cmd = Command::new(&shell);
1671 cmd.arg("-c")
1672 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
1673 .arg(&shell)
1674 .arg(command)
1675 .arg(exit_path);
1676 unsafe {
1677 cmd.pre_exec(|| {
1678 if libc::setsid() == -1 {
1679 return Err(std::io::Error::last_os_error());
1680 }
1681 Ok(())
1682 });
1683 }
1684 cmd
1685}
1686
1687#[cfg(unix)]
1688fn resolve_posix_shell() -> PathBuf {
1689 static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
1690 POSIX_SHELL
1691 .get_or_init(|| {
1692 std::env::var_os("BASH")
1693 .filter(|value| !value.is_empty())
1694 .map(PathBuf::from)
1695 .filter(|path| path.exists())
1696 .or_else(|| which::which("bash").ok())
1697 .or_else(|| which::which("zsh").ok())
1698 .unwrap_or_else(|| PathBuf::from("/bin/sh"))
1699 })
1700 .clone()
1701}
1702
1703#[cfg(windows)]
1704fn detached_shell_command_for(
1705 shell: crate::windows_shell::WindowsShell,
1706 command: &str,
1707 exit_path: &Path,
1708 paths: &TaskPaths,
1709 creation_flags: u32,
1710) -> Result<Command, String> {
1711 use crate::windows_shell::WindowsShell;
1712 let wrapper_body = shell.wrapper_script(command, exit_path);
1725 let wrapper_ext = match shell {
1726 WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
1727 WindowsShell::Cmd => "bat",
1728 WindowsShell::Posix(_) => "sh",
1732 };
1733 let wrapper_path = paths.dir.join(format!(
1734 "{}.{}",
1735 paths
1736 .json
1737 .file_stem()
1738 .and_then(|s| s.to_str())
1739 .unwrap_or("wrapper"),
1740 wrapper_ext
1741 ));
1742 fs::write(&wrapper_path, wrapper_body)
1743 .map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
1744
1745 let mut cmd = Command::new(shell.binary().as_ref());
1746 match shell {
1747 WindowsShell::Pwsh | WindowsShell::Powershell => {
1748 cmd.args([
1751 "-NoLogo",
1752 "-NoProfile",
1753 "-NonInteractive",
1754 "-ExecutionPolicy",
1755 "Bypass",
1756 "-File",
1757 ]);
1758 cmd.arg(&wrapper_path);
1759 }
1760 WindowsShell::Cmd => {
1761 cmd.args(["/D", "/C"]);
1768 cmd.arg(&wrapper_path);
1769 }
1770 WindowsShell::Posix(_) => {
1771 cmd.arg(&wrapper_path);
1776 }
1777 }
1778
1779 cmd.creation_flags(creation_flags);
1783 Ok(cmd)
1784}
1785
1786fn spawn_detached_child(
1802 command: &str,
1803 paths: &TaskPaths,
1804 workdir: &Path,
1805 env: &HashMap<String, String>,
1806) -> Result<std::process::Child, String> {
1807 #[cfg(not(windows))]
1808 {
1809 let stdout = create_capture_file(&paths.stdout)
1810 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1811 let stderr = create_capture_file(&paths.stderr)
1812 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1813 detached_shell_command(command, &paths.exit)
1814 .current_dir(workdir)
1815 .envs(env)
1816 .stdin(Stdio::null())
1817 .stdout(Stdio::from(stdout))
1818 .stderr(Stdio::from(stderr))
1819 .spawn()
1820 .map_err(|e| format!("failed to spawn background bash command: {e}"))
1821 }
1822 #[cfg(windows)]
1823 {
1824 use crate::windows_shell::shell_candidates;
1825 let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
1836 const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
1857 const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
1858 const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
1859 let with_breakaway =
1860 FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
1861 let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
1862 let mut last_error: Option<String> = None;
1863 for (idx, shell) in candidates.iter().enumerate() {
1864 for &flags in &[with_breakaway, without_breakaway] {
1868 let stdout = create_capture_file(&paths.stdout)
1870 .map_err(|e| format!("failed to open stdout capture file: {e}"))?;
1871 let stderr = create_capture_file(&paths.stderr)
1872 .map_err(|e| format!("failed to open stderr capture file: {e}"))?;
1873 let mut cmd =
1874 detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
1875 cmd.current_dir(workdir)
1876 .envs(env)
1877 .stdin(Stdio::null())
1878 .stdout(Stdio::from(stdout))
1879 .stderr(Stdio::from(stderr));
1880 match cmd.spawn() {
1881 Ok(child) => {
1882 if idx > 0 {
1883 crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
1884 the cached PATH probe disagreed with runtime spawn — likely PATH \
1885 inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
1886 shell.binary(),
1887 idx);
1888 }
1889 if flags == without_breakaway {
1890 crate::slog_warn!(
1891 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
1892 (likely a restrictive Job Object — CI sandbox or MDM policy). \
1893 Spawned without breakaway; the bg task will be torn down if the \
1894 AFT process group is killed."
1895 );
1896 }
1897 return Ok(child);
1898 }
1899 Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
1900 crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
1901 shell.binary());
1902 last_error = Some(format!("{}: {e}", shell.binary()));
1903 break;
1906 }
1907 Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
1908 crate::slog_warn!(
1910 "background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
1911 Access Denied — retrying {} without breakaway",
1912 shell.binary()
1913 );
1914 last_error = Some(format!("{}: {e}", shell.binary()));
1915 continue;
1916 }
1917 Err(e) => {
1918 return Err(format!(
1919 "failed to spawn background bash command via {}: {e}",
1920 shell.binary()
1921 ));
1922 }
1923 }
1924 }
1925 }
1926 Err(format!(
1927 "failed to spawn background bash command: no Windows shell could be spawned. \
1928 Last error: {}. PATH-probed candidates: {:?}",
1929 last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
1930 candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
1931 ))
1932 }
1933}
1934
1935fn random_slug() -> String {
1936 let mut bytes = [0u8; 4];
1937 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
1939 let t = SystemTime::now()
1941 .duration_since(UNIX_EPOCH)
1942 .map(|d| d.subsec_nanos())
1943 .unwrap_or(0);
1944 let p = std::process::id();
1945 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
1946 });
1947 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
1949 format!("bash-{hex}")
1950}
1951
1952#[cfg(test)]
1953mod tests {
1954 use std::collections::HashMap;
1955 #[cfg(windows)]
1956 use std::fs;
1957 use std::sync::{Arc, Mutex};
1958 use std::time::Duration;
1959 #[cfg(windows)]
1960 use std::time::Instant;
1961
1962 use super::*;
1963
1964 #[cfg(unix)]
1965 const QUICK_SUCCESS_COMMAND: &str = "true";
1966 #[cfg(windows)]
1967 const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
1968
1969 #[cfg(unix)]
1970 const LONG_RUNNING_COMMAND: &str = "sleep 5";
1971 #[cfg(windows)]
1972 const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
1973
1974 fn spawn_dead_child() -> std::process::Child {
1979 #[cfg(unix)]
1980 let mut cmd = std::process::Command::new("true");
1981 #[cfg(windows)]
1982 let mut cmd = {
1983 let mut c = std::process::Command::new("cmd");
1984 c.args(["/c", "exit", "0"]);
1985 c
1986 };
1987 cmd.stdin(std::process::Stdio::null());
1988 cmd.stdout(std::process::Stdio::null());
1989 cmd.stderr(std::process::Stdio::null());
1990 let mut child = cmd.spawn().expect("spawn replacement child for reap test");
1991 let started = Instant::now();
2000 loop {
2001 match child.try_wait() {
2002 Ok(Some(_)) => break,
2003 Ok(None) => {
2004 if started.elapsed() > Duration::from_secs(5) {
2005 panic!("dead-child stand-in did not exit within 5s");
2006 }
2007 std::thread::sleep(Duration::from_millis(10));
2008 }
2009 Err(error) => panic!("dead-child try_wait failed: {error}"),
2010 }
2011 }
2012 child
2013 }
2014
2015 #[test]
2016 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
2017 let registry = BgTaskRegistry::default();
2018 let dir = tempfile::tempdir().unwrap();
2019 let task_id = registry
2020 .spawn(
2021 QUICK_SUCCESS_COMMAND,
2022 "session".to_string(),
2023 dir.path().to_path_buf(),
2024 HashMap::new(),
2025 Some(Duration::from_secs(30)),
2026 dir.path().to_path_buf(),
2027 10,
2028 true,
2029 false,
2030 Some(dir.path().to_path_buf()),
2031 )
2032 .unwrap();
2033 registry
2034 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2035 .unwrap();
2036 let completions = registry.drain_completions_for_session(Some("session"));
2037 assert_eq!(completions.len(), 1);
2038 assert_eq!(
2039 registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
2040 vec![task_id.clone()]
2041 );
2042
2043 registry.cleanup_finished(Duration::ZERO);
2044
2045 assert!(registry.inner.tasks.lock().unwrap().is_empty());
2046 }
2047
2048 #[test]
2049 fn cleanup_finished_retains_undelivered_terminals() {
2050 let registry = BgTaskRegistry::default();
2051 let dir = tempfile::tempdir().unwrap();
2052 let task_id = registry
2053 .spawn(
2054 QUICK_SUCCESS_COMMAND,
2055 "session".to_string(),
2056 dir.path().to_path_buf(),
2057 HashMap::new(),
2058 Some(Duration::from_secs(30)),
2059 dir.path().to_path_buf(),
2060 10,
2061 true,
2062 false,
2063 Some(dir.path().to_path_buf()),
2064 )
2065 .unwrap();
2066 registry
2067 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
2068 .unwrap();
2069
2070 registry.cleanup_finished(Duration::ZERO);
2071
2072 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2073 }
2074
2075 #[test]
2085 fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
2086 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2087 let dir = tempfile::tempdir().unwrap();
2088 let task_id = registry
2089 .spawn(
2090 QUICK_SUCCESS_COMMAND,
2091 "session".to_string(),
2092 dir.path().to_path_buf(),
2093 HashMap::new(),
2094 Some(Duration::from_secs(30)),
2095 dir.path().to_path_buf(),
2096 10,
2097 true,
2098 false,
2099 Some(dir.path().to_path_buf()),
2100 )
2101 .unwrap();
2102
2103 let task = registry.task_for_session(&task_id, "session").unwrap();
2104
2105 let started = Instant::now();
2110 loop {
2111 let exited = {
2112 let mut state = task.state.lock().unwrap();
2113 if let Some(child) = state.child.as_mut() {
2114 matches!(child.try_wait(), Ok(Some(_)))
2115 } else {
2116 true
2117 }
2118 };
2119 if exited {
2120 break;
2121 }
2122 assert!(
2123 started.elapsed() < Duration::from_secs(5),
2124 "child should exit quickly"
2125 );
2126 std::thread::sleep(Duration::from_millis(20));
2127 }
2128
2129 registry
2137 .inner
2138 .shutdown
2139 .store(true, std::sync::atomic::Ordering::SeqCst);
2140 std::thread::sleep(Duration::from_millis(550));
2144
2145 let _ = std::fs::remove_file(&task.paths.exit);
2148
2149 {
2166 let mut state = task.state.lock().unwrap();
2167 state.metadata.status = BgTaskStatus::Running;
2168 state.metadata.status_reason = None;
2169 state.metadata.exit_code = None;
2170 state.metadata.finished_at = None;
2171 state.metadata.duration_ms = None;
2172 crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
2175 .expect("persist reset Running metadata for reap_child test");
2176 if state.child.is_none() {
2180 state.child = Some(spawn_dead_child());
2181 }
2182 }
2183 *task.terminal_at.lock().unwrap() = None;
2186
2187 assert!(
2190 task.is_running(),
2191 "precondition: metadata.status == Running"
2192 );
2193 assert!(
2194 !task.paths.exit.exists(),
2195 "precondition: exit marker absent"
2196 );
2197
2198 registry.reap_child(&task);
2202
2203 let state = task.state.lock().unwrap();
2204 assert!(
2205 state.metadata.status.is_terminal(),
2206 "reap_child must transition to terminal when PID dead and no marker. \
2207 Got status={:?}",
2208 state.metadata.status
2209 );
2210 assert_eq!(
2211 state.metadata.status,
2212 BgTaskStatus::Failed,
2213 "must specifically be Failed (not Killed): status={:?}",
2214 state.metadata.status
2215 );
2216 assert_eq!(
2217 state.metadata.status_reason.as_deref(),
2218 Some("process exited without exit marker"),
2219 "reason must match replay path's wording: {:?}",
2220 state.metadata.status_reason
2221 );
2222 assert!(
2223 state.child.is_none(),
2224 "child handle must be released after reap"
2225 );
2226 assert!(state.detached, "task must be marked detached after reap");
2227 }
2228
2229 #[test]
2235 fn reap_child_preserves_running_when_exit_marker_exists() {
2236 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2237 let dir = tempfile::tempdir().unwrap();
2238 let task_id = registry
2239 .spawn(
2240 QUICK_SUCCESS_COMMAND,
2241 "session".to_string(),
2242 dir.path().to_path_buf(),
2243 HashMap::new(),
2244 Some(Duration::from_secs(30)),
2245 dir.path().to_path_buf(),
2246 10,
2247 true,
2248 false,
2249 Some(dir.path().to_path_buf()),
2250 )
2251 .unwrap();
2252
2253 let task = registry.task_for_session(&task_id, "session").unwrap();
2254
2255 let started = Instant::now();
2258 loop {
2259 let exited = {
2260 let mut state = task.state.lock().unwrap();
2261 if let Some(child) = state.child.as_mut() {
2262 matches!(child.try_wait(), Ok(Some(_)))
2263 } else {
2264 true
2265 }
2266 };
2267 if exited && task.paths.exit.exists() {
2268 break;
2269 }
2270 assert!(
2271 started.elapsed() < Duration::from_secs(5),
2272 "child should exit and write marker quickly"
2273 );
2274 std::thread::sleep(Duration::from_millis(20));
2275 }
2276
2277 registry
2283 .inner
2284 .shutdown
2285 .store(true, std::sync::atomic::Ordering::SeqCst);
2286 std::thread::sleep(Duration::from_millis(550));
2287
2288 {
2294 let mut state = task.state.lock().unwrap();
2295 state.metadata.status = BgTaskStatus::Running;
2296 state.metadata.status_reason = None;
2297 if state.child.is_none() {
2298 state.child = Some(spawn_dead_child());
2299 }
2300 }
2301 *task.terminal_at.lock().unwrap() = None;
2302 if !task.paths.exit.exists() {
2305 std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
2306 }
2307
2308 registry.reap_child(&task);
2312
2313 let state = task.state.lock().unwrap();
2314 assert!(
2315 state.child.is_none(),
2316 "child handle still released even when marker exists"
2317 );
2318 assert!(
2319 state.detached,
2320 "task still marked detached even when marker exists"
2321 );
2322 assert_eq!(
2327 state.metadata.status,
2328 BgTaskStatus::Running,
2329 "reap_child must defer to poll_task when marker exists"
2330 );
2331 }
2332
2333 #[test]
2334 fn cleanup_finished_keeps_running_tasks() {
2335 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2336 let dir = tempfile::tempdir().unwrap();
2337 let task_id = registry
2338 .spawn(
2339 LONG_RUNNING_COMMAND,
2340 "session".to_string(),
2341 dir.path().to_path_buf(),
2342 HashMap::new(),
2343 Some(Duration::from_secs(30)),
2344 dir.path().to_path_buf(),
2345 10,
2346 true,
2347 false,
2348 Some(dir.path().to_path_buf()),
2349 )
2350 .unwrap();
2351
2352 registry.cleanup_finished(Duration::ZERO);
2353
2354 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
2355 let _ = registry.kill(&task_id, "session");
2356 }
2357
2358 #[cfg(windows)]
2359 fn wait_for_file(path: &Path) -> String {
2360 let started = Instant::now();
2361 loop {
2362 if path.exists() {
2363 return fs::read_to_string(path).expect("read file");
2364 }
2365 assert!(
2366 started.elapsed() < Duration::from_secs(30),
2367 "timed out waiting for {}",
2368 path.display()
2369 );
2370 std::thread::sleep(Duration::from_millis(100));
2371 }
2372 }
2373
2374 #[cfg(windows)]
2375 fn spawn_windows_registry_command(
2376 command: &str,
2377 ) -> (BgTaskRegistry, tempfile::TempDir, String) {
2378 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
2379 let dir = tempfile::tempdir().unwrap();
2380 let task_id = registry
2381 .spawn(
2382 command,
2383 "session".to_string(),
2384 dir.path().to_path_buf(),
2385 HashMap::new(),
2386 Some(Duration::from_secs(30)),
2387 dir.path().to_path_buf(),
2388 10,
2389 false,
2390 false,
2391 Some(dir.path().to_path_buf()),
2392 )
2393 .unwrap();
2394 (registry, dir, task_id)
2395 }
2396
2397 #[cfg(windows)]
2398 #[test]
2399 fn windows_spawn_writes_exit_marker_for_zero_exit() {
2400 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
2401 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2402
2403 let content = wait_for_file(&exit_path);
2404
2405 assert_eq!(content.trim(), "0");
2406 }
2407
2408 #[cfg(windows)]
2409 #[test]
2410 fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
2411 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
2412 let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
2413
2414 let content = wait_for_file(&exit_path);
2415
2416 assert_eq!(content.trim(), "42");
2417 }
2418
2419 #[cfg(windows)]
2420 #[test]
2421 fn windows_spawn_captures_stdout_to_disk() {
2422 let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
2423 let task = registry.task_for_session(&task_id, "session").unwrap();
2424 let stdout_path = task.paths.stdout.clone();
2425 let exit_path = task.paths.exit.clone();
2426
2427 let _ = wait_for_file(&exit_path);
2428 let stdout = fs::read_to_string(stdout_path).expect("read stdout");
2429
2430 assert!(stdout.contains("hello"), "stdout was {stdout:?}");
2431 }
2432
2433 #[cfg(windows)]
2434 #[test]
2435 fn windows_spawn_uses_pwsh_when_available() {
2436 let candidates = crate::windows_shell::shell_candidates_with(
2440 |binary| match binary {
2441 "pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
2442 "powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
2443 _ => None,
2444 },
2445 || None,
2446 );
2447 let shell = candidates.first().expect("at least one candidate").clone();
2448 assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
2449 assert_eq!(shell.binary().as_ref(), "pwsh.exe");
2450 }
2451
2452 #[cfg(windows)]
2459 #[test]
2460 fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
2461 let exit_path = Path::new(r"C:\Temp\bash-test.exit");
2462 let script =
2463 crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
2464
2465 assert!(
2469 script.contains("set CODE=%ERRORLEVEL%"),
2470 "wrapper must capture exit code into CODE: {script}"
2471 );
2472 assert!(
2473 script.contains("echo %CODE% >"),
2474 "wrapper must echo CODE to a temp marker file: {script}"
2475 );
2476 assert!(
2477 script.contains("move /Y"),
2478 "wrapper must use atomic move to write the marker: {script}"
2479 );
2480 assert!(
2483 script.contains("> nul"),
2484 "wrapper must redirect move output to nul: {script}"
2485 );
2486 assert!(
2488 script.contains("exit /B %CODE%"),
2489 "wrapper must propagate the captured exit code: {script}"
2490 );
2491 assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
2492 assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
2493 }
2494
2495 #[cfg(windows)]
2501 #[test]
2502 fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
2503 use crate::windows_shell::WindowsShell;
2504 let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
2505 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2506 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2507 assert_eq!(
2508 args_strs,
2509 vec!["/D", "/S", "/C", "echo wrapped"],
2510 "Cmd::bg_command must prepend /D /S /C"
2511 );
2512 }
2513
2514 #[cfg(windows)]
2518 #[test]
2519 fn windows_shell_pwsh_bg_command_uses_standard_args() {
2520 use crate::windows_shell::WindowsShell;
2521 let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
2522 let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
2523 let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
2524 assert!(
2525 args_strs.contains(&"-Command"),
2526 "Pwsh::bg_command must use -Command: {args_strs:?}"
2527 );
2528 assert!(
2529 args_strs.contains(&"Get-Date"),
2530 "Pwsh::bg_command must include the user command body"
2531 );
2532 }
2533
2534 #[allow(dead_code)]
2565 #[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
2567}