use meerkat_core::ops_lifecycle::{
OperationId, OperationKind, OperationLifecycleSnapshot, OperationResult, OperationSpec,
OperationStatus, OpsLifecycleRegistry,
};
use meerkat_core::types::SessionId;
use meerkat_runtime::RuntimeOpsLifecycleRegistry;
use serde_json::{Value, json};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::io::AsyncReadExt;
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Notify, mpsc};
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument, warn};
#[cfg(unix)]
async fn graceful_kill(child: &mut Child) -> std::io::Result<()> {
use nix::sys::signal::{Signal, killpg};
use nix::unistd::Pid;
if let Some(pid) = child.id() {
let pgid = Pid::from_raw(pid as i32);
let _ = killpg(pgid, Signal::SIGTERM);
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(2)) => {
let _ = killpg(pgid, Signal::SIGKILL);
let _ = child.wait().await;
}
result = child.wait() => {
return result.map(|_| ());
}
}
}
Ok(())
}
#[cfg(not(unix))]
async fn graceful_kill(child: &mut Child) -> std::io::Result<()> {
child.kill().await
}
const DEFAULT_MAX_OUTPUT_BYTES: usize = 1024 * 1024;
fn append_with_truncation(buffer: &mut Vec<u8>, data: &[u8], max_bytes: usize) {
buffer.extend_from_slice(data);
if buffer.len() > max_bytes * 2 {
let keep_from = buffer.len() - max_bytes;
let safe_keep_from = (keep_from..buffer.len())
.find(|&i| {
buffer.get(i).is_none_or(|b| (*b as i8) >= -64)
})
.unwrap_or(buffer.len());
buffer.drain(0..safe_keep_from);
}
}
fn truncate_output_tail(data: &[u8], max_bytes: usize) -> String {
if data.len() <= max_bytes {
return String::from_utf8_lossy(data).to_string();
}
let keep_from = data.len() - max_bytes;
let safe_keep_from = (keep_from..data.len())
.find(|&i| {
data.get(i).is_none_or(|b| (*b as i8) >= -64)
})
.unwrap_or(data.len());
String::from_utf8_lossy(&data[safe_keep_from..]).to_string()
}
async fn read_stream_with_limit<R>(mut reader: R, max_bytes: usize) -> std::io::Result<Vec<u8>>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut buffer = Vec::new();
let mut chunk = [0u8; 8192];
loop {
let n = reader.read(&mut chunk).await?;
if n == 0 {
break;
}
append_with_truncation(&mut buffer, &chunk[..n], max_bytes);
}
Ok(buffer)
}
use super::config::{ShellConfig, ShellError};
use super::types::{BackgroundJob, JobId, JobStatus, JobSummary};
#[derive(Clone, Debug)]
struct BackgroundJobRecord {
view: BackgroundJob,
operation_id: OperationId,
}
pub struct JobManager {
jobs: Arc<Mutex<HashMap<JobId, BackgroundJobRecord>>>,
canonical_job_ops: Arc<std::sync::Mutex<HashMap<JobId, OperationId>>>,
config: ShellConfig,
resolved_shell_path: Arc<Mutex<Option<PathBuf>>>,
event_tx: Option<mpsc::Sender<Value>>,
ops_registry: Arc<dyn OpsLifecycleRegistry>,
owner_session_id: SessionId,
owner_session_bound: bool,
ops_registry_bound: bool,
handles: Arc<Mutex<HashMap<JobId, JoinHandle<()>>>>,
cancel_notifiers: Arc<Mutex<HashMap<JobId, Arc<Notify>>>>,
completed_at: Arc<Mutex<HashMap<JobId, Instant>>>,
sync_slots: Arc<std::sync::atomic::AtomicUsize>,
}
impl std::fmt::Debug for JobManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobManager")
.field("config", &self.config)
.field("event_tx", &self.event_tx.is_some())
.field("owner_session_id", &self.owner_session_id)
.field(
"exports_canonical_async_ops",
&self.exports_canonical_async_ops(),
)
.finish_non_exhaustive()
}
}
impl JobManager {
pub fn new(config: ShellConfig) -> Self {
Self {
jobs: Arc::new(Mutex::new(HashMap::new())),
canonical_job_ops: Arc::new(std::sync::Mutex::new(HashMap::new())),
config,
resolved_shell_path: Arc::new(Mutex::new(None)),
event_tx: None,
ops_registry: Arc::new(RuntimeOpsLifecycleRegistry::new()),
owner_session_id: SessionId::new(),
owner_session_bound: false,
ops_registry_bound: false,
handles: Arc::new(Mutex::new(HashMap::new())),
cancel_notifiers: Arc::new(Mutex::new(HashMap::new())),
completed_at: Arc::new(Mutex::new(HashMap::new())),
sync_slots: Arc::new(std::sync::atomic::AtomicUsize::new(0)),
}
}
pub fn with_event_sender(mut self, tx: mpsc::Sender<Value>) -> Self {
self.event_tx = Some(tx);
self
}
pub(crate) fn with_owner_session_id(mut self, session_id: SessionId) -> Self {
self.owner_session_id = session_id;
self.owner_session_bound = true;
self
}
pub(crate) fn with_ops_registry(mut self, ops_registry: Arc<dyn OpsLifecycleRegistry>) -> Self {
self.ops_registry = ops_registry;
self.ops_registry_bound = true;
self
}
pub fn exports_canonical_async_ops(&self) -> bool {
self.owner_session_bound && self.ops_registry_bound
}
fn lifecycle_duration_secs(started_at_unix: u64) -> f64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now.saturating_sub(started_at_unix) as f64
}
fn snapshot_for_job(&self, job: &BackgroundJobRecord) -> Option<OperationLifecycleSnapshot> {
self.ops_registry.snapshot(&job.operation_id)
}
fn lifecycle_status_string(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> &'static str {
if matches!(job.view.status, JobStatus::TimedOut { .. }) {
return "timed_out";
}
match snapshot.map(|value| value.status) {
Some(OperationStatus::Provisioning | OperationStatus::Running) => "running",
Some(OperationStatus::Completed) => "completed",
Some(OperationStatus::Failed | OperationStatus::Terminated) => "failed",
Some(
OperationStatus::Aborted
| OperationStatus::Cancelled
| OperationStatus::Retiring
| OperationStatus::Retired,
) => "cancelled",
Some(OperationStatus::Absent) | None => match job.view.status {
JobStatus::Running { .. } => "running",
JobStatus::Completed { .. } => "completed",
JobStatus::Failed { .. } => "failed",
JobStatus::TimedOut { .. } => "timed_out",
JobStatus::Cancelled { .. } => "cancelled",
},
}
}
fn reconcile_job_status(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> JobStatus {
match snapshot.map(|value| value.status) {
Some(OperationStatus::Provisioning | OperationStatus::Running) => JobStatus::Running {
started_at_unix: job.view.started_at_unix,
},
Some(OperationStatus::Completed) => job.view.status.clone(),
Some(OperationStatus::Failed | OperationStatus::Terminated) => match &job.view.status {
JobStatus::Failed { .. } | JobStatus::TimedOut { .. } => job.view.status.clone(),
_ => JobStatus::Failed {
error: "background job failed".to_string(),
duration_secs: Self::lifecycle_duration_secs(job.view.started_at_unix),
},
},
Some(
OperationStatus::Aborted
| OperationStatus::Cancelled
| OperationStatus::Retiring
| OperationStatus::Retired,
) => JobStatus::Cancelled {
duration_secs: Self::lifecycle_duration_secs(job.view.started_at_unix),
},
Some(OperationStatus::Absent) | None => job.view.status.clone(),
}
}
pub async fn ops_lifecycle_snapshot(
&self,
job_id: &JobId,
) -> Option<OperationLifecycleSnapshot> {
let jobs = self.jobs.lock().await;
let job = jobs.get(job_id)?;
self.snapshot_for_job(job)
}
pub async fn register_synthetic_running_job(
&self,
command: &str,
working_dir: Option<&Path>,
timeout_secs: u64,
) -> Result<JobId, ShellError> {
if !self.exports_canonical_async_ops() {
return Err(ShellError::Io(std::io::Error::other(
"background shell jobs require canonical session binding",
)));
}
self.cleanup_old_jobs().await;
let resolved_dir = if let Some(dir) = working_dir {
Some(self.config.validate_working_dir_async(dir).await?)
} else {
None
};
let job_id = JobId::new();
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let operation_id = OperationId::new();
self.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: self.owner_session_id.clone(),
display_name: format!("shell:{command}"),
source_label: "shell_job".to_string(),
child_session_id: None,
expect_peer_channel: false,
})
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?;
let _ = self.ops_registry.provisioning_succeeded(&operation_id);
let job = BackgroundJob {
id: job_id.clone(),
command: command.to_string(),
working_dir: resolved_dir.as_ref().map(|path| path.display().to_string()),
timeout_secs,
started_at_unix,
status: JobStatus::Running { started_at_unix },
};
self.jobs.lock().await.insert(
job_id.clone(),
BackgroundJobRecord {
view: job,
operation_id: operation_id.clone(),
},
);
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id);
self.cancel_notifiers
.lock()
.await
.insert(job_id.clone(), Arc::new(Notify::new()));
Ok(job_id)
}
async fn resolved_shell_path(&self) -> Result<PathBuf, ShellError> {
{
let guard = self.resolved_shell_path.lock().await;
if let Some(path) = guard.as_ref() {
return Ok(path.clone());
}
}
let path = self.config.resolve_shell_path_auto_async().await?;
let mut guard = self.resolved_shell_path.lock().await;
*guard = Some(path.clone());
Ok(path)
}
#[instrument(skip(self), fields(command = %command, timeout_secs = %timeout_secs))]
pub async fn spawn_job(
&self,
command: &str,
working_dir: Option<&Path>,
timeout_secs: u64,
) -> Result<JobId, ShellError> {
if !self.exports_canonical_async_ops() {
return Err(ShellError::Io(std::io::Error::other(
"background shell jobs require canonical session binding",
)));
}
info!("Spawning background job");
let limit = self.config.max_concurrent_processes;
if limit > 0 {
let current = self.running_job_count().await;
if current >= limit {
warn!(current = %current, limit = %limit, "Concurrency limit exceeded");
return Err(ShellError::Io(std::io::Error::other(format!(
"Concurrency limit exceeded: {current} running jobs, limit is {limit}"
))));
}
}
self.cleanup_old_jobs().await;
let resolved_dir = if let Some(dir) = working_dir {
debug!(working_dir = %dir.display(), "Validating working directory");
Some(self.config.validate_working_dir_async(dir).await?)
} else {
None
};
let shell_path = self.resolved_shell_path().await?;
let job_id = JobId::new();
debug!(job_id = %job_id, "Generated job ID");
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let operation_id = OperationId::new();
self.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: self.owner_session_id.clone(),
display_name: format!("shell:{command}"),
source_label: "shell_job".to_string(),
child_session_id: None,
expect_peer_channel: false,
})
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?;
let job = BackgroundJob {
id: job_id.clone(),
command: command.to_string(),
working_dir: resolved_dir.as_ref().map(|p| p.display().to_string()),
timeout_secs,
started_at_unix,
status: JobStatus::Running { started_at_unix },
};
let mut cmd = Command::new(&shell_path);
cmd.arg("-c").arg(command);
let work_dir = resolved_dir.as_ref().unwrap_or(&self.config.project_root);
cmd.current_dir(work_dir);
cmd.env("PWD", work_dir);
cmd.envs(&self.config.env_vars);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
#[cfg(unix)]
cmd.process_group(0);
let child = match cmd.spawn() {
Ok(child) => child,
Err(error) => {
let _ = self
.ops_registry
.provisioning_failed(&operation_id, error.to_string());
return Err(ShellError::Io(error));
}
};
let _ = self.ops_registry.provisioning_succeeded(&operation_id);
debug!("Spawned child process");
let jobs = Arc::clone(&self.jobs);
let handles = Arc::clone(&self.handles);
let cancel_notifiers = Arc::clone(&self.cancel_notifiers);
let completed_at = Arc::clone(&self.completed_at);
let event_tx = self.event_tx.clone();
let ops_registry = Arc::clone(&self.ops_registry);
let operation_id_for_task = operation_id.clone();
let command_clone = command.to_string();
let job_id_clone = job_id.clone();
let job_id_for_completion = job_id.clone();
let job_id_for_cancel = job_id.clone();
let cancel_notify = Arc::new(Notify::new());
jobs.lock().await.insert(
job_id.clone(),
BackgroundJobRecord {
view: job,
operation_id: operation_id.clone(),
},
);
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id.clone());
cancel_notifiers
.lock()
.await
.insert(job_id.clone(), Arc::clone(&cancel_notify));
let handle = tokio::spawn(async move {
let start = Instant::now();
let timeout_duration = Duration::from_secs(timeout_secs);
let mut child = child;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let stdout_handle = tokio::spawn(async move {
if let Some(out) = stdout {
read_stream_with_limit(out, DEFAULT_MAX_OUTPUT_BYTES).await
} else {
Ok(Vec::new())
}
});
let stderr_handle = tokio::spawn(async move {
if let Some(err) = stderr {
read_stream_with_limit(err, DEFAULT_MAX_OUTPUT_BYTES).await
} else {
Ok(Vec::new())
}
});
enum WaitOutcome {
Completed(Option<i32>),
Failed(std::io::Error),
TimedOut,
Cancelled,
}
let wait_outcome = tokio::select! {
() = cancel_notify.notified() => WaitOutcome::Cancelled,
result = tokio::time::timeout(timeout_duration, child.wait()) => {
match result {
Ok(Ok(status)) => WaitOutcome::Completed(status.code()),
Ok(Err(err)) => WaitOutcome::Failed(err),
Err(_) => WaitOutcome::TimedOut,
}
}
};
if matches!(wait_outcome, WaitOutcome::TimedOut | WaitOutcome::Cancelled) {
let _ = graceful_kill(&mut child).await;
}
let duration_secs = start.elapsed().as_secs_f64();
let stdout_bytes = match stdout_handle.await {
Ok(Ok(buf)) => buf,
Ok(Err(err)) => {
warn!("Failed to read job stdout: {}", err);
Vec::new()
}
Err(err) => {
warn!("Job stdout reader task failed: {}", err);
Vec::new()
}
};
let stderr_bytes = match stderr_handle.await {
Ok(Ok(buf)) => buf,
Ok(Err(err)) => {
warn!("Failed to read job stderr: {}", err);
Vec::new()
}
Err(err) => {
warn!("Job stderr reader task failed: {}", err);
Vec::new()
}
};
let stdout = truncate_output_tail(&stdout_bytes, DEFAULT_MAX_OUTPUT_BYTES);
let stderr = truncate_output_tail(&stderr_bytes, DEFAULT_MAX_OUTPUT_BYTES);
let final_status = match wait_outcome {
WaitOutcome::Failed(err) => JobStatus::Failed {
error: err.to_string(),
duration_secs,
},
WaitOutcome::TimedOut => JobStatus::TimedOut {
stdout,
stderr,
duration_secs,
},
WaitOutcome::Cancelled => JobStatus::Cancelled { duration_secs },
WaitOutcome::Completed(exit_code) => JobStatus::Completed {
exit_code,
stdout,
stderr,
duration_secs,
},
};
match &final_status {
JobStatus::Completed { stdout, stderr, .. } => {
let content = if !stdout.is_empty() {
stdout.clone()
} else if !stderr.is_empty() {
stderr.clone()
} else {
command_clone.clone()
};
let _ = ops_registry.complete_operation(
&operation_id_for_task,
OperationResult {
id: operation_id_for_task.clone(),
content,
is_error: false,
duration_ms: (duration_secs * 1000.0) as u64,
tokens_used: 0,
},
);
}
JobStatus::Failed { error, .. } => {
let _ = ops_registry.fail_operation(&operation_id_for_task, error.clone());
}
JobStatus::TimedOut { .. } => {
let _ = ops_registry
.fail_operation(&operation_id_for_task, "background job timed out".into());
}
JobStatus::Cancelled { .. } => {
let _ = ops_registry.cancel_operation(
&operation_id_for_task,
Some("cancelled by caller".into()),
);
}
JobStatus::Running { .. } => {}
}
let status_for_event = {
let mut jobs_guard = jobs.lock().await;
if let Some(job) = jobs_guard.get_mut(&job_id_clone) {
if matches!(job.view.status, JobStatus::Cancelled { .. }) {
job.view.status.clone()
} else {
job.view.status = final_status.clone();
final_status.clone()
}
} else {
final_status.clone()
}
};
{
completed_at
.lock()
.await
.insert(job_id_for_completion, Instant::now());
}
{
cancel_notifiers.lock().await.remove(&job_id_for_cancel);
}
if let Some(tx) = event_tx {
let event = json!({
"type": "shell_job_completed",
"job_id": job_id_clone.0,
"command": command_clone,
"result": serde_json::to_value(&status_for_event).unwrap_or(Value::Null)
});
let _ = tx.send(event).await;
}
});
handles.lock().await.insert(job_id.clone(), handle);
Ok(job_id)
}
#[instrument(skip(self), fields(job_id = %job_id))]
pub async fn get_status(&self, job_id: &JobId) -> Option<BackgroundJob> {
let result = {
let jobs = self.jobs.lock().await;
jobs.get(job_id).cloned().map(|mut job| {
let snapshot = self.snapshot_for_job(&job);
job.view.status = self.reconcile_job_status(&job, snapshot.as_ref());
job.view
})
};
if result.is_none() {
debug!("Job not found");
}
result
}
#[instrument(skip(self))]
pub async fn list_jobs(&self) -> Vec<JobSummary> {
let summaries: Vec<JobSummary> = self
.jobs
.lock()
.await
.values()
.map(|job| {
let snapshot = self.snapshot_for_job(job);
let status_str = self.lifecycle_status_string(job, snapshot.as_ref());
JobSummary {
id: job.view.id.clone(),
command: job.view.command.clone(),
status: status_str.to_string(),
started_at_unix: job.view.started_at_unix,
}
})
.collect();
debug!(count = summaries.len(), "Listed jobs");
summaries
}
#[instrument(skip(self), fields(job_id = %job_id))]
pub async fn cancel_job(&self, job_id: &JobId) -> Result<(), ShellError> {
info!("Cancelling job");
{
let mut jobs_guard = self.jobs.lock().await;
let job = jobs_guard.get_mut(job_id).ok_or_else(|| {
warn!("Job not found");
ShellError::JobNotFound(job_id.to_string())
})?;
let duration_secs = if let JobStatus::Running { started_at_unix } = &job.view.status {
Self::lifecycle_duration_secs(*started_at_unix)
} else {
warn!("Job is not running");
return Err(ShellError::JobNotRunning);
};
job.view.status = JobStatus::Cancelled { duration_secs };
let _ = self
.ops_registry
.cancel_operation(&job.operation_id, Some("cancelled by caller".into()));
};
let notify = { self.cancel_notifiers.lock().await.get(job_id).cloned() };
if let Some(notify) = notify {
notify.notify_one();
} else {
warn!("Cancel notifier missing for running job");
}
self.handles.lock().await.remove(job_id);
Ok(())
}
pub async fn remove_job(&self, job_id: &JobId) -> bool {
let removed = self.jobs.lock().await.remove(job_id).is_some();
if removed {
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.remove(job_id);
self.handles.lock().await.remove(job_id);
self.cancel_notifiers.lock().await.remove(job_id);
self.completed_at.lock().await.remove(job_id);
}
removed
}
pub fn canonical_operation_for_job(&self, job_id: &JobId) -> Option<OperationId> {
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(job_id)
.cloned()
}
async fn cleanup_old_jobs(&self) {
let ttl = Duration::from_secs(self.config.completed_job_ttl_secs);
let max_completed = self.config.max_completed_jobs;
let now = Instant::now();
let mut jobs_guard = self.jobs.lock().await;
let mut handles_guard = self.handles.lock().await;
let mut cancel_notifiers_guard = self.cancel_notifiers.lock().await;
let mut completed_at_guard = self.completed_at.lock().await;
let mut canonical_job_ops_guard = self
.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let expired_jobs: Vec<JobId> = completed_at_guard
.iter()
.filter(|(_, completed_time)| now.duration_since(**completed_time) > ttl)
.map(|(job_id, _)| job_id.clone())
.collect();
for job_id in &expired_jobs {
jobs_guard.remove(job_id);
canonical_job_ops_guard.remove(job_id);
handles_guard.remove(job_id);
cancel_notifiers_guard.remove(job_id);
completed_at_guard.remove(job_id);
}
if max_completed > 0 && completed_at_guard.len() > max_completed {
let mut completed_jobs: Vec<(JobId, Instant)> = completed_at_guard
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
completed_jobs.sort_by_key(|(_, time)| *time);
let to_remove = completed_jobs.len() - max_completed;
for (job_id, _) in completed_jobs.into_iter().take(to_remove) {
jobs_guard.remove(&job_id);
canonical_job_ops_guard.remove(&job_id);
handles_guard.remove(&job_id);
cancel_notifiers_guard.remove(&job_id);
completed_at_guard.remove(&job_id);
}
}
}
pub async fn job_count(&self) -> usize {
self.jobs.lock().await.len()
}
pub async fn completed_job_count(&self) -> usize {
self.completed_at.lock().await.len()
}
pub async fn running_job_count(&self) -> usize {
let jobs = self.jobs.lock().await;
let background = jobs
.values()
.filter(|job| matches!(job.view.status, JobStatus::Running { .. }))
.count();
let sync = self.sync_slots.load(std::sync::atomic::Ordering::Relaxed);
background + sync
}
pub async fn acquire_sync_slot(&self) -> Result<SyncSlotGuard, ShellError> {
use std::sync::atomic::Ordering;
let limit = self.config.max_concurrent_processes;
if limit == 0 {
self.sync_slots.fetch_add(1, Ordering::SeqCst);
return Ok(SyncSlotGuard {
sync_slots: Arc::clone(&self.sync_slots),
});
}
let jobs = self.jobs.lock().await;
let background = jobs
.values()
.filter(|job| matches!(job.view.status, JobStatus::Running { .. }))
.count();
loop {
let current_sync = self.sync_slots.load(Ordering::Acquire);
let total = background + current_sync;
if total >= limit {
return Err(ShellError::Io(std::io::Error::other(format!(
"Concurrency limit exceeded: {total} processes, limit is {limit}"
))));
}
match self.sync_slots.compare_exchange_weak(
current_sync,
current_sync + 1,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
return Ok(SyncSlotGuard {
sync_slots: Arc::clone(&self.sync_slots),
});
}
Err(_) => {
continue;
}
}
}
}
}
#[derive(Debug)]
pub struct SyncSlotGuard {
sync_slots: Arc<std::sync::atomic::AtomicUsize>,
}
impl Drop for SyncSlotGuard {
fn drop(&mut self) {
self.sync_slots
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::builtin::shell::security::SecurityMode;
use std::path::PathBuf;
use tempfile::TempDir;
fn bound_job_manager(config: ShellConfig) -> JobManager {
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
JobManager::new(config)
.with_owner_session_id(SessionId::new())
.with_ops_registry(registry)
}
#[test]
fn test_job_manager_struct() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
assert!(manager.event_tx.is_none());
}
#[test]
fn test_job_manager_new() {
let config = ShellConfig {
enabled: true,
default_timeout_secs: 60,
restrict_to_project: true,
shell: "nu".to_string(),
shell_path: None,
project_root: PathBuf::from("/tmp/test"),
max_completed_jobs: 100,
completed_job_ttl_secs: 300,
max_concurrent_processes: 10,
security_mode: SecurityMode::Unrestricted,
security_patterns: vec![],
env_vars: std::collections::HashMap::new(),
};
let manager = bound_job_manager(config);
assert!(manager.config.enabled);
assert_eq!(manager.config.default_timeout_secs, 60);
assert_eq!(manager.config.shell, "nu");
assert_eq!(manager.config.project_root, PathBuf::from("/tmp/test"));
assert_eq!(manager.config.max_completed_jobs, 100);
assert_eq!(manager.config.completed_job_ttl_secs, 300);
assert!(manager.event_tx.is_none());
}
#[test]
fn test_job_manager_has_event_sender() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
assert!(manager.event_tx.is_none());
}
#[test]
fn test_job_manager_with_event_sender() {
let config = ShellConfig::default();
let (tx, _rx) = mpsc::channel::<Value>(10);
let manager = bound_job_manager(config).with_event_sender(tx);
assert!(manager.event_tx.is_some());
}
#[tokio::test]
async fn timed_out_job_summary_preserves_timed_out_status_over_failed_snapshot() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_session_id.clone(),
display_name: "shell:sleep".to_string(),
source_label: "shell_job".to_string(),
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager
.ops_registry
.fail_operation(&operation_id, "background job timed out".into())
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "sleep 30".to_string(),
working_dir: None,
timeout_secs: 30,
started_at_unix: 123,
status: JobStatus::TimedOut {
stdout: String::new(),
stderr: String::new(),
duration_secs: 30.0,
},
},
operation_id,
};
let snapshot = manager.snapshot_for_job(&job);
assert_eq!(
manager.lifecycle_status_string(&job, snapshot.as_ref()),
"timed_out"
);
}
#[tokio::test]
async fn synthetic_running_job_registers_into_injected_ops_registry() {
let owner_session_id = SessionId::new();
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
let manager = JobManager::new(ShellConfig::default())
.with_owner_session_id(owner_session_id)
.with_ops_registry(Arc::clone(®istry));
let job_id = manager
.register_synthetic_running_job("shell:synthetic", None, 30)
.await
.expect("synthetic running job should register");
let snapshot = manager
.ops_lifecycle_snapshot(&job_id)
.await
.expect("job manager should resolve snapshot from injected registry");
assert_eq!(snapshot.kind, OperationKind::BackgroundToolOp);
assert_eq!(snapshot.status, OperationStatus::Running);
assert_eq!(
registry
.snapshot(&snapshot.id)
.expect("injected registry should own the operation")
.status,
OperationStatus::Running
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn() {
let temp_dir = TempDir::new().unwrap();
let config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let mut config = config;
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let result = manager.spawn_job("echo test", None, 30).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert!(job_id.0.starts_with("job_"));
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn_immediate() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let start = Instant::now();
let job_id = manager.spawn_job("sleep 5", None, 30).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 1000,
"spawn_job should return immediately, took {elapsed:?}"
);
let job = manager.get_status(&job_id).await;
assert!(job.is_some());
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn_running() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 10", None, 30).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be Running, got {:?}",
job.status
);
if let JobStatus::Running { started_at_unix } = job.status {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(
started_at_unix <= now && started_at_unix > now - 60,
"started_at_unix should be recent"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_get_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent");
let status = manager.get_status(&fake_id).await;
assert!(status.is_none());
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
let status = manager.get_status(&job_id).await;
assert!(status.is_some());
let job = status.unwrap();
assert_eq!(job.id, job_id);
assert_eq!(job.command, "echo hello");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_list_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let jobs = manager.list_jobs().await;
assert!(jobs.is_empty());
let id1 = manager.spawn_job("echo one", None, 30).await.unwrap();
let id2 = manager.spawn_job("echo two", None, 30).await.unwrap();
let jobs = manager.list_jobs().await;
assert_eq!(jobs.len(), 2);
let ids: Vec<_> = jobs.iter().map(|j| j.id.clone()).collect();
assert!(ids.contains(&id1));
assert!(ids.contains(&id2));
}
#[tokio::test]
async fn test_job_summary_preserves_started_at_unit() {
let temp_dir = TempDir::new().unwrap();
let config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let manager = bound_job_manager(config);
let job_id = JobId::new();
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let operation_id = OperationId::new();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: job_id.clone(),
command: "echo done".to_string(),
working_dir: None,
timeout_secs: 10,
started_at_unix,
status: JobStatus::Completed {
exit_code: Some(0),
stdout: "done".to_string(),
stderr: String::new(),
duration_secs: 0.01,
},
},
operation_id: operation_id.clone(),
};
manager.jobs.lock().await.insert(job_id.clone(), job);
manager
.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id);
let summaries = manager.list_jobs().await;
let summary = summaries
.iter()
.find(|s| s.id == job_id)
.expect("Job should be in list");
assert_eq!(summary.started_at_unix, started_at_unix);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancel() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result = manager.cancel_job(&job_id).await;
assert!(result.is_ok());
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled, got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancel_signal() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result = manager.cancel_job(&job_id).await;
assert!(result.is_ok());
let fake_id = JobId::from_string("job_fake");
let result = manager.cancel_job(&fake_id).await;
assert!(matches!(result, Err(ShellError::JobNotFound(_))));
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_tokio_spawn() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should be Completed, got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_completed_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager
.spawn_job("echo 'test output'", None, 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap();
if let JobStatus::Completed {
exit_code,
stdout,
stderr: _,
duration_secs,
} = &job.status
{
assert_eq!(*exit_code, Some(0));
assert!(stdout.contains("test output"));
assert!(*duration_secs >= 0.0);
} else {
unreachable!("Expected Completed status, got {:?}", job.status);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_failed_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("exit 1", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap();
if let JobStatus::Completed { exit_code, .. } = &job.status {
assert_eq!(*exit_code, Some(1));
} else {
unreachable!(
"Expected Completed status with exit code 1, got {:?}",
job.status
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_timeout_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 30", None, 1).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::TimedOut { .. }),
"Job should be TimedOut, got {:?}",
job.status
);
if let JobStatus::TimedOut { duration_secs, .. } = &job.status {
assert!(*duration_secs >= 1.0);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancelled_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
manager.cancel_job(&job_id).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled, got {:?}",
job.status
);
if let JobStatus::Cancelled { duration_secs } = &job.status {
assert!(*duration_secs >= 0.0);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_sends_completion_event() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let (tx, mut rx) = mpsc::channel::<Value>(10);
let manager = bound_job_manager(config).with_event_sender(tx);
let _job_id = manager.spawn_job("echo done", None, 30).await.unwrap();
let event = tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("Should receive event within timeout")
.expect("Channel should not be closed");
assert_eq!(event["type"], "shell_job_completed");
assert!(event["job_id"].is_string());
assert_eq!(event["command"], "echo done");
assert!(event["result"].is_object());
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_event_payload() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let (tx, mut rx) = mpsc::channel::<Value>(10);
let manager = bound_job_manager(config).with_event_sender(tx);
let job_id = manager
.spawn_job("echo 'hello world'", None, 30)
.await
.unwrap();
let event = tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("Should receive event within timeout")
.expect("Channel should not be closed");
assert_eq!(event["type"], "shell_job_completed");
assert_eq!(event["job_id"], job_id.0);
assert_eq!(event["command"], "echo 'hello world'");
let result = &event["result"];
assert!(result["status"].is_string());
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_async_execution_nonblocking() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let start = Instant::now();
let id1 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let id2 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let id3 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 1000,
"Spawning 3 jobs should be nearly instant, took {elapsed:?}"
);
for id in [&id1, &id2, &id3] {
let job = manager.get_status(id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job {id} should be running"
);
}
let _ = manager.cancel_job(&id1).await;
let _ = manager.cancel_job(&id2).await;
let _ = manager.cancel_job(&id3).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_timeout_enforced() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 10", None, 1).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::TimedOut { .. }),
"Job should have timed out, got {:?}",
job.status
);
if let JobStatus::TimedOut { duration_secs, .. } = &job.status {
assert!(
*duration_secs >= 1.0 && *duration_secs < 3.0,
"Duration should be close to timeout: {duration_secs}"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_kill_terminates_process() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be running before cancel"
);
manager
.cancel_job(&job_id)
.await
.expect("Cancel should succeed");
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be cancelled, got {:?}",
job.status
);
tokio::time::sleep(Duration::from_millis(500)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should still be cancelled"
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrent_job_spawning() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = Arc::new(bound_job_manager(config));
let mut handles = Vec::new();
for i in 0..10 {
let mgr = Arc::clone(&manager);
let cmd = format!("echo job{i}");
handles.push(tokio::spawn(
async move { mgr.spawn_job(&cmd, None, 30).await },
));
}
let mut job_ids = Vec::new();
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok(), "All spawns should succeed");
job_ids.push(result.unwrap());
}
let unique_count = {
let mut ids: Vec<_> = job_ids.iter().map(|id| &id.0).collect();
ids.sort();
ids.dedup();
ids.len()
};
assert_eq!(
unique_count, 10,
"All 10 jobs should have unique IDs, got {unique_count}"
);
tokio::time::sleep(Duration::from_secs(2)).await;
for job_id in &job_ids {
let job = manager.get_status(job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job {} should be completed, got {:?}",
job_id,
job.status
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_remove_job() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(manager.get_status(&job_id).await.is_some());
assert_eq!(manager.job_count().await, 1);
let removed = manager.remove_job(&job_id).await;
assert!(removed, "Job should be removed");
assert!(manager.get_status(&job_id).await.is_none());
assert_eq!(manager.job_count().await, 0);
let removed_again = manager.remove_job(&job_id).await;
assert!(!removed_again, "Already removed job should return false");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.job_count().await, 0);
let id1 = manager.spawn_job("echo one", None, 30).await.unwrap();
assert_eq!(manager.job_count().await, 1);
let id2 = manager.spawn_job("echo two", None, 30).await.unwrap();
assert_eq!(manager.job_count().await, 2);
manager.remove_job(&id1).await;
assert_eq!(manager.job_count().await, 1);
manager.remove_job(&id2).await;
assert_eq!(manager.job_count().await, 0);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_completed_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.completed_job_count().await, 0);
let _job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
assert_eq!(manager.completed_job_count().await, 0);
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(manager.completed_job_count().await, 1);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cleanup_respects_max_completed_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_completed_jobs = 3;
config.completed_job_ttl_secs = 3600;
let manager = bound_job_manager(config);
for i in 0..5 {
let _ = manager
.spawn_job(&format!("echo job{i}"), None, 30)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(manager.completed_job_count().await, 5);
let trigger_id = manager.spawn_job("sleep 10", None, 30).await.unwrap();
let completed_after = manager.completed_job_count().await;
assert!(
completed_after <= 3,
"Should have at most 3 completed jobs after cleanup, got {completed_after}"
);
let job = manager.get_status(&trigger_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Trigger job should be running"
);
let _ = manager.cancel_job(&trigger_id).await;
}
#[tokio::test]
async fn test_remove_nonexistent_job() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent");
let removed = manager.remove_job(&fake_id).await;
assert!(!removed, "Removing nonexistent job should return false");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_multibyte_utf8_output() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager
.spawn_job("printf 'Hello 世界! 🎉🚀 Test émojis: 👍'", None, 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job with UTF-8 output should complete successfully, got {:?}",
job.status
);
if let JobStatus::Completed { stdout, .. } = &job.status {
assert!(
stdout.contains("Hello") || stdout.contains("世界") || stdout.is_empty(),
"Output should be captured without corruption"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_kill_reaps_process() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be Running before kill"
);
manager.cancel_job(&job_id).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled after kill, got {:?}",
job.status
);
tokio::time::sleep(Duration::from_millis(100)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should still be Cancelled (process reaped), got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_background_job_auto_completes() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should start as Running"
);
tokio::time::sleep(Duration::from_millis(500)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should auto-complete to Completed, got {:?}",
job.status
);
if let JobStatus::Completed {
stdout, exit_code, ..
} = &job.status
{
assert!(
stdout.contains("hello"),
"stdout should contain 'hello', got: {stdout}"
);
assert_eq!(*exit_code, Some(0), "exit code should be 0");
}
}
#[test]
fn test_truncate_output_tail_small_input() {
let data = b"Hello, World!";
let result = super::truncate_output_tail(data, 100);
assert_eq!(result, "Hello, World!");
}
#[test]
fn test_truncate_output_tail_exact_limit() {
let data = b"12345";
let result = super::truncate_output_tail(data, 5);
assert_eq!(result, "12345");
}
#[test]
fn test_truncate_output_tail_exceeds_limit() {
let data = b"0123456789";
let result = super::truncate_output_tail(data, 5);
assert_eq!(result, "56789");
}
#[test]
fn test_truncate_output_tail_utf8_boundary() {
let data = "Hello世界Test".as_bytes();
let result = super::truncate_output_tail(data, 10);
assert!(result.is_ascii() || result.chars().count() > 0);
assert!(result.contains("Test") || result.contains("界"));
}
#[test]
fn test_truncate_output_tail_emoji() {
let data = "Start🎉🚀End".as_bytes();
let result = super::truncate_output_tail(data, 8);
assert!(result.chars().count() > 0);
}
#[test]
fn test_append_with_truncation_small_append() {
let mut buffer = vec![1, 2, 3];
super::append_with_truncation(&mut buffer, &[4, 5], 10);
assert_eq!(buffer, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_append_with_truncation_triggers_truncation() {
let mut buffer = vec![0; 15]; super::append_with_truncation(&mut buffer, &[1; 10], 10);
assert!(buffer.len() <= 11); assert!(buffer.len() >= 9);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cancel_job_atomic_status_check() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = Arc::new(bound_job_manager(config));
let job_id = manager.spawn_job("sleep 30", None, 60).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let mgr1 = Arc::clone(&manager);
let mgr2 = Arc::clone(&manager);
let job_id1 = job_id.clone();
let job_id2 = job_id.clone();
let (r1, r2) = tokio::join!(
tokio::spawn(async move { mgr1.cancel_job(&job_id1).await }),
tokio::spawn(async move { mgr2.cancel_job(&job_id2).await }),
);
let result1 = r1.expect("Task 1 should not panic");
let result2 = r2.expect("Task 2 should not panic");
let (successes, failures): (Vec<_>, Vec<_>) =
[result1, result2].into_iter().partition(Result::is_ok);
assert_eq!(successes.len(), 1, "Exactly one cancel should succeed");
assert_eq!(
failures.len(),
1,
"Exactly one cancel should fail with JobNotRunning"
);
if let Err(err) = &failures[0] {
assert!(
matches!(err, ShellError::JobNotRunning),
"Expected JobNotRunning error, got: {err:?}"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
#[cfg(unix)]
async fn integration_real_graceful_kill_function_exists() {
use tokio::process::Command;
let mut child = Command::new("sleep")
.arg("1")
.spawn()
.expect("Failed to spawn test process");
let result = super::graceful_kill(&mut child).await;
assert!(result.is_ok(), "graceful_kill should succeed: {result:?}");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cleanup_atomicity() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_completed_jobs = 2;
config.completed_job_ttl_secs = 0;
let manager = Arc::new(bound_job_manager(config));
for i in 0..5 {
let _id = manager
.spawn_job(&format!("echo job{i}"), None, 30)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(2)).await;
let mut handles = Vec::new();
for i in 0..5 {
let mgr = Arc::clone(&manager);
handles.push(tokio::spawn(async move {
mgr.spawn_job(&format!("echo trigger{i}"), None, 30).await
}));
}
for handle in handles {
let result = handle.await.expect("Task should not panic");
assert!(result.is_ok(), "Spawn should succeed: {result:?}");
}
let count = manager.job_count().await;
assert!(
count <= 12, "Job count should be reasonable after cleanup: {count}"
);
}
#[test]
#[cfg(unix)]
fn test_process_group_configured() {
use std::os::unix::process::CommandExt;
let mut cmd = std::process::Command::new("echo");
cmd.process_group(0);
}
#[tokio::test]
async fn test_error_context_job_not_found() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent_12345");
let result = manager.cancel_job(&fake_id).await;
match result {
Err(ShellError::JobNotFound(job_id)) => {
assert_eq!(job_id, "job_nonexistent_12345");
}
other => unreachable!("Expected JobNotFound error, got: {:?}", other),
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_error_context_job_already_completed() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo done", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let result = manager.cancel_job(&job_id).await;
match result {
Err(ShellError::JobNotRunning) => {}
other => unreachable!("Expected JobNotRunning error, got: {:?}", other),
}
}
#[test]
fn test_streaming_truncation_utf8_safety() {
let mut buffer = "Hello世界".as_bytes().to_vec();
let more_data = "更多数据Test".as_bytes(); super::append_with_truncation(&mut buffer, more_data, 10);
let result = String::from_utf8(buffer.clone());
assert!(
result.is_ok(),
"Buffer should be valid UTF-8 after truncation: {buffer:?}"
);
let result_str = result.unwrap();
assert!(
result_str.contains("Test") || result_str.contains("据"),
"Should contain tail data: {result_str}"
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_running_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.running_job_count().await, 0);
let job1 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
assert_eq!(manager.running_job_count().await, 1);
let job2 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
assert_eq!(manager.running_job_count().await, 2);
let _ = manager.cancel_job(&job1).await;
let _ = manager.cancel_job(&job2).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrency_limit_enforced() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = bound_job_manager(config);
let job1 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
let job2 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
let result = manager.spawn_job("sleep 60", None, 120).await;
assert!(
result.is_err(),
"Should reject job when at concurrency limit"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("Concurrency limit exceeded"),
"Expected concurrency limit error, got: {err:?}"
);
let _ = manager.cancel_job(&job1).await;
let _ = manager.cancel_job(&job2).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrency_limit_zero_means_unlimited() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 0;
let manager = bound_job_manager(config);
let mut jobs = Vec::new();
for _ in 0..5 {
let result = manager.spawn_job("sleep 60", None, 120).await;
assert!(
result.is_ok(),
"Should allow unlimited jobs when limit is 0"
);
jobs.push(result.unwrap());
}
for job in jobs {
let _ = manager.cancel_job(&job).await;
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_summary_preserves_started_at_for_completed_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should be completed, got {:?}",
job.status
);
let summaries = manager.list_jobs().await;
let summary = summaries
.iter()
.find(|s| s.id == job_id)
.expect("Job should be in list");
assert!(
summary.started_at_unix > 0,
"Completed job's started_at_unix should be preserved (> 0), got {}",
summary.started_at_unix
);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(
summary.started_at_unix <= now && summary.started_at_unix > now - 60,
"started_at_unix should be recent, got {} vs now {}",
summary.started_at_unix,
now
);
}
#[tokio::test]
async fn test_acquire_sync_slot_atomic_stress() {
use std::sync::atomic::{AtomicUsize, Ordering};
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = Arc::new(bound_job_manager(config));
let active_slots = Arc::new(AtomicUsize::new(0));
let max_observed = Arc::new(AtomicUsize::new(0));
let success_count = Arc::new(AtomicUsize::new(0));
let failure_count = Arc::new(AtomicUsize::new(0));
let num_tasks = 20;
let mut handles = Vec::new();
for _ in 0..num_tasks {
let mgr = Arc::clone(&manager);
let active = Arc::clone(&active_slots);
let max_obs = Arc::clone(&max_observed);
let successes = Arc::clone(&success_count);
let failures = Arc::clone(&failure_count);
handles.push(tokio::spawn(async move {
match mgr.acquire_sync_slot().await {
Ok(guard) => {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = max_obs.load(Ordering::SeqCst);
while current > max {
match max_obs.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
successes.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(10)).await;
active.fetch_sub(1, Ordering::SeqCst);
drop(guard);
}
Err(_) => {
failures.fetch_add(1, Ordering::SeqCst);
}
}
}));
}
for handle in handles {
handle.await.expect("Task should not panic");
}
let final_max = max_observed.load(Ordering::SeqCst);
let total_successes = success_count.load(Ordering::SeqCst);
let total_failures = failure_count.load(Ordering::SeqCst);
assert!(
final_max <= 2,
"TOCTOU race detected! Max concurrent slots was {final_max}, limit is 2. \
Successes: {total_successes}, Failures: {total_failures}"
);
assert_eq!(
total_successes + total_failures,
num_tasks,
"All tasks should complete"
);
assert!(
total_failures > 0,
"Should have some failures when 20 tasks compete for 2 slots"
);
}
#[tokio::test]
async fn test_acquire_sync_slot_enforces_limit() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.max_concurrent_processes = 1;
let manager = bound_job_manager(config);
let _guard = manager
.acquire_sync_slot()
.await
.expect("first slot should be available");
let err = manager
.acquire_sync_slot()
.await
.expect_err("second slot should be rejected");
assert!(
err.to_string().contains("Concurrency limit exceeded"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn test_acquire_sync_slot_atomic_repeated() {
use std::sync::atomic::{AtomicUsize, Ordering};
for iteration in 0..10 {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = Arc::new(bound_job_manager(config));
let max_observed = Arc::new(AtomicUsize::new(0));
let active_slots = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let mgr = Arc::clone(&manager);
let max_obs = Arc::clone(&max_observed);
let active = Arc::clone(&active_slots);
handles.push(tokio::spawn(async move {
if let Ok(guard) = mgr.acquire_sync_slot().await {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = max_obs.load(Ordering::SeqCst);
while current > max {
match max_obs.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
active.fetch_sub(1, Ordering::SeqCst);
drop(guard);
}
}));
}
for handle in handles {
handle.await.expect("Task should not panic");
}
let final_max = max_observed.load(Ordering::SeqCst);
assert!(
final_max <= 2,
"TOCTOU race on iteration {iteration}! Max concurrent was {final_max}, limit is 2"
);
}
}
}