use std::collections::{HashMap, VecDeque};
use std::fs;
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(unix)]
use std::sync::OnceLock;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use serde::Serialize;
use crate::context::SharedProgressSender;
use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, PushFrame};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use super::buffer::BgBuffer;
use super::persistence::{
create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, ExitMarker,
PersistedTask, TaskPaths,
};
use super::process::is_process_alive;
#[cfg(unix)]
use super::process::terminate_pgid;
#[cfg(windows)]
use super::process::terminate_pid;
use super::{BgTaskInfo, BgTaskStatus};
const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
const BG_COMPLETION_PREVIEW_BYTES: usize = 300;
#[derive(Debug, Clone, Serialize)]
pub struct BgCompletion {
pub task_id: String,
#[serde(skip_serializing)]
pub session_id: String,
pub status: BgTaskStatus,
pub exit_code: Option<i32>,
pub command: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub output_preview: String,
#[serde(default, skip_serializing_if = "is_false")]
pub output_truncated: bool,
}
fn is_false(v: &bool) -> bool {
!*v
}
#[derive(Debug, Clone, Serialize)]
pub struct BgTaskSnapshot {
#[serde(flatten)]
pub info: BgTaskInfo,
pub exit_code: Option<i32>,
pub child_pid: Option<u32>,
pub workdir: String,
pub output_preview: String,
pub output_truncated: bool,
pub output_path: Option<String>,
pub stderr_path: Option<String>,
}
#[derive(Clone)]
pub struct BgTaskRegistry {
pub(crate) inner: Arc<RegistryInner>,
}
pub(crate) struct RegistryInner {
pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
pub(crate) progress_sender: SharedProgressSender,
watchdog_started: AtomicBool,
pub(crate) shutdown: AtomicBool,
pub(crate) long_running_reminder_enabled: AtomicBool,
pub(crate) long_running_reminder_interval_ms: AtomicU64,
persisted_gc_started: AtomicBool,
#[cfg(test)]
persisted_gc_runs: AtomicU64,
pub(crate) compressor: Mutex<Option<Box<dyn Fn(&str, String) -> String + Send + Sync>>>,
}
pub(crate) struct BgTask {
pub(crate) task_id: String,
pub(crate) session_id: String,
pub(crate) paths: TaskPaths,
pub(crate) started: Instant,
pub(crate) last_reminder_at: Mutex<Option<Instant>>,
pub(crate) terminal_at: Mutex<Option<Instant>>,
pub(crate) state: Mutex<BgTaskState>,
}
pub(crate) struct BgTaskState {
pub(crate) metadata: PersistedTask,
pub(crate) child: Option<Child>,
pub(crate) detached: bool,
pub(crate) buffer: BgBuffer,
}
impl BgTaskRegistry {
pub fn new(progress_sender: SharedProgressSender) -> Self {
Self {
inner: Arc::new(RegistryInner {
tasks: Mutex::new(HashMap::new()),
completions: Mutex::new(VecDeque::new()),
progress_sender,
watchdog_started: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
long_running_reminder_enabled: AtomicBool::new(true),
long_running_reminder_interval_ms: AtomicU64::new(600_000),
persisted_gc_started: AtomicBool::new(false),
#[cfg(test)]
persisted_gc_runs: AtomicU64::new(0),
compressor: Mutex::new(None),
}),
}
}
pub fn set_compressor<F>(&self, compressor: F)
where
F: Fn(&str, String) -> String + Send + Sync + 'static,
{
if let Ok(mut slot) = self.inner.compressor.lock() {
*slot = Some(Box::new(compressor));
}
}
pub(crate) fn compress_output(&self, command: &str, output: String) -> String {
let Ok(slot) = self.inner.compressor.lock() else {
return output;
};
match slot.as_ref() {
Some(compressor) => compressor(command, output),
None => output,
}
}
pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
self.inner
.long_running_reminder_enabled
.store(enabled, Ordering::SeqCst);
self.inner
.long_running_reminder_interval_ms
.store(interval_ms, Ordering::SeqCst);
}
#[cfg(unix)]
#[allow(clippy::too_many_arguments)]
pub fn spawn(
&self,
command: &str,
session_id: String,
workdir: PathBuf,
env: HashMap<String, String>,
timeout: Option<Duration>,
storage_dir: PathBuf,
max_running: usize,
notify_on_completion: bool,
compressed: bool,
project_root: Option<PathBuf>,
) -> Result<String, String> {
self.start_watchdog();
let running = self.running_count();
if running >= max_running {
return Err(format!(
"background bash task limit exceeded: {running} running (max {max_running})"
));
}
let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
let task_id = self.generate_unique_task_id()?;
let paths = task_paths(&storage_dir, &session_id, &task_id);
fs::create_dir_all(&paths.dir)
.map_err(|e| format!("failed to create background task dir: {e}"))?;
let mut metadata = PersistedTask::starting(
task_id.clone(),
session_id.clone(),
command.to_string(),
workdir.clone(),
project_root,
timeout_ms,
notify_on_completion,
compressed,
);
write_task(&paths.json, &metadata)
.map_err(|e| format!("failed to persist background task metadata: {e}"))?;
create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to create stdout capture file: {e}"))?;
create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to create stderr capture file: {e}"))?;
let child = match spawn_detached_child(command, &paths, &workdir, &env) {
Ok(child) => child,
Err(error) => {
crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
let _ = delete_task_bundle(&paths);
return Err(error);
}
};
let child_pid = child.id();
metadata.mark_running(child_pid, child_pid as i32);
write_task(&paths.json, &metadata)
.map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started: Instant::now(),
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(None),
state: Mutex::new(BgTaskState {
metadata,
child: Some(child),
detached: false,
buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id.clone(), task);
Ok(task_id)
}
#[cfg(windows)]
#[allow(clippy::too_many_arguments)]
pub fn spawn(
&self,
command: &str,
session_id: String,
workdir: PathBuf,
env: HashMap<String, String>,
timeout: Option<Duration>,
storage_dir: PathBuf,
max_running: usize,
notify_on_completion: bool,
compressed: bool,
project_root: Option<PathBuf>,
) -> Result<String, String> {
self.start_watchdog();
let running = self.running_count();
if running >= max_running {
return Err(format!(
"background bash task limit exceeded: {running} running (max {max_running})"
));
}
let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
let task_id = self.generate_unique_task_id()?;
let paths = task_paths(&storage_dir, &session_id, &task_id);
fs::create_dir_all(&paths.dir)
.map_err(|e| format!("failed to create background task dir: {e}"))?;
let mut metadata = PersistedTask::starting(
task_id.clone(),
session_id.clone(),
command.to_string(),
workdir.clone(),
project_root,
timeout_ms,
notify_on_completion,
compressed,
);
write_task(&paths.json, &metadata)
.map_err(|e| format!("failed to persist background task metadata: {e}"))?;
create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to create stdout capture file: {e}"))?;
create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to create stderr capture file: {e}"))?;
let child = match spawn_detached_child(command, &paths, &workdir, &env) {
Ok(child) => child,
Err(error) => {
crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
let _ = delete_task_bundle(&paths);
return Err(error);
}
};
let child_pid = child.id();
metadata.status = BgTaskStatus::Running;
metadata.child_pid = Some(child_pid);
metadata.pgid = None;
write_task(&paths.json, &metadata)
.map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started: Instant::now(),
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(None),
state: Mutex::new(BgTaskState {
metadata,
child: Some(child),
detached: false,
buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id.clone(), task);
Ok(task_id)
}
pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
self.start_watchdog();
if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
if let Err(error) = self.maybe_gc_persisted(storage_dir) {
crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
}
}
let dir = session_tasks_dir(storage_dir, session_id);
if !dir.exists() {
return Ok(());
}
let entries = fs::read_dir(&dir)
.map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
continue;
}
let Ok(mut metadata) = read_task(&path) else {
continue;
};
if metadata.session_id != session_id {
continue;
}
let paths = task_paths(storage_dir, session_id, &metadata.task_id);
match metadata.status {
BgTaskStatus::Starting => {
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("spawn aborted".to_string()),
);
let _ = write_task(&paths.json, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
}
BgTaskStatus::Running | BgTaskStatus::Killing => {
if self.running_metadata_is_stale(&metadata) {
metadata.mark_terminal(
BgTaskStatus::Killed,
None,
Some("orphaned (>24h)".to_string()),
);
if !paths.exit.exists() {
let _ = write_kill_marker_if_absent(&paths.exit);
}
let _ = write_task(&paths.json, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
} else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
"recovered from inconsistent killing state on replay".to_string()
});
if reason.is_some() {
crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
metadata.task_id);
}
metadata = terminal_metadata_from_marker(metadata, marker, reason);
let _ = write_task(&paths.json, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
} else if metadata.status == BgTaskStatus::Killing {
if !paths.exit.exists() {
let _ = write_kill_marker_if_absent(&paths.exit);
}
metadata.mark_terminal(
BgTaskStatus::Killed,
None,
Some("recovered from inconsistent killing state on replay".to_string()),
);
let _ = write_task(&paths.json, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
} else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("process exited without exit marker".to_string()),
);
let _ = write_task(&paths.json, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
} else {
self.insert_rehydrated_task(metadata, paths, true)?;
}
}
_ if metadata.status.is_terminal() => {
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
}
_ => {}
}
}
Ok(())
}
pub fn status(
&self,
task_id: &str,
session_id: &str,
project_root: Option<&Path>,
storage_dir: Option<&Path>,
preview_bytes: usize,
) -> Option<BgTaskSnapshot> {
let mut task = self.task_for_session(task_id, session_id);
if task.is_none() {
if let Some(storage_dir) = storage_dir {
let _ = self.replay_session(storage_dir, session_id);
task = self.task_for_session(task_id, session_id);
}
}
let Some(task) = task else {
return self.status_relaxed(
task_id,
session_id,
project_root?,
storage_dir?,
preview_bytes,
);
};
let _ = self.poll_task(&task);
let mut snapshot = task.snapshot(preview_bytes);
self.maybe_compress_snapshot(&task, &mut snapshot);
Some(snapshot)
}
fn status_relaxed_task(
&self,
task_id: &str,
project_root: &Path,
storage_dir: &Path,
) -> Option<Arc<BgTask>> {
let canonical_project = canonicalized_path(project_root);
let root = storage_dir.join("bash-tasks");
let entries = fs::read_dir(&root).ok()?;
for entry in entries.flatten() {
let dir = entry.path();
if !dir.is_dir() {
continue;
}
let path = dir.join(format!("{task_id}.json"));
if !path.exists() {
continue;
}
let Ok(metadata) = read_task(&path) else {
continue;
};
let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
if metadata_project.as_deref() != Some(canonical_project.as_path()) {
continue;
}
if let Some(task) = self.task(task_id) {
let matches_project = task
.state
.lock()
.map(|state| {
state
.metadata
.project_root
.as_deref()
.map(canonicalized_path)
.as_deref()
== Some(canonical_project.as_path())
})
.unwrap_or(false);
return matches_project.then_some(task);
}
let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
if self.insert_rehydrated_task(metadata, paths, true).is_err() {
return None;
}
return self.task(task_id);
}
None
}
pub(super) fn status_relaxed(
&self,
task_id: &str,
_session_id: &str,
project_root: &Path,
storage_dir: &Path,
preview_bytes: usize,
) -> Option<BgTaskSnapshot> {
let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
let _ = self.poll_task(&task);
let mut snapshot = task.snapshot(preview_bytes);
self.maybe_compress_snapshot(&task, &mut snapshot);
Some(snapshot)
}
pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
#[cfg(test)]
self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
let mut deleted = 0usize;
let root = storage_dir.join("bash-tasks");
if root.exists() {
let session_dirs = fs::read_dir(&root).map_err(|e| {
format!(
"failed to read background task root {}: {e}",
root.display()
)
})?;
for session_entry in session_dirs.flatten() {
let session_dir = session_entry.path();
if !session_dir.is_dir() {
continue;
}
let task_entries = match fs::read_dir(&session_dir) {
Ok(entries) => entries,
Err(error) => {
crate::slog_warn!(
"failed to read background task session dir {}: {error}",
session_dir.display()
);
continue;
}
};
for task_entry in task_entries.flatten() {
let json_path = task_entry.path();
if json_path
.extension()
.and_then(|extension| extension.to_str())
!= Some("json")
{
continue;
}
if modified_within(&json_path, PERSISTED_GC_GRACE) {
continue;
}
let metadata = match read_task(&json_path) {
Ok(metadata) => metadata,
Err(error) => {
crate::slog_warn!(
"quarantining corrupt background task metadata {}: {error}",
json_path.display()
);
quarantine_corrupt_task_json(storage_dir, &session_dir, &json_path)?;
continue;
}
};
if !(metadata.status.is_terminal() && metadata.completion_delivered) {
continue;
}
let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
match delete_task_bundle(&paths) {
Ok(()) => {
deleted += 1;
log::debug!(
"deleted persisted background task bundle {}",
metadata.task_id
);
}
Err(error) => {
crate::slog_warn!(
"failed to delete background task bundle {}: {error}",
metadata.task_id
);
continue;
}
}
}
}
}
gc_quarantine(storage_dir);
Ok(deleted)
}
pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
let tasks = self
.inner
.tasks
.lock()
.map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
.unwrap_or_default();
tasks
.into_iter()
.map(|task| {
let _ = self.poll_task(&task);
let mut snapshot = task.snapshot(preview_bytes);
self.maybe_compress_snapshot(&task, &mut snapshot);
snapshot
})
.collect()
}
fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
if !snapshot.info.status.is_terminal() {
return;
}
let compressed_flag = task
.state
.lock()
.map(|state| state.metadata.compressed)
.unwrap_or(true);
if !compressed_flag {
return;
}
let raw = std::mem::take(&mut snapshot.output_preview);
snapshot.output_preview = self.compress_output(&snapshot.info.command, raw);
}
pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
}
pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
let task = self
.task_for_session(task_id, session_id)
.ok_or_else(|| format!("background task not found: {task_id}"))?;
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
let updated = update_task(&task.paths.json, |metadata| {
metadata.notify_on_completion = true;
metadata.completion_delivered = false;
})
.map_err(|e| format!("failed to promote background task: {e}"))?;
state.metadata = updated;
if state.metadata.status.is_terminal() {
state.buffer.enforce_terminal_cap();
self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
}
Ok(true)
}
pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
.map(|_| ())
}
pub fn cleanup_finished(&self, older_than: Duration) {
let cutoff = Instant::now().checked_sub(older_than);
let removable_paths: Vec<(String, TaskPaths)> =
if let Ok(mut tasks) = self.inner.tasks.lock() {
let removable = tasks
.iter()
.filter_map(|(task_id, task)| {
let delivered_terminal = task
.state
.lock()
.map(|state| {
state.metadata.status.is_terminal()
&& state.metadata.completion_delivered
})
.unwrap_or(false);
if !delivered_terminal {
return None;
}
let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
let expired = match (terminal_at, cutoff) {
(Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
(Some(_), None) => true,
(None, _) => false,
};
expired.then(|| task_id.clone())
})
.collect::<Vec<_>>();
removable
.into_iter()
.filter_map(|task_id| {
tasks
.remove(&task_id)
.map(|task| (task_id, task.paths.clone()))
})
.collect()
} else {
Vec::new()
};
for (task_id, paths) in removable_paths {
match delete_task_bundle(&paths) {
Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
Err(error) => crate::slog_warn!(
"failed to delete persisted background task bundle {task_id}: {error}"
),
}
}
}
pub fn drain_completions(&self) -> Vec<BgCompletion> {
self.drain_completions_for_session(None)
}
pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
let mut completions = match self.inner.completions.lock() {
Ok(completions) => completions,
Err(_) => return Vec::new(),
};
let drained = if let Some(session_id) = session_id {
let mut matched = Vec::new();
let mut retained = VecDeque::new();
while let Some(completion) = completions.pop_front() {
if completion.session_id == session_id {
matched.push(completion);
} else {
retained.push_back(completion);
}
}
*completions = retained;
matched
} else {
completions.drain(..).collect()
};
drop(completions);
for completion in &drained {
if let Some(task) = self.task_for_session(&completion.task_id, &completion.session_id) {
let _ = task.set_completion_delivered(true);
}
}
drained
}
pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
self.inner
.completions
.lock()
.map(|completions| {
completions
.iter()
.filter(|completion| completion.session_id == session_id)
.cloned()
.collect()
})
.unwrap_or_default()
}
pub fn detach(&self) {
self.inner.shutdown.store(true, Ordering::SeqCst);
if let Ok(mut tasks) = self.inner.tasks.lock() {
for task in tasks.values() {
if let Ok(mut state) = task.state.lock() {
state.child = None;
state.detached = true;
}
}
tasks.clear();
}
}
pub fn shutdown(&self) {
let tasks = self
.inner
.tasks
.lock()
.map(|tasks| {
tasks
.values()
.map(|task| (task.task_id.clone(), task.session_id.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
for (task_id, session_id) in tasks {
let _ = self.kill(&task_id, &session_id);
}
}
pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
let marker = match read_exit_marker(&task.paths.exit) {
Ok(Some(marker)) => marker,
Ok(None) => return Ok(()),
Err(error) => return Err(format!("failed to read exit marker: {error}")),
};
self.finalize_from_marker(task, marker, None)
}
pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
let Ok(mut state) = task.state.lock() else {
return;
};
if let Some(child) = state.child.as_mut() {
if matches!(child.try_wait(), Ok(Some(_))) {
state.child = None;
state.detached = true;
if state.metadata.status.is_terminal() {
return;
}
if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
return;
}
let updated = update_task(&task.paths.json, |metadata| {
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("process exited without exit marker".to_string()),
);
});
if let Ok(metadata) = updated {
state.metadata = metadata;
task.mark_terminal_now();
state.buffer.enforce_terminal_cap();
self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
}
}
}
}
pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
self.inner
.tasks
.lock()
.map(|tasks| {
tasks
.values()
.filter(|task| task.is_running())
.cloned()
.collect()
})
.unwrap_or_default()
}
fn insert_rehydrated_task(
&self,
metadata: PersistedTask,
paths: TaskPaths,
detached: bool,
) -> Result<(), String> {
let task_id = metadata.task_id.clone();
let session_id = metadata.session_id.clone();
let started = started_instant_from_unix_millis(metadata.started_at);
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started,
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
state: Mutex::new(BgTaskState {
metadata,
child: None,
detached,
buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id, task);
Ok(())
}
fn kill_with_status(
&self,
task_id: &str,
session_id: &str,
terminal_status: BgTaskStatus,
) -> Result<BgTaskSnapshot, String> {
let task = self
.task_for_session(task_id, session_id)
.ok_or_else(|| format!("background task not found: {task_id}"))?;
{
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if state.metadata.status.is_terminal() {
return Ok(task.snapshot_locked(&state, 5 * 1024));
}
if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
state.metadata =
terminal_metadata_from_marker(state.metadata.clone(), marker, None);
task.mark_terminal_now();
state.child = None;
state.detached = true;
state.buffer.enforce_terminal_cap();
write_task(&task.paths.json, &state.metadata)
.map_err(|e| format!("failed to persist terminal state: {e}"))?;
self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
return Ok(task.snapshot_locked(&state, 5 * 1024));
}
state.metadata.status = BgTaskStatus::Killing;
write_task(&task.paths.json, &state.metadata)
.map_err(|e| format!("failed to persist killing state: {e}"))?;
#[cfg(unix)]
if let Some(pgid) = state.metadata.pgid {
terminate_pgid(pgid, state.child.as_mut());
}
#[cfg(windows)]
if let Some(child) = state.child.as_mut() {
super::process::terminate_process(child);
} else if let Some(pid) = state.metadata.child_pid {
terminate_pid(pid);
}
if let Some(child) = state.child.as_mut() {
let _ = child.wait();
}
state.child = None;
state.detached = true;
if !task.paths.exit.exists() {
write_kill_marker_if_absent(&task.paths.exit)
.map_err(|e| format!("failed to write kill marker: {e}"))?;
}
let exit_code = if terminal_status == BgTaskStatus::TimedOut {
Some(124)
} else {
None
};
state
.metadata
.mark_terminal(terminal_status, exit_code, None);
task.mark_terminal_now();
write_task(&task.paths.json, &state.metadata)
.map_err(|e| format!("failed to persist killed state: {e}"))?;
state.buffer.enforce_terminal_cap();
self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
}
Ok(task.snapshot(5 * 1024))
}
fn finalize_from_marker(
&self,
task: &Arc<BgTask>,
marker: ExitMarker,
reason: Option<String>,
) -> Result<(), String> {
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if state.metadata.status.is_terminal() {
return Ok(());
}
let updated = update_task(&task.paths.json, |metadata| {
let new_metadata = terminal_metadata_from_marker(metadata.clone(), marker, reason);
*metadata = new_metadata;
})
.map_err(|e| format!("failed to persist terminal state: {e}"))?;
state.metadata = updated;
task.mark_terminal_now();
state.child = None;
state.detached = true;
state.buffer.enforce_terminal_cap();
self.enqueue_completion_locked(&state.metadata, Some(&state.buffer), true);
Ok(())
}
fn enqueue_completion_if_needed(
&self,
metadata: &PersistedTask,
paths: Option<&TaskPaths>,
emit_frame: bool,
) {
if metadata.status.is_terminal() && !metadata.completion_delivered {
self.enqueue_completion_from_parts(metadata, None, paths, emit_frame);
}
}
fn enqueue_completion_locked(
&self,
metadata: &PersistedTask,
buffer: Option<&BgBuffer>,
emit_frame: bool,
) {
self.enqueue_completion_from_parts(metadata, buffer, None, emit_frame);
}
fn enqueue_completion_from_parts(
&self,
metadata: &PersistedTask,
buffer: Option<&BgBuffer>,
paths: Option<&TaskPaths>,
emit_frame: bool,
) {
if !metadata.status.is_terminal() || metadata.completion_delivered {
return;
}
let (raw_preview, output_truncated) = match buffer {
Some(buf) => buf.read_tail(BG_COMPLETION_PREVIEW_BYTES),
None => paths
.map(|paths| read_tail_from_disk(paths, BG_COMPLETION_PREVIEW_BYTES))
.unwrap_or_else(|| (String::new(), false)),
};
let output_preview = if metadata.compressed {
self.compress_output(&metadata.command, raw_preview)
} else {
raw_preview
};
let completion = BgCompletion {
task_id: metadata.task_id.clone(),
session_id: metadata.session_id.clone(),
status: metadata.status.clone(),
exit_code: metadata.exit_code,
command: metadata.command.clone(),
output_preview,
output_truncated,
};
if let Ok(mut completions) = self.inner.completions.lock() {
if completions
.iter()
.any(|completion| completion.task_id == metadata.task_id)
{
return;
}
completions.push_back(completion.clone());
} else {
return;
}
if emit_frame {
self.emit_bash_completed(completion);
}
}
fn emit_bash_completed(&self, completion: BgCompletion) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
let Some(sender) = progress_sender.as_ref() else {
return;
};
sender(PushFrame::BashCompleted(BashCompletedFrame::new(
completion.task_id,
completion.session_id,
completion.status,
completion.exit_code,
completion.command,
completion.output_preview,
completion.output_truncated,
)));
}
pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
if !self
.inner
.long_running_reminder_enabled
.load(Ordering::SeqCst)
{
return;
}
let interval_ms = self
.inner
.long_running_reminder_interval_ms
.load(Ordering::SeqCst);
if interval_ms == 0 {
return;
}
let interval = Duration::from_millis(interval_ms);
let now = Instant::now();
let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
return;
};
let since = last_reminder_at.unwrap_or(task.started);
if now.duration_since(since) < interval {
return;
}
let command = task
.state
.lock()
.map(|state| state.metadata.command.clone())
.unwrap_or_default();
*last_reminder_at = Some(now);
self.emit_bash_long_running(BashLongRunningFrame::new(
task.task_id.clone(),
task.session_id.clone(),
command,
task.started.elapsed().as_millis() as u64,
));
}
fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
if let Some(sender) = progress_sender.as_ref() {
sender(PushFrame::BashLongRunning(frame));
}
}
fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
self.inner
.tasks
.lock()
.ok()
.and_then(|tasks| tasks.get(task_id).cloned())
}
fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
self.task(task_id)
.filter(|task| task.session_id == session_id)
}
fn running_count(&self) -> usize {
self.inner
.tasks
.lock()
.map(|tasks| tasks.values().filter(|task| task.is_running()).count())
.unwrap_or(0)
}
fn start_watchdog(&self) {
if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
super::watchdog::start(self.clone());
}
}
fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
}
#[cfg(test)]
pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
self.task_for_session(task_id, session_id)
.map(|task| task.paths.json.clone())
}
#[cfg(test)]
pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
self.task_for_session(task_id, session_id)
.map(|task| task.paths.exit.clone())
}
fn generate_unique_task_id(&self) -> Result<String, String> {
for _ in 0..32 {
let candidate = random_slug();
let tasks = self
.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?;
if tasks.contains_key(&candidate) {
continue;
}
let completions = self
.inner
.completions
.lock()
.map_err(|_| "background completions lock poisoned".to_string())?;
if completions
.iter()
.any(|completion| completion.task_id == candidate)
{
continue;
}
return Ok(candidate);
}
Err("failed to allocate unique background task id after 32 attempts".to_string())
}
}
impl Default for BgTaskRegistry {
fn default() -> Self {
Self::new(Arc::new(Mutex::new(None)))
}
}
fn modified_within(path: &Path, grace: Duration) -> bool {
fs::metadata(path)
.and_then(|metadata| metadata.modified())
.ok()
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
.map(|age| age < grace)
.unwrap_or(false)
}
fn canonicalized_path(path: &Path) -> PathBuf {
fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
}
fn started_instant_from_unix_millis(started_at: u64) -> Instant {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|duration| duration.as_millis() as u64)
.unwrap_or(started_at);
let elapsed_ms = now_ms.saturating_sub(started_at);
Instant::now()
.checked_sub(Duration::from_millis(elapsed_ms))
.unwrap_or_else(Instant::now)
}
fn gc_quarantine(storage_dir: &Path) {
let quarantine_root = storage_dir.join("bash-tasks-quarantine");
let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
return;
};
for session_entry in session_dirs.flatten() {
let session_quarantine_dir = session_entry.path();
if !session_quarantine_dir.is_dir() {
continue;
}
let entries = match fs::read_dir(&session_quarantine_dir) {
Ok(entries) => entries,
Err(error) => {
crate::slog_warn!(
"failed to read background task quarantine dir {}: {error}",
session_quarantine_dir.display()
);
continue;
}
};
for entry in entries.flatten() {
let path = entry.path();
if modified_within(&path, QUARANTINE_GC_GRACE) {
continue;
}
let result = if path.is_dir() {
fs::remove_dir_all(&path)
} else {
fs::remove_file(&path)
};
match result {
Ok(()) => log::debug!(
"deleted old background task quarantine entry {}",
path.display()
),
Err(error) => crate::slog_warn!(
"failed to delete old background task quarantine entry {}: {error}",
path.display()
),
}
}
let _ = fs::remove_dir(&session_quarantine_dir);
}
let _ = fs::remove_dir(&quarantine_root);
}
fn quarantine_corrupt_task_json(
storage_dir: &Path,
session_dir: &Path,
json_path: &Path,
) -> Result<(), String> {
let session_hash = session_dir
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| {
format!(
"invalid background task session dir: {}",
session_dir.display()
)
})?;
let task_name = json_path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
let unix_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or(0);
let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
fs::create_dir_all(&quarantine_dir).map_err(|e| {
format!(
"failed to create background task quarantine dir {}: {e}",
quarantine_dir.display()
)
})?;
let target = quarantine_dir.join(format!("{task_name}.corrupt-{unix_ts}"));
fs::rename(json_path, &target).map_err(|e| {
format!(
"failed to quarantine corrupt background task metadata {} to {}: {e}",
json_path.display(),
target.display()
)
})?;
for sibling in task_sibling_paths(json_path) {
if !sibling.exists() {
continue;
}
let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
crate::slog_warn!(
"skipping background task sibling with invalid name during quarantine: {}",
sibling.display()
);
continue;
};
let sibling_target = quarantine_dir.join(format!("{sibling_name}.corrupt-{unix_ts}"));
if let Err(error) = fs::rename(&sibling, &sibling_target) {
crate::slog_warn!(
"failed to quarantine background task sibling {} to {}: {error}",
sibling.display(),
sibling_target.display()
);
}
}
let _ = fs::remove_dir(session_dir);
Ok(())
}
fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
let Some(parent) = json_path.parent() else {
return Vec::new();
};
let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
return Vec::new();
};
["stdout", "stderr", "exit", "ps1", "bat", "sh"]
.into_iter()
.map(|extension| parent.join(format!("{stem}.{extension}")))
.collect()
}
fn read_tail_from_disk(paths: &TaskPaths, max_bytes: usize) -> (String, bool) {
let stdout = fs::read(&paths.stdout).unwrap_or_default();
let stderr = fs::read(&paths.stderr).unwrap_or_default();
let mut bytes = Vec::with_capacity(stdout.len().saturating_add(stderr.len()));
bytes.extend_from_slice(&stdout);
bytes.extend_from_slice(&stderr);
if bytes.len() <= max_bytes {
return (String::from_utf8_lossy(&bytes).into_owned(), false);
}
let start = bytes.len().saturating_sub(max_bytes);
(String::from_utf8_lossy(&bytes[start..]).into_owned(), true)
}
impl BgTask {
fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
let state = self
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
self.snapshot_locked(&state, preview_bytes)
}
fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
let metadata = &state.metadata;
let duration_ms = metadata.duration_ms.or_else(|| {
metadata
.status
.is_terminal()
.then(|| self.started.elapsed().as_millis() as u64)
});
let (output_preview, output_truncated) = state.buffer.read_tail(preview_bytes);
BgTaskSnapshot {
info: BgTaskInfo {
task_id: self.task_id.clone(),
status: metadata.status.clone(),
command: metadata.command.clone(),
started_at: metadata.started_at,
duration_ms,
},
exit_code: metadata.exit_code,
child_pid: metadata.child_pid,
workdir: metadata.workdir.display().to_string(),
output_preview,
output_truncated,
output_path: state
.buffer
.output_path()
.map(|path| path.display().to_string()),
stderr_path: Some(state.buffer.stderr_path().display().to_string()),
}
}
pub(crate) fn is_running(&self) -> bool {
self.state
.lock()
.map(|state| state.metadata.status == BgTaskStatus::Running)
.unwrap_or(false)
}
fn mark_terminal_now(&self) {
if let Ok(mut terminal_at) = self.terminal_at.lock() {
if terminal_at.is_none() {
*terminal_at = Some(Instant::now());
}
}
}
fn set_completion_delivered(&self, delivered: bool) -> Result<(), String> {
let mut state = self
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
let updated = update_task(&self.paths.json, |metadata| {
metadata.completion_delivered = delivered;
})
.map_err(|e| format!("failed to update completion delivery: {e}"))?;
state.metadata = updated;
Ok(())
}
}
fn terminal_metadata_from_marker(
mut metadata: PersistedTask,
marker: ExitMarker,
reason: Option<String>,
) -> PersistedTask {
match marker {
ExitMarker::Code(code) => {
let status = if code == 0 {
BgTaskStatus::Completed
} else {
BgTaskStatus::Failed
};
metadata.mark_terminal(status, Some(code), reason);
}
ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
}
metadata
}
#[cfg(unix)]
fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
let shell = resolve_posix_shell();
let mut cmd = Command::new(&shell);
cmd.arg("-c")
.arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
.arg(&shell)
.arg(command)
.arg(exit_path);
unsafe {
cmd.pre_exec(|| {
if libc::setsid() == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
cmd
}
#[cfg(unix)]
fn resolve_posix_shell() -> PathBuf {
static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
POSIX_SHELL
.get_or_init(|| {
std::env::var_os("BASH")
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.filter(|path| path.exists())
.or_else(|| which::which("bash").ok())
.or_else(|| which::which("zsh").ok())
.unwrap_or_else(|| PathBuf::from("/bin/sh"))
})
.clone()
}
#[cfg(windows)]
fn detached_shell_command_for(
shell: crate::windows_shell::WindowsShell,
command: &str,
exit_path: &Path,
paths: &TaskPaths,
creation_flags: u32,
) -> Result<Command, String> {
use crate::windows_shell::WindowsShell;
let wrapper_body = shell.wrapper_script(command, exit_path);
let wrapper_ext = match shell {
WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
WindowsShell::Cmd => "bat",
WindowsShell::Posix(_) => "sh",
};
let wrapper_path = paths.dir.join(format!(
"{}.{}",
paths
.json
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("wrapper"),
wrapper_ext
));
fs::write(&wrapper_path, wrapper_body)
.map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
let mut cmd = Command::new(shell.binary().as_ref());
match shell {
WindowsShell::Pwsh | WindowsShell::Powershell => {
cmd.args([
"-NoLogo",
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-File",
]);
cmd.arg(&wrapper_path);
}
WindowsShell::Cmd => {
cmd.args(["/D", "/C"]);
cmd.arg(&wrapper_path);
}
WindowsShell::Posix(_) => {
cmd.arg(&wrapper_path);
}
}
cmd.creation_flags(creation_flags);
Ok(cmd)
}
fn spawn_detached_child(
command: &str,
paths: &TaskPaths,
workdir: &Path,
env: &HashMap<String, String>,
) -> Result<std::process::Child, String> {
#[cfg(not(windows))]
{
let stdout = create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to open stdout capture file: {e}"))?;
let stderr = create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to open stderr capture file: {e}"))?;
detached_shell_command(command, &paths.exit)
.current_dir(workdir)
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::from(stdout))
.stderr(Stdio::from(stderr))
.spawn()
.map_err(|e| format!("failed to spawn background bash command: {e}"))
}
#[cfg(windows)]
{
use crate::windows_shell::shell_candidates;
let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
let with_breakaway =
FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
let mut last_error: Option<String> = None;
for (idx, shell) in candidates.iter().enumerate() {
for &flags in &[with_breakaway, without_breakaway] {
let stdout = create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to open stdout capture file: {e}"))?;
let stderr = create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to open stderr capture file: {e}"))?;
let mut cmd =
detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
cmd.current_dir(workdir)
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::from(stdout))
.stderr(Stdio::from(stderr));
match cmd.spawn() {
Ok(child) => {
if idx > 0 {
crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
the cached PATH probe disagreed with runtime spawn — likely PATH \
inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
shell.binary(),
idx);
}
if flags == without_breakaway {
crate::slog_warn!(
"background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
(likely a restrictive Job Object — CI sandbox or MDM policy). \
Spawned without breakaway; the bg task will be torn down if the \
AFT process group is killed."
);
}
return Ok(child);
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
shell.binary());
last_error = Some(format!("{}: {e}", shell.binary()));
break;
}
Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
crate::slog_warn!(
"background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
Access Denied — retrying {} without breakaway",
shell.binary()
);
last_error = Some(format!("{}: {e}", shell.binary()));
continue;
}
Err(e) => {
return Err(format!(
"failed to spawn background bash command via {}: {e}",
shell.binary()
));
}
}
}
}
Err(format!(
"failed to spawn background bash command: no Windows shell could be spawned. \
Last error: {}. PATH-probed candidates: {:?}",
last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
))
}
}
fn random_slug() -> String {
let mut bytes = [0u8; 4];
getrandom::fill(&mut bytes).unwrap_or_else(|_| {
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let p = std::process::id();
bytes.copy_from_slice(&(t ^ p).to_le_bytes());
});
let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
format!("bash-{hex}")
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
#[cfg(windows)]
use std::fs;
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[cfg(windows)]
use std::time::Instant;
use super::*;
#[cfg(unix)]
const QUICK_SUCCESS_COMMAND: &str = "true";
#[cfg(windows)]
const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
#[cfg(unix)]
const LONG_RUNNING_COMMAND: &str = "sleep 5";
#[cfg(windows)]
const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
fn spawn_dead_child() -> std::process::Child {
#[cfg(unix)]
let mut cmd = std::process::Command::new("true");
#[cfg(windows)]
let mut cmd = {
let mut c = std::process::Command::new("cmd");
c.args(["/c", "exit", "0"]);
c
};
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
let mut child = cmd.spawn().expect("spawn replacement child for reap test");
let _ = child.wait();
child
}
#[test]
fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
let completions = registry.drain_completions_for_session(Some("session"));
assert_eq!(completions.len(), 1);
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().is_empty());
}
#[test]
fn cleanup_finished_retains_undelivered_terminals() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
}
#[test]
fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
loop {
let exited = {
let mut state = task.state.lock().unwrap();
if let Some(child) = state.child.as_mut() {
matches!(child.try_wait(), Ok(Some(_)))
} else {
true
}
};
if exited {
break;
}
assert!(
started.elapsed() < Duration::from_secs(5),
"child should exit quickly"
);
std::thread::sleep(Duration::from_millis(20));
}
registry
.inner
.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
let _ = std::fs::remove_file(&task.paths.exit);
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
if state.child.is_none() {
state.child = Some(spawn_dead_child());
}
}
*task.terminal_at.lock().unwrap() = None;
assert!(
task.is_running(),
"precondition: metadata.status == Running"
);
assert!(
!task.paths.exit.exists(),
"precondition: exit marker absent"
);
registry.reap_child(&task);
let state = task.state.lock().unwrap();
assert!(
state.metadata.status.is_terminal(),
"reap_child must transition to terminal when PID dead and no marker. \
Got status={:?}",
state.metadata.status
);
assert_eq!(
state.metadata.status,
BgTaskStatus::Failed,
"must specifically be Failed (not Killed): status={:?}",
state.metadata.status
);
assert_eq!(
state.metadata.status_reason.as_deref(),
Some("process exited without exit marker"),
"reason must match replay path's wording: {:?}",
state.metadata.status_reason
);
assert!(
state.child.is_none(),
"child handle must be released after reap"
);
assert!(state.detached, "task must be marked detached after reap");
}
#[test]
fn reap_child_preserves_running_when_exit_marker_exists() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
loop {
let exited = {
let mut state = task.state.lock().unwrap();
if let Some(child) = state.child.as_mut() {
matches!(child.try_wait(), Ok(Some(_)))
} else {
true
}
};
if exited && task.paths.exit.exists() {
break;
}
assert!(
started.elapsed() < Duration::from_secs(5),
"child should exit and write marker quickly"
);
std::thread::sleep(Duration::from_millis(20));
}
registry
.inner
.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
if state.child.is_none() {
state.child = Some(spawn_dead_child());
}
}
*task.terminal_at.lock().unwrap() = None;
if !task.paths.exit.exists() {
std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
}
registry.reap_child(&task);
let state = task.state.lock().unwrap();
assert!(
state.child.is_none(),
"child handle still released even when marker exists"
);
assert!(
state.detached,
"task still marked detached even when marker exists"
);
assert_eq!(
state.metadata.status,
BgTaskStatus::Running,
"reap_child must defer to poll_task when marker exists"
);
}
#[test]
fn cleanup_finished_keeps_running_tasks() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
LONG_RUNNING_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
let _ = registry.kill(&task_id, "session");
}
#[cfg(windows)]
fn wait_for_file(path: &Path) -> String {
let started = Instant::now();
loop {
if path.exists() {
return fs::read_to_string(path).expect("read file");
}
assert!(
started.elapsed() < Duration::from_secs(30),
"timed out waiting for {}",
path.display()
);
std::thread::sleep(Duration::from_millis(100));
}
}
#[cfg(windows)]
fn spawn_windows_registry_command(
command: &str,
) -> (BgTaskRegistry, tempfile::TempDir, String) {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
command,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
false,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
(registry, dir, task_id)
}
#[cfg(windows)]
#[test]
fn windows_spawn_writes_exit_marker_for_zero_exit() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
let content = wait_for_file(&exit_path);
assert_eq!(content.trim(), "0");
}
#[cfg(windows)]
#[test]
fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
let content = wait_for_file(&exit_path);
assert_eq!(content.trim(), "42");
}
#[cfg(windows)]
#[test]
fn windows_spawn_captures_stdout_to_disk() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
let task = registry.task_for_session(&task_id, "session").unwrap();
let stdout_path = task.paths.stdout.clone();
let exit_path = task.paths.exit.clone();
let _ = wait_for_file(&exit_path);
let stdout = fs::read_to_string(stdout_path).expect("read stdout");
assert!(stdout.contains("hello"), "stdout was {stdout:?}");
}
#[cfg(windows)]
#[test]
fn windows_spawn_uses_pwsh_when_available() {
let candidates = crate::windows_shell::shell_candidates_with(
|binary| match binary {
"pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
"powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
_ => None,
},
|| None,
);
let shell = candidates.first().expect("at least one candidate").clone();
assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
assert_eq!(shell.binary().as_ref(), "pwsh.exe");
}
#[cfg(windows)]
#[test]
fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
let exit_path = Path::new(r"C:\Temp\bash-test.exit");
let script =
crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
assert!(
script.contains("set CODE=%ERRORLEVEL%"),
"wrapper must capture exit code into CODE: {script}"
);
assert!(
script.contains("echo %CODE% >"),
"wrapper must echo CODE to a temp marker file: {script}"
);
assert!(
script.contains("move /Y"),
"wrapper must use atomic move to write the marker: {script}"
);
assert!(
script.contains("> nul"),
"wrapper must redirect move output to nul: {script}"
);
assert!(
script.contains("exit /B %CODE%"),
"wrapper must propagate the captured exit code: {script}"
);
assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
}
#[cfg(windows)]
#[test]
fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
use crate::windows_shell::WindowsShell;
let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
assert_eq!(
args_strs,
vec!["/D", "/S", "/C", "echo wrapped"],
"Cmd::bg_command must prepend /D /S /C"
);
}
#[cfg(windows)]
#[test]
fn windows_shell_pwsh_bg_command_uses_standard_args() {
use crate::windows_shell::WindowsShell;
let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
assert!(
args_strs.contains(&"-Command"),
"Pwsh::bg_command must use -Command: {args_strs:?}"
);
assert!(
args_strs.contains(&"Get-Date"),
"Pwsh::bg_command must include the user command body"
);
}
#[allow(dead_code)]
#[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
}