1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16
17use super::buffer::BgBuffer;
18use super::persistence::{
19 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
20 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
21};
22#[cfg(unix)]
23use super::process::terminate_pgid;
24use super::{BgTaskInfo, BgTaskStatus};
25
26const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
29const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
30
31const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
38
39#[derive(Debug, Clone, Serialize)]
40pub struct BgCompletion {
41 pub task_id: String,
42 #[serde(skip_serializing)]
45 pub session_id: String,
46 pub status: BgTaskStatus,
47 pub exit_code: Option<i32>,
48 pub command: String,
49 #[serde(default, skip_serializing_if = "String::is_empty")]
55 pub output_preview: String,
56 #[serde(default, skip_serializing_if = "is_false")]
61 pub output_truncated: bool,
62}
63
64fn is_false(v: &bool) -> bool {
65 !*v
66}
67
68#[derive(Debug, Clone, Serialize)]
69pub struct BgTaskSnapshot {
70 #[serde(flatten)]
71 pub info: BgTaskInfo,
72 pub exit_code: Option<i32>,
73 pub child_pid: Option<u32>,
74 pub workdir: String,
75 pub output_preview: String,
76 pub output_truncated: bool,
77 pub output_path: Option<String>,
78 pub stderr_path: Option<String>,
79}
80
81#[derive(Clone)]
82pub struct BgTaskRegistry {
83 pub(crate) inner: Arc<RegistryInner>,
84}
85
86pub(crate) struct RegistryInner {
87 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
88 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
89 pub(crate) progress_sender: SharedProgressSender,
90 watchdog_started: AtomicBool,
91 pub(crate) shutdown: AtomicBool,
92}
93
94pub(crate) struct BgTask {
95 pub(crate) task_id: String,
96 pub(crate) session_id: String,
97 pub(crate) paths: TaskPaths,
98 pub(crate) started: Instant,
99 pub(crate) terminal_at: Mutex<Option<Instant>>,
100 pub(crate) state: Mutex<BgTaskState>,
101}
102
103pub(crate) struct BgTaskState {
104 pub(crate) metadata: PersistedTask,
105 pub(crate) child: Option<Child>,
106 pub(crate) detached: bool,
107 pub(crate) buffer: BgBuffer,
108}
109
110impl BgTaskRegistry {
111 pub fn new(progress_sender: SharedProgressSender) -> Self {
112 Self {
113 inner: Arc::new(RegistryInner {
114 tasks: Mutex::new(HashMap::new()),
115 completions: Mutex::new(VecDeque::new()),
116 progress_sender,
117 watchdog_started: AtomicBool::new(false),
118 shutdown: AtomicBool::new(false),
119 }),
120 }
121 }
122
123 #[cfg(unix)]
124 pub fn spawn(
125 &self,
126 command: &str,
127 session_id: String,
128 workdir: PathBuf,
129 env: HashMap<String, String>,
130 timeout: Option<Duration>,
131 storage_dir: PathBuf,
132 max_running: usize,
133 ) -> Result<String, String> {
134 self.start_watchdog();
135
136 let running = self.running_count();
137 if running >= max_running {
138 return Err(format!(
139 "background bash task limit exceeded: {running} running (max {max_running})"
140 ));
141 }
142
143 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
144 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
145 let task_id = self.generate_unique_task_id()?;
146 let paths = task_paths(&storage_dir, &session_id, &task_id);
147 fs::create_dir_all(&paths.dir)
148 .map_err(|e| format!("failed to create background task dir: {e}"))?;
149
150 let mut metadata = PersistedTask::starting(
151 task_id.clone(),
152 session_id.clone(),
153 command.to_string(),
154 workdir.clone(),
155 timeout_ms,
156 );
157 write_task(&paths.json, &metadata)
158 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
159
160 let stdout = create_capture_file(&paths.stdout)
161 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
162 let stderr = create_capture_file(&paths.stderr)
163 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
164
165 let child = detached_shell_command(command, &paths.exit)
166 .current_dir(&workdir)
167 .envs(&env)
168 .stdin(Stdio::null())
169 .stdout(Stdio::from(stdout))
170 .stderr(Stdio::from(stderr))
171 .spawn()
172 .map_err(|e| format!("failed to spawn background bash command: {e}"))?;
173
174 let child_pid = child.id();
175 metadata.mark_running(child_pid, child_pid as i32);
176 write_task(&paths.json, &metadata)
177 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
178
179 let task = Arc::new(BgTask {
180 task_id: task_id.clone(),
181 session_id,
182 paths: paths.clone(),
183 started: Instant::now(),
184 terminal_at: Mutex::new(None),
185 state: Mutex::new(BgTaskState {
186 metadata,
187 child: Some(child),
188 detached: false,
189 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
190 }),
191 });
192
193 self.inner
194 .tasks
195 .lock()
196 .map_err(|_| "background task registry lock poisoned".to_string())?
197 .insert(task_id.clone(), task);
198
199 Ok(task_id)
200 }
201
202 #[cfg(windows)]
203 pub fn spawn(
204 &self,
205 _command: &str,
206 _session_id: String,
207 _workdir: PathBuf,
208 _env: HashMap<String, String>,
209 _timeout: Option<Duration>,
210 _storage_dir: PathBuf,
211 _max_running: usize,
212 ) -> Result<String, String> {
213 Err("background bash is not yet supported on Windows".to_string())
214 }
215
216 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
217 self.start_watchdog();
218 let dir = session_tasks_dir(storage_dir, session_id);
219 if !dir.exists() {
220 return Ok(());
221 }
222
223 let entries = fs::read_dir(&dir)
224 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
225 for entry in entries.flatten() {
226 let path = entry.path();
227 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
228 continue;
229 }
230 let Ok(mut metadata) = read_task(&path) else {
231 continue;
232 };
233 if metadata.session_id != session_id {
234 continue;
235 }
236
237 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
238 match metadata.status {
239 BgTaskStatus::Starting => {
240 metadata.mark_terminal(
241 BgTaskStatus::Failed,
242 None,
243 Some("spawn aborted".to_string()),
244 );
245 let _ = write_task(&paths.json, &metadata);
246 self.enqueue_completion_if_needed(&metadata, false);
247 }
248 BgTaskStatus::Running => {
249 if self.running_metadata_is_stale(&metadata) {
250 metadata.mark_terminal(
251 BgTaskStatus::Killed,
252 None,
253 Some("orphaned (>24h)".to_string()),
254 );
255 if !paths.exit.exists() {
256 let _ = write_kill_marker_if_absent(&paths.exit);
257 }
258 let _ = write_task(&paths.json, &metadata);
259 self.enqueue_completion_if_needed(&metadata, false);
260 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
261 metadata = terminal_metadata_from_marker(metadata, marker, None);
262 let _ = write_task(&paths.json, &metadata);
263 self.enqueue_completion_if_needed(&metadata, false);
264 } else {
265 self.insert_rehydrated_task(metadata, paths, true)?;
266 }
267 }
268 _ if metadata.status.is_terminal() => {
269 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
270 self.enqueue_completion_if_needed(&metadata, false);
271 }
272 _ => {}
273 }
274 }
275
276 Ok(())
277 }
278
279 pub fn status(
280 &self,
281 task_id: &str,
282 session_id: &str,
283 preview_bytes: usize,
284 ) -> Option<BgTaskSnapshot> {
285 let task = self.task_for_session(task_id, session_id)?;
286 let _ = self.poll_task(&task);
287 Some(task.snapshot(preview_bytes))
288 }
289
290 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
291 let tasks = self
292 .inner
293 .tasks
294 .lock()
295 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
296 .unwrap_or_default();
297 tasks
298 .into_iter()
299 .map(|task| {
300 let _ = self.poll_task(&task);
301 task.snapshot(preview_bytes)
302 })
303 .collect()
304 }
305
306 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
307 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
308 }
309
310 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
311 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
312 .map(|_| ())
313 }
314
315 pub fn cleanup_finished(&self, older_than: Duration) {
316 let cutoff = Instant::now().checked_sub(older_than);
317 if let Ok(mut tasks) = self.inner.tasks.lock() {
318 tasks.retain(|_, task| {
319 let is_terminal = task
320 .state
321 .lock()
322 .map(|state| state.metadata.status.is_terminal())
323 .unwrap_or(false);
324 if !is_terminal {
325 return true;
326 }
327
328 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
329 match (terminal_at, cutoff) {
330 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
331 (Some(_), None) => false,
332 (None, _) => true,
333 }
334 });
335 }
336 }
337
338 pub fn drain_completions(&self) -> Vec<BgCompletion> {
339 self.drain_completions_for_session(None)
340 }
341
342 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
343 let mut completions = match self.inner.completions.lock() {
344 Ok(completions) => completions,
345 Err(_) => return Vec::new(),
346 };
347
348 let drained = if let Some(session_id) = session_id {
349 let mut matched = Vec::new();
350 let mut retained = VecDeque::new();
351 while let Some(completion) = completions.pop_front() {
352 if completion.session_id == session_id {
353 matched.push(completion);
354 } else {
355 retained.push_back(completion);
356 }
357 }
358 *completions = retained;
359 matched
360 } else {
361 completions.drain(..).collect()
362 };
363 drop(completions);
364
365 for completion in &drained {
366 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
367 let _ = task.set_completion_delivered(true);
368 }
369 }
370
371 drained
372 }
373
374 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
375 self.inner
376 .completions
377 .lock()
378 .map(|completions| {
379 completions
380 .iter()
381 .filter(|completion| completion.session_id == session_id)
382 .cloned()
383 .collect()
384 })
385 .unwrap_or_default()
386 }
387
388 pub fn detach(&self) {
389 self.inner.shutdown.store(true, Ordering::SeqCst);
390 if let Ok(mut tasks) = self.inner.tasks.lock() {
391 for task in tasks.values() {
392 if let Ok(mut state) = task.state.lock() {
393 state.child = None;
394 state.detached = true;
395 }
396 }
397 tasks.clear();
398 }
399 }
400
401 pub fn shutdown(&self) {
402 let tasks = self
403 .inner
404 .tasks
405 .lock()
406 .map(|tasks| {
407 tasks
408 .values()
409 .map(|task| (task.task_id.clone(), task.session_id.clone()))
410 .collect::<Vec<_>>()
411 })
412 .unwrap_or_default();
413 for (task_id, session_id) in tasks {
414 let _ = self.kill(&task_id, &session_id);
415 }
416 }
417
418 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
419 let marker = match read_exit_marker(&task.paths.exit) {
420 Ok(Some(marker)) => marker,
421 Ok(None) => return Ok(()),
422 Err(error) => return Err(format!("failed to read exit marker: {error}")),
423 };
424 self.finalize_from_marker(task, marker, None)
425 }
426
427 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
428 let Ok(mut state) = task.state.lock() else {
429 return;
430 };
431 if let Some(child) = state.child.as_mut() {
432 if matches!(child.try_wait(), Ok(Some(_))) {
433 state.child = None;
434 state.detached = true;
435 }
436 }
437 }
438
439 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
440 self.inner
441 .tasks
442 .lock()
443 .map(|tasks| {
444 tasks
445 .values()
446 .filter(|task| task.is_running())
447 .cloned()
448 .collect()
449 })
450 .unwrap_or_default()
451 }
452
453 fn insert_rehydrated_task(
454 &self,
455 metadata: PersistedTask,
456 paths: TaskPaths,
457 detached: bool,
458 ) -> Result<(), String> {
459 let task_id = metadata.task_id.clone();
460 let session_id = metadata.session_id.clone();
461 let task = Arc::new(BgTask {
462 task_id: task_id.clone(),
463 session_id,
464 paths: paths.clone(),
465 started: Instant::now(),
466 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
467 state: Mutex::new(BgTaskState {
468 metadata,
469 child: None,
470 detached,
471 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
472 }),
473 });
474 self.inner
475 .tasks
476 .lock()
477 .map_err(|_| "background task registry lock poisoned".to_string())?
478 .insert(task_id, task);
479 Ok(())
480 }
481
482 fn kill_with_status(
483 &self,
484 task_id: &str,
485 session_id: &str,
486 terminal_status: BgTaskStatus,
487 ) -> Result<BgTaskSnapshot, String> {
488 let task = self
489 .task_for_session(task_id, session_id)
490 .ok_or_else(|| format!("background task not found: {task_id}"))?;
491
492 {
493 let mut state = task
494 .state
495 .lock()
496 .map_err(|_| "background task lock poisoned".to_string())?;
497 if state.metadata.status.is_terminal() {
498 return Ok(task.snapshot_locked(&state, 5 * 1024));
499 }
500
501 state.metadata.status = BgTaskStatus::Killing;
502 write_task(&task.paths.json, &state.metadata)
503 .map_err(|e| format!("failed to persist killing state: {e}"))?;
504
505 #[cfg(unix)]
506 if let Some(pgid) = state.metadata.pgid {
507 terminate_pgid(pgid, state.child.as_mut());
508 }
509 if let Some(child) = state.child.as_mut() {
510 let _ = child.wait();
511 }
512 state.child = None;
513 state.detached = true;
514
515 if !task.paths.exit.exists() {
516 write_kill_marker_if_absent(&task.paths.exit)
517 .map_err(|e| format!("failed to write kill marker: {e}"))?;
518 }
519
520 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
521 Some(124)
522 } else {
523 None
524 };
525 state
526 .metadata
527 .mark_terminal(terminal_status, exit_code, None);
528 task.mark_terminal_now();
529 write_task(&task.paths.json, &state.metadata)
530 .map_err(|e| format!("failed to persist killed state: {e}"))?;
531 state.buffer.enforce_terminal_cap();
532 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
533 }
534
535 Ok(task.snapshot(5 * 1024))
536 }
537
538 fn finalize_from_marker(
539 &self,
540 task: &Arc<BgTask>,
541 marker: ExitMarker,
542 reason: Option<String>,
543 ) -> Result<(), String> {
544 let mut state = task
545 .state
546 .lock()
547 .map_err(|_| "background task lock poisoned".to_string())?;
548 if state.metadata.status.is_terminal() {
549 return Ok(());
550 }
551
552 let updated = update_task(&task.paths.json, |metadata| {
553 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
554 *metadata = new_metadata;
555 })
556 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
557 state.metadata = updated;
558 task.mark_terminal_now();
559 state.child = None;
560 state.detached = true;
561 state.buffer.enforce_terminal_cap();
562 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
563 Ok(())
564 }
565
566 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
567 if metadata.status.is_terminal() && !metadata.completion_delivered {
568 self.enqueue_completion_locked(metadata, None, emit_frame);
569 }
570 }
571
572 fn enqueue_completion_locked(
573 &self,
574 metadata: &PersistedTask,
575 buffer: Option<&BgBuffer>,
576 emit_frame: bool,
577 ) {
578 if !metadata.status.is_terminal() || metadata.completion_delivered {
579 return;
580 }
581 let (output_preview, output_truncated) = match buffer {
586 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
587 None => (String::new(), false),
588 };
589 let completion = BgCompletion {
590 task_id: metadata.task_id.clone(),
591 session_id: metadata.session_id.clone(),
592 status: metadata.status.clone(),
593 exit_code: metadata.exit_code,
594 command: metadata.command.clone(),
595 output_preview,
596 output_truncated,
597 };
598 if let Ok(mut completions) = self.inner.completions.lock() {
599 if completions
600 .iter()
601 .any(|completion| completion.task_id == metadata.task_id)
602 {
603 return;
604 }
605 completions.push_back(completion.clone());
606 } else {
607 return;
608 }
609
610 if emit_frame {
611 self.emit_bash_completed(completion);
612 }
613 }
614
615 fn emit_bash_completed(&self, completion: BgCompletion) {
616 let Ok(progress_sender) = self
617 .inner
618 .progress_sender
619 .lock()
620 .map(|sender| sender.clone())
621 else {
622 return;
623 };
624 let Some(sender) = progress_sender.as_ref() else {
625 return;
626 };
627 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
635 completion.task_id,
636 completion.session_id,
637 completion.status,
638 completion.exit_code,
639 completion.command,
640 completion.output_preview,
641 completion.output_truncated,
642 )));
643 }
644
645 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
646 self.inner
647 .tasks
648 .lock()
649 .ok()
650 .and_then(|tasks| tasks.get(task_id).cloned())
651 }
652
653 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
654 self.task(task_id)
655 .filter(|task| task.session_id == session_id)
656 }
657
658 fn running_count(&self) -> usize {
659 self.inner
660 .tasks
661 .lock()
662 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
663 .unwrap_or(0)
664 }
665
666 fn start_watchdog(&self) {
667 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
668 super::watchdog::start(self.clone());
669 }
670 }
671
672 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
673 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
674 }
675
676 #[cfg(test)]
677 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
678 self.task_for_session(task_id, session_id)
679 .map(|task| task.paths.json.clone())
680 }
681
682 #[cfg(test)]
683 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
684 self.task_for_session(task_id, session_id)
685 .map(|task| task.paths.exit.clone())
686 }
687
688 fn generate_unique_task_id(&self) -> Result<String, String> {
690 for _ in 0..32 {
691 let candidate = random_slug();
692 let tasks = self
693 .inner
694 .tasks
695 .lock()
696 .map_err(|_| "background task registry lock poisoned".to_string())?;
697 if tasks.contains_key(&candidate) {
698 continue;
699 }
700 let completions = self
701 .inner
702 .completions
703 .lock()
704 .map_err(|_| "background completions lock poisoned".to_string())?;
705 if completions
706 .iter()
707 .any(|completion| completion.task_id == candidate)
708 {
709 continue;
710 }
711 return Ok(candidate);
712 }
713 Err("failed to allocate unique background task id after 32 attempts".to_string())
714 }
715}
716
717impl Default for BgTaskRegistry {
718 fn default() -> Self {
719 Self::new(Arc::new(Mutex::new(None)))
720 }
721}
722
723impl BgTask {
724 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
725 let state = self
726 .state
727 .lock()
728 .unwrap_or_else(|poison| poison.into_inner());
729 self.snapshot_locked(&state, preview_bytes)
730 }
731
732 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
733 let metadata = &state.metadata;
734 let duration_ms = metadata.duration_ms.or_else(|| {
735 metadata
736 .status
737 .is_terminal()
738 .then(|| self.started.elapsed().as_millis() as u64)
739 });
740 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
741 BgTaskSnapshot {
742 info: BgTaskInfo {
743 task_id: self.task_id.clone(),
744 status: metadata.status.clone(),
745 command: metadata.command.clone(),
746 started_at: metadata.started_at,
747 duration_ms,
748 },
749 exit_code: metadata.exit_code,
750 child_pid: metadata.child_pid,
751 workdir: metadata.workdir.display().to_string(),
752 output_preview,
753 output_truncated,
754 output_path: state
755 .buffer
756 .output_path()
757 .map(|path| path.display().to_string()),
758 stderr_path: Some(state.buffer.stderr_path().display().to_string()),
759 }
760 }
761
762 pub(crate) fn is_running(&self) -> bool {
763 self.state
764 .lock()
765 .map(|state| state.metadata.status == BgTaskStatus::Running)
766 .unwrap_or(false)
767 }
768
769 fn mark_terminal_now(&self) {
770 if let Ok(mut terminal_at) = self.terminal_at.lock() {
771 if terminal_at.is_none() {
772 *terminal_at = Some(Instant::now());
773 }
774 }
775 }
776
777 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
778 let mut state = self
779 .state
780 .lock()
781 .map_err(|_| "background task lock poisoned".to_string())?;
782 let updated = update_task(&self.paths.json, |metadata| {
783 metadata.completion_delivered = delivered;
784 })
785 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
786 state.metadata = updated;
787 Ok(())
788 }
789}
790
791fn terminal_metadata_from_marker(
792 mut metadata: PersistedTask,
793 marker: ExitMarker,
794 reason: Option<String>,
795) -> PersistedTask {
796 match marker {
797 ExitMarker::Code(code) => {
798 let status = if code == 0 {
799 BgTaskStatus::Completed
800 } else {
801 BgTaskStatus::Failed
802 };
803 metadata.mark_terminal(status, Some(code), reason);
804 }
805 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
806 }
807 metadata
808}
809
810#[cfg(unix)]
811fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
812 let mut cmd = Command::new("/bin/sh");
813 cmd.arg("-c")
814 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
815 .arg("/bin/sh")
816 .arg(command)
817 .arg(exit_path);
818 unsafe {
819 cmd.pre_exec(|| {
820 if libc::setsid() == -1 {
821 return Err(std::io::Error::last_os_error());
822 }
823 Ok(())
824 });
825 }
826 cmd
827}
828
829fn random_slug() -> String {
830 let mut bytes = [0u8; 4];
831 getrandom::fill(&mut bytes).unwrap_or_else(|_| {
833 let t = SystemTime::now()
835 .duration_since(UNIX_EPOCH)
836 .map(|d| d.subsec_nanos())
837 .unwrap_or(0);
838 let p = std::process::id();
839 bytes.copy_from_slice(&(t ^ p).to_le_bytes());
840 });
841 let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
843 format!("bgb-{hex}")
844}
845
846#[cfg(test)]
847mod tests {
848 use std::collections::HashMap;
849 use std::sync::{Arc, Mutex};
850 use std::time::Duration;
851
852 use super::*;
853
854 #[test]
855 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
856 let registry = BgTaskRegistry::default();
857 let dir = tempfile::tempdir().unwrap();
858 let task_id = registry
859 .spawn(
860 "true",
861 "session".to_string(),
862 dir.path().to_path_buf(),
863 HashMap::new(),
864 Some(Duration::from_secs(30)),
865 dir.path().to_path_buf(),
866 10,
867 )
868 .unwrap();
869 registry
870 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
871 .unwrap();
872
873 registry.cleanup_finished(Duration::ZERO);
874
875 assert!(registry.inner.tasks.lock().unwrap().is_empty());
876 }
877
878 #[test]
879 fn cleanup_finished_keeps_running_tasks() {
880 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
881 let dir = tempfile::tempdir().unwrap();
882 let task_id = registry
883 .spawn(
884 "sleep 5",
885 "session".to_string(),
886 dir.path().to_path_buf(),
887 HashMap::new(),
888 Some(Duration::from_secs(30)),
889 dir.path().to_path_buf(),
890 10,
891 )
892 .unwrap();
893
894 registry.cleanup_finished(Duration::ZERO);
895
896 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
897 let _ = registry.kill(&task_id, "session");
898 }
899}