1use std::collections::{HashMap, VecDeque};
2use std::fs;
3use std::path::{Path, PathBuf};
4use std::process::{Child, Command, Stdio};
5use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
8
9use serde::Serialize;
10
11use crate::context::SharedProgressSender;
12use crate::protocol::{BashCompletedFrame, PushFrame};
13
14#[cfg(unix)]
15use std::os::unix::process::CommandExt;
16
17use super::buffer::BgBuffer;
18use super::persistence::{
19 create_capture_file, read_exit_marker, read_task, session_tasks_dir, task_paths, unix_millis,
20 update_task, write_kill_marker_if_absent, write_task, ExitMarker, PersistedTask, TaskPaths,
21};
22#[cfg(unix)]
23use super::process::terminate_pgid;
24use super::{BgTaskInfo, BgTaskStatus};
25
26const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
29const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
30
31const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
38
39#[derive(Debug, Clone, Serialize)]
40pub struct BgCompletion {
41 pub task_id: String,
42 #[serde(skip_serializing)]
45 pub session_id: String,
46 pub status: BgTaskStatus,
47 pub exit_code: Option<i32>,
48 pub command: String,
49 #[serde(default, skip_serializing_if = "String::is_empty")]
55 pub output_preview: String,
56 #[serde(default, skip_serializing_if = "is_false")]
61 pub output_truncated: bool,
62}
63
64fn is_false(v: &bool) -> bool {
65 !*v
66}
67
68#[derive(Debug, Clone, Serialize)]
69pub struct BgTaskSnapshot {
70 #[serde(flatten)]
71 pub info: BgTaskInfo,
72 pub exit_code: Option<i32>,
73 pub child_pid: Option<u32>,
74 pub workdir: String,
75 pub output_preview: String,
76 pub output_truncated: bool,
77 pub output_path: Option<String>,
78}
79
80#[derive(Clone)]
81pub struct BgTaskRegistry {
82 pub(crate) inner: Arc<RegistryInner>,
83}
84
85pub(crate) struct RegistryInner {
86 pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
87 pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
88 pub(crate) progress_sender: SharedProgressSender,
89 watchdog_started: AtomicBool,
90 pub(crate) shutdown: AtomicBool,
91}
92
93pub(crate) struct BgTask {
94 pub(crate) task_id: String,
95 pub(crate) session_id: String,
96 pub(crate) paths: TaskPaths,
97 pub(crate) started: Instant,
98 pub(crate) terminal_at: Mutex<Option<Instant>>,
99 pub(crate) state: Mutex<BgTaskState>,
100}
101
102pub(crate) struct BgTaskState {
103 pub(crate) metadata: PersistedTask,
104 pub(crate) child: Option<Child>,
105 pub(crate) detached: bool,
106 pub(crate) buffer: BgBuffer,
107}
108
109impl BgTaskRegistry {
110 pub fn new(progress_sender: SharedProgressSender) -> Self {
111 Self {
112 inner: Arc::new(RegistryInner {
113 tasks: Mutex::new(HashMap::new()),
114 completions: Mutex::new(VecDeque::new()),
115 progress_sender,
116 watchdog_started: AtomicBool::new(false),
117 shutdown: AtomicBool::new(false),
118 }),
119 }
120 }
121
122 #[cfg(unix)]
123 pub fn spawn(
124 &self,
125 command: &str,
126 session_id: String,
127 workdir: PathBuf,
128 env: HashMap<String, String>,
129 timeout: Option<Duration>,
130 storage_dir: PathBuf,
131 max_running: usize,
132 ) -> Result<String, String> {
133 self.start_watchdog();
134
135 let running = self.running_count();
136 if running >= max_running {
137 return Err(format!(
138 "background bash task limit exceeded: {running} running (max {max_running})"
139 ));
140 }
141
142 let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
143 let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
144 let task_id = self.generate_unique_task_id()?;
145 let paths = task_paths(&storage_dir, &session_id, &task_id);
146 fs::create_dir_all(&paths.dir)
147 .map_err(|e| format!("failed to create background task dir: {e}"))?;
148
149 let mut metadata = PersistedTask::starting(
150 task_id.clone(),
151 session_id.clone(),
152 command.to_string(),
153 workdir.clone(),
154 timeout_ms,
155 );
156 write_task(&paths.json, &metadata)
157 .map_err(|e| format!("failed to persist background task metadata: {e}"))?;
158
159 let stdout = create_capture_file(&paths.stdout)
160 .map_err(|e| format!("failed to create stdout capture file: {e}"))?;
161 let stderr = create_capture_file(&paths.stderr)
162 .map_err(|e| format!("failed to create stderr capture file: {e}"))?;
163
164 let child = detached_shell_command(command, &paths.exit)
165 .current_dir(&workdir)
166 .envs(&env)
167 .stdin(Stdio::null())
168 .stdout(Stdio::from(stdout))
169 .stderr(Stdio::from(stderr))
170 .spawn()
171 .map_err(|e| format!("failed to spawn background bash command: {e}"))?;
172
173 let child_pid = child.id();
174 metadata.mark_running(child_pid, child_pid as i32);
175 write_task(&paths.json, &metadata)
176 .map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
177
178 let task = Arc::new(BgTask {
179 task_id: task_id.clone(),
180 session_id,
181 paths: paths.clone(),
182 started: Instant::now(),
183 terminal_at: Mutex::new(None),
184 state: Mutex::new(BgTaskState {
185 metadata,
186 child: Some(child),
187 detached: false,
188 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
189 }),
190 });
191
192 self.inner
193 .tasks
194 .lock()
195 .map_err(|_| "background task registry lock poisoned".to_string())?
196 .insert(task_id.clone(), task);
197
198 Ok(task_id)
199 }
200
201 #[cfg(windows)]
202 pub fn spawn(
203 &self,
204 _command: &str,
205 _session_id: String,
206 _workdir: PathBuf,
207 _env: HashMap<String, String>,
208 _timeout: Option<Duration>,
209 _storage_dir: PathBuf,
210 _max_running: usize,
211 ) -> Result<String, String> {
212 Err("background bash is not yet supported on Windows".to_string())
213 }
214
215 pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
216 self.start_watchdog();
217 let dir = session_tasks_dir(storage_dir, session_id);
218 if !dir.exists() {
219 return Ok(());
220 }
221
222 let entries = fs::read_dir(&dir)
223 .map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
224 for entry in entries.flatten() {
225 let path = entry.path();
226 if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
227 continue;
228 }
229 let Ok(mut metadata) = read_task(&path) else {
230 continue;
231 };
232 if metadata.session_id != session_id {
233 continue;
234 }
235
236 let paths = task_paths(storage_dir, session_id, &metadata.task_id);
237 match metadata.status {
238 BgTaskStatus::Starting => {
239 metadata.mark_terminal(
240 BgTaskStatus::Failed,
241 None,
242 Some("spawn aborted".to_string()),
243 );
244 let _ = write_task(&paths.json, &metadata);
245 self.enqueue_completion_if_needed(&metadata, false);
246 }
247 BgTaskStatus::Running => {
248 if self.running_metadata_is_stale(&metadata) {
249 metadata.mark_terminal(
250 BgTaskStatus::Killed,
251 None,
252 Some("orphaned (>24h)".to_string()),
253 );
254 if !paths.exit.exists() {
255 let _ = write_kill_marker_if_absent(&paths.exit);
256 }
257 let _ = write_task(&paths.json, &metadata);
258 self.enqueue_completion_if_needed(&metadata, false);
259 } else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
260 metadata = terminal_metadata_from_marker(metadata, marker, None);
261 let _ = write_task(&paths.json, &metadata);
262 self.enqueue_completion_if_needed(&metadata, false);
263 } else {
264 self.insert_rehydrated_task(metadata, paths, true)?;
265 }
266 }
267 _ if metadata.status.is_terminal() => {
268 self.insert_rehydrated_task(metadata.clone(), paths, true)?;
269 self.enqueue_completion_if_needed(&metadata, false);
270 }
271 _ => {}
272 }
273 }
274
275 Ok(())
276 }
277
278 pub fn status(
279 &self,
280 task_id: &str,
281 session_id: &str,
282 preview_bytes: usize,
283 ) -> Option<BgTaskSnapshot> {
284 let task = self.task_for_session(task_id, session_id)?;
285 let _ = self.poll_task(&task);
286 Some(task.snapshot(preview_bytes))
287 }
288
289 pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
290 let tasks = self
291 .inner
292 .tasks
293 .lock()
294 .map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
295 .unwrap_or_default();
296 tasks
297 .into_iter()
298 .map(|task| {
299 let _ = self.poll_task(&task);
300 task.snapshot(preview_bytes)
301 })
302 .collect()
303 }
304
305 pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
306 self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
307 }
308
309 pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
310 self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
311 .map(|_| ())
312 }
313
314 pub fn cleanup_finished(&self, older_than: Duration) {
315 let cutoff = Instant::now().checked_sub(older_than);
316 if let Ok(mut tasks) = self.inner.tasks.lock() {
317 tasks.retain(|_, task| {
318 let is_terminal = task
319 .state
320 .lock()
321 .map(|state| state.metadata.status.is_terminal())
322 .unwrap_or(false);
323 if !is_terminal {
324 return true;
325 }
326
327 let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
328 match (terminal_at, cutoff) {
329 (Some(terminal_at), Some(cutoff)) => terminal_at > cutoff,
330 (Some(_), None) => false,
331 (None, _) => true,
332 }
333 });
334 }
335 }
336
337 pub fn drain_completions(&self) -> Vec<BgCompletion> {
338 self.drain_completions_for_session(None)
339 }
340
341 pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
342 let mut completions = match self.inner.completions.lock() {
343 Ok(completions) => completions,
344 Err(_) => return Vec::new(),
345 };
346
347 let drained = if let Some(session_id) = session_id {
348 let mut matched = Vec::new();
349 let mut retained = VecDeque::new();
350 while let Some(completion) = completions.pop_front() {
351 if completion.session_id == session_id {
352 matched.push(completion);
353 } else {
354 retained.push_back(completion);
355 }
356 }
357 *completions = retained;
358 matched
359 } else {
360 completions.drain(..).collect()
361 };
362 drop(completions);
363
364 for completion in &drained {
365 if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
366 let _ = task.set_completion_delivered(true);
367 }
368 }
369
370 drained
371 }
372
373 pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
374 self.inner
375 .completions
376 .lock()
377 .map(|completions| {
378 completions
379 .iter()
380 .filter(|completion| completion.session_id == session_id)
381 .cloned()
382 .collect()
383 })
384 .unwrap_or_default()
385 }
386
387 pub fn detach(&self) {
388 self.inner.shutdown.store(true, Ordering::SeqCst);
389 if let Ok(mut tasks) = self.inner.tasks.lock() {
390 for task in tasks.values() {
391 if let Ok(mut state) = task.state.lock() {
392 state.child = None;
393 state.detached = true;
394 }
395 }
396 tasks.clear();
397 }
398 }
399
400 pub fn shutdown(&self) {
401 let tasks = self
402 .inner
403 .tasks
404 .lock()
405 .map(|tasks| {
406 tasks
407 .values()
408 .map(|task| (task.task_id.clone(), task.session_id.clone()))
409 .collect::<Vec<_>>()
410 })
411 .unwrap_or_default();
412 for (task_id, session_id) in tasks {
413 let _ = self.kill(&task_id, &session_id);
414 }
415 }
416
417 pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
418 let marker = match read_exit_marker(&task.paths.exit) {
419 Ok(Some(marker)) => marker,
420 Ok(None) => return Ok(()),
421 Err(error) => return Err(format!("failed to read exit marker: {error}")),
422 };
423 self.finalize_from_marker(task, marker, None)
424 }
425
426 pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
427 let Ok(mut state) = task.state.lock() else {
428 return;
429 };
430 if let Some(child) = state.child.as_mut() {
431 if matches!(child.try_wait(), Ok(Some(_))) {
432 state.child = None;
433 state.detached = true;
434 }
435 }
436 }
437
438 pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
439 self.inner
440 .tasks
441 .lock()
442 .map(|tasks| {
443 tasks
444 .values()
445 .filter(|task| task.is_running())
446 .cloned()
447 .collect()
448 })
449 .unwrap_or_default()
450 }
451
452 fn insert_rehydrated_task(
453 &self,
454 metadata: PersistedTask,
455 paths: TaskPaths,
456 detached: bool,
457 ) -> Result<(), String> {
458 let task_id = metadata.task_id.clone();
459 let session_id = metadata.session_id.clone();
460 let task = Arc::new(BgTask {
461 task_id: task_id.clone(),
462 session_id,
463 paths: paths.clone(),
464 started: Instant::now(),
465 terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
466 state: Mutex::new(BgTaskState {
467 metadata,
468 child: None,
469 detached,
470 buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
471 }),
472 });
473 self.inner
474 .tasks
475 .lock()
476 .map_err(|_| "background task registry lock poisoned".to_string())?
477 .insert(task_id, task);
478 Ok(())
479 }
480
481 fn kill_with_status(
482 &self,
483 task_id: &str,
484 session_id: &str,
485 terminal_status: BgTaskStatus,
486 ) -> Result<BgTaskSnapshot, String> {
487 let task = self
488 .task_for_session(task_id, session_id)
489 .ok_or_else(|| format!("background task not found: {task_id}"))?;
490
491 {
492 let mut state = task
493 .state
494 .lock()
495 .map_err(|_| "background task lock poisoned".to_string())?;
496 if state.metadata.status.is_terminal() {
497 return Ok(task.snapshot_locked(&state, 5 * 1024));
498 }
499
500 state.metadata.status = BgTaskStatus::Killing;
501 write_task(&task.paths.json, &state.metadata)
502 .map_err(|e| format!("failed to persist killing state: {e}"))?;
503
504 #[cfg(unix)]
505 if let Some(pgid) = state.metadata.pgid {
506 terminate_pgid(pgid, state.child.as_mut());
507 }
508 if let Some(child) = state.child.as_mut() {
509 let _ = child.wait();
510 }
511 state.child = None;
512 state.detached = true;
513
514 if !task.paths.exit.exists() {
515 write_kill_marker_if_absent(&task.paths.exit)
516 .map_err(|e| format!("failed to write kill marker: {e}"))?;
517 }
518
519 let exit_code = if terminal_status == BgTaskStatus::TimedOut {
520 Some(124)
521 } else {
522 None
523 };
524 state
525 .metadata
526 .mark_terminal(terminal_status, exit_code, None);
527 task.mark_terminal_now();
528 write_task(&task.paths.json, &state.metadata)
529 .map_err(|e| format!("failed to persist killed state: {e}"))?;
530 state.buffer.enforce_terminal_cap();
531 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
532 }
533
534 Ok(task.snapshot(5 * 1024))
535 }
536
537 fn finalize_from_marker(
538 &self,
539 task: &Arc<BgTask>,
540 marker: ExitMarker,
541 reason: Option<String>,
542 ) -> Result<(), String> {
543 let mut state = task
544 .state
545 .lock()
546 .map_err(|_| "background task lock poisoned".to_string())?;
547 if state.metadata.status.is_terminal() {
548 return Ok(());
549 }
550
551 let updated = update_task(&task.paths.json, |metadata| {
552 let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
553 *metadata = new_metadata;
554 })
555 .map_err(|e| format!("failed to persist terminal state: {e}"))?;
556 state.metadata = updated;
557 task.mark_terminal_now();
558 state.child = None;
559 state.detached = true;
560 state.buffer.enforce_terminal_cap();
561 self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
562 Ok(())
563 }
564
565 fn enqueue_completion_if_needed(&self, metadata: &PersistedTask, emit_frame: bool) {
566 if metadata.status.is_terminal() && !metadata.completion_delivered {
567 self.enqueue_completion_locked(metadata, None, emit_frame);
568 }
569 }
570
571 fn enqueue_completion_locked(
572 &self,
573 metadata: &PersistedTask,
574 buffer: Option<&BgBuffer>,
575 emit_frame: bool,
576 ) {
577 if !metadata.status.is_terminal() || metadata.completion_delivered {
578 return;
579 }
580 let (output_preview, output_truncated) = match buffer {
585 Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
586 None => (String::new(), false),
587 };
588 let completion = BgCompletion {
589 task_id: metadata.task_id.clone(),
590 session_id: metadata.session_id.clone(),
591 status: metadata.status.clone(),
592 exit_code: metadata.exit_code,
593 command: metadata.command.clone(),
594 output_preview,
595 output_truncated,
596 };
597 if let Ok(mut completions) = self.inner.completions.lock() {
598 if completions
599 .iter()
600 .any(|completion| completion.task_id == metadata.task_id)
601 {
602 return;
603 }
604 completions.push_back(completion.clone());
605 } else {
606 return;
607 }
608
609 if emit_frame {
610 self.emit_bash_completed(completion);
611 }
612 }
613
614 fn emit_bash_completed(&self, completion: BgCompletion) {
615 let Ok(progress_sender) = self
616 .inner
617 .progress_sender
618 .lock()
619 .map(|sender| sender.clone())
620 else {
621 return;
622 };
623 let Some(sender) = progress_sender.as_ref() else {
624 return;
625 };
626 sender(PushFrame::BashCompleted(BashCompletedFrame::new(
634 completion.task_id,
635 completion.session_id,
636 completion.status,
637 completion.exit_code,
638 completion.command,
639 completion.output_preview,
640 completion.output_truncated,
641 )));
642 }
643
644 fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
645 self.inner
646 .tasks
647 .lock()
648 .ok()
649 .and_then(|tasks| tasks.get(task_id).cloned())
650 }
651
652 fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
653 self.task(task_id)
654 .filter(|task| task.session_id == session_id)
655 }
656
657 fn running_count(&self) -> usize {
658 self.inner
659 .tasks
660 .lock()
661 .map(|tasks| tasks.values().filter(|task| task.is_running()).count())
662 .unwrap_or(0)
663 }
664
665 fn start_watchdog(&self) {
666 if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
667 super::watchdog::start(self.clone());
668 }
669 }
670
671 fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
672 unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
673 }
674
675 #[cfg(test)]
676 pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
677 self.task_for_session(task_id, session_id)
678 .map(|task| task.paths.json.clone())
679 }
680
681 #[cfg(test)]
682 pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
683 self.task_for_session(task_id, session_id)
684 .map(|task| task.paths.exit.clone())
685 }
686
687 fn generate_unique_task_id(&self) -> Result<String, String> {
689 for _ in 0..32 {
690 let candidate = random_slug();
691 let tasks = self
692 .inner
693 .tasks
694 .lock()
695 .map_err(|_| "background task registry lock poisoned".to_string())?;
696 if tasks.contains_key(&candidate) {
697 continue;
698 }
699 let completions = self
700 .inner
701 .completions
702 .lock()
703 .map_err(|_| "background completions lock poisoned".to_string())?;
704 if completions
705 .iter()
706 .any(|completion| completion.task_id == candidate)
707 {
708 continue;
709 }
710 return Ok(candidate);
711 }
712 Err("failed to allocate unique background task id after 32 attempts".to_string())
713 }
714}
715
716impl Default for BgTaskRegistry {
717 fn default() -> Self {
718 Self::new(Arc::new(Mutex::new(None)))
719 }
720}
721
722impl BgTask {
723 fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
724 let state = self
725 .state
726 .lock()
727 .unwrap_or_else(|poison| poison.into_inner());
728 self.snapshot_locked(&state, preview_bytes)
729 }
730
731 fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
732 let metadata = &state.metadata;
733 let duration_ms = metadata.duration_ms.or_else(|| {
734 metadata
735 .status
736 .is_terminal()
737 .then(|| self.started.elapsed().as_millis() as u64)
738 });
739 let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
740 BgTaskSnapshot {
741 info: BgTaskInfo {
742 task_id: self.task_id.clone(),
743 status: metadata.status.clone(),
744 command: metadata.command.clone(),
745 started_at: metadata.started_at,
746 duration_ms,
747 },
748 exit_code: metadata.exit_code,
749 child_pid: metadata.child_pid,
750 workdir: metadata.workdir.display().to_string(),
751 output_preview,
752 output_truncated,
753 output_path: state
754 .buffer
755 .output_path()
756 .map(|path| path.display().to_string()),
757 }
758 }
759
760 pub(crate) fn is_running(&self) -> bool {
761 self.state
762 .lock()
763 .map(|state| state.metadata.status == BgTaskStatus::Running)
764 .unwrap_or(false)
765 }
766
767 fn mark_terminal_now(&self) {
768 if let Ok(mut terminal_at) = self.terminal_at.lock() {
769 if terminal_at.is_none() {
770 *terminal_at = Some(Instant::now());
771 }
772 }
773 }
774
775 fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
776 let mut state = self
777 .state
778 .lock()
779 .map_err(|_| "background task lock poisoned".to_string())?;
780 let updated = update_task(&self.paths.json, |metadata| {
781 metadata.completion_delivered = delivered;
782 })
783 .map_err(|e| format!("failed to update completion delivery: {e}"))?;
784 state.metadata = updated;
785 Ok(())
786 }
787}
788
789fn terminal_metadata_from_marker(
790 mut metadata: PersistedTask,
791 marker: ExitMarker,
792 reason: Option<String>,
793) -> PersistedTask {
794 match marker {
795 ExitMarker::Code(code) => {
796 let status = if code == 0 {
797 BgTaskStatus::Completed
798 } else {
799 BgTaskStatus::Failed
800 };
801 metadata.mark_terminal(status, Some(code), reason);
802 }
803 ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
804 }
805 metadata
806}
807
808#[cfg(unix)]
809fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
810 let mut cmd = Command::new("/bin/sh");
811 cmd.arg("-c")
812 .arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
813 .arg("/bin/sh")
814 .arg(command)
815 .arg(exit_path);
816 unsafe {
817 cmd.pre_exec(|| {
818 if libc::setsid() == -1 {
819 return Err(std::io::Error::last_os_error());
820 }
821 Ok(())
822 });
823 }
824 cmd
825}
826
827fn random_slug() -> String {
828 static COUNTER: AtomicU64 = AtomicU64::new(0);
829 let counter = COUNTER.fetch_add(1, Ordering::Relaxed);
830 let mixed = unix_millis_nanos()
831 ^ (std::process::id() as u128).wrapping_mul(0x9E3779B97F4A7C15)
832 ^ (counter as u128).wrapping_mul(0xBF58476D1CE4E5B9);
833 format!("bgb-{:016x}", mixed as u64)
836}
837
838fn unix_millis_nanos() -> u128 {
839 SystemTime::now()
840 .duration_since(UNIX_EPOCH)
841 .map(|duration| duration.as_nanos())
842 .unwrap_or(0)
843}
844
845#[cfg(test)]
846mod tests {
847 use std::collections::HashMap;
848 use std::sync::{Arc, Mutex};
849 use std::time::Duration;
850
851 use super::*;
852
853 #[test]
854 fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
855 let registry = BgTaskRegistry::default();
856 let dir = tempfile::tempdir().unwrap();
857 let task_id = registry
858 .spawn(
859 "true",
860 "session".to_string(),
861 dir.path().to_path_buf(),
862 HashMap::new(),
863 Some(Duration::from_secs(30)),
864 dir.path().to_path_buf(),
865 10,
866 )
867 .unwrap();
868 registry
869 .kill_with_status(&task_id, "session", BgTaskStatus::Killed)
870 .unwrap();
871
872 registry.cleanup_finished(Duration::ZERO);
873
874 assert!(registry.inner.tasks.lock().unwrap().is_empty());
875 }
876
877 #[test]
878 fn cleanup_finished_keeps_running_tasks() {
879 let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
880 let dir = tempfile::tempdir().unwrap();
881 let task_id = registry
882 .spawn(
883 "sleep 5",
884 "session".to_string(),
885 dir.path().to_path_buf(),
886 HashMap::new(),
887 Some(Duration::from_secs(30)),
888 dir.path().to_path_buf(),
889 10,
890 )
891 .unwrap();
892
893 registry.cleanup_finished(Duration::ZERO);
894
895 assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
896 let _ = registry.kill(&task_id, "session");
897 }
898}