use std::collections::{BTreeMap, HashMap, HashSet, VecDeque};
use std::fs;
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::process::{Child, Command, Stdio};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
#[cfg(unix)]
use std::sync::OnceLock;
use std::sync::{Arc, Mutex, RwLock};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use rusqlite::Connection;
use serde::Serialize;
use crate::compress::caps::DropClass;
use crate::compress::CompressionResult;
use crate::context::SharedProgressSender;
use crate::harness::Harness;
use crate::protocol::{BashCompletedFrame, BashLongRunningFrame, BashPatternMatchFrame, PushFrame};
#[cfg(unix)]
use std::os::unix::process::CommandExt;
#[cfg(windows)]
use std::os::windows::process::CommandExt;
use super::buffer::{combine_streams, BgBuffer, StreamKind, TokenCountInput};
use super::output::{
cap_completion_output, cap_completion_output_with_marker, cap_final_output,
cap_final_output_with_marker, json_output_pointer, quote_path, COMPLETION_OUTPUT_PREVIEW_BYTES,
COMPRESS_INPUT_CAP_BYTES, COMPRESS_INPUT_HEAD_BYTES, COMPRESS_INPUT_TAIL_BYTES,
FINAL_OUTPUT_CAP_BYTES, RAW_PASSTHROUGH_CAP_BYTES, RAW_PASSTHROUGH_HEAD_BYTES,
RAW_PASSTHROUGH_TAIL_BYTES, RUNNING_OUTPUT_PREVIEW_BYTES, STRUCTURED_OUTPUT_CAP_BYTES,
};
use super::persistence::{
create_capture_file, delete_task_bundle, read_exit_marker, read_task, session_tasks_dir,
task_paths, unix_millis, update_task, write_kill_marker_if_absent, write_task, BgMode,
ExitMarker, PersistedTask, TaskPaths,
};
use super::process::is_process_alive;
#[cfg(unix)]
use super::process::terminate_pgid;
#[cfg(windows)]
use super::process::terminate_pid;
use super::pty_process::spawn_pty_for_command;
use super::pty_runtime::PtyRuntime;
use super::watches::{PatternMatch, WatchPattern, WatchRegistry};
use super::{BgTaskInfo, BgTaskStatus};
const DEFAULT_BG_TIMEOUT: Duration = Duration::from_secs(30 * 60);
const STALE_RUNNING_AFTER: Duration = Duration::from_secs(24 * 60 * 60);
const PERSISTED_GC_GRACE: Duration = Duration::from_secs(24 * 60 * 60);
const QUARANTINE_GC_GRACE: Duration = Duration::from_secs(30 * 24 * 60 * 60);
const BG_COMPLETION_PREVIEW_BYTES: usize = COMPLETION_OUTPUT_PREVIEW_BYTES;
const TOKENIZE_CAP_BYTES_PER_STREAM: usize = 128 * 1024;
#[derive(Debug, Clone, Serialize)]
pub struct BgCompletion {
pub task_id: String,
#[serde(skip_serializing)]
pub session_id: String,
pub status: BgTaskStatus,
pub exit_code: Option<i32>,
pub command: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub output_preview: String,
#[serde(default, skip_serializing_if = "is_false")]
pub output_truncated: bool,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub original_tokens: Option<u32>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compressed_tokens: Option<u32>,
#[serde(default, skip_serializing_if = "is_false")]
pub tokens_skipped: bool,
}
fn is_false(v: &bool) -> bool {
!*v
}
#[derive(Debug, Clone, Serialize)]
pub struct BgTaskSnapshot {
#[serde(flatten)]
pub info: BgTaskInfo,
pub exit_code: Option<i32>,
pub child_pid: Option<u32>,
pub workdir: String,
pub output_preview: String,
pub output_truncated: bool,
pub output_path: Option<String>,
pub stderr_path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pty_rows: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pty_cols: Option<u16>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum TerminalOutputKind {
Compressed,
Raw,
Structured,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct TerminalOutputCache {
output_preview: String,
output_truncated: bool,
kind: TerminalOutputKind,
output_path: Option<String>,
stderr_path: Option<String>,
recovery: Option<RecoveryContext>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct RecoveryContext {
dropped_by_class: BTreeMap<DropClass, usize>,
had_inner_drop: bool,
offset_hint_eligible: bool,
offset_start_line: Option<usize>,
byte_truncated: bool,
output_path: Option<String>,
stderr_path: Option<String>,
include_stderr_path: bool,
}
impl RecoveryContext {
fn has_visible_drop(&self) -> bool {
self.byte_truncated || self.had_inner_drop || !self.dropped_by_class.is_empty()
}
}
#[derive(Clone)]
pub struct BgTaskRegistry {
pub(crate) inner: Arc<RegistryInner>,
}
pub(crate) struct RegistryInner {
pub(crate) tasks: Mutex<HashMap<String, Arc<BgTask>>>,
pub(crate) completions: Mutex<VecDeque<BgCompletion>>,
pub(crate) progress_sender: SharedProgressSender,
watchdog_started: AtomicBool,
pub(crate) shutdown: AtomicBool,
pub(crate) long_running_reminder_enabled: AtomicBool,
pub(crate) long_running_reminder_interval_ms: AtomicU64,
persisted_gc_started: AtomicBool,
#[cfg(test)]
persisted_gc_runs: AtomicU64,
pub(crate) compressor:
Mutex<Option<Box<dyn Fn(&str, String) -> CompressionResult + Send + Sync>>>,
pub(crate) db_pool: RwLock<Option<Arc<Mutex<Connection>>>>,
pub(crate) db_harness: RwLock<Option<String>>,
pub(crate) wake_tx: crossbeam_channel::Sender<()>,
pub(crate) wake_rx: crossbeam_channel::Receiver<()>,
pub(crate) watch_registry: Mutex<WatchRegistry>,
}
pub(crate) struct BgTask {
pub(crate) task_id: String,
pub(crate) session_id: String,
pub(crate) paths: TaskPaths,
pub(crate) started: Instant,
pub(crate) last_reminder_at: Mutex<Option<Instant>>,
pub(crate) terminal_at: Mutex<Option<Instant>>,
pub(crate) state: Mutex<BgTaskState>,
}
pub(crate) enum TaskRuntime {
Piped(Option<Child>),
Pty(Option<PtyRuntime>),
}
pub(crate) struct BgTaskState {
pub(crate) metadata: PersistedTask,
pub(crate) runtime: TaskRuntime,
pub(crate) detached: bool,
pub(crate) child_exit_observed: bool,
pub(crate) buffer: BgBuffer,
terminal_output_cache: Option<TerminalOutputCache>,
pub(crate) pending_terminal_override: Option<BgTaskStatus>,
}
impl BgTaskRegistry {
pub fn new(progress_sender: SharedProgressSender) -> Self {
let (wake_tx, wake_rx) = crossbeam_channel::bounded(1);
Self {
inner: Arc::new(RegistryInner {
tasks: Mutex::new(HashMap::new()),
completions: Mutex::new(VecDeque::new()),
progress_sender,
watchdog_started: AtomicBool::new(false),
shutdown: AtomicBool::new(false),
long_running_reminder_enabled: AtomicBool::new(true),
long_running_reminder_interval_ms: AtomicU64::new(600_000),
persisted_gc_started: AtomicBool::new(false),
#[cfg(test)]
persisted_gc_runs: AtomicU64::new(0),
compressor: Mutex::new(None),
db_pool: RwLock::new(None),
db_harness: RwLock::new(None),
wake_tx,
wake_rx,
watch_registry: Mutex::new(WatchRegistry::default()),
}),
}
}
pub fn set_harness(&self, harness: Harness) {
if let Ok(mut slot) = self.inner.db_harness.write() {
*slot = Some(harness.as_str().to_string());
}
}
pub fn set_db_pool(&self, conn: Arc<Mutex<Connection>>) {
if let Ok(mut slot) = self.inner.db_pool.write() {
*slot = Some(conn);
}
}
pub fn clear_db_pool(&self) {
if let Ok(mut slot) = self.inner.db_pool.write() {
*slot = None;
}
}
pub fn set_compressor<F>(&self, compressor: F)
where
F: Fn(&str, String) -> CompressionResult + Send + Sync + 'static,
{
if let Ok(mut slot) = self.inner.compressor.lock() {
*slot = Some(Box::new(compressor));
}
}
pub(crate) fn compress_output(&self, command: &str, output: String) -> CompressionResult {
let Ok(slot) = self.inner.compressor.lock() else {
return CompressionResult::new(output);
};
match slot.as_ref() {
Some(compressor) => compressor(command, output),
None => CompressionResult::new(output),
}
}
fn ensure_terminal_output_cache(&self, task: &Arc<BgTask>) -> Option<TerminalOutputCache> {
let (metadata, buffer) = {
let state = task.state.lock().ok()?;
if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
return None;
}
if let Some(cache) = state.terminal_output_cache.clone() {
return Some(cache);
}
(state.metadata.clone(), state.buffer.clone())
};
let mut cap_buffer = buffer.clone();
cap_buffer.enforce_terminal_cap();
let cache = self.render_terminal_output(&metadata, &buffer);
let mut state = task.state.lock().ok()?;
if !state.metadata.status.is_terminal() || state.metadata.mode == BgMode::Pty {
return None;
}
if let Some(existing) = state.terminal_output_cache.clone() {
return Some(existing);
}
state.terminal_output_cache = Some(cache.clone());
Some(cache)
}
fn render_terminal_output(
&self,
metadata: &PersistedTask,
buffer: &BgBuffer,
) -> TerminalOutputCache {
if metadata.mode == BgMode::Pty {
return TerminalOutputCache {
output_preview: String::new(),
output_truncated: false,
kind: TerminalOutputKind::Raw,
output_path: buffer.output_path().map(|path| path.display().to_string()),
stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
recovery: None,
};
}
if let Some(structured) = render_structured_output(&metadata.command, buffer) {
return structured;
}
if !metadata.compressed {
return render_raw_passthrough(buffer);
}
let raw = buffer.read_combined_head_tail(
COMPRESS_INPUT_CAP_BYTES,
COMPRESS_INPUT_HEAD_BYTES,
COMPRESS_INPUT_TAIL_BYTES,
);
let compressed = self.compress_output(&metadata.command, raw.text);
render_compressed_with_recovery(buffer, compressed, raw.truncated)
}
fn snapshot_with_terminal_cache(
&self,
task: &Arc<BgTask>,
preview_bytes: usize,
) -> BgTaskSnapshot {
let mut snapshot = task.snapshot(preview_bytes);
self.maybe_compress_snapshot(task, &mut snapshot);
snapshot
}
fn post_terminal_transition(&self, task: &Arc<BgTask>, emit_frame: bool) -> Result<(), String> {
let (metadata, buffer) = {
let state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if !state.metadata.status.is_terminal() {
return Ok(());
}
(state.metadata.clone(), state.buffer.clone())
};
let mut cap_buffer = buffer.clone();
cap_buffer.enforce_terminal_cap();
let cache = self.ensure_terminal_output_cache(task);
self.enqueue_completion_from_parts(
&metadata,
Some(&buffer),
None,
emit_frame,
cache.as_ref(),
);
Ok(())
}
fn persist_task(&self, paths: &TaskPaths, metadata: &PersistedTask) -> std::io::Result<()> {
write_task(&paths.json, metadata)?;
self.dual_write_task(paths, metadata);
Ok(())
}
fn update_task_metadata<F>(
&self,
paths: &TaskPaths,
update: F,
) -> std::io::Result<PersistedTask>
where
F: FnOnce(&mut PersistedTask),
{
let metadata = update_task(&paths.json, update)?;
self.dual_write_task(paths, &metadata);
Ok(metadata)
}
fn dual_write_task(&self, paths: &TaskPaths, metadata: &PersistedTask) {
let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
let Some(pool) = pool else {
return;
};
let harness = self
.inner
.db_harness
.read()
.ok()
.and_then(|slot| slot.clone());
let Some(harness) = harness else {
crate::slog_warn!(
"dual-write bash_task to DB skipped for {}: harness not configured",
metadata.task_id
);
return;
};
let row = match metadata.to_bash_task_row(&harness, paths) {
Ok(row) => row,
Err(error) => {
crate::slog_warn!(
"dual-write bash_task to DB failed for {}: {}",
metadata.task_id,
error
);
return;
}
};
let conn = match pool.lock() {
Ok(conn) => conn,
Err(_) => {
crate::slog_warn!(
"dual-write bash_task to DB failed for {}: db mutex poisoned",
metadata.task_id
);
return;
}
};
if let Err(error) = crate::db::bash_tasks::upsert_bash_task(&conn, &row) {
crate::slog_warn!(
"dual-write bash_task to DB failed for {}: {}",
metadata.task_id,
error
);
}
}
pub fn configure_long_running_reminders(&self, enabled: bool, interval_ms: u64) {
self.inner
.long_running_reminder_enabled
.store(enabled, Ordering::SeqCst);
self.inner
.long_running_reminder_interval_ms
.store(interval_ms, Ordering::SeqCst);
}
#[cfg(unix)]
#[allow(clippy::too_many_arguments)]
pub fn spawn(
&self,
command: &str,
session_id: String,
workdir: PathBuf,
env: HashMap<String, String>,
timeout: Option<Duration>,
storage_dir: PathBuf,
max_running: usize,
notify_on_completion: bool,
compressed: bool,
project_root: Option<PathBuf>,
) -> Result<String, String> {
self.start_watchdog();
let running = self.running_count();
if running >= max_running {
return Err(format!(
"background bash task limit exceeded: {running} running (max {max_running})"
));
}
let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
let task_id = self.generate_unique_task_id()?;
let paths = task_paths(&storage_dir, &session_id, &task_id);
fs::create_dir_all(&paths.dir)
.map_err(|e| format!("failed to create background task dir: {e}"))?;
let mut metadata = PersistedTask::starting(
task_id.clone(),
session_id.clone(),
command.to_string(),
workdir.clone(),
project_root,
timeout_ms,
notify_on_completion,
compressed,
);
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist background task metadata: {e}"))?;
create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to create stdout capture file: {e}"))?;
create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to create stderr capture file: {e}"))?;
let child = match spawn_detached_child(command, &paths, &workdir, &env) {
Ok(child) => child,
Err(error) => {
crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
let _ = delete_task_bundle(&paths);
return Err(error);
}
};
let child_pid = child.id();
metadata.mark_running(child_pid, child_pid as i32);
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started: Instant::now(),
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(None),
state: Mutex::new(BgTaskState {
metadata,
runtime: TaskRuntime::Piped(Some(child)),
detached: false,
child_exit_observed: false,
buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
terminal_output_cache: None,
pending_terminal_override: None,
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id.clone(), task);
Ok(task_id)
}
#[allow(clippy::too_many_arguments)]
pub fn spawn_pty(
&self,
command: &str,
session_id: String,
workdir: PathBuf,
env: HashMap<String, String>,
timeout: Option<Duration>,
storage_dir: PathBuf,
max_running: usize,
notify_on_completion: bool,
compressed: bool,
project_root: Option<PathBuf>,
rows: u16,
cols: u16,
) -> Result<String, String> {
self.start_watchdog();
let running = self.running_count();
if running >= max_running {
return Err(format!(
"background bash task limit exceeded: {running} running (max {max_running})"
));
}
let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
let task_id = self.generate_unique_task_id()?;
let paths = task_paths(&storage_dir, &session_id, &task_id);
fs::create_dir_all(&paths.dir)
.map_err(|e| format!("failed to create background task dir: {e}"))?;
let mut metadata = PersistedTask::starting(
task_id.clone(),
session_id.clone(),
command.to_string(),
workdir.clone(),
project_root,
timeout_ms,
notify_on_completion,
compressed,
);
metadata.mode = BgMode::Pty;
metadata.pty_rows = Some(rows);
metadata.pty_cols = Some(cols);
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist background task metadata: {e}"))?;
create_capture_file(&paths.pty)
.map_err(|e| format!("failed to create PTY capture file: {e}"))?;
let runtime = match spawn_pty_for_command(
&task_id,
&session_id,
command,
&paths,
&workdir,
&env,
rows,
cols,
self.inner.wake_tx.clone(),
) {
Ok(runtime) => runtime,
Err(error) => {
crate::slog_warn!(
"failed to spawn PTY background bash task {task_id}; deleting partial bundle: {error}"
);
let _ = delete_task_bundle(&paths);
return Err(error);
}
};
if let Some(child_pid) = runtime.child_pid {
metadata.mark_running(child_pid, child_pid as i32);
} else {
metadata.status = BgTaskStatus::Running;
metadata.pgid = None;
}
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started: Instant::now(),
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(None),
state: Mutex::new(BgTaskState {
metadata,
runtime: TaskRuntime::Pty(Some(runtime)),
detached: false,
child_exit_observed: false,
buffer: BgBuffer::pty(paths.pty.clone()),
terminal_output_cache: None,
pending_terminal_override: None,
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id.clone(), task);
Ok(task_id)
}
#[cfg(windows)]
#[allow(clippy::too_many_arguments)]
pub fn spawn(
&self,
command: &str,
session_id: String,
workdir: PathBuf,
env: HashMap<String, String>,
timeout: Option<Duration>,
storage_dir: PathBuf,
max_running: usize,
notify_on_completion: bool,
compressed: bool,
project_root: Option<PathBuf>,
) -> Result<String, String> {
self.start_watchdog();
let running = self.running_count();
if running >= max_running {
return Err(format!(
"background bash task limit exceeded: {running} running (max {max_running})"
));
}
let timeout = timeout.or(Some(DEFAULT_BG_TIMEOUT));
let timeout_ms = timeout.map(|timeout| timeout.as_millis() as u64);
let task_id = self.generate_unique_task_id()?;
let paths = task_paths(&storage_dir, &session_id, &task_id);
fs::create_dir_all(&paths.dir)
.map_err(|e| format!("failed to create background task dir: {e}"))?;
let mut metadata = PersistedTask::starting(
task_id.clone(),
session_id.clone(),
command.to_string(),
workdir.clone(),
project_root,
timeout_ms,
notify_on_completion,
compressed,
);
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist background task metadata: {e}"))?;
create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to create stdout capture file: {e}"))?;
create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to create stderr capture file: {e}"))?;
let child = match spawn_detached_child(command, &paths, &workdir, &env) {
Ok(child) => child,
Err(error) => {
crate::slog_warn!("failed to spawn background bash task {task_id}; deleting partial bundle: {error}");
let _ = delete_task_bundle(&paths);
return Err(error);
}
};
let child_pid = child.id();
metadata.status = BgTaskStatus::Running;
metadata.child_pid = Some(child_pid);
metadata.pgid = None;
self.persist_task(&paths, &metadata)
.map_err(|e| format!("failed to persist running background task metadata: {e}"))?;
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started: Instant::now(),
last_reminder_at: Mutex::new(None),
terminal_at: Mutex::new(None),
state: Mutex::new(BgTaskState {
metadata,
runtime: TaskRuntime::Piped(Some(child)),
detached: false,
child_exit_observed: false,
buffer: BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()),
terminal_output_cache: None,
pending_terminal_override: None,
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id.clone(), task);
Ok(task_id)
}
pub fn write_pty(
&self,
task_id: &str,
session_id: &str,
input: &[u8],
) -> Result<usize, String> {
let task = self
.task_for_session(task_id, session_id)
.ok_or_else(|| "task_not_found".to_string())?;
let writer = {
let state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if state.metadata.mode != BgMode::Pty {
return Err("task_not_pty".to_string());
}
if state.metadata.status.is_terminal() {
return Err("task_exited".to_string());
}
match &state.runtime {
TaskRuntime::Pty(Some(runtime)) => Arc::clone(&runtime.writer),
TaskRuntime::Pty(None) => return Err("task_exited".to_string()),
TaskRuntime::Piped(_) => return Err("task_not_pty".to_string()),
}
};
let mut writer = writer
.lock()
.map_err(|_| "PTY writer lock poisoned".to_string())?;
writer
.write_all(input)
.map_err(|error| format!("failed to write to PTY: {error}"))?;
writer
.flush()
.map_err(|error| format!("failed to flush PTY writer: {error}"))?;
Ok(input.len())
}
pub fn replay_session(&self, storage_dir: &Path, session_id: &str) -> Result<(), String> {
self.replay_session_inner(storage_dir, session_id, None)
}
pub fn replay_session_for_project(
&self,
storage_dir: &Path,
session_id: &str,
project_root: &Path,
) -> Result<(), String> {
self.replay_session_inner(storage_dir, session_id, Some(project_root))
}
fn replay_session_inner(
&self,
storage_dir: &Path,
session_id: &str,
project_root: Option<&Path>,
) -> Result<(), String> {
self.start_watchdog();
if !self.inner.persisted_gc_started.swap(true, Ordering::SeqCst) {
if let Err(error) = self.maybe_gc_persisted(storage_dir) {
crate::slog_warn!("failed to GC persisted background bash tasks: {error}");
}
}
let canonical_project = project_root.map(canonicalized_path);
let tasks = match self.replay_session_from_db(session_id) {
Some(Ok(tasks)) if !tasks.is_empty() => tasks,
Some(Ok(_)) => {
let disk_tasks = self.replay_session_from_disk(storage_dir, session_id)?;
if !disk_tasks.is_empty() {
crate::slog_info!(
"bash task replay: 0 in DB for session {}, {} from disk fallback",
session_id,
disk_tasks.len()
);
}
disk_tasks
}
Some(Err(error)) => {
crate::slog_warn!(
"bash task replay DB lookup failed for session {}; falling back to disk: {}",
session_id,
error
);
self.replay_session_from_disk(storage_dir, session_id)?
}
None => {
self.replay_session_from_disk(storage_dir, session_id)?
}
};
for mut metadata in tasks {
if metadata.session_id != session_id {
continue;
}
if let Some(canonical_project) = canonical_project.as_deref() {
let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
if metadata_project.as_deref() != Some(canonical_project) {
continue;
}
}
let paths = task_paths(storage_dir, session_id, &metadata.task_id);
match metadata.status {
BgTaskStatus::Starting => {
let completion_was_delivered = metadata.completion_delivered;
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("spawn aborted".to_string()),
);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
}
BgTaskStatus::Running | BgTaskStatus::Killing => {
if metadata.mode == BgMode::Pty {
if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
let completion_was_delivered = metadata.completion_delivered;
metadata = terminal_metadata_from_marker(metadata, marker, None);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
} else if metadata.status.is_terminal() {
self.insert_rehydrated_task(metadata, paths, true)?;
} else {
let completion_was_delivered = metadata.completion_delivered;
metadata.mark_terminal(
BgTaskStatus::Killed,
None,
Some("pty_lost_on_bridge_restart".to_string()),
);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
}
} else if self.running_metadata_is_stale(&metadata) {
let completion_was_delivered = metadata.completion_delivered;
metadata.mark_terminal(
BgTaskStatus::Killed,
None,
Some("orphaned (>24h)".to_string()),
);
metadata.completion_delivered |= completion_was_delivered;
if !paths.exit.exists() {
let _ = write_kill_marker_if_absent(&paths.exit);
}
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
} else if let Ok(Some(marker)) = read_exit_marker(&paths.exit) {
let reason = (metadata.status == BgTaskStatus::Killing).then(|| {
"recovered from inconsistent killing state on replay".to_string()
});
if reason.is_some() {
crate::slog_warn!("background task {} had killing state with exit marker; preferring marker",
metadata.task_id);
}
let completion_was_delivered = metadata.completion_delivered;
metadata = terminal_metadata_from_marker(metadata, marker, reason);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
} else if metadata.status == BgTaskStatus::Killing {
if !paths.exit.exists() {
let _ = write_kill_marker_if_absent(&paths.exit);
}
let completion_was_delivered = metadata.completion_delivered;
metadata.mark_terminal(
BgTaskStatus::Killed,
None,
Some("recovered from inconsistent killing state on replay".to_string()),
);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
} else if metadata.child_pid.is_some_and(|pid| !is_process_alive(pid)) {
let completion_was_delivered = metadata.completion_delivered;
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("process exited without exit marker".to_string()),
);
metadata.completion_delivered |= completion_was_delivered;
let _ = self.persist_task(&paths, &metadata);
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
} else {
self.insert_rehydrated_task(metadata, paths, true)?;
}
}
_ if metadata.status.is_terminal() => {
self.enqueue_completion_if_needed(&metadata, Some(&paths), false);
self.insert_rehydrated_task(metadata, paths, true)?;
}
_ => {}
}
}
Ok(())
}
fn replay_session_from_db(
&self,
session_id: &str,
) -> Option<Result<Vec<PersistedTask>, String>> {
let pool = self
.inner
.db_pool
.read()
.ok()
.and_then(|slot| slot.clone())?;
let harness = self
.inner
.db_harness
.read()
.ok()
.and_then(|slot| slot.clone())?;
let conn = match pool.lock() {
Ok(conn) => conn,
Err(_) => return Some(Err("db mutex poisoned".to_string())),
};
Some(
crate::db::bash_tasks::list_bash_tasks_for_session(&conn, &harness, session_id)
.map(|rows| rows.into_iter().map(PersistedTask::from).collect())
.map_err(|error| error.to_string()),
)
}
fn replay_session_from_disk(
&self,
storage_dir: &Path,
session_id: &str,
) -> Result<Vec<PersistedTask>, String> {
let dir = session_tasks_dir(storage_dir, session_id);
if !dir.exists() {
return Ok(Vec::new());
}
let entries = fs::read_dir(&dir)
.map_err(|e| format!("failed to read background task dir {}: {e}", dir.display()))?;
let mut tasks = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|extension| extension.to_str()) != Some("json") {
continue;
}
match read_task(&path) {
Ok(metadata) => tasks.push(metadata),
Err(error) => {
crate::slog_warn!(
"quarantining invalid background task metadata {} during replay: {error}",
path.display()
);
if let Err(quarantine_error) =
quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
{
crate::slog_warn!(
"failed to quarantine invalid background task metadata {}: {quarantine_error}",
path.display()
);
}
}
}
}
Ok(tasks)
}
pub fn register_watch(
&self,
task_id: String,
pattern: WatchPattern,
once: bool,
) -> Result<String, &'static str> {
let task = self.task(&task_id).ok_or("task_not_found")?;
let (mode, terminal_at_registration, stdout, stderr, pty) = task
.state
.lock()
.map(|state| {
(
state.metadata.mode.clone(),
state.metadata.status.is_terminal(),
task.paths.stdout.clone(),
task.paths.stderr.clone(),
task.paths.pty.clone(),
)
})
.map_err(|_| "background_task_lock_poisoned")?;
let mut terminal_matches = Vec::new();
let scanned_terminal = terminal_at_registration;
let watch_id = {
let mut registry = self
.inner
.watch_registry
.lock()
.map_err(|_| "watch_registry_poisoned")?;
let watch_id = registry.register(task_id.clone(), pattern, once)?;
match &mode {
BgMode::Pipes => {
let stdout_key = format!("{task_id}:stdout");
let stderr_key = format!("{task_id}:stderr");
if terminal_at_registration {
registry.set_file_cursor(&stdout_key, 0);
registry.set_file_cursor(&stderr_key, 0);
terminal_matches.extend(registry.scan_file_new_bytes(
&stdout_key,
&task_id,
&stdout,
));
terminal_matches.extend(registry.scan_file_new_bytes(
&stderr_key,
&task_id,
&stderr,
));
} else {
registry.prime_file_cursor(&stdout_key, &stdout);
registry.prime_file_cursor(&stderr_key, &stderr);
}
}
BgMode::Pty => {
let pty_key = format!("{task_id}:pty");
if terminal_at_registration {
registry.set_file_cursor(&pty_key, 0);
terminal_matches
.extend(registry.scan_file_new_bytes(&pty_key, &task_id, &pty));
} else {
registry.prime_file_cursor(&pty_key, &pty);
}
}
}
watch_id
};
if task.is_terminal() {
if !scanned_terminal {
terminal_matches = {
let mut registry = self
.inner
.watch_registry
.lock()
.map_err(|_| "watch_registry_poisoned")?;
match &mode {
BgMode::Pipes => {
let stdout_key = format!("{task_id}:stdout");
let stderr_key = format!("{task_id}:stderr");
registry.set_file_cursor(&stdout_key, 0);
registry.set_file_cursor(&stderr_key, 0);
let mut matches =
registry.scan_file_new_bytes(&stdout_key, &task_id, &stdout);
matches.extend(registry.scan_file_new_bytes(
&stderr_key,
&task_id,
&stderr,
));
matches
}
BgMode::Pty => {
let pty_key = format!("{task_id}:pty");
registry.set_file_cursor(&pty_key, 0);
registry.scan_file_new_bytes(&pty_key, &task_id, &pty)
}
}
};
}
let (watch_controlled, watch_matched) = self.task_watch_state(&task_id);
if terminal_matches.is_empty() && (!watch_controlled || watch_matched) {
if watch_matched {
let _ = task.set_completion_delivered(true, self);
self.clear_task_watch_state(&task_id);
}
return Ok(watch_id);
}
let completion = self
.remove_pending_completion(&task_id)
.or_else(|| self.completion_snapshot_for_task(&task, BG_COMPLETION_PREVIEW_BYTES));
if terminal_matches.is_empty() {
if let Some(completion) = completion.as_ref() {
self.emit_bash_watch_exit(completion);
}
} else {
for pattern_match in terminal_matches {
self.emit_bash_pattern_match(&task.session_id, pattern_match);
}
}
let _ = task.set_completion_delivered(true, self);
self.clear_task_watch_state(&task_id);
}
Ok(watch_id)
}
pub fn unregister_watch(&self, task_id: &str, watch_id: &str) {
if let Ok(mut registry) = self.inner.watch_registry.lock() {
registry.unregister(task_id, watch_id);
}
}
pub fn active_watch_count(&self, task_id: &str) -> usize {
self.inner
.watch_registry
.lock()
.map(|registry| registry.active_count(task_id))
.unwrap_or(0)
}
fn task_watch_state(&self, task_id: &str) -> (bool, bool) {
self.inner
.watch_registry
.lock()
.map(|registry| {
(
registry.has_controlled_task(task_id),
registry.has_matched_task(task_id),
)
})
.unwrap_or((false, false))
}
fn task_has_watch_control(&self, task_id: &str) -> bool {
self.inner
.watch_registry
.lock()
.map(|registry| registry.has_controlled_task(task_id))
.unwrap_or(false)
}
fn clear_task_watch_state(&self, task_id: &str) {
if let Ok(mut registry) = self.inner.watch_registry.lock() {
registry.clear_task(task_id);
}
}
pub(crate) fn scan_task_watch_output(&self, task: &Arc<BgTask>) {
let (mode, stdout, stderr, pty) = match task.state.lock() {
Ok(state) => (
state.metadata.mode.clone(),
task.paths.stdout.clone(),
task.paths.stderr.clone(),
task.paths.pty.clone(),
),
Err(_) => return,
};
let mut matches = Vec::new();
if let Ok(mut registry) = self.inner.watch_registry.lock() {
match mode {
BgMode::Pipes => {
let stdout_key = format!("{}:stdout", task.task_id);
let stderr_key = format!("{}:stderr", task.task_id);
matches.extend(registry.scan_file_new_bytes(
&stdout_key,
&task.task_id,
&stdout,
));
matches.extend(registry.scan_file_new_bytes(
&stderr_key,
&task.task_id,
&stderr,
));
}
BgMode::Pty => {
let pty_key = format!("{}:pty", task.task_id);
matches.extend(registry.scan_file_new_bytes(&pty_key, &task.task_id, &pty));
}
}
}
for pattern_match in matches {
self.emit_bash_pattern_match(&task.session_id, pattern_match);
}
}
pub fn status(
&self,
task_id: &str,
session_id: &str,
project_root: Option<&Path>,
storage_dir: Option<&Path>,
preview_bytes: usize,
) -> Option<BgTaskSnapshot> {
let mut task = self.task_for_session(task_id, session_id);
if task.is_none() {
if let Some(storage_dir) = storage_dir {
let _ = self.replay_session(storage_dir, session_id);
task = self.task_for_session(task_id, session_id);
}
}
let Some(task) = task else {
return self.status_relaxed(
task_id,
session_id,
project_root?,
storage_dir?,
preview_bytes,
);
};
let _ = self.poll_task(&task);
Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
}
fn status_relaxed_task(
&self,
task_id: &str,
project_root: &Path,
storage_dir: &Path,
) -> Option<Arc<BgTask>> {
let canonical_project = canonicalized_path(project_root);
match self.lookup_relaxed_task_from_db(task_id, project_root) {
Some(Ok(Some(metadata))) => {
if let Some(task) = self.task(task_id) {
let matches_project = task
.state
.lock()
.map(|state| {
state
.metadata
.project_root
.as_deref()
.map(canonicalized_path)
.as_deref()
== Some(canonical_project.as_path())
})
.unwrap_or(false);
return matches_project.then_some(task);
}
let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
if self.insert_rehydrated_task(metadata, paths, true).is_err() {
return None;
}
return self.task(task_id);
}
Some(Ok(None)) => {
crate::slog_info!(
"bash task relaxed DB miss for {}; falling back to disk",
task_id
);
}
Some(Err(error)) => {
crate::slog_warn!(
"bash task relaxed DB lookup failed for {}; falling back to disk: {}",
task_id,
error
);
}
None => {
crate::slog_info!(
"bash task relaxed DB unavailable for {}; falling back to disk",
task_id
);
}
}
let root = storage_dir.join("bash-tasks");
let entries = fs::read_dir(&root).ok()?;
for entry in entries.flatten() {
let dir = entry.path();
if !dir.is_dir() {
continue;
}
let path = dir.join(format!("{task_id}.json"));
if !path.exists() {
continue;
}
let metadata = match read_task(&path) {
Ok(metadata) => metadata,
Err(error) => {
crate::slog_warn!(
"quarantining invalid background task metadata {} during relaxed lookup: {error}",
path.display()
);
if let Err(quarantine_error) =
quarantine_task_json(storage_dir, &dir, &path, QuarantineKind::Invalid)
{
crate::slog_warn!(
"failed to quarantine invalid background task metadata {}: {quarantine_error}",
path.display()
);
}
continue;
}
};
let metadata_project = metadata.project_root.as_deref().map(canonicalized_path);
if metadata_project.as_deref() != Some(canonical_project.as_path()) {
continue;
}
if let Some(task) = self.task(task_id) {
let matches_project = task
.state
.lock()
.map(|state| {
state
.metadata
.project_root
.as_deref()
.map(canonicalized_path)
.as_deref()
== Some(canonical_project.as_path())
})
.unwrap_or(false);
return matches_project.then_some(task);
}
let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
if self.insert_rehydrated_task(metadata, paths, true).is_err() {
return None;
}
return self.task(task_id);
}
None
}
fn lookup_relaxed_task_from_db(
&self,
task_id: &str,
project_root: &Path,
) -> Option<Result<Option<PersistedTask>, String>> {
let pool = self
.inner
.db_pool
.read()
.ok()
.and_then(|slot| slot.clone())?;
let harness = self
.inner
.db_harness
.read()
.ok()
.and_then(|slot| slot.clone())?;
let conn = match pool.lock() {
Ok(conn) => conn,
Err(_) => return Some(Err("db mutex poisoned".to_string())),
};
let project_key = crate::search_index::project_cache_key(project_root);
Some(
crate::db::bash_tasks::find_bash_task_for_project(
&conn,
&harness,
&project_key,
task_id,
)
.map(|row| row.map(PersistedTask::from))
.map_err(|error| error.to_string()),
)
}
pub(super) fn status_relaxed(
&self,
task_id: &str,
_session_id: &str,
project_root: &Path,
storage_dir: &Path,
preview_bytes: usize,
) -> Option<BgTaskSnapshot> {
let task = self.status_relaxed_task(task_id, project_root, storage_dir)?;
let _ = self.poll_task(&task);
Some(self.snapshot_with_terminal_cache(&task, preview_bytes))
}
pub fn kill_relaxed(
&self,
task_id: &str,
project_root: &Path,
storage_dir: &Path,
) -> Result<BgTaskSnapshot, String> {
let task = self
.status_relaxed_task(task_id, project_root, storage_dir)
.ok_or_else(|| format!("background task not found: {task_id}"))?;
self.kill_with_status(task_id, &task.session_id, BgTaskStatus::Killed)
}
pub fn maybe_gc_persisted(&self, storage_dir: &Path) -> Result<usize, String> {
#[cfg(test)]
self.inner.persisted_gc_runs.fetch_add(1, Ordering::SeqCst);
let mut deleted = 0usize;
let root = storage_dir.join("bash-tasks");
if root.exists() {
let session_dirs = fs::read_dir(&root).map_err(|e| {
format!(
"failed to read background task root {}: {e}",
root.display()
)
})?;
for session_entry in session_dirs.flatten() {
let session_dir = session_entry.path();
if !session_dir.is_dir() {
continue;
}
let task_entries = match fs::read_dir(&session_dir) {
Ok(entries) => entries,
Err(error) => {
crate::slog_warn!(
"failed to read background task session dir {}: {error}",
session_dir.display()
);
continue;
}
};
for task_entry in task_entries.flatten() {
let json_path = task_entry.path();
if json_path
.extension()
.and_then(|extension| extension.to_str())
!= Some("json")
{
continue;
}
if modified_within(&json_path, PERSISTED_GC_GRACE) {
continue;
}
let metadata = match read_task(&json_path) {
Ok(metadata) => metadata,
Err(error) => {
crate::slog_warn!(
"quarantining corrupt background task metadata {}: {error}",
json_path.display()
);
quarantine_task_json(
storage_dir,
&session_dir,
&json_path,
QuarantineKind::Corrupt,
)?;
continue;
}
};
if !(metadata.status.is_terminal() && metadata.completion_delivered) {
continue;
}
let paths = task_paths(storage_dir, &metadata.session_id, &metadata.task_id);
match delete_task_bundle(&paths) {
Ok(()) => {
deleted += 1;
log::debug!(
"deleted persisted background task bundle {}",
metadata.task_id
);
}
Err(error) => {
crate::slog_warn!(
"failed to delete background task bundle {}: {error}",
metadata.task_id
);
continue;
}
}
}
}
}
gc_quarantine(storage_dir);
Ok(deleted)
}
pub fn list(&self, preview_bytes: usize) -> Vec<BgTaskSnapshot> {
let tasks = self
.inner
.tasks
.lock()
.map(|tasks| tasks.values().cloned().collect::<Vec<_>>())
.unwrap_or_default();
tasks
.into_iter()
.map(|task| {
let _ = self.poll_task(&task);
self.snapshot_with_terminal_cache(&task, preview_bytes)
})
.collect()
}
fn maybe_compress_snapshot(&self, task: &Arc<BgTask>, snapshot: &mut BgTaskSnapshot) {
if !snapshot.info.status.is_terminal() || snapshot.info.mode == BgMode::Pty {
return;
}
if let Some(cache) = self.ensure_terminal_output_cache(task) {
snapshot.output_preview = cache.output_preview;
snapshot.output_truncated = cache.output_truncated;
}
}
pub fn kill(&self, task_id: &str, session_id: &str) -> Result<BgTaskSnapshot, String> {
self.kill_with_status(task_id, session_id, BgTaskStatus::Killed)
}
pub fn promote(&self, task_id: &str, session_id: &str) -> Result<bool, String> {
let task = self
.task_for_session(task_id, session_id)
.ok_or_else(|| format!("background task not found: {task_id}"))?;
let terminal_after_promote = {
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
let updated = self
.update_task_metadata(&task.paths, |metadata| {
metadata.notify_on_completion = true;
metadata.completion_delivered = false;
})
.map_err(|e| format!("failed to promote background task: {e}"))?;
state.metadata = updated;
state.metadata.status.is_terminal()
};
if terminal_after_promote {
self.post_terminal_transition(&task, true)?;
}
Ok(true)
}
pub(crate) fn kill_for_timeout(&self, task_id: &str, session_id: &str) -> Result<(), String> {
self.kill_with_status(task_id, session_id, BgTaskStatus::TimedOut)
.map(|_| ())
}
pub fn cleanup_finished(&self, older_than: Duration) {
let cutoff = Instant::now().checked_sub(older_than);
let removable_paths: Vec<(String, TaskPaths)> =
if let Ok(mut tasks) = self.inner.tasks.lock() {
let removable = tasks
.iter()
.filter_map(|(task_id, task)| {
let delivered_terminal = task
.state
.lock()
.map(|state| {
state.metadata.status.is_terminal()
&& state.metadata.completion_delivered
})
.unwrap_or(false);
if !delivered_terminal {
return None;
}
let terminal_at = task.terminal_at.lock().ok().and_then(|at| *at);
let expired = match (terminal_at, cutoff) {
(Some(terminal_at), Some(cutoff)) => terminal_at <= cutoff,
(Some(_), None) => true,
(None, _) => false,
};
expired.then(|| task_id.clone())
})
.collect::<Vec<_>>();
removable
.into_iter()
.filter_map(|task_id| {
tasks
.remove(&task_id)
.map(|task| (task_id, task.paths.clone()))
})
.collect()
} else {
Vec::new()
};
for (task_id, paths) in removable_paths {
match delete_task_bundle(&paths) {
Ok(()) => log::debug!("deleted persisted background task bundle {task_id}"),
Err(error) => crate::slog_warn!(
"failed to delete persisted background task bundle {task_id}: {error}"
),
}
}
}
pub fn drain_completions(&self) -> Vec<BgCompletion> {
self.drain_completions_for_session(None)
}
pub fn drain_completions_for_session(&self, session_id: Option<&str>) -> Vec<BgCompletion> {
let completions = match self.inner.completions.lock() {
Ok(completions) => completions,
Err(_) => return Vec::new(),
};
completions
.iter()
.filter(|completion| {
session_id
.map(|session_id| completion.session_id == session_id)
.unwrap_or(true)
})
.cloned()
.collect()
}
pub fn ack_completions_for_session(
&self,
session_id: Option<&str>,
task_ids: &[String],
) -> Vec<String> {
if task_ids.is_empty() {
return Vec::new();
}
let requested_task_ids = task_ids.iter().map(String::as_str).collect::<HashSet<_>>();
let mut completion_sessions = HashMap::new();
if let Ok(mut completions) = self.inner.completions.lock() {
completions.retain(|completion| {
let session_matches = session_id
.map(|session_id| completion.session_id == session_id)
.unwrap_or(true);
if session_matches && requested_task_ids.contains(completion.task_id.as_str()) {
completion_sessions
.insert(completion.task_id.clone(), completion.session_id.clone());
false
} else {
true
}
});
}
let mut delivered = Vec::new();
for task_id in task_ids {
let task = if let Some(session_id) = session_id {
self.task_for_session(task_id, session_id)
} else if let Some(completion_session_id) = completion_sessions.get(task_id) {
self.task_for_session(task_id, completion_session_id)
} else {
self.task(task_id)
};
if let Some(task) = task {
if task.set_completion_delivered(true, self).is_ok() {
delivered.push(task_id.clone());
}
}
}
delivered
}
pub fn pending_completions_for_session(&self, session_id: &str) -> Vec<BgCompletion> {
self.inner
.completions
.lock()
.map(|completions| {
completions
.iter()
.filter(|completion| completion.session_id == session_id)
.cloned()
.collect()
})
.unwrap_or_default()
}
fn remove_pending_completion(&self, task_id: &str) -> Option<BgCompletion> {
let mut completions = self.inner.completions.lock().ok()?;
let idx = completions
.iter()
.position(|completion| completion.task_id == task_id)?;
completions.remove(idx)
}
fn completion_snapshot_for_task(
&self,
task: &Arc<BgTask>,
_preview_bytes: usize,
) -> Option<BgCompletion> {
let snapshot = self.snapshot_with_terminal_cache(task, RUNNING_OUTPUT_PREVIEW_BYTES);
if !snapshot.info.status.is_terminal() {
return None;
}
let (output_preview, output_truncated) = if snapshot.info.mode == BgMode::Pty {
(String::new(), false)
} else {
self.ensure_terminal_output_cache(task)
.map(|cache| completion_preview_for_cache(&cache))
.unwrap_or_else(|| (String::new(), false))
};
Some(BgCompletion {
task_id: snapshot.info.task_id,
session_id: task.session_id.clone(),
status: snapshot.info.status,
exit_code: snapshot.exit_code,
command: snapshot.info.command,
output_preview,
output_truncated,
original_tokens: None,
compressed_tokens: None,
tokens_skipped: false,
})
}
pub fn detach(&self) {
self.inner.shutdown.store(true, Ordering::SeqCst);
if let Ok(mut tasks) = self.inner.tasks.lock() {
for task in tasks.values() {
if let Ok(mut state) = task.state.lock() {
match &mut state.runtime {
TaskRuntime::Piped(child) => *child = None,
TaskRuntime::Pty(runtime) => *runtime = None,
}
state.detached = true;
}
}
tasks.clear();
}
}
pub fn shutdown(&self) {
let tasks = self
.inner
.tasks
.lock()
.map(|tasks| {
tasks
.values()
.map(|task| (task.task_id.clone(), task.session_id.clone()))
.collect::<Vec<_>>()
})
.unwrap_or_default();
for (task_id, session_id) in tasks {
let _ = self.kill(&task_id, &session_id);
}
}
pub(crate) fn poll_task(&self, task: &Arc<BgTask>) -> Result<(), String> {
if let Ok(state) = task.state.lock() {
if let TaskRuntime::Pty(Some(pty)) = &state.runtime {
if !pty.exit_observed.load(Ordering::SeqCst) {
return Ok(());
}
}
}
let marker = match read_exit_marker(&task.paths.exit) {
Ok(Some(marker)) => marker,
Ok(None) => return Ok(()),
Err(error) => return Err(format!("failed to read exit marker: {error}")),
};
self.finalize_from_marker(task, marker, None)
}
pub(crate) fn reap_child(&self, task: &Arc<BgTask>) {
let mut needs_completion = false;
{
let Ok(mut state) = task.state.lock() else {
return;
};
match &mut state.runtime {
TaskRuntime::Piped(child_slot) => {
if let Some(child) = child_slot.as_mut() {
if matches!(child.try_wait(), Ok(Some(_))) {
*child_slot = None;
state.detached = true;
state.child_exit_observed = true;
}
} else if state.detached {
let child_known_dead = state.child_exit_observed
|| state
.metadata
.child_pid
.is_some_and(|pid| !is_process_alive(pid));
if child_known_dead {
needs_completion =
self.fail_without_exit_marker_if_needed(task, &mut state);
}
}
}
TaskRuntime::Pty(Some(pty)) => {
if pty.exit_observed.load(Ordering::SeqCst) {
drop(state);
let _ = self.poll_task(task);
return;
}
}
TaskRuntime::Pty(None) => {}
}
}
if needs_completion {
let _ = self.post_terminal_transition(task, true);
}
}
fn fail_without_exit_marker_if_needed(
&self,
task: &Arc<BgTask>,
state: &mut BgTaskState,
) -> bool {
if state.metadata.status.is_terminal() {
return false;
}
if matches!(read_exit_marker(&task.paths.exit), Ok(Some(_))) {
return false;
}
let watch_controlled = self.task_has_watch_control(&task.task_id);
let updated = self.update_task_metadata(&task.paths, |metadata| {
metadata.mark_terminal(
BgTaskStatus::Failed,
None,
Some("process exited without exit marker".to_string()),
);
if watch_controlled {
metadata.completion_delivered = true;
}
});
if let Ok(metadata) = updated {
state.pending_terminal_override = None;
state.metadata = metadata;
task.mark_terminal_now();
return true;
}
false
}
pub(crate) fn running_tasks(&self) -> Vec<Arc<BgTask>> {
self.inner
.tasks
.lock()
.map(|tasks| {
tasks
.values()
.filter(|task| task.is_running())
.cloned()
.collect()
})
.unwrap_or_default()
}
fn insert_rehydrated_task(
&self,
metadata: PersistedTask,
paths: TaskPaths,
detached: bool,
) -> Result<(), String> {
let task_id = metadata.task_id.clone();
let session_id = metadata.session_id.clone();
let started = started_instant_from_unix_millis(metadata.started_at);
let suppress_replayed_running_reminder = metadata.status == BgTaskStatus::Running;
let mode = metadata.mode.clone();
let task = Arc::new(BgTask {
task_id: task_id.clone(),
session_id,
paths: paths.clone(),
started,
last_reminder_at: Mutex::new(suppress_replayed_running_reminder.then(Instant::now)),
terminal_at: Mutex::new(metadata.status.is_terminal().then(Instant::now)),
state: Mutex::new(BgTaskState {
metadata,
runtime: if mode == BgMode::Pty {
TaskRuntime::Pty(None)
} else {
TaskRuntime::Piped(None)
},
detached,
child_exit_observed: false,
buffer: if mode == BgMode::Pty {
BgBuffer::pty(paths.pty.clone())
} else {
BgBuffer::new(paths.stdout.clone(), paths.stderr.clone())
},
terminal_output_cache: None,
pending_terminal_override: None,
}),
});
self.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?
.insert(task_id, task);
Ok(())
}
fn kill_with_status(
&self,
task_id: &str,
session_id: &str,
terminal_status: BgTaskStatus,
) -> Result<BgTaskSnapshot, String> {
let task = self
.task_for_session(task_id, session_id)
.ok_or_else(|| format!("background task not found: {task_id}"))?;
let mut terminalized = false;
{
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if state.metadata.status.is_terminal() {
state.pending_terminal_override = None;
} else if let Ok(Some(marker)) = read_exit_marker(&task.paths.exit) {
state.metadata =
terminal_metadata_from_marker(state.metadata.clone(), marker, None);
if self.task_has_watch_control(&task.task_id) {
state.metadata.completion_delivered = true;
}
state.pending_terminal_override = None;
task.mark_terminal_now();
match &mut state.runtime {
TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
TaskRuntime::Pty(runtime) => *runtime = None,
}
state.detached = true;
self.persist_task(&task.paths, &state.metadata)
.map_err(|e| format!("failed to persist terminal state: {e}"))?;
terminalized = true;
} else {
let was_already_killing = state.metadata.status == BgTaskStatus::Killing;
if !was_already_killing {
state.metadata.status = BgTaskStatus::Killing;
self.persist_task(&task.paths, &state.metadata)
.map_err(|e| format!("failed to persist killing state: {e}"))?;
}
#[cfg(unix)]
let pgid = state.metadata.pgid;
#[cfg(windows)]
let child_pid = state.metadata.child_pid;
if !was_already_killing
&& state.metadata.mode == BgMode::Pty
&& terminal_status == BgTaskStatus::TimedOut
{
state.pending_terminal_override = Some(BgTaskStatus::TimedOut);
}
#[cfg(windows)]
let mut pty_forced_terminal_status: Option<BgTaskStatus> = None;
match &mut state.runtime {
TaskRuntime::Piped(child_slot) => {
#[cfg(unix)]
if let Some(pgid) = pgid {
terminate_pgid(pgid, child_slot.as_mut());
}
#[cfg(windows)]
if let Some(child) = child_slot.as_mut() {
super::process::terminate_process(child);
} else if let Some(pid) = child_pid {
terminate_pid(pid);
}
if let Some(child) = child_slot.as_mut() {
let _ = child.wait();
}
*child_slot = None;
state.detached = true;
if !task.paths.exit.exists() {
write_kill_marker_if_absent(&task.paths.exit)
.map_err(|e| format!("failed to write kill marker: {e}"))?;
}
let exit_code = if terminal_status == BgTaskStatus::TimedOut {
Some(124)
} else {
None
};
state
.metadata
.mark_terminal(terminal_status, exit_code, None);
if self.task_has_watch_control(&task.task_id) {
state.metadata.completion_delivered = true;
}
state.pending_terminal_override = None;
task.mark_terminal_now();
self.persist_task(&task.paths, &state.metadata)
.map_err(|e| format!("failed to persist killed state: {e}"))?;
terminalized = true;
}
TaskRuntime::Pty(Some(pty)) => {
pty.was_killed.store(true, Ordering::SeqCst);
if let Err(error) = pty.killer.kill() {
crate::slog_warn!(
"[pty-kill] {task_id} ChildKiller::kill failed: {error}"
);
}
if let Some(pid) = pty.child_pid {
#[cfg(unix)]
terminate_pgid(pid as i32, None);
#[cfg(windows)]
terminate_pid(pid);
}
drop(pty.master.take());
#[cfg(windows)]
{
let default_status = if terminal_status == BgTaskStatus::TimedOut {
BgTaskStatus::TimedOut
} else {
BgTaskStatus::Killed
};
pty_forced_terminal_status = Some(
state
.pending_terminal_override
.take()
.unwrap_or(default_status),
);
}
}
TaskRuntime::Pty(None) => {}
}
#[cfg(windows)]
if let Some(target_status) = pty_forced_terminal_status {
if !task.paths.exit.exists() {
write_kill_marker_if_absent(&task.paths.exit)
.map_err(|e| format!("failed to write kill marker: {e}"))?;
}
let exit_code = if target_status == BgTaskStatus::TimedOut {
Some(124)
} else {
None
};
state.metadata.mark_terminal(target_status, exit_code, None);
if self.task_has_watch_control(&task.task_id) {
state.metadata.completion_delivered = true;
}
state.pending_terminal_override = None;
task.mark_terminal_now();
if let TaskRuntime::Pty(runtime) = &mut state.runtime {
*runtime = None;
}
state.detached = true;
self.persist_task(&task.paths, &state.metadata)
.map_err(|e| format!("failed to persist killed PTY state: {e}"))?;
terminalized = true;
}
}
}
if terminalized {
self.post_terminal_transition(&task, true)?;
}
Ok(self.snapshot_with_terminal_cache(&task, RUNNING_OUTPUT_PREVIEW_BYTES))
}
fn finalize_from_marker(
&self,
task: &Arc<BgTask>,
marker: ExitMarker,
reason: Option<String>,
) -> Result<(), String> {
let watch_controlled = self.task_has_watch_control(&task.task_id);
let mut pty_reader_done = None;
{
let mut state = task
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
if state.metadata.status.is_terminal() {
state.pending_terminal_override = None;
return Ok(());
}
let pending_override = state.pending_terminal_override.take();
let is_pty = state.metadata.mode == BgMode::Pty;
let updated = self
.update_task_metadata(&task.paths, |metadata| {
let mut new_metadata = if is_pty && marker == ExitMarker::Killed {
let mut metadata = metadata.clone();
let target_status = pending_override.unwrap_or(BgTaskStatus::Killed);
let exit_code = if target_status == BgTaskStatus::TimedOut {
Some(124)
} else {
None
};
metadata.mark_terminal(target_status, exit_code, reason);
metadata
} else {
terminal_metadata_from_marker(metadata.clone(), marker, reason)
};
if watch_controlled {
new_metadata.completion_delivered = true;
}
*metadata = new_metadata;
})
.map_err(|e| format!("failed to persist terminal state: {e}"))?;
state.metadata = updated;
task.mark_terminal_now();
match &mut state.runtime {
TaskRuntime::Piped(child_slot) => reap_piped_child(child_slot),
TaskRuntime::Pty(runtime) => {
pty_reader_done = runtime
.as_ref()
.map(|runtime| Arc::clone(&runtime.reader_done));
*runtime = None;
}
}
state.detached = true;
}
if let Some(reader_done) = pty_reader_done {
let deadline = Instant::now() + Duration::from_millis(200);
while !reader_done.load(Ordering::SeqCst) && Instant::now() < deadline {
std::thread::sleep(Duration::from_millis(10));
}
}
self.scan_task_watch_output(task);
self.post_terminal_transition(task, true)
}
fn enqueue_completion_if_needed(
&self,
metadata: &PersistedTask,
paths: Option<&TaskPaths>,
emit_frame: bool,
) {
if metadata.status.is_terminal() && !metadata.completion_delivered {
let cache =
paths.and_then(|paths| self.render_terminal_output_from_paths(metadata, paths));
self.enqueue_completion_from_parts(metadata, None, paths, emit_frame, cache.as_ref());
}
}
fn render_terminal_output_from_paths(
&self,
metadata: &PersistedTask,
paths: &TaskPaths,
) -> Option<TerminalOutputCache> {
if metadata.mode == BgMode::Pty {
return None;
}
let buffer = BgBuffer::new(paths.stdout.clone(), paths.stderr.clone());
Some(self.render_terminal_output(metadata, &buffer))
}
fn enqueue_completion_from_parts(
&self,
metadata: &PersistedTask,
buffer: Option<&BgBuffer>,
paths: Option<&TaskPaths>,
emit_frame: bool,
terminal_render: Option<&TerminalOutputCache>,
) {
if !metadata.status.is_terminal() {
return;
}
let owned_buffer = if buffer.is_none() && metadata.mode != BgMode::Pty {
paths.map(|paths| BgBuffer::new(paths.stdout.clone(), paths.stderr.clone()))
} else {
None
};
let render_buffer = buffer.or(owned_buffer.as_ref());
let owned_render = if terminal_render.is_none() {
render_buffer.map(|buffer| self.render_terminal_output(metadata, buffer))
} else {
None
};
let render = terminal_render.or(owned_render.as_ref());
let (output_preview, output_truncated) = render
.map(completion_preview_for_cache)
.unwrap_or_else(|| (String::new(), false));
let token_counts = self.completion_token_counts(
metadata,
buffer,
paths,
render.map(|render| render.output_preview.as_str()),
);
let completion = BgCompletion {
task_id: metadata.task_id.clone(),
session_id: metadata.session_id.clone(),
status: metadata.status.clone(),
exit_code: metadata.exit_code,
command: metadata.command.clone(),
output_preview,
output_truncated,
original_tokens: token_counts.original_tokens,
compressed_tokens: token_counts.compressed_tokens,
tokens_skipped: token_counts.tokens_skipped,
};
self.record_compression_event_if_applicable(metadata, &token_counts);
let (watch_controlled, watch_matched) = self.task_watch_state(&metadata.task_id);
if watch_controlled {
if emit_frame && !watch_matched {
self.emit_bash_watch_exit(&completion);
}
self.clear_task_watch_state(&metadata.task_id);
return;
}
if metadata.completion_delivered {
return;
}
let pushed = if let Ok(mut completions) = self.inner.completions.lock() {
if completions
.iter()
.any(|existing| existing.task_id == metadata.task_id)
{
false
} else {
completions.push_back(completion.clone());
true
}
} else {
false
};
if pushed && emit_frame {
self.emit_bash_completed(completion);
}
}
fn record_compression_event_if_applicable(
&self,
metadata: &PersistedTask,
token_counts: &CompletionTokenCounts,
) {
if metadata.mode == BgMode::Pty {
return;
}
let (original_tokens, compressed_tokens, original_bytes, compressed_bytes) = match (
token_counts.original_tokens,
token_counts.compressed_tokens,
token_counts.original_bytes,
token_counts.compressed_bytes,
) {
(
Some(original_tokens),
Some(compressed_tokens),
Some(original_bytes),
Some(compressed_bytes),
) => (
original_tokens,
compressed_tokens,
original_bytes,
compressed_bytes,
),
_ => {
crate::slog_warn!(
"compression event skipped for {}: token counts unavailable (likely spill file missing or unreadable)",
metadata.task_id
);
return;
}
};
let pool = self.inner.db_pool.read().ok().and_then(|slot| slot.clone());
let Some(pool) = pool else {
crate::slog_warn!(
"compression event skipped for {}: db_pool not initialized — was configure run?",
metadata.task_id
);
return;
};
let harness = self
.inner
.db_harness
.read()
.ok()
.and_then(|slot| slot.clone());
let Some(harness) = harness else {
crate::slog_warn!(
"compression event insert skipped for {}: harness not configured",
metadata.task_id
);
return;
};
let project_root = metadata
.project_root
.as_deref()
.unwrap_or(&metadata.workdir);
let project_key = crate::search_index::project_cache_key(project_root);
let row = crate::db::compression_events::CompressionEventRow {
harness: &harness,
session_id: Some(&metadata.session_id),
project_key: &project_key,
tool: "bash",
task_id: Some(&metadata.task_id),
command: Some(&metadata.command),
compressor: if metadata.compressed {
"registry"
} else {
"none"
},
original_bytes,
compressed_bytes,
original_tokens,
compressed_tokens,
created_at: unix_millis() as i64,
};
let conn = match pool.lock() {
Ok(conn) => conn,
Err(_) => {
crate::slog_warn!(
"compression event insert failed for {}: db mutex poisoned",
metadata.task_id
);
return;
}
};
match crate::db::compression_events::insert_compression_event(&conn, &row) {
Ok(_) => {
crate::slog_debug!(
"compression event recorded for {} (project={}, session={}, {} → {} tokens)",
metadata.task_id,
project_key,
metadata.session_id,
original_tokens,
compressed_tokens
);
}
Err(error) => {
crate::slog_warn!(
"compression event insert failed for {}: {}",
metadata.task_id,
error
);
}
}
}
fn emit_bash_pattern_match(&self, session_id: &str, pattern_match: PatternMatch) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
if let Some(sender) = progress_sender.as_ref() {
sender(PushFrame::BashPatternMatch(BashPatternMatchFrame::new(
pattern_match.task_id,
session_id.to_string(),
pattern_match.watch_id,
pattern_match.match_text,
pattern_match.match_offset,
pattern_match.context,
pattern_match.once,
)));
}
}
fn emit_bash_watch_exit(&self, completion: &BgCompletion) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
let Some(sender) = progress_sender.as_ref() else {
return;
};
let status = completion_status_text(&completion.status, completion.exit_code);
let preview = completion.output_preview.trim_end();
let context = if preview.is_empty() {
format!("task {} exited ({status})", completion.task_id)
} else {
format!(
"task {} exited ({status})
{preview}",
completion.task_id
)
};
sender(PushFrame::BashPatternMatch(
BashPatternMatchFrame::task_exit(
completion.task_id.clone(),
completion.session_id.clone(),
format!("exited ({status})"),
context,
),
));
}
fn emit_bash_completed(&self, completion: BgCompletion) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
let Some(sender) = progress_sender.as_ref() else {
return;
};
sender(PushFrame::BashCompleted(BashCompletedFrame::new(
completion.task_id,
completion.session_id,
completion.status,
completion.exit_code,
completion.command,
completion.output_preview,
completion.output_truncated,
completion.original_tokens,
completion.compressed_tokens,
completion.tokens_skipped,
)));
}
fn completion_token_counts(
&self,
metadata: &PersistedTask,
buffer: Option<&BgBuffer>,
paths: Option<&TaskPaths>,
rendered_output: Option<&str>,
) -> CompletionTokenCounts {
if metadata.mode == BgMode::Pty {
return CompletionTokenCounts::skipped();
}
let raw = match buffer {
Some(buffer) => buffer.read_for_token_count(TOKENIZE_CAP_BYTES_PER_STREAM),
None => paths
.map(|paths| {
read_for_token_count_from_disk(metadata, paths, TOKENIZE_CAP_BYTES_PER_STREAM)
})
.unwrap_or(TokenCountInput::Skipped),
};
let TokenCountInput::Text(raw_output) = raw else {
return CompletionTokenCounts::skipped();
};
let original_tokens = token_count_u32(&raw_output);
let original_bytes = raw_output.len() as i64;
let compressed_output = rendered_output.unwrap_or(&raw_output);
let compressed_tokens = token_count_u32(compressed_output);
let compressed_bytes = compressed_output.len() as i64;
CompletionTokenCounts {
original_tokens: Some(original_tokens),
compressed_tokens: Some(compressed_tokens),
original_bytes: Some(original_bytes),
compressed_bytes: Some(compressed_bytes),
tokens_skipped: false,
}
}
pub(crate) fn maybe_emit_long_running_reminder(&self, task: &Arc<BgTask>) {
if !self
.inner
.long_running_reminder_enabled
.load(Ordering::SeqCst)
{
return;
}
let interval_ms = self
.inner
.long_running_reminder_interval_ms
.load(Ordering::SeqCst);
if interval_ms == 0 {
return;
}
let interval = Duration::from_millis(interval_ms);
let now = Instant::now();
let Ok(mut last_reminder_at) = task.last_reminder_at.lock() else {
return;
};
let since = last_reminder_at.unwrap_or(task.started);
if now.duration_since(since) < interval {
return;
}
let command = task
.state
.lock()
.map(|state| state.metadata.command.clone())
.unwrap_or_default();
*last_reminder_at = Some(now);
self.emit_bash_long_running(BashLongRunningFrame::new(
task.task_id.clone(),
task.session_id.clone(),
command,
task.started.elapsed().as_millis() as u64,
));
}
fn emit_bash_long_running(&self, frame: BashLongRunningFrame) {
let Ok(progress_sender) = self
.inner
.progress_sender
.lock()
.map(|sender| sender.clone())
else {
return;
};
if let Some(sender) = progress_sender.as_ref() {
sender(PushFrame::BashLongRunning(frame));
}
}
fn task(&self, task_id: &str) -> Option<Arc<BgTask>> {
self.inner
.tasks
.lock()
.ok()
.and_then(|tasks| tasks.get(task_id).cloned())
}
fn task_for_session(&self, task_id: &str, session_id: &str) -> Option<Arc<BgTask>> {
self.task(task_id)
.filter(|task| task.session_id == session_id)
}
fn running_count(&self) -> usize {
self.inner
.tasks
.lock()
.map(|tasks| tasks.values().filter(|task| task.is_running()).count())
.unwrap_or(0)
}
fn start_watchdog(&self) {
if !self.inner.watchdog_started.swap(true, Ordering::SeqCst) {
super::watchdog::start(self.clone());
}
}
fn running_metadata_is_stale(&self, metadata: &PersistedTask) -> bool {
unix_millis().saturating_sub(metadata.started_at) > STALE_RUNNING_AFTER.as_millis() as u64
}
#[cfg(test)]
pub fn task_json_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
self.task_for_session(task_id, session_id)
.map(|task| task.paths.json.clone())
}
#[cfg(test)]
pub fn task_exit_path(&self, task_id: &str, session_id: &str) -> Option<PathBuf> {
self.task_for_session(task_id, session_id)
.map(|task| task.paths.exit.clone())
}
fn generate_unique_task_id(&self) -> Result<String, String> {
for _ in 0..32 {
let candidate = random_slug();
let tasks = self
.inner
.tasks
.lock()
.map_err(|_| "background task registry lock poisoned".to_string())?;
if tasks.contains_key(&candidate) {
continue;
}
let completions = self
.inner
.completions
.lock()
.map_err(|_| "background completions lock poisoned".to_string())?;
if completions
.iter()
.any(|completion| completion.task_id == candidate)
{
continue;
}
return Ok(candidate);
}
Err("failed to allocate unique background task id after 32 attempts".to_string())
}
}
fn render_compressed_with_recovery(
buffer: &BgBuffer,
mut compressed: CompressionResult,
input_truncated: bool,
) -> TerminalOutputCache {
let had_trailing_newline = compressed.text.ends_with('\n');
let mut text = strip_plain_truncation_marker_lines(&compressed.text)
.trim_end()
.to_string();
if had_trailing_newline && !text.is_empty() {
text.push('\n');
}
compressed.text = text;
let output_path = buffer.output_path().map(|path| path.display().to_string());
let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
let mut recovery = RecoveryContext {
dropped_by_class: compressed.dropped_by_class,
had_inner_drop: compressed.had_inner_drop,
offset_hint_eligible: compressed.offset_hint_eligible,
offset_start_line: compressed.offset_start_line,
byte_truncated: input_truncated,
output_path: output_path.clone(),
stderr_path: stderr_path.clone(),
include_stderr_path,
};
let (output_preview, output_truncated) =
render_body_with_recovery_marker(&compressed.text, &mut recovery);
TerminalOutputCache {
output_preview,
output_truncated,
kind: TerminalOutputKind::Compressed,
output_path,
stderr_path,
recovery: Some(recovery),
}
}
fn render_body_with_recovery_marker(body: &str, recovery: &mut RecoveryContext) -> (String, bool) {
render_body_with_recovery_marker_at_cap(
body,
recovery,
FINAL_OUTPUT_CAP_BYTES,
cap_final_output,
cap_final_output_with_marker,
)
}
fn render_raw_body_with_recovery_marker(
body: &str,
recovery: &mut RecoveryContext,
) -> (String, bool) {
render_body_with_recovery_marker_at_cap(
body,
recovery,
RAW_PASSTHROUGH_CAP_BYTES,
|input| {
super::output::cap_head_tail(
input,
RAW_PASSTHROUGH_CAP_BYTES,
RAW_PASSTHROUGH_HEAD_BYTES,
RAW_PASSTHROUGH_TAIL_BYTES,
)
},
|input, marker| {
super::output::cap_head_tail_with_marker(
input,
RAW_PASSTHROUGH_CAP_BYTES,
RAW_PASSTHROUGH_HEAD_BYTES,
RAW_PASSTHROUGH_TAIL_BYTES,
marker,
)
},
)
}
fn render_body_with_recovery_marker_at_cap<F, G>(
body: &str,
recovery: &mut RecoveryContext,
cap_bytes: usize,
cap_plain: F,
cap_with_marker: G,
) -> (String, bool)
where
F: Fn(&str) -> super::output::CappedText,
G: Fn(&str, &str) -> super::output::CappedText,
{
let needs_marker = recovery.has_visible_drop();
if body.len() > cap_bytes {
recovery.byte_truncated = true;
if let Some(marker) = recovery_marker(recovery) {
let capped = cap_with_marker(body, &marker);
return (capped.text, true);
}
let capped = cap_plain(body);
return (capped.text, capped.truncated || needs_marker);
}
if !needs_marker {
return (body.to_string(), false);
}
let Some(marker) = recovery_marker(recovery) else {
return (body.to_string(), true);
};
let with_marker = append_recovery_marker(body, &marker);
if with_marker.len() <= cap_bytes {
return (with_marker, true);
}
recovery.byte_truncated = true;
let marker = recovery_marker(recovery).unwrap_or(marker);
let capped = cap_with_marker(body, &marker);
(capped.text, true)
}
fn append_recovery_marker(body: &str, marker: &str) -> String {
if body.is_empty() {
return marker.to_string();
}
let mut output = body.trim_end().to_string();
output.push('\n');
output.push_str(marker);
output
}
fn recovery_marker(recovery: &RecoveryContext) -> Option<String> {
let mut parts = Vec::new();
for (class, count) in &recovery.dropped_by_class {
let label = if *count == 1 {
class.singular()
} else {
class.plural()
};
parts.push(format!("+{count} more {label}"));
}
if recovery.byte_truncated {
parts.push("truncated output".to_string());
} else if recovery.had_inner_drop && parts.is_empty() {
parts.push("omitted output".to_string());
}
if parts.is_empty() {
return None;
}
let hint = recovery_hint(recovery);
Some(format!("[{}; {hint}]", parts.join(", ")))
}
fn recovery_hint(recovery: &RecoveryContext) -> String {
if recovery.offset_hint_eligible
&& !recovery.byte_truncated
&& recovery.dropped_by_class.is_empty()
&& !recovery.include_stderr_path
{
if let (Some(path), Some(line)) =
(recovery.output_path.as_deref(), recovery.offset_start_line)
{
return format!("see remaining: tail -n +{line} {}", quote_path(path));
}
}
let mut paths = Vec::new();
if let Some(path) = recovery.output_path.as_deref() {
paths.push(path);
}
if recovery.include_stderr_path {
if let Some(path) = recovery.stderr_path.as_deref() {
if !paths.contains(&path) {
paths.push(path);
}
}
}
if paths.is_empty() {
return "full output unavailable".to_string();
}
let reads = paths
.into_iter()
.map(|path| format!("read {}", quote_path(path)))
.collect::<Vec<_>>()
.join(" and ");
format!("full output: {reads}")
}
fn strip_plain_truncation_marker_lines(input: &str) -> String {
input
.lines()
.filter(|line| !is_plain_truncation_marker(line.trim()))
.collect::<Vec<_>>()
.join("\n")
}
fn strip_recovery_marker_lines(input: &str) -> String {
input
.lines()
.filter(|line| !is_recovery_marker(line.trim()))
.collect::<Vec<_>>()
.join("\n")
}
fn is_plain_truncation_marker(line: &str) -> bool {
let Some(rest) = line.strip_prefix("...<truncated ") else {
return false;
};
let Some(bytes) = rest.strip_suffix(" bytes>...") else {
return false;
};
!bytes.is_empty() && bytes.chars().all(|ch| ch.is_ascii_digit())
}
fn is_recovery_marker(line: &str) -> bool {
line.starts_with('[')
&& line.ends_with(']')
&& (line.contains("full output: read ")
|| line.contains("see remaining: tail -n +")
|| line.contains("full output unavailable"))
}
fn render_structured_output(command: &str, buffer: &BgBuffer) -> Option<TerminalOutputCache> {
if !is_gh_structured_command(command) {
return None;
}
let output_path = buffer
.output_path()
.map(|path| path.display().to_string())?;
let stdout_bytes = buffer.stream_len(StreamKind::Stdout);
if stdout_bytes == 0 {
return None;
}
if stdout_bytes > STRUCTURED_OUTPUT_CAP_BYTES as u64 {
if !stream_starts_like_json(buffer, StreamKind::Stdout) {
return None;
}
return Some(TerminalOutputCache {
output_preview: json_output_pointer(stdout_bytes, &output_path),
output_truncated: true,
kind: TerminalOutputKind::Structured,
output_path: Some(output_path),
stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
recovery: None,
});
}
let stdout = buffer.read_stream_bounded(StreamKind::Stdout, STRUCTURED_OUTPUT_CAP_BYTES);
if stdout.truncated || !is_structured_body(&stdout.text) {
return None;
}
Some(TerminalOutputCache {
output_preview: stdout.text,
output_truncated: false,
kind: TerminalOutputKind::Structured,
output_path: Some(output_path),
stderr_path: buffer.stderr_path().map(|path| path.display().to_string()),
recovery: None,
})
}
fn render_raw_passthrough(buffer: &BgBuffer) -> TerminalOutputCache {
let raw = buffer.read_combined_head_tail(
RAW_PASSTHROUGH_CAP_BYTES,
RAW_PASSTHROUGH_HEAD_BYTES,
RAW_PASSTHROUGH_TAIL_BYTES,
);
let output_path = buffer.output_path().map(|path| path.display().to_string());
let stderr_path = buffer.stderr_path().map(|path| path.display().to_string());
if !raw.truncated {
return TerminalOutputCache {
output_preview: raw.text,
output_truncated: false,
kind: TerminalOutputKind::Raw,
output_path,
stderr_path,
recovery: None,
};
}
let include_stderr_path = buffer.stream_len(StreamKind::Stderr) > 0;
let mut recovery = RecoveryContext {
dropped_by_class: BTreeMap::new(),
had_inner_drop: false,
offset_hint_eligible: false,
offset_start_line: None,
byte_truncated: true,
output_path: output_path.clone(),
stderr_path: stderr_path.clone(),
include_stderr_path,
};
let (output_preview, output_truncated) =
render_raw_body_with_recovery_marker(&raw.text, &mut recovery);
TerminalOutputCache {
output_preview,
output_truncated,
kind: TerminalOutputKind::Raw,
output_path,
stderr_path,
recovery: Some(recovery),
}
}
fn completion_preview_for_cache(cache: &TerminalOutputCache) -> (String, bool) {
if cache.kind == TerminalOutputKind::Structured
&& cache.output_preview.len() > BG_COMPLETION_PREVIEW_BYTES
{
if let Some(path) = cache.output_path.as_deref() {
return (
json_output_pointer(cache.output_preview.len() as u64, path),
true,
);
}
return (cache.output_preview.clone(), cache.output_truncated);
}
if let Some(recovery) = cache.recovery.as_ref() {
if cache.output_preview.len() <= BG_COMPLETION_PREVIEW_BYTES {
return (cache.output_preview.clone(), cache.output_truncated);
}
let body = strip_recovery_marker_lines(&cache.output_preview);
let mut completion_recovery = recovery.clone();
completion_recovery.byte_truncated = true;
if let Some(marker) = recovery_marker(&completion_recovery) {
let capped = cap_completion_output_with_marker(&body, &marker);
return (capped.text, true);
}
}
let capped = cap_completion_output(&cache.output_preview);
(capped.text, cache.output_truncated || capped.truncated)
}
fn is_gh_structured_command(command: &str) -> bool {
let normalized = crate::compress::normalize_command_for_dispatch(command)
.unwrap_or_else(|| command.trim_start().to_string());
let tokens = shell_words_for_flags(&normalized);
let Some(head) = tokens.first() else {
return false;
};
let head_name = Path::new(head)
.file_name()
.and_then(|name| name.to_str())
.unwrap_or(head);
if !(head_name == "gh" || head_name.eq_ignore_ascii_case("gh.exe")) {
return false;
}
tokens.iter().any(|token| {
matches!(token.as_str(), "--json" | "--jq" | "--template")
|| token.starts_with("--json=")
|| token.starts_with("--jq=")
|| token.starts_with("--template=")
})
}
fn shell_words_for_flags(command: &str) -> Vec<String> {
let mut words = Vec::new();
let mut current = String::new();
let mut in_single = false;
let mut in_double = false;
let mut escaped = false;
for ch in command.chars() {
if escaped {
current.push(ch);
escaped = false;
continue;
}
if ch == '\\' && !in_single {
escaped = true;
continue;
}
if ch == '\'' && !in_double {
in_single = !in_single;
continue;
}
if ch == '"' && !in_single {
in_double = !in_double;
continue;
}
if ch.is_whitespace() && !in_single && !in_double {
if !current.is_empty() {
words.push(std::mem::take(&mut current));
}
continue;
}
if matches!(ch, ';' | '&' | '|') && !in_single && !in_double {
if !current.is_empty() {
words.push(std::mem::take(&mut current));
}
continue;
}
current.push(ch);
}
if !current.is_empty() {
words.push(current);
}
words
}
fn is_structured_body(body: &str) -> bool {
let trimmed = body.trim();
if trimmed.is_empty() {
return false;
}
if serde_json::from_str::<serde_json::Value>(trimmed).is_ok() {
return true;
}
let mut saw_line = false;
for line in trimmed
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
{
saw_line = true;
if serde_json::from_str::<serde_json::Value>(line).is_err() {
return false;
}
}
saw_line
}
fn stream_starts_like_json(buffer: &BgBuffer, stream: StreamKind) -> bool {
let path = match (buffer, stream) {
(BgBuffer::Pipes { stdout_path, .. }, StreamKind::Stdout) => Some(stdout_path),
(BgBuffer::Pipes { stderr_path, .. }, StreamKind::Stderr) => Some(stderr_path),
(BgBuffer::Pty { combined_path }, _) => Some(combined_path),
};
let Some(path) = path else {
return false;
};
let Ok(file) = std::fs::File::open(path) else {
return false;
};
let mut limited = file.take(512);
let mut bytes = Vec::new();
if limited.read_to_end(&mut bytes).is_err() {
return false;
}
String::from_utf8_lossy(&bytes)
.chars()
.find(|ch| !ch.is_whitespace())
.is_some_and(|ch| matches!(ch, '{' | '[' | '"' | '-' | '0'..='9' | 't' | 'f' | 'n'))
}
struct CompletionTokenCounts {
original_tokens: Option<u32>,
compressed_tokens: Option<u32>,
original_bytes: Option<i64>,
compressed_bytes: Option<i64>,
tokens_skipped: bool,
}
impl CompletionTokenCounts {
fn skipped() -> Self {
Self {
original_tokens: None,
compressed_tokens: None,
original_bytes: None,
compressed_bytes: None,
tokens_skipped: true,
}
}
}
fn completion_status_text(status: &BgTaskStatus, exit_code: Option<i32>) -> String {
match status {
BgTaskStatus::TimedOut => "timed out".to_string(),
BgTaskStatus::Killed => "killed".to_string(),
_ => exit_code
.map(|code| format!("exit {code}"))
.unwrap_or_else(|| format!("{status:?}").to_lowercase()),
}
}
fn token_count_u32(text: &str) -> u32 {
aft_tokenizer::count_tokens(text)
.try_into()
.unwrap_or(u32::MAX)
}
impl Default for BgTaskRegistry {
fn default() -> Self {
Self::new(Arc::new(Mutex::new(None)))
}
}
fn modified_within(path: &Path, grace: Duration) -> bool {
fs::metadata(path)
.and_then(|metadata| metadata.modified())
.ok()
.and_then(|modified| SystemTime::now().duration_since(modified).ok())
.map(|age| age < grace)
.unwrap_or(false)
}
fn canonicalized_path(path: &Path) -> PathBuf {
fs::canonicalize(path).unwrap_or_else(|_| path.to_path_buf())
}
fn started_instant_from_unix_millis(started_at: u64) -> Instant {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.ok()
.map(|duration| duration.as_millis() as u64)
.unwrap_or(started_at);
let elapsed_ms = now_ms.saturating_sub(started_at);
Instant::now()
.checked_sub(Duration::from_millis(elapsed_ms))
.unwrap_or_else(Instant::now)
}
fn gc_quarantine(storage_dir: &Path) {
let quarantine_root = storage_dir.join("bash-tasks-quarantine");
let Ok(session_dirs) = fs::read_dir(&quarantine_root) else {
return;
};
for session_entry in session_dirs.flatten() {
let session_quarantine_dir = session_entry.path();
if !session_quarantine_dir.is_dir() {
continue;
}
let entries = match fs::read_dir(&session_quarantine_dir) {
Ok(entries) => entries,
Err(error) => {
crate::slog_warn!(
"failed to read background task quarantine dir {}: {error}",
session_quarantine_dir.display()
);
continue;
}
};
for entry in entries.flatten() {
let path = entry.path();
if modified_within(&path, QUARANTINE_GC_GRACE) {
continue;
}
let result = if path.is_dir() {
fs::remove_dir_all(&path)
} else {
fs::remove_file(&path)
};
match result {
Ok(()) => log::debug!(
"deleted old background task quarantine entry {}",
path.display()
),
Err(error) => crate::slog_warn!(
"failed to delete old background task quarantine entry {}: {error}",
path.display()
),
}
}
let _ = fs::remove_dir(&session_quarantine_dir);
}
let _ = fs::remove_dir(&quarantine_root);
}
enum QuarantineKind {
Corrupt,
Invalid,
}
fn quarantine_task_json(
storage_dir: &Path,
session_dir: &Path,
json_path: &Path,
kind: QuarantineKind,
) -> Result<(), String> {
let session_hash = session_dir
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| {
format!(
"invalid background task session dir: {}",
session_dir.display()
)
})?;
let task_name = json_path
.file_name()
.and_then(|name| name.to_str())
.ok_or_else(|| format!("invalid background task json path: {}", json_path.display()))?;
let unix_ts = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|duration| duration.as_secs())
.unwrap_or(0);
let quarantine_dir = storage_dir.join("bash-tasks-quarantine").join(session_hash);
fs::create_dir_all(&quarantine_dir).map_err(|e| {
format!(
"failed to create background task quarantine dir {}: {e}",
quarantine_dir.display()
)
})?;
let target_name = quarantine_name(task_name, unix_ts, &kind);
let target = quarantine_dir.join(target_name);
fs::rename(json_path, &target).map_err(|e| {
format!(
"failed to quarantine background task metadata {} to {}: {e}",
json_path.display(),
target.display()
)
})?;
for sibling in task_sibling_paths(json_path) {
if !sibling.exists() {
continue;
}
let Some(sibling_name) = sibling.file_name().and_then(|name| name.to_str()) else {
crate::slog_warn!(
"skipping background task sibling with invalid name during quarantine: {}",
sibling.display()
);
continue;
};
let sibling_target = quarantine_dir.join(quarantine_name(sibling_name, unix_ts, &kind));
if let Err(error) = fs::rename(&sibling, &sibling_target) {
crate::slog_warn!(
"failed to quarantine background task sibling {} to {}: {error}",
sibling.display(),
sibling_target.display()
);
}
}
let _ = fs::remove_dir(session_dir);
Ok(())
}
fn quarantine_name(file_name: &str, unix_ts: u64, kind: &QuarantineKind) -> String {
match kind {
QuarantineKind::Corrupt => format!("{file_name}.corrupt-{unix_ts}"),
QuarantineKind::Invalid => {
let path = Path::new(file_name);
let stem = path.file_stem().and_then(|stem| stem.to_str());
let extension = path.extension().and_then(|extension| extension.to_str());
match (stem, extension) {
(Some(stem), Some(extension)) => format!("{stem}.invalid.{unix_ts}.{extension}"),
_ => format!("{file_name}.invalid.{unix_ts}"),
}
}
}
}
fn task_sibling_paths(json_path: &Path) -> Vec<PathBuf> {
let Some(parent) = json_path.parent() else {
return Vec::new();
};
let Some(stem) = json_path.file_stem().and_then(|stem| stem.to_str()) else {
return Vec::new();
};
["stdout", "stderr", "exit", "pty", "ps1", "bat", "sh"]
.into_iter()
.map(|extension| parent.join(format!("{stem}.{extension}")))
.collect()
}
fn read_for_token_count_from_disk(
metadata: &PersistedTask,
paths: &TaskPaths,
max_bytes_per_stream: usize,
) -> TokenCountInput {
if metadata.mode == BgMode::Pty {
return TokenCountInput::Skipped;
}
let stdout = read_file_tail_capped(&paths.stdout, max_bytes_per_stream);
let stderr = read_file_tail_capped(&paths.stderr, max_bytes_per_stream);
match (stdout, stderr) {
(Ok(stdout), Ok(stderr)) => TokenCountInput::Text(combine_streams(
String::from_utf8_lossy(&stdout).as_ref(),
String::from_utf8_lossy(&stderr).as_ref(),
)),
(Ok(stdout), Err(_)) => TokenCountInput::Text(combine_streams(
String::from_utf8_lossy(&stdout).as_ref(),
"",
)),
(Err(_), Ok(stderr)) => TokenCountInput::Text(combine_streams(
"",
String::from_utf8_lossy(&stderr).as_ref(),
)),
(Err(_), Err(_)) => TokenCountInput::Skipped,
}
}
fn read_file_tail_capped(path: &Path, max_bytes: usize) -> std::io::Result<Vec<u8>> {
use std::io::{Read, Seek, SeekFrom};
let mut file = std::fs::File::open(path)?;
let len = file.metadata()?.len();
let read_len = len.min(max_bytes as u64);
if read_len > 0 && len > max_bytes as u64 {
file.seek(SeekFrom::End(-(read_len as i64)))?;
}
let mut bytes = Vec::with_capacity(read_len as usize);
file.read_to_end(&mut bytes)?;
Ok(bytes)
}
impl BgTask {
fn snapshot(&self, preview_bytes: usize) -> BgTaskSnapshot {
let state = self
.state
.lock()
.unwrap_or_else(|poison| poison.into_inner());
self.snapshot_locked(&state, preview_bytes)
}
fn snapshot_locked(&self, state: &BgTaskState, preview_bytes: usize) -> BgTaskSnapshot {
let metadata = &state.metadata;
let duration_ms = metadata.duration_ms.or_else(|| {
metadata
.status
.is_terminal()
.then(|| self.started.elapsed().as_millis() as u64)
});
let (output_preview, output_truncated) = if metadata.mode == BgMode::Pty {
(String::new(), false)
} else if metadata.status.is_terminal() {
state
.terminal_output_cache
.as_ref()
.map(|cache| (cache.output_preview.clone(), cache.output_truncated))
.unwrap_or_else(|| (String::new(), false))
} else {
state.buffer.read_tail(preview_bytes)
};
BgTaskSnapshot {
info: BgTaskInfo {
task_id: self.task_id.clone(),
status: metadata.status.clone(),
command: metadata.command.clone(),
mode: metadata.mode.clone(),
started_at: metadata.started_at,
duration_ms,
},
exit_code: metadata.exit_code,
child_pid: metadata.child_pid,
workdir: metadata.workdir.display().to_string(),
output_preview,
output_truncated,
output_path: state
.buffer
.output_path()
.map(|path| path.display().to_string()),
stderr_path: state
.buffer
.stderr_path()
.map(|path| path.display().to_string()),
pty_rows: (metadata.mode == BgMode::Pty).then_some(metadata.pty_rows.unwrap_or(24)),
pty_cols: (metadata.mode == BgMode::Pty).then_some(metadata.pty_cols.unwrap_or(80)),
}
}
pub(crate) fn is_running(&self) -> bool {
self.state
.lock()
.map(|state| {
state.metadata.status == BgTaskStatus::Running
|| (state.metadata.mode == BgMode::Pty
&& state.metadata.status == BgTaskStatus::Killing)
})
.unwrap_or(false)
}
fn is_terminal(&self) -> bool {
self.state
.lock()
.map(|state| state.metadata.status.is_terminal())
.unwrap_or(false)
}
fn mark_terminal_now(&self) {
if let Ok(mut terminal_at) = self.terminal_at.lock() {
if terminal_at.is_none() {
*terminal_at = Some(Instant::now());
}
}
}
fn set_completion_delivered(
&self,
delivered: bool,
registry: &BgTaskRegistry,
) -> Result<(), String> {
let mut state = self
.state
.lock()
.map_err(|_| "background task lock poisoned".to_string())?;
let updated = registry
.update_task_metadata(&self.paths, |metadata| {
metadata.completion_delivered = delivered;
})
.map_err(|e| format!("failed to update completion delivery: {e}"))?;
state.metadata = updated;
Ok(())
}
}
#[cfg(unix)]
fn reap_piped_child(child_slot: &mut Option<Child>) {
if let Some(mut child) = child_slot.take() {
if matches!(child.try_wait(), Ok(None)) {
let _ = child.wait();
}
}
}
#[cfg(windows)]
fn reap_piped_child(child_slot: &mut Option<Child>) {
*child_slot = None;
}
fn terminal_metadata_from_marker(
mut metadata: PersistedTask,
marker: ExitMarker,
reason: Option<String>,
) -> PersistedTask {
match marker {
ExitMarker::Code(code) => {
let status = if code == 0 {
BgTaskStatus::Completed
} else {
BgTaskStatus::Failed
};
metadata.mark_terminal(status, Some(code), reason);
}
ExitMarker::Killed => metadata.mark_terminal(BgTaskStatus::Killed, None, reason),
}
metadata
}
#[cfg(unix)]
fn detached_shell_command(command: &str, exit_path: &Path) -> Command {
let shell = resolve_posix_shell();
let mut cmd = Command::new(&shell);
cmd.arg("-c")
.arg("\"$0\" -c \"$1\"; code=$?; printf \"%s\" \"$code\" > \"$2.tmp.$$\"; mv -f \"$2.tmp.$$\" \"$2\"")
.arg(&shell)
.arg(command)
.arg(exit_path);
unsafe {
cmd.pre_exec(|| {
if libc::setsid() == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
cmd
}
#[cfg(unix)]
fn resolve_posix_shell() -> PathBuf {
static POSIX_SHELL: OnceLock<PathBuf> = OnceLock::new();
POSIX_SHELL
.get_or_init(|| {
std::env::var_os("BASH")
.filter(|value| !value.is_empty())
.map(PathBuf::from)
.filter(|path| path.exists())
.or_else(|| which::which("bash").ok())
.or_else(|| which::which("zsh").ok())
.unwrap_or_else(|| PathBuf::from("/bin/sh"))
})
.clone()
}
#[cfg(windows)]
fn detached_shell_command_for(
shell: crate::windows_shell::WindowsShell,
command: &str,
exit_path: &Path,
paths: &TaskPaths,
creation_flags: u32,
) -> Result<Command, String> {
use crate::windows_shell::WindowsShell;
let wrapper_body = shell.wrapper_script_bytes(command, exit_path);
let wrapper_ext = match shell {
WindowsShell::Pwsh | WindowsShell::Powershell => "ps1",
WindowsShell::Cmd => "bat",
WindowsShell::Posix(_) => "sh",
};
let wrapper_path = paths.dir.join(format!(
"{}.{}",
paths
.json
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("wrapper"),
wrapper_ext
));
fs::write(&wrapper_path, wrapper_body)
.map_err(|e| format!("failed to write background bash wrapper script: {e}"))?;
let mut cmd = Command::new(shell.binary().as_ref());
match shell {
WindowsShell::Pwsh | WindowsShell::Powershell => {
cmd.args([
"-NoLogo",
"-NoProfile",
"-NonInteractive",
"-ExecutionPolicy",
"Bypass",
"-File",
]);
cmd.arg(&wrapper_path);
}
WindowsShell::Cmd => {
cmd.args(["/D", "/C"]);
cmd.arg(&wrapper_path);
}
WindowsShell::Posix(_) => {
cmd.arg(&wrapper_path);
}
}
cmd.creation_flags(creation_flags);
Ok(cmd)
}
fn spawn_detached_child(
command: &str,
paths: &TaskPaths,
workdir: &Path,
env: &HashMap<String, String>,
) -> Result<std::process::Child, String> {
#[cfg(not(windows))]
{
let stdout = create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to open stdout capture file: {e}"))?;
let stderr = create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to open stderr capture file: {e}"))?;
detached_shell_command(command, &paths.exit)
.current_dir(workdir)
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::from(stdout))
.stderr(Stdio::from(stderr))
.spawn()
.map_err(|e| format!("failed to spawn background bash command: {e}"))
}
#[cfg(windows)]
{
use crate::windows_shell::shell_candidates;
let candidates: Vec<crate::windows_shell::WindowsShell> = shell_candidates();
const FLAG_CREATE_NEW_PROCESS_GROUP: u32 = 0x0000_0200;
const FLAG_CREATE_BREAKAWAY_FROM_JOB: u32 = 0x0100_0000;
const FLAG_CREATE_NO_WINDOW: u32 = 0x0800_0000;
let with_breakaway =
FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP | FLAG_CREATE_BREAKAWAY_FROM_JOB;
let without_breakaway = FLAG_CREATE_NO_WINDOW | FLAG_CREATE_NEW_PROCESS_GROUP;
let mut last_error: Option<String> = None;
for (idx, shell) in candidates.iter().enumerate() {
for &flags in &[with_breakaway, without_breakaway] {
let stdout = create_capture_file(&paths.stdout)
.map_err(|e| format!("failed to open stdout capture file: {e}"))?;
let stderr = create_capture_file(&paths.stderr)
.map_err(|e| format!("failed to open stderr capture file: {e}"))?;
let mut cmd =
detached_shell_command_for(shell.clone(), command, &paths.exit, paths, flags)?;
cmd.current_dir(workdir)
.envs(env)
.stdin(Stdio::null())
.stdout(Stdio::from(stdout))
.stderr(Stdio::from(stderr));
match cmd.spawn() {
Ok(child) => {
if idx > 0 {
crate::slog_warn!("background bash spawn fell back to {} after {} earlier candidate(s) failed; \
the cached PATH probe disagreed with runtime spawn — likely PATH \
inheritance, antivirus / AppLocker / Defender ASR, or sandbox policy.",
shell.binary(),
idx);
}
if flags == without_breakaway {
crate::slog_warn!(
"background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected \
(likely a restrictive Job Object — CI sandbox or MDM policy). \
Spawned without breakaway; the bg task will be torn down if the \
AFT process group is killed."
);
}
return Ok(child);
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
crate::slog_warn!("background bash spawn: {} returned NotFound at runtime — trying next candidate",
shell.binary());
last_error = Some(format!("{}: {e}", shell.binary()));
break;
}
Err(e) if flags == with_breakaway && e.raw_os_error() == Some(5) => {
crate::slog_warn!(
"background bash spawn: CREATE_BREAKAWAY_FROM_JOB rejected with \
Access Denied — retrying {} without breakaway",
shell.binary()
);
last_error = Some(format!("{}: {e}", shell.binary()));
continue;
}
Err(e) => {
return Err(format!(
"failed to spawn background bash command via {}: {e}",
shell.binary()
));
}
}
}
}
Err(format!(
"failed to spawn background bash command: no Windows shell could be spawned. \
Last error: {}. PATH-probed candidates: {:?}",
last_error.unwrap_or_else(|| "no candidates were attempted".to_string()),
candidates.iter().map(|s| s.binary()).collect::<Vec<_>>()
))
}
}
fn random_slug() -> String {
let mut bytes = [0u8; 4];
getrandom::fill(&mut bytes).unwrap_or_else(|_| {
let t = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.subsec_nanos())
.unwrap_or(0);
let p = std::process::id();
bytes.copy_from_slice(&(t ^ p).to_le_bytes());
});
let hex: String = bytes.iter().map(|b| format!("{b:02x}")).collect();
format!("bash-{hex}")
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::fs;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[cfg(windows)]
use std::time::Instant;
use super::*;
#[cfg(unix)]
const QUICK_SUCCESS_COMMAND: &str = "true";
#[cfg(windows)]
const QUICK_SUCCESS_COMMAND: &str = "cmd /c exit 0";
#[cfg(unix)]
const LONG_RUNNING_COMMAND: &str = "sleep 5";
#[cfg(windows)]
const LONG_RUNNING_COMMAND: &str = "cmd /c timeout /t 5 /nobreak > nul";
fn insert_terminal_piped_task(
registry: &BgTaskRegistry,
dir: &tempfile::TempDir,
command: &str,
stdout: &str,
stderr: &str,
compressed: bool,
) -> (String, Arc<BgTask>) {
let task_id = format!("bash-test-{}", random_slug());
let paths = task_paths(dir.path(), "session", &task_id);
fs::create_dir_all(&paths.dir).unwrap();
fs::write(&paths.stdout, stdout).unwrap();
fs::write(&paths.stderr, stderr).unwrap();
let mut metadata = PersistedTask::starting(
task_id.clone(),
"session".to_string(),
command.to_string(),
dir.path().to_path_buf(),
Some(dir.path().to_path_buf()),
Some(30_000),
true,
compressed,
);
metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
write_task(&paths.json, &metadata).unwrap();
registry
.insert_rehydrated_task(metadata, paths, true)
.expect("insert terminal task");
let task = registry.task_for_session(&task_id, "session").unwrap();
(task_id, task)
}
fn insert_terminal_pty_task(
registry: &BgTaskRegistry,
dir: &tempfile::TempDir,
pty_output: &str,
) -> (String, Arc<BgTask>) {
let task_id = format!("bash-test-{}", random_slug());
let paths = task_paths(dir.path(), "session", &task_id);
fs::create_dir_all(&paths.dir).unwrap();
fs::write(&paths.pty, pty_output).unwrap();
let mut metadata = PersistedTask::starting(
task_id.clone(),
"session".to_string(),
"python".to_string(),
dir.path().to_path_buf(),
Some(dir.path().to_path_buf()),
Some(30_000),
true,
true,
);
metadata.mode = BgMode::Pty;
metadata.mark_terminal(BgTaskStatus::Completed, Some(0), None);
write_task(&paths.json, &metadata).unwrap();
registry
.insert_rehydrated_task(metadata, paths, true)
.expect("insert terminal pty task");
let task = registry.task_for_session(&task_id, "session").unwrap();
(task_id, task)
}
#[test]
fn recognizes_all_recovery_marker_forms() {
assert!(is_recovery_marker(
"[truncated output; full output: read \"/tmp/out\"]"
));
assert!(is_recovery_marker(
"[omitted output; see remaining: tail -n +42 \"/tmp/out\"]"
));
assert!(is_recovery_marker(
"[truncated output; full output unavailable]"
));
}
#[test]
fn terminal_status_polls_use_cached_render_once_and_off_lock() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let (_task_id, task) = insert_terminal_piped_task(
®istry,
&dir,
"custom-tool --verbose",
&"stdout line\n".repeat(200_000),
"",
true,
);
let calls = Arc::new(AtomicUsize::new(0));
let saw_unlocked_state = Arc::new(AtomicBool::new(false));
let task_holder = Arc::new(Mutex::new(Some(Arc::clone(&task))));
let calls_for_closure = Arc::clone(&calls);
let unlocked_for_closure = Arc::clone(&saw_unlocked_state);
let task_for_closure = Arc::clone(&task_holder);
registry.set_compressor(move |_command, output| {
calls_for_closure.fetch_add(1, Ordering::SeqCst);
if let Some(task) = task_for_closure.lock().unwrap().as_ref() {
if task.state.try_lock().is_ok() {
unlocked_for_closure.store(true, Ordering::SeqCst);
}
}
CompressionResult::new(format!("compressed {} bytes", output.len()))
});
let first = registry
.status(
&task.task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
let second = registry
.status(
&task.task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
let listed = registry.list(RUNNING_OUTPUT_PREVIEW_BYTES);
assert_eq!(
calls.load(Ordering::SeqCst),
1,
"terminal render must be cached"
);
assert!(
saw_unlocked_state.load(Ordering::SeqCst),
"compressor must run after releasing the task state lock"
);
assert!(first.output_preview.starts_with("compressed "));
assert_eq!(second.output_preview, first.output_preview);
assert_eq!(listed[0].output_preview, first.output_preview);
}
#[test]
fn completion_preview_uses_head_and_tail_not_blind_tail() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let output = format!("HEAD-SIGNAL\n{}TAIL-SIGNAL\n", "middle\n".repeat(2_000));
let (_task_id, task) =
insert_terminal_piped_task(®istry, &dir, "cat big.log", &output, "", false);
registry.post_terminal_transition(&task, true).unwrap();
let completions = registry.drain_completions_for_session(Some("session"));
assert_eq!(completions.len(), 1);
let preview = &completions[0].output_preview;
assert!(preview.contains("HEAD-SIGNAL"), "preview was {preview:?}");
assert!(preview.contains("TAIL-SIGNAL"), "preview was {preview:?}");
assert!(
preview.contains("...<truncated "),
"preview was {preview:?}"
);
}
#[test]
fn structured_gh_json_survives_intact_and_ignores_stderr() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let calls = Arc::new(AtomicUsize::new(0));
let calls_for_closure = Arc::clone(&calls);
registry.set_compressor(move |_command, output| {
calls_for_closure.fetch_add(1, Ordering::SeqCst);
CompressionResult::new(output)
});
let (task_id, _task) = insert_terminal_piped_task(
®istry,
&dir,
"gh pr view 123 --json body",
"{\"body\":\"hello\"}",
"warning: stderr must not join json",
true,
);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert_eq!(snapshot.output_preview, "{\"body\":\"hello\"}");
assert!(!snapshot.output_preview.contains("warning"));
assert!(!snapshot.output_truncated);
assert_eq!(
calls.load(Ordering::SeqCst),
0,
"structured JSON bypasses compression"
);
}
#[test]
fn registry_emits_single_recovery_marker_for_class_drops() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
registry.set_compressor(move |_command, _output| {
let mut dropped = BTreeMap::new();
dropped.insert(DropClass::Error, 18);
dropped.insert(DropClass::Warning, 6);
CompressionResult::with_class_drops("kept diagnostic", dropped)
});
let (task_id, task) =
insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
assert!(snapshot.output_preview.contains("+18 more errors"));
assert!(snapshot.output_preview.contains("+6 more warnings"));
assert!(snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stdout.display())));
assert!(!snapshot.output_preview.contains("tail -n +"));
assert!(snapshot.output_truncated);
}
#[test]
fn registry_marker_reports_semantic_and_byte_drops_once() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
registry.set_compressor(move |_command, _output| {
let mut dropped = BTreeMap::new();
dropped.insert(DropClass::Error, 1);
CompressionResult::with_class_drops(
format!("HEAD-SIGNAL\n{}TAIL-SIGNAL", "middle\n".repeat(8_000)),
dropped,
)
});
let (task_id, _task) =
insert_terminal_piped_task(®istry, &dir, "custom-tool", "raw", "", true);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert_eq!(snapshot.output_preview.matches("full output:").count(), 1);
assert!(snapshot.output_preview.contains("+1 more error"));
assert!(snapshot.output_preview.contains("truncated output"));
assert!(snapshot.output_preview.contains("HEAD-SIGNAL"));
assert!(snapshot.output_preview.contains("TAIL-SIGNAL"));
assert!(!snapshot.output_preview.contains("...<truncated"));
assert!(snapshot.output_truncated);
}
#[test]
fn cargo_stderr_class_drops_name_both_capture_paths() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let filter_registry = crate::compress::toml_filter::FilterRegistry::default();
registry.set_compressor(move |command, output| {
crate::compress::compress_with_registry(command, &output, &filter_registry)
});
let stderr = (0..22)
.map(|index| {
format!(
"error: cargo failure {index}\n --> src/lib.rs:{}:1\n |\n{} | boom\n",
index + 1,
index + 1
)
})
.collect::<Vec<_>>()
.join("\n");
let (task_id, task) = insert_terminal_piped_task(
®istry,
&dir,
"cargo check",
"Finished dev [unoptimized] target(s) in 0.01s\n",
&stderr,
true,
);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert!(snapshot.output_preview.contains("+2 more errors"));
assert!(snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stdout.display())));
assert!(snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stderr.display())));
assert!(!snapshot.output_preview.contains("tail -n +"));
}
#[test]
fn over_ceiling_structured_json_uses_pointer_not_partial_json() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let body = format!("{{\"body\":\"{}\"}}", "x".repeat(60 * 1024));
let (task_id, task) = insert_terminal_piped_task(
®istry,
&dir,
"cd /repo && gh pr view 123 --json body",
&body,
"",
true,
);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert!(snapshot.output_preview.starts_with("[JSON output "));
assert!(snapshot
.output_preview
.contains(&task.paths.stdout.display().to_string()));
assert!(!snapshot.output_preview.contains(&"x".repeat(1024)));
assert!(snapshot.output_truncated);
}
#[test]
fn toml_strip_tail_cap_uses_full_output_hint_not_offset_hint() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let filter_registry = crate::compress::toml_filter::build_registry(
crate::compress::builtin_filters::ALL,
None,
None,
);
registry.set_compressor(move |command, output| {
crate::compress::compress_with_registry(command, &output, &filter_registry)
});
let stdout = format!(
"make[1]: Entering directory `/tmp`\n{}",
(0..100)
.map(|index| format!("compile line {index}"))
.collect::<Vec<_>>()
.join("\n")
);
let (task_id, task) =
insert_terminal_piped_task(®istry, &dir, "make all", &stdout, "", true);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert!(snapshot.output_preview.contains("compile line 99"));
assert!(snapshot.output_preview.contains(&format!(
"full output: read \"{}\"",
task.paths.stdout.display()
)));
assert!(!snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stderr.display())));
assert!(!snapshot.output_preview.contains("tail -n +"));
}
#[test]
fn compressed_false_raw_passthrough_uses_wider_head_tail_cap() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let output = format!("RAW-HEAD\n{}RAW-TAIL\n", "raw-middle\n".repeat(8_000));
let (task_id, task) =
insert_terminal_piped_task(®istry, &dir, "cat raw.log", &output, "RAW-ERR\n", false);
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert!(snapshot.output_preview.contains("RAW-HEAD"));
assert!(snapshot.output_preview.contains("RAW-TAIL"));
assert!(snapshot.output_preview.contains("truncated output"));
assert!(snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stdout.display())));
assert!(snapshot
.output_preview
.contains(&format!("read \"{}\"", task.paths.stderr.display())));
assert!(!snapshot.output_preview.contains("tail -n +"));
assert!(snapshot.output_preview.len() > 16 * 1024);
assert!(snapshot.output_truncated);
}
#[test]
fn pty_terminal_snapshot_bypasses_line_compression() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let calls = Arc::new(AtomicUsize::new(0));
let calls_for_closure = Arc::clone(&calls);
registry.set_compressor(move |_command, output| {
calls_for_closure.fetch_add(1, Ordering::SeqCst);
CompressionResult::new(output)
});
let (task_id, _task) = insert_terminal_pty_task(®istry, &dir, "raw\u{1b}[31m pty bytes");
let snapshot = registry
.status(
&task_id,
"session",
None,
Some(dir.path()),
RUNNING_OUTPUT_PREVIEW_BYTES,
)
.unwrap();
assert_eq!(snapshot.info.mode, BgMode::Pty);
assert_eq!(snapshot.output_preview, "");
assert_eq!(calls.load(Ordering::SeqCst), 0);
}
#[test]
fn pty_dimensions_are_persisted_and_returned_in_snapshot() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn_pty(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
50,
120,
)
.unwrap();
let paths = task_paths(dir.path(), "session", &task_id);
let metadata = read_task(&paths.json).unwrap();
assert_eq!(
metadata.schema_version,
crate::bash_background::persistence::SCHEMA_VERSION
);
assert_eq!(metadata.mode, BgMode::Pty);
assert_eq!(metadata.pty_rows, Some(50));
assert_eq!(metadata.pty_cols, Some(120));
let snapshot = registry
.status(&task_id, "session", None, Some(dir.path()), 1024)
.unwrap();
assert_eq!(snapshot.pty_rows, Some(50));
assert_eq!(snapshot.pty_cols, Some(120));
}
fn spawn_dead_child() -> std::process::Child {
#[cfg(unix)]
let mut cmd = std::process::Command::new("true");
#[cfg(windows)]
let mut cmd = {
let mut c = std::process::Command::new("cmd");
c.args(["/c", "exit", "0"]);
c
};
cmd.stdin(std::process::Stdio::null());
cmd.stdout(std::process::Stdio::null());
cmd.stderr(std::process::Stdio::null());
let mut child = cmd.spawn().expect("spawn replacement child for reap test");
let started = Instant::now();
loop {
match child.try_wait() {
Ok(Some(_)) => break,
Ok(None) => {
if started.elapsed() > Duration::from_secs(5) {
panic!("dead-child stand-in did not exit within 5s");
}
std::thread::sleep(Duration::from_millis(10));
}
Err(error) => panic!("dead-child try_wait failed: {error}"),
}
}
child
}
#[test]
fn ack_marks_delivered_even_when_completion_was_already_consumed_locally() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
LONG_RUNNING_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
assert_eq!(
registry
.drain_completions_for_session(Some("session"))
.len(),
1
);
registry.inner.completions.lock().unwrap().clear();
assert_eq!(
registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
vec![task_id.clone()]
);
assert!(registry
.drain_completions_for_session(Some("session"))
.is_empty());
let paths = task_paths(dir.path(), "session", &task_id);
let metadata = read_task(&paths.json).unwrap();
assert!(metadata.completion_delivered);
let replayed = BgTaskRegistry::default();
replayed
.replay_session_inner(dir.path(), "session", None)
.unwrap();
assert!(replayed
.drain_completions_for_session(Some("session"))
.is_empty());
}
#[test]
fn register_watch_rejects_unknown_task() {
let registry = BgTaskRegistry::default();
let result = registry.register_watch(
"missing-task".to_string(),
WatchPattern::Substring("READY".into()),
true,
);
assert_eq!(result, Err("task_not_found"));
}
#[test]
fn register_watch_on_terminal_task_scans_existing_output() {
let frames = Arc::new(Mutex::new(Vec::new()));
let captured = Arc::clone(&frames);
let sender: crate::context::ProgressSender = Arc::new(Box::new(move |frame| {
captured.lock().unwrap().push(frame);
})
as Box<dyn Fn(PushFrame) + Send + Sync>);
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(Some(sender))));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
LONG_RUNNING_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.inner
.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
let task = registry.task_for_session(&task_id, "session").unwrap();
std::fs::write(&task.paths.stdout, "READY\n").unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
frames.lock().unwrap().clear();
registry.inner.completions.lock().unwrap().clear();
registry
.register_watch(
task_id.clone(),
WatchPattern::Substring("READY".into()),
true,
)
.unwrap();
let frames = frames.lock().unwrap();
let frame = frames
.iter()
.find_map(|frame| match frame {
PushFrame::BashPatternMatch(frame) => Some(frame),
_ => None,
})
.expect("terminal watch registration should emit pattern frame");
assert_eq!(frame.reason, "pattern_match");
assert_eq!(frame.task_id, task_id);
assert_eq!(frame.session_id, "session");
assert_eq!(frame.match_text, "READY");
assert_eq!(frame.match_offset, 0);
assert_eq!(registry.active_watch_count(&frame.task_id), 0);
let metadata = read_task(&task.paths.json).unwrap();
assert!(metadata.completion_delivered);
}
#[test]
fn cleanup_finished_removes_terminal_tasks_older_than_threshold() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
let completions = registry.drain_completions_for_session(Some("session"));
assert_eq!(completions.len(), 1);
assert_eq!(
registry.ack_completions_for_session(Some("session"), std::slice::from_ref(&task_id)),
vec![task_id.clone()]
);
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().is_empty());
}
#[test]
fn cleanup_finished_retains_undelivered_terminals() {
let registry = BgTaskRegistry::default();
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.unwrap();
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
}
#[test]
fn reap_child_marks_failed_when_child_exits_without_exit_marker() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
loop {
let exited = {
let mut state = task.state.lock().unwrap();
match &mut state.runtime {
TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
_ => true,
}
};
if exited {
break;
}
assert!(
started.elapsed() < Duration::from_secs(5),
"child should exit quickly"
);
std::thread::sleep(Duration::from_millis(20));
}
registry
.inner
.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
let _ = std::fs::remove_file(&task.paths.exit);
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
state.metadata.exit_code = None;
state.metadata.finished_at = None;
state.metadata.duration_ms = None;
crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
.expect("persist reset Running metadata for reap_child test");
if matches!(state.runtime, TaskRuntime::Piped(None)) {
state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
}
}
*task.terminal_at.lock().unwrap() = None;
assert!(
task.is_running(),
"precondition: metadata.status == Running"
);
assert!(
!task.paths.exit.exists(),
"precondition: exit marker absent"
);
registry.reap_child(&task);
{
let state = task.state.lock().unwrap();
assert_eq!(
state.metadata.status,
BgTaskStatus::Running,
"first reap must leave status Running while waiting one pass for marker"
);
assert_eq!(
state.metadata.status_reason, None,
"first reap must not record a failure reason"
);
assert!(
matches!(state.runtime, TaskRuntime::Piped(None)),
"child handle must be released after first reap"
);
assert!(
state.detached,
"task must be marked detached after first reap"
);
}
registry.reap_child(&task);
let state = task.state.lock().unwrap();
assert!(
state.metadata.status.is_terminal(),
"second reap must transition to terminal when PID dead and no marker. Got status={:?}",
state.metadata.status
);
assert_eq!(
state.metadata.status,
BgTaskStatus::Failed,
"must specifically be Failed (not Killed): status={:?}",
state.metadata.status
);
assert_eq!(
state.metadata.status_reason.as_deref(),
Some("process exited without exit marker"),
"reason must match replay path's wording: {:?}",
state.metadata.status_reason
);
assert!(
matches!(state.runtime, TaskRuntime::Piped(None)),
"child handle must stay released after second reap"
);
assert!(
state.detached,
"task must remain detached after second reap"
);
}
#[test]
fn reap_child_preserves_running_when_exit_marker_exists() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
loop {
let exited = {
let mut state = task.state.lock().unwrap();
match &mut state.runtime {
TaskRuntime::Piped(Some(child)) => matches!(child.try_wait(), Ok(Some(_))),
_ => true,
}
};
if exited && task.paths.exit.exists() {
break;
}
assert!(
started.elapsed() < Duration::from_secs(5),
"child should exit and write marker quickly"
);
std::thread::sleep(Duration::from_millis(20));
}
registry
.inner
.shutdown
.store(true, std::sync::atomic::Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
if matches!(state.runtime, TaskRuntime::Piped(None)) {
state.runtime = TaskRuntime::Piped(Some(spawn_dead_child()));
}
}
*task.terminal_at.lock().unwrap() = None;
if !task.paths.exit.exists() {
std::fs::write(&task.paths.exit, "0").expect("write replacement exit marker");
}
registry.reap_child(&task);
let state = task.state.lock().unwrap();
assert!(
matches!(state.runtime, TaskRuntime::Piped(None)),
"child handle still released even when marker exists"
);
assert!(
state.detached,
"task still marked detached even when marker exists"
);
assert_eq!(
state.metadata.status,
BgTaskStatus::Running,
"reap_child must defer to poll_task when marker exists"
);
}
#[cfg(unix)]
fn pid_stat(pid: u32) -> Option<String> {
let output = std::process::Command::new("ps")
.args(["-o", "stat=", "-p", &pid.to_string()])
.output()
.ok()?;
if !output.status.success() {
return None;
}
let stat = String::from_utf8_lossy(&output.stdout).trim().to_string();
if stat.is_empty() {
None
} else {
Some(stat)
}
}
#[cfg(unix)]
fn is_zombie(pid: u32) -> bool {
pid_stat(pid).is_some_and(|stat| stat.starts_with('Z'))
}
#[cfg(unix)]
fn spawn_unreaped_zombie() -> std::process::Child {
let child = std::process::Command::new("true")
.stdin(std::process::Stdio::null())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::null())
.spawn()
.expect("spawn zombie stand-in");
let pid = child.id();
let started = Instant::now();
while !is_zombie(pid) {
assert!(
started.elapsed() < Duration::from_secs(5),
"stand-in child should become a zombie within 5s"
);
std::thread::sleep(Duration::from_millis(10));
}
child
}
#[cfg(unix)]
#[test]
fn finalize_from_marker_reaps_child_no_zombie() {
use std::sync::atomic::Ordering;
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry.inner.shutdown.store(true, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
while !task.paths.exit.exists() {
assert!(
started.elapsed() < Duration::from_secs(5),
"exit marker should land quickly for `true`"
);
std::thread::sleep(Duration::from_millis(20));
}
let zombie_pid;
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
state.metadata.exit_code = None;
state.metadata.finished_at = None;
state.metadata.duration_ms = None;
crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
.expect("persist reset Running metadata");
let zombie = spawn_unreaped_zombie();
zombie_pid = zombie.id();
state.runtime = TaskRuntime::Piped(Some(zombie));
}
*task.terminal_at.lock().unwrap() = None;
assert!(
is_zombie(zombie_pid),
"precondition: stand-in child {zombie_pid} must be a zombie before finalize"
);
registry.poll_task(&task).unwrap();
{
let state = task.state.lock().unwrap();
assert!(
matches!(state.runtime, TaskRuntime::Piped(None)),
"child handle must be released after marker finalize"
);
assert!(
state.metadata.status.is_terminal(),
"task must be terminal after marker finalize: {:?}",
state.metadata.status
);
}
assert!(
!is_zombie(zombie_pid),
"issue #91 regression: child {zombie_pid} left as <defunct> zombie \
after the exit-marker terminal transition"
);
}
#[cfg(unix)]
#[test]
fn kill_with_existing_marker_reaps_child_no_zombie() {
use std::sync::atomic::Ordering;
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
QUICK_SUCCESS_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry.inner.shutdown.store(true, Ordering::SeqCst);
std::thread::sleep(Duration::from_millis(550));
let task = registry.task_for_session(&task_id, "session").unwrap();
let started = Instant::now();
while !task.paths.exit.exists() {
assert!(
started.elapsed() < Duration::from_secs(5),
"exit marker should land quickly for `true`"
);
std::thread::sleep(Duration::from_millis(20));
}
let zombie_pid;
{
let mut state = task.state.lock().unwrap();
state.metadata.status = BgTaskStatus::Running;
state.metadata.status_reason = None;
state.metadata.exit_code = None;
state.metadata.finished_at = None;
state.metadata.duration_ms = None;
crate::bash_background::persistence::write_task(&task.paths.json, &state.metadata)
.expect("persist reset Running metadata");
let zombie = spawn_unreaped_zombie();
zombie_pid = zombie.id();
state.runtime = TaskRuntime::Piped(Some(zombie));
}
*task.terminal_at.lock().unwrap() = None;
assert!(
is_zombie(zombie_pid),
"precondition: stand-in child {zombie_pid} must be a zombie before kill"
);
registry
.kill_with_status(&task_id, "session", BgTaskStatus::Killed)
.expect("kill should succeed");
{
let state = task.state.lock().unwrap();
assert!(
matches!(state.runtime, TaskRuntime::Piped(None)),
"child handle must be released after marker-aware kill"
);
assert!(state.metadata.status.is_terminal());
}
assert!(
!is_zombie(zombie_pid),
"issue #91 regression: child {zombie_pid} left as <defunct> zombie \
after a marker-aware kill"
);
}
#[test]
fn cleanup_finished_keeps_running_tasks() {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
LONG_RUNNING_COMMAND,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
true,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
registry.cleanup_finished(Duration::ZERO);
assert!(registry.inner.tasks.lock().unwrap().contains_key(&task_id));
let _ = registry.kill(&task_id, "session");
}
#[cfg(windows)]
fn wait_for_file(path: &Path) -> String {
let started = Instant::now();
loop {
if path.exists() {
return fs::read_to_string(path).expect("read file");
}
assert!(
started.elapsed() < Duration::from_secs(30),
"timed out waiting for {}",
path.display()
);
std::thread::sleep(Duration::from_millis(100));
}
}
#[cfg(windows)]
fn spawn_windows_registry_command(
command: &str,
) -> (BgTaskRegistry, tempfile::TempDir, String) {
let registry = BgTaskRegistry::new(Arc::new(Mutex::new(None)));
let dir = tempfile::tempdir().unwrap();
let task_id = registry
.spawn(
command,
"session".to_string(),
dir.path().to_path_buf(),
HashMap::new(),
Some(Duration::from_secs(30)),
dir.path().to_path_buf(),
10,
false,
false,
Some(dir.path().to_path_buf()),
)
.unwrap();
(registry, dir, task_id)
}
#[cfg(windows)]
#[test]
fn windows_spawn_writes_exit_marker_for_zero_exit() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 0");
let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
let content = wait_for_file(&exit_path);
assert_eq!(content.trim(), "0");
}
#[cfg(windows)]
#[test]
fn windows_spawn_writes_exit_marker_for_nonzero_exit() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c exit 42");
let exit_path = registry.task_exit_path(&task_id, "session").unwrap();
let content = wait_for_file(&exit_path);
assert_eq!(content.trim(), "42");
}
#[cfg(windows)]
#[test]
fn windows_spawn_captures_stdout_to_disk() {
let (registry, _dir, task_id) = spawn_windows_registry_command("cmd /c echo hello");
let task = registry.task_for_session(&task_id, "session").unwrap();
let stdout_path = task.paths.stdout.clone();
let exit_path = task.paths.exit.clone();
let _ = wait_for_file(&exit_path);
let stdout = fs::read_to_string(stdout_path).expect("read stdout");
assert!(stdout.contains("hello"), "stdout was {stdout:?}");
}
#[cfg(windows)]
#[test]
fn windows_spawn_uses_pwsh_when_available() {
let candidates = crate::windows_shell::shell_candidates_with(
|binary| match binary {
"pwsh.exe" => Some(std::path::PathBuf::from(r"C:\pwsh\pwsh.exe")),
"powershell.exe" => Some(std::path::PathBuf::from(r"C:\ps\powershell.exe")),
_ => None,
},
|| None,
);
let shell = candidates.first().expect("at least one candidate").clone();
assert_eq!(shell, crate::windows_shell::WindowsShell::Pwsh);
assert_eq!(shell.binary().as_ref(), "pwsh.exe");
}
#[cfg(windows)]
#[test]
fn windows_shell_cmd_wrapper_writes_exit_marker_with_move() {
let exit_path = Path::new(r"C:\Temp\bash-test.exit");
let script =
crate::windows_shell::WindowsShell::Cmd.wrapper_script("cmd /c exit 42", exit_path);
assert!(
script.contains("set CODE=%ERRORLEVEL%"),
"wrapper must capture exit code into CODE: {script}"
);
assert!(
script.contains("echo %CODE% >"),
"wrapper must echo CODE to a temp marker file: {script}"
);
assert!(
script.contains("move /Y"),
"wrapper must use atomic move to write the marker: {script}"
);
assert!(
script.contains("> nul"),
"wrapper must redirect move output to nul: {script}"
);
assert!(
script.contains("exit /B %CODE%"),
"wrapper must propagate the captured exit code: {script}"
);
assert!(script.contains(r#""C:\Temp\bash-test.exit.tmp""#));
assert!(script.contains(r#""C:\Temp\bash-test.exit""#));
}
#[cfg(windows)]
#[test]
fn windows_shell_cmd_bg_command_uses_minimal_cmd_flags() {
use crate::windows_shell::WindowsShell;
let cmd = WindowsShell::Cmd.bg_command("echo wrapped");
let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
assert_eq!(
args_strs,
vec!["/D", "/S", "/C", "echo wrapped"],
"Cmd::bg_command must prepend /D /S /C"
);
}
#[cfg(windows)]
#[test]
fn windows_shell_pwsh_bg_command_uses_standard_args() {
use crate::windows_shell::WindowsShell;
let cmd = WindowsShell::Pwsh.bg_command("Get-Date");
let args: Vec<&std::ffi::OsStr> = cmd.get_args().collect();
let args_strs: Vec<&str> = args.iter().filter_map(|a| a.to_str()).collect();
assert!(
args_strs.contains(&"-Command"),
"Pwsh::bg_command must use -Command: {args_strs:?}"
);
assert!(
args_strs.contains(&"Get-Date"),
"Pwsh::bg_command must include the user command body"
);
}
#[allow(dead_code)]
#[cfg(any())] fn windows_cmd_wrapper_records_real_exit_code_disabled() {}
}