#[cfg(test)]
use meerkat_core::ops_lifecycle::OperationStatus;
use meerkat_core::ops_lifecycle::{
OperationId, OperationKind, OperationLifecycleSnapshot, OperationPublicResultClass,
OperationResult, OperationSpec, OperationTerminalOutcome, OpsLifecycleError,
OpsLifecycleRegistry,
};
use meerkat_core::types::SessionId;
use meerkat_runtime::RuntimeOpsLifecycleRegistry;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use std::sync::Arc;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::io::AsyncReadExt;
use tokio::process::{Child, Command};
use tokio::sync::{Mutex, Notify};
use tokio::task::JoinHandle;
use tracing::{debug, info, instrument, warn};
#[cfg(unix)]
async fn graceful_kill(child: &mut Child) -> std::io::Result<()> {
use nix::sys::signal::{Signal, killpg};
use nix::unistd::Pid;
if let Some(pid) = child.id() {
let pgid = Pid::from_raw(pid as i32);
let _ = killpg(pgid, Signal::SIGTERM);
tokio::select! {
() = tokio::time::sleep(Duration::from_secs(2)) => {
let _ = killpg(pgid, Signal::SIGKILL);
let _ = child.wait().await;
}
result = child.wait() => {
return result.map(|_| ());
}
}
}
Ok(())
}
#[cfg(not(unix))]
async fn graceful_kill(child: &mut Child) -> std::io::Result<()> {
child.kill().await
}
const DEFAULT_MAX_OUTPUT_BYTES: usize = 1024 * 1024;
fn append_with_truncation(buffer: &mut Vec<u8>, data: &[u8], max_bytes: usize) {
buffer.extend_from_slice(data);
if buffer.len() > max_bytes * 2 {
let keep_from = buffer.len() - max_bytes;
let safe_keep_from = (keep_from..buffer.len())
.find(|&i| {
buffer.get(i).is_none_or(|b| (*b as i8) >= -64)
})
.unwrap_or(buffer.len());
buffer.drain(0..safe_keep_from);
}
}
fn truncate_output_tail(data: &[u8], max_bytes: usize) -> String {
if data.len() <= max_bytes {
return String::from_utf8_lossy(data).to_string();
}
let keep_from = data.len() - max_bytes;
let safe_keep_from = (keep_from..data.len())
.find(|&i| {
data.get(i).is_none_or(|b| (*b as i8) >= -64)
})
.unwrap_or(data.len());
String::from_utf8_lossy(&data[safe_keep_from..]).to_string()
}
async fn read_stream_with_limit<R>(mut reader: R, max_bytes: usize) -> std::io::Result<Vec<u8>>
where
R: tokio::io::AsyncRead + Unpin,
{
let mut buffer = Vec::new();
let mut chunk = [0u8; 8192];
loop {
let n = reader.read(&mut chunk).await?;
if n == 0 {
break;
}
append_with_truncation(&mut buffer, &chunk[..n], max_bytes);
}
Ok(buffer)
}
use super::config::{ShellConfig, ShellError};
use super::types::{BackgroundJob, JobId, JobStatus, JobSummary, JobSummaryStatus};
#[derive(Clone, Debug)]
struct BackgroundJobRecord {
view: BackgroundJob,
operation_id: OperationId,
completion_notified: bool,
}
pub struct JobManager {
jobs: Arc<Mutex<HashMap<JobId, BackgroundJobRecord>>>,
canonical_job_ops: Arc<std::sync::Mutex<HashMap<JobId, OperationId>>>,
config: ShellConfig,
resolved_shell_path: Arc<Mutex<Option<PathBuf>>>,
ops_registry: Arc<dyn OpsLifecycleRegistry>,
owner_bridge_session_id: SessionId,
owner_session_bound: bool,
ops_registry_bound: bool,
handles: Arc<Mutex<HashMap<JobId, JoinHandle<()>>>>,
cancel_notifiers: Arc<Mutex<HashMap<JobId, Arc<Notify>>>>,
completed_at: Arc<Mutex<HashMap<JobId, Instant>>>,
}
impl std::fmt::Debug for JobManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("JobManager")
.field("config", &self.config)
.field("owner_bridge_session_id", &self.owner_bridge_session_id)
.field(
"exports_canonical_async_ops",
&self.exports_canonical_async_ops(),
)
.finish_non_exhaustive()
}
}
impl JobManager {
pub fn new(config: ShellConfig) -> Self {
Self {
jobs: Arc::new(Mutex::new(HashMap::new())),
canonical_job_ops: Arc::new(std::sync::Mutex::new(HashMap::new())),
config,
resolved_shell_path: Arc::new(Mutex::new(None)),
ops_registry: Arc::new(RuntimeOpsLifecycleRegistry::new()),
owner_bridge_session_id: SessionId::new(),
owner_session_bound: false,
ops_registry_bound: false,
handles: Arc::new(Mutex::new(HashMap::new())),
cancel_notifiers: Arc::new(Mutex::new(HashMap::new())),
completed_at: Arc::new(Mutex::new(HashMap::new())),
}
}
pub(crate) fn with_owner_bridge_session_id(mut self, bridge_session_id: SessionId) -> Self {
self.owner_bridge_session_id = bridge_session_id;
self.owner_session_bound = true;
self
}
pub(crate) fn with_ops_registry(mut self, ops_registry: Arc<dyn OpsLifecycleRegistry>) -> Self {
self.ops_registry = ops_registry;
self.ops_registry_bound = true;
self
}
pub fn bind_canonical_async_ops(
self,
owner_bridge_session_id: SessionId,
ops_registry: Arc<dyn OpsLifecycleRegistry>,
) -> Self {
self.with_owner_bridge_session_id(owner_bridge_session_id)
.with_ops_registry(ops_registry)
}
pub fn exports_canonical_async_ops(&self) -> bool {
self.owner_session_bound && self.ops_registry_bound
}
fn lifecycle_duration_secs(started_at_unix: u64) -> f64 {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
now.saturating_sub(started_at_unix) as f64
}
fn snapshot_for_job(
&self,
job: &BackgroundJobRecord,
) -> Result<Option<OperationLifecycleSnapshot>, ShellError> {
self.ops_registry
.snapshot(&job.operation_id)
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))
}
fn operation_admission_limit(&self) -> Option<usize> {
match self.config.max_concurrent_processes {
0 => None,
limit => Some(limit),
}
}
fn shell_error_for_ops_lifecycle_admission(error: OpsLifecycleError) -> ShellError {
match error {
OpsLifecycleError::MaxConcurrentExceeded { limit, active } => {
ShellError::Io(std::io::Error::other(format!(
"Concurrency limit exceeded: {active} active operations, limit is {limit}"
)))
}
other => ShellError::Io(std::io::Error::other(other.to_string())),
}
}
fn register_background_operation(
&self,
operation_id: OperationId,
display_name: String,
) -> Result<(), ShellError> {
self.ops_registry
.register_operation_with_admission_limit(
OperationSpec {
id: operation_id,
kind: OperationKind::BackgroundToolOp,
owner_session_id: self.owner_bridge_session_id.clone(),
display_name,
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
},
self.operation_admission_limit(),
)
.map_err(Self::shell_error_for_ops_lifecycle_admission)
}
fn start_background_operation(&self, operation_id: &OperationId) -> Result<(), ShellError> {
self.ops_registry
.provisioning_succeeded(operation_id)
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))
}
fn operation_public_result_class(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> Result<OperationPublicResultClass, OpsLifecycleError> {
if let Some(snapshot) = snapshot {
if snapshot.id != job.operation_id {
return Err(OpsLifecycleError::Internal(format!(
"background job {} snapshot operation {} does not match canonical operation {}",
job.view.id, snapshot.id, job.operation_id
)));
}
return Ok(snapshot.public_result_class);
}
self.ops_registry
.classify_operation_public_result(&job.operation_id)
}
fn summary_status_from_public_result(
result: OperationPublicResultClass,
) -> Result<JobSummaryStatus, OpsLifecycleError> {
Ok(match result {
OperationPublicResultClass::MissingAuthority => {
return Err(OpsLifecycleError::Internal(
"background job lifecycle authority missing".into(),
));
}
OperationPublicResultClass::Running => JobSummaryStatus::Running,
OperationPublicResultClass::Completed => JobSummaryStatus::Completed,
OperationPublicResultClass::Failed => JobSummaryStatus::Failed,
OperationPublicResultClass::Cancelled => JobSummaryStatus::Cancelled,
})
}
fn lifecycle_summary_status(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> Result<JobSummaryStatus, OpsLifecycleError> {
let result = self.operation_public_result_class(job, snapshot)?;
Self::summary_status_from_public_result(result)
}
fn reconcile_job_status(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> Result<JobStatus, OpsLifecycleError> {
let public_result = self.operation_public_result_class(job, snapshot)?;
match public_result {
OperationPublicResultClass::Running => Ok(JobStatus::Running {
started_at_unix: job.view.started_at_unix,
}),
OperationPublicResultClass::MissingAuthority => Err(OpsLifecycleError::Internal(
"background job lifecycle authority missing".into(),
)),
OperationPublicResultClass::Completed
| OperationPublicResultClass::Failed
| OperationPublicResultClass::Cancelled => {
match Self::terminal_status_from_authority(job, snapshot, &job.view.status) {
Ok(Some(status)) => Ok(status),
Ok(None) => Err(OpsLifecycleError::Internal(
"background job lifecycle authority missing".into(),
)),
Err(error) => Err(error),
}
}
}
}
fn duration_from_process_or_snapshot(
job: &BackgroundJobRecord,
process_status: &JobStatus,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> f64 {
match process_status {
JobStatus::Completed { duration_secs, .. }
| JobStatus::Failed { duration_secs, .. }
| JobStatus::Cancelled { duration_secs } => *duration_secs,
JobStatus::Running { .. } => snapshot
.and_then(|snapshot| snapshot.elapsed_ms)
.map(|elapsed_ms| elapsed_ms as f64 / 1000.0)
.unwrap_or_else(|| Self::lifecycle_duration_secs(job.view.started_at_unix)),
}
}
fn terminal_status_from_authority(
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
process_status: &JobStatus,
) -> Result<Option<JobStatus>, OpsLifecycleError> {
let Some(snapshot) = snapshot else {
return Ok(None);
};
let public_result = snapshot.public_result_class;
let terminal = snapshot.terminal;
let duration_secs =
Self::duration_from_process_or_snapshot(job, process_status, Some(snapshot));
Ok(match public_result {
OperationPublicResultClass::Running => Some(JobStatus::Running {
started_at_unix: job.view.started_at_unix,
}),
OperationPublicResultClass::Completed => {
if !terminal {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class completed for non-terminal status {:?} on {}",
snapshot.status, snapshot.id
)));
}
let result = match snapshot.terminal_outcome.as_ref() {
Some(OperationTerminalOutcome::Completed(result)) => result,
Some(other) => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class completed with non-completed terminal outcome for {}: {other:?}",
snapshot.id
)));
}
None => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class completed without terminal outcome for {}",
snapshot.id
)));
}
};
let (stdout, stderr) = if result.is_error {
(String::new(), result.content.clone())
} else {
(result.content.clone(), String::new())
};
let exit_code = match process_status {
JobStatus::Completed { exit_code, .. } => *exit_code,
_ => None,
};
Some(JobStatus::Completed {
exit_code,
stdout,
stderr,
duration_secs,
})
}
OperationPublicResultClass::Failed => {
if !terminal {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class failed for non-terminal status {:?} on {}",
snapshot.status, snapshot.id
)));
}
let error = match snapshot.terminal_outcome.as_ref() {
Some(OperationTerminalOutcome::Failed { error }) => error.clone(),
Some(OperationTerminalOutcome::Terminated { reason }) => reason.clone(),
Some(other) => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class failed with non-failed terminal outcome for {}: {other:?}",
snapshot.id
)));
}
None => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class failed without terminal outcome for {}",
snapshot.id
)));
}
};
Some(JobStatus::Failed {
error,
duration_secs,
})
}
OperationPublicResultClass::Cancelled => {
if !terminal {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class cancelled for non-terminal status {:?} on {}",
snapshot.status, snapshot.id
)));
}
match snapshot.terminal_outcome.as_ref() {
Some(
OperationTerminalOutcome::Aborted { .. }
| OperationTerminalOutcome::Cancelled { .. }
| OperationTerminalOutcome::Retired,
) => Some(JobStatus::Cancelled { duration_secs }),
Some(other) => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class cancelled with non-cancelled terminal outcome for {}: {other:?}",
snapshot.id
)));
}
None => {
return Err(OpsLifecycleError::Internal(format!(
"generated op public result class cancelled without terminal outcome for {}",
snapshot.id
)));
}
}
}
OperationPublicResultClass::MissingAuthority => None,
})
}
fn shell_error_for_ops_lifecycle_cancel(
job_id: &JobId,
error: OpsLifecycleError,
) -> ShellError {
match error {
OpsLifecycleError::NotFound(_) => ShellError::JobNotFound(job_id.to_string()),
OpsLifecycleError::InvalidTransition { .. } => ShellError::JobNotRunning,
other => ShellError::Io(std::io::Error::other(other.to_string())),
}
}
fn cancelled_view_from_authority(
&self,
job: &BackgroundJobRecord,
snapshot: Option<&OperationLifecycleSnapshot>,
) -> Result<JobStatus, ShellError> {
match Self::terminal_status_from_authority(job, snapshot, &job.view.status)
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?
{
Some(status @ JobStatus::Cancelled { .. }) => Ok(status),
Some(_) => Err(ShellError::JobNotRunning),
None => Err(ShellError::Io(std::io::Error::other(
"background job lifecycle authority missing after cancellation",
))),
}
}
pub async fn ops_lifecycle_snapshot(
&self,
job_id: &JobId,
) -> Result<Option<OperationLifecycleSnapshot>, ShellError> {
let jobs = self.jobs.lock().await;
let Some(job) = jobs.get(job_id) else {
return Ok(None);
};
self.snapshot_for_job(job)
}
pub async fn register_synthetic_running_job(
&self,
command: &str,
working_dir: Option<&Path>,
timeout_secs: u64,
) -> Result<JobId, ShellError> {
if !self.exports_canonical_async_ops() {
return Err(ShellError::Io(std::io::Error::other(
"background shell jobs require canonical session binding",
)));
}
self.cleanup_old_jobs().await;
let resolved_dir = if let Some(dir) = working_dir {
Some(self.config.validate_working_dir_async(dir).await?)
} else {
None
};
let effective_dir = if let Some(dir) = resolved_dir.as_ref() {
dir.clone()
} else {
self.config.default_working_dir_async().await?
};
let placement = self
.config
.execution_placement_for_working_dir_async(&effective_dir)
.await?;
let job_id = JobId::new();
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let operation_id = OperationId::new();
self.register_background_operation(operation_id.clone(), format!("shell:{command}"))?;
if let Err(error) = self.start_background_operation(&operation_id) {
let _ = self
.ops_registry
.abort_provisioning(&operation_id, Some(error.to_string()));
return Err(error);
}
let job = BackgroundJob {
id: job_id.clone(),
command: command.to_string(),
working_dir: resolved_dir.as_ref().map(|path| path.display().to_string()),
placement: Some(placement),
timeout_secs,
started_at_unix,
status: JobStatus::Running { started_at_unix },
};
self.jobs.lock().await.insert(
job_id.clone(),
BackgroundJobRecord {
view: job,
operation_id: operation_id.clone(),
completion_notified: false,
},
);
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id);
self.cancel_notifiers
.lock()
.await
.insert(job_id.clone(), Arc::new(Notify::new()));
Ok(job_id)
}
async fn resolved_shell_path(&self) -> Result<PathBuf, ShellError> {
{
let guard = self.resolved_shell_path.lock().await;
if let Some(path) = guard.as_ref() {
return Ok(path.clone());
}
}
let path = self.config.resolve_shell_path_auto_async().await?;
let mut guard = self.resolved_shell_path.lock().await;
*guard = Some(path.clone());
Ok(path)
}
#[instrument(skip(self), fields(command = %command, timeout_secs = %timeout_secs))]
pub async fn spawn_job(
&self,
command: &str,
working_dir: Option<&Path>,
timeout_secs: u64,
) -> Result<JobId, ShellError> {
if !self.exports_canonical_async_ops() {
return Err(ShellError::Io(std::io::Error::other(
"background shell jobs require canonical session binding",
)));
}
info!("Spawning background job");
self.cleanup_old_jobs().await;
let resolved_dir = if let Some(dir) = working_dir {
debug!(working_dir = %dir.display(), "Validating working directory");
Some(self.config.validate_working_dir_async(dir).await?)
} else {
None
};
let effective_dir = if let Some(dir) = resolved_dir.as_ref() {
dir.clone()
} else {
self.config.default_working_dir_async().await?
};
let placement = self
.config
.execution_placement_for_working_dir_async(&effective_dir)
.await?;
let shell_path = self.resolved_shell_path().await?;
let job_id = JobId::new();
debug!(job_id = %job_id, "Generated job ID");
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let operation_id = OperationId::new();
self.register_background_operation(operation_id.clone(), format!("shell:{command}"))?;
let job = BackgroundJob {
id: job_id.clone(),
command: command.to_string(),
working_dir: resolved_dir.as_ref().map(|p| p.display().to_string()),
placement: Some(placement),
timeout_secs,
started_at_unix,
status: JobStatus::Running { started_at_unix },
};
let mut cmd = Command::new(&shell_path);
cmd.arg("-c").arg(command);
let work_dir = effective_dir;
cmd.current_dir(&work_dir);
cmd.env("PWD", &work_dir);
cmd.envs(&self.config.env_vars);
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
#[cfg(unix)]
cmd.process_group(0);
eprintln!(
"[SHELL-DEBUG] shell={} work_dir={} exists={} command={}",
shell_path.display(),
work_dir.display(),
work_dir.exists(),
&command[..command.len().min(80)]
);
let child = match cmd.spawn() {
Ok(child) => child,
Err(error) => {
let _ = self
.ops_registry
.provisioning_failed(&operation_id, error.to_string());
return Err(ShellError::Io(error));
}
};
if let Err(error) = self.start_background_operation(&operation_id) {
let mut child = child;
let _ = graceful_kill(&mut child).await;
let _ = self
.ops_registry
.abort_provisioning(&operation_id, Some(error.to_string()));
return Err(error);
}
debug!("Spawned child process");
let jobs = Arc::clone(&self.jobs);
let handles = Arc::clone(&self.handles);
let cancel_notifiers = Arc::clone(&self.cancel_notifiers);
let completed_at = Arc::clone(&self.completed_at);
let ops_registry = Arc::clone(&self.ops_registry);
let operation_id_for_task = operation_id.clone();
let command_clone = command.to_string();
let job_id_clone = job_id.clone();
let job_id_for_completion = job_id.clone();
let job_id_for_cancel = job_id.clone();
let cancel_notify = Arc::new(Notify::new());
jobs.lock().await.insert(
job_id.clone(),
BackgroundJobRecord {
view: job,
operation_id: operation_id.clone(),
completion_notified: false,
},
);
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id.clone());
cancel_notifiers
.lock()
.await
.insert(job_id.clone(), Arc::clone(&cancel_notify));
let handle = tokio::spawn(async move {
let start = Instant::now();
let timeout_duration = Duration::from_secs(timeout_secs);
let mut child = child;
let stdout = child.stdout.take();
let stderr = child.stderr.take();
let stdout_handle = tokio::spawn(async move {
if let Some(out) = stdout {
read_stream_with_limit(out, DEFAULT_MAX_OUTPUT_BYTES).await
} else {
Ok(Vec::new())
}
});
let stderr_handle = tokio::spawn(async move {
if let Some(err) = stderr {
read_stream_with_limit(err, DEFAULT_MAX_OUTPUT_BYTES).await
} else {
Ok(Vec::new())
}
});
enum WaitOutcome {
Completed(Option<i32>),
Failed(std::io::Error),
TimedOut,
Cancelled,
}
let wait_outcome = tokio::select! {
() = cancel_notify.notified() => WaitOutcome::Cancelled,
result = tokio::time::timeout(timeout_duration, child.wait()) => {
match result {
Ok(Ok(status)) => WaitOutcome::Completed(status.code()),
Ok(Err(err)) => WaitOutcome::Failed(err),
Err(_) => WaitOutcome::TimedOut,
}
}
};
if matches!(wait_outcome, WaitOutcome::TimedOut | WaitOutcome::Cancelled) {
let _ = graceful_kill(&mut child).await;
}
let duration_secs = start.elapsed().as_secs_f64();
let stdout_bytes = match stdout_handle.await {
Ok(Ok(buf)) => buf,
Ok(Err(err)) => {
warn!("Failed to read job stdout: {}", err);
Vec::new()
}
Err(err) => {
warn!("Job stdout reader task failed: {}", err);
Vec::new()
}
};
let stderr_bytes = match stderr_handle.await {
Ok(Ok(buf)) => buf,
Ok(Err(err)) => {
warn!("Failed to read job stderr: {}", err);
Vec::new()
}
Err(err) => {
warn!("Job stderr reader task failed: {}", err);
Vec::new()
}
};
let stdout = truncate_output_tail(&stdout_bytes, DEFAULT_MAX_OUTPUT_BYTES);
let stderr = truncate_output_tail(&stderr_bytes, DEFAULT_MAX_OUTPUT_BYTES);
let final_status = match wait_outcome {
WaitOutcome::Failed(err) => JobStatus::Failed {
error: err.to_string(),
duration_secs,
},
WaitOutcome::TimedOut => JobStatus::Failed {
error: "background job timed out".to_string(),
duration_secs,
},
WaitOutcome::Cancelled => JobStatus::Cancelled { duration_secs },
WaitOutcome::Completed(exit_code) => JobStatus::Completed {
exit_code,
stdout,
stderr,
duration_secs,
},
};
let lifecycle_result = match &final_status {
JobStatus::Completed { stdout, stderr, .. } => {
let content = if !stdout.is_empty() {
stdout.clone()
} else if !stderr.is_empty() {
stderr.clone()
} else {
command_clone.clone()
};
ops_registry.complete_operation(
&operation_id_for_task,
OperationResult {
id: operation_id_for_task.clone(),
content,
is_error: false,
duration_ms: (duration_secs * 1000.0) as u64,
tokens_used: 0,
},
)
}
JobStatus::Failed { error, .. } => {
ops_registry.fail_operation(&operation_id_for_task, error.clone())
}
JobStatus::Cancelled { .. } => ops_registry
.cancel_operation(&operation_id_for_task, Some("cancelled by caller".into())),
JobStatus::Running { .. } => Ok(()),
};
let authority_snapshot = ops_registry.snapshot(&operation_id_for_task);
let mut record_completed_at = false;
{
let mut jobs_guard = jobs.lock().await;
if let Some(job) = jobs_guard.get_mut(&job_id_clone) {
let generated_terminal_authority = authority_snapshot
.as_ref()
.ok()
.and_then(Option::as_ref)
.is_some_and(|snapshot| snapshot.terminal);
let authority_status = match authority_snapshot.as_ref() {
Ok(snapshot) => Self::terminal_status_from_authority(
job,
snapshot.as_ref(),
&final_status,
),
Err(error) => Err(OpsLifecycleError::Internal(format!(
"background job lifecycle authority projection failed: {error}"
))),
};
match authority_status {
Ok(Some(status)) => {
record_completed_at = generated_terminal_authority;
job.view.status = status;
}
Ok(None) => {
let error = match lifecycle_result {
Ok(()) => {
"background job lifecycle authority missing after terminal transition"
.to_string()
}
Err(error) => format!(
"background job lifecycle authority rejected terminal transition: {error}"
),
};
warn!(
job_id = %job.view.id,
operation_id = %operation_id_for_task,
error = %error,
"background job terminal status not updated because generated authority did not project a public result"
);
}
Err(error) => {
warn!(
job_id = %job.view.id,
operation_id = %operation_id_for_task,
error = %error,
"background job terminal status not updated because generated authority projection failed"
);
}
}
}
}
if record_completed_at {
completed_at
.lock()
.await
.insert(job_id_for_completion, Instant::now());
}
{
cancel_notifiers.lock().await.remove(&job_id_for_cancel);
}
});
handles.lock().await.insert(job_id.clone(), handle);
Ok(job_id)
}
#[instrument(skip(self), fields(job_id = %job_id))]
pub async fn get_status(&self, job_id: &JobId) -> Result<Option<BackgroundJob>, ShellError> {
let Some(mut job) = self.jobs.lock().await.get(job_id).cloned() else {
debug!("Job not found");
return Ok(None);
};
let snapshot = self.snapshot_for_job(&job)?;
job.view.status = self
.reconcile_job_status(&job, snapshot.as_ref())
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?;
Ok(Some(job.view))
}
#[instrument(skip(self))]
pub async fn list_jobs(&self) -> Result<Vec<JobSummary>, ShellError> {
let jobs: Vec<BackgroundJobRecord> = self.jobs.lock().await.values().cloned().collect();
let mut summaries = Vec::with_capacity(jobs.len());
for job in jobs {
let snapshot = self.snapshot_for_job(&job)?;
let status = self
.lifecycle_summary_status(&job, snapshot.as_ref())
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?;
summaries.push(JobSummary {
id: job.view.id.clone(),
command: job.view.command.clone(),
status,
started_at_unix: job.view.started_at_unix,
});
}
debug!(count = summaries.len(), "Listed jobs");
Ok(summaries)
}
#[instrument(skip(self), fields(job_id = %job_id))]
pub async fn cancel_job(&self, job_id: &JobId) -> Result<(), ShellError> {
info!("Cancelling job");
let operation_id = {
let jobs_guard = self.jobs.lock().await;
jobs_guard
.get(job_id)
.ok_or_else(|| {
warn!("Job not found");
ShellError::JobNotFound(job_id.to_string())
})?
.operation_id
.clone()
};
self.ops_registry
.cancel_operation(&operation_id, Some("cancelled by caller".into()))
.map_err(|error| Self::shell_error_for_ops_lifecycle_cancel(job_id, error))?;
let snapshot = self
.ops_registry
.snapshot(&operation_id)
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?;
{
let mut jobs_guard = self.jobs.lock().await;
let job = jobs_guard.get_mut(job_id).ok_or_else(|| {
warn!("Job not found");
ShellError::JobNotFound(job_id.to_string())
})?;
job.view.status = self.cancelled_view_from_authority(job, snapshot.as_ref())?;
}
let notify = { self.cancel_notifiers.lock().await.get(job_id).cloned() };
if let Some(notify) = notify {
notify.notify_one();
} else {
warn!("Cancel notifier missing for running job");
}
self.handles.lock().await.remove(job_id);
Ok(())
}
pub async fn remove_job(&self, job_id: &JobId) -> bool {
let removed = self.jobs.lock().await.remove(job_id).is_some();
if removed {
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.remove(job_id);
self.handles.lock().await.remove(job_id);
self.cancel_notifiers.lock().await.remove(job_id);
self.completed_at.lock().await.remove(job_id);
}
removed
}
pub fn canonical_operation_for_job(&self, job_id: &JobId) -> Option<OperationId> {
self.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(job_id)
.cloned()
}
pub async fn drain_completed(&self) -> Vec<meerkat_core::agent::DetachedOpCompletion> {
let mut completions = Vec::new();
let mut jobs = self.jobs.lock().await;
let mut completed_at_guard = self.completed_at.lock().await;
for (_job_id, record) in jobs.iter_mut() {
if record.completion_notified {
continue;
}
let snapshot = match self.ops_registry.snapshot(&record.operation_id) {
Ok(Some(snapshot)) => snapshot,
Ok(None) => continue,
Err(error) => {
warn!(
job_id = %record.view.id,
operation_id = %record.operation_id,
error = %error,
"background job completion drain skipped operation without generated projection authority"
);
continue;
}
};
if !snapshot.terminal {
continue;
}
let detail = Self::build_completion_detail(&record.view);
let completed_at_ms = snapshot.completed_at_ms;
completions.push(meerkat_core::agent::DetachedOpCompletion {
job_id: record.view.id.to_string(),
kind: snapshot.kind,
status: snapshot.status,
terminal_outcome: snapshot.terminal_outcome,
display_name: snapshot.display_name,
detail,
elapsed_ms: snapshot.elapsed_ms,
});
record.completion_notified = true;
let completion_instant = completed_at_ms
.and_then(|completed_ms| {
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let ago_ms = now_ms.saturating_sub(completed_ms);
Instant::now().checked_sub(Duration::from_millis(ago_ms))
})
.unwrap_or_else(Instant::now);
completed_at_guard
.entry(record.view.id.clone())
.or_insert(completion_instant);
}
completions
}
fn str_tail(s: &str, max_bytes: usize) -> &str {
if s.len() <= max_bytes {
return s;
}
let start = s.floor_char_boundary(s.len() - max_bytes);
&s[start..]
}
fn build_completion_detail(view: &BackgroundJob) -> String {
let mut parts = Vec::new();
match &view.status {
JobStatus::Completed {
exit_code,
stdout,
stderr,
duration_secs,
} => {
if let Some(code) = exit_code {
parts.push(format!("exit_code: {code}"));
}
parts.push(format!("duration: {duration_secs:.1}s"));
if !stdout.is_empty() {
parts.push(format!("stdout: {}", Self::str_tail(stdout, 200)));
}
if !stderr.is_empty() {
parts.push(format!("stderr: {}", Self::str_tail(stderr, 200)));
}
}
JobStatus::Failed {
error,
duration_secs,
} => {
parts.push(format!("error: {error}"));
parts.push(format!("duration: {duration_secs:.1}s"));
}
JobStatus::Cancelled { duration_secs } => {
parts.push(format!("cancelled after {duration_secs:.1}s"));
}
JobStatus::Running { .. } => {
parts.push("still running (unexpected)".to_string());
}
}
parts.join("; ")
}
async fn cleanup_old_jobs(&self) {
let ttl = Duration::from_secs(self.config.completed_job_ttl_secs);
let max_completed = self.config.max_completed_jobs;
let now = Instant::now();
let mut jobs_guard = self.jobs.lock().await;
let mut handles_guard = self.handles.lock().await;
let mut cancel_notifiers_guard = self.cancel_notifiers.lock().await;
let mut completed_at_guard = self.completed_at.lock().await;
let mut canonical_job_ops_guard = self
.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let expired_jobs: Vec<JobId> = completed_at_guard
.iter()
.filter(|(job_id, completed_time)| {
now.duration_since(**completed_time) > ttl
&& jobs_guard.get(job_id).is_none_or(|j| j.completion_notified)
})
.map(|(job_id, _)| job_id.clone())
.collect();
for job_id in &expired_jobs {
jobs_guard.remove(job_id);
canonical_job_ops_guard.remove(job_id);
handles_guard.remove(job_id);
cancel_notifiers_guard.remove(job_id);
completed_at_guard.remove(job_id);
}
if max_completed > 0 && completed_at_guard.len() > max_completed {
let mut evictable_jobs: Vec<(JobId, Instant)> = completed_at_guard
.iter()
.filter(|(job_id, _)| jobs_guard.get(job_id).is_none_or(|j| j.completion_notified))
.map(|(k, v)| (k.clone(), *v))
.collect();
evictable_jobs.sort_by_key(|(_, time)| *time);
let total_completed = completed_at_guard.len();
let to_remove = total_completed
.saturating_sub(max_completed)
.min(evictable_jobs.len());
for (job_id, _) in evictable_jobs.into_iter().take(to_remove) {
jobs_guard.remove(&job_id);
canonical_job_ops_guard.remove(&job_id);
handles_guard.remove(&job_id);
cancel_notifiers_guard.remove(&job_id);
completed_at_guard.remove(&job_id);
}
}
}
pub async fn job_count(&self) -> usize {
self.jobs.lock().await.len()
}
pub async fn completed_job_count(&self) -> usize {
self.completed_at.lock().await.len()
}
pub async fn running_job_count(&self) -> Result<usize, ShellError> {
let mut count = 0;
for snapshot in self
.ops_registry
.list_operations()
.map_err(|error| ShellError::Io(std::io::Error::other(error.to_string())))?
{
if snapshot.kind != OperationKind::BackgroundToolOp
&& snapshot.kind != OperationKind::BackgroundToolCapacitySlot
{
continue;
}
if !snapshot.terminal {
count += 1;
}
}
Ok(count)
}
pub async fn acquire_sync_slot(&self) -> Result<SyncSlotGuard, ShellError> {
let operation_id = OperationId::new();
self.ops_registry
.register_operation_with_admission_limit(
OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolCapacitySlot,
owner_session_id: self.owner_bridge_session_id.clone(),
display_name: "shell:sync-slot".to_string(),
source_label: "shell_sync_slot".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
},
self.operation_admission_limit(),
)
.map_err(Self::shell_error_for_ops_lifecycle_admission)?;
if let Err(error) = self.ops_registry.provisioning_succeeded(&operation_id) {
let _ = self
.ops_registry
.abort_provisioning(&operation_id, Some(error.to_string()));
return Err(ShellError::Io(std::io::Error::other(error.to_string())));
}
Ok(SyncSlotGuard {
ops_registry: Arc::clone(&self.ops_registry),
operation_id,
})
}
}
pub struct SyncSlotGuard {
ops_registry: Arc<dyn OpsLifecycleRegistry>,
operation_id: OperationId,
}
impl std::fmt::Debug for SyncSlotGuard {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SyncSlotGuard")
.field("operation_id", &self.operation_id)
.finish_non_exhaustive()
}
}
impl Drop for SyncSlotGuard {
fn drop(&mut self) {
if let Err(error) = self.ops_registry.mark_retired(&self.operation_id) {
tracing::error!(
operation_id = %self.operation_id,
error = %error,
"generated shell sync-slot lifecycle authority rejected release"
);
}
}
}
impl meerkat_core::completion_feed::CompletionEnrichmentProvider for JobManager {
fn enrich(
&self,
operation_id: &OperationId,
) -> Option<meerkat_core::completion_feed::CompletionEnrichmentData> {
let canonical = self
.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let job_id = canonical
.iter()
.find(|(_, oid)| *oid == operation_id)
.map(|(jid, _)| jid.clone())?;
drop(canonical);
let jobs = self.jobs.try_lock().ok()?;
let record = jobs.get(&job_id)?;
let detail = Self::build_completion_detail(&record.view);
Some(meerkat_core::completion_feed::CompletionEnrichmentData {
job_id: job_id.to_string(),
detail,
})
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used)]
mod tests {
use super::*;
use crate::builtin::shell::security::SecurityMode;
use std::path::{Path, PathBuf};
use tempfile::TempDir;
fn bound_job_manager(config: ShellConfig) -> JobManager {
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
JobManager::new(config)
.with_owner_bridge_session_id(SessionId::new())
.with_ops_registry(registry)
}
#[test]
fn test_job_manager_struct() {
let config = ShellConfig::default();
let _manager = bound_job_manager(config);
}
#[test]
fn test_job_manager_new() {
let config = ShellConfig {
enabled: true,
default_timeout_secs: 60,
restrict_to_project: true,
shell: "nu".to_string(),
shell_path: None,
project_root: PathBuf::from("/tmp/test"),
max_completed_jobs: 100,
completed_job_ttl_secs: 300,
max_concurrent_processes: 10,
security_mode: SecurityMode::Unrestricted,
security_patterns: vec![],
env_vars: std::collections::HashMap::new(),
};
let manager = bound_job_manager(config);
assert!(manager.config.enabled);
assert_eq!(manager.config.default_timeout_secs, 60);
assert_eq!(manager.config.shell, "nu");
assert_eq!(manager.config.project_root, PathBuf::from("/tmp/test"));
assert_eq!(manager.config.max_completed_jobs, 100);
assert_eq!(manager.config.completed_job_ttl_secs, 300);
}
#[tokio::test]
async fn timed_out_job_summary_uses_generated_failed_snapshot() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:sleep".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager
.ops_registry
.fail_operation(&operation_id, "background job timed out".into())
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "sleep 30".to_string(),
working_dir: None,
placement: None,
timeout_secs: 30,
started_at_unix: 123,
status: JobStatus::Failed {
error: "background job timed out".to_string(),
duration_secs: 30.0,
},
},
operation_id,
completion_notified: false,
};
let snapshot = manager.snapshot_for_job(&job).unwrap();
assert_eq!(
manager
.lifecycle_summary_status(&job, snapshot.as_ref())
.expect("generated summary status"),
JobSummaryStatus::Failed
);
}
#[test]
fn terminal_job_status_uses_generated_lifecycle_class() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:false".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager
.ops_registry
.fail_operation(&operation_id, "generated lifecycle rejected success".into())
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "false".to_string(),
working_dir: None,
placement: None,
timeout_secs: 30,
started_at_unix: 123,
status: JobStatus::Completed {
exit_code: Some(0),
stdout: "local success".to_string(),
stderr: String::new(),
duration_secs: 1.25,
},
},
operation_id,
completion_notified: false,
};
let snapshot = manager.snapshot_for_job(&job).unwrap();
assert_eq!(
JobManager::terminal_status_from_authority(&job, snapshot.as_ref(), &job.view.status)
.expect("generated public result class"),
Some(JobStatus::Failed {
error: "generated lifecycle rejected success".to_string(),
duration_secs: 1.25,
})
);
}
#[test]
fn local_process_completion_remains_running_until_generated_terminal_authority() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:local-complete".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "true".to_string(),
working_dir: None,
placement: None,
timeout_secs: 30,
started_at_unix: 123,
status: JobStatus::Completed {
exit_code: Some(0),
stdout: "local success".to_string(),
stderr: String::new(),
duration_secs: 1.0,
},
},
operation_id,
completion_notified: false,
};
let snapshot = manager
.snapshot_for_job(&job)
.unwrap()
.expect("generated lifecycle snapshot");
assert_eq!(snapshot.status, OperationStatus::Running);
assert!(!snapshot.terminal);
assert_eq!(
JobManager::terminal_status_from_authority(&job, Some(&snapshot), &job.view.status)
.expect("generated public result class"),
Some(JobStatus::Running {
started_at_unix: 123,
})
);
}
#[test]
fn failed_terminal_status_uses_generated_terminal_outcome() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:timeout".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager
.ops_registry
.fail_operation(&operation_id, "generated failure reason".into())
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "sleep 30".to_string(),
working_dir: None,
placement: None,
timeout_secs: 1,
started_at_unix: 123,
status: JobStatus::Failed {
error: "local shell timeout".to_string(),
duration_secs: 1.0,
},
},
operation_id,
completion_notified: false,
};
let snapshot = manager.snapshot_for_job(&job).unwrap();
assert_eq!(
JobManager::terminal_status_from_authority(&job, snapshot.as_ref(), &job.view.status)
.expect("generated public result class"),
Some(JobStatus::Failed {
error: "generated failure reason".to_string(),
duration_secs: 1.0,
})
);
}
#[test]
fn retiring_job_status_remains_nonterminal_until_generated_terminal_outcome() {
let manager = bound_job_manager(ShellConfig::default());
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:retiring".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager.ops_registry.request_retire(&operation_id).unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: JobId::new(),
command: "sleep 30".to_string(),
working_dir: None,
placement: None,
timeout_secs: 30,
started_at_unix: 123,
status: JobStatus::Running {
started_at_unix: 123,
},
},
operation_id,
completion_notified: false,
};
let snapshot = manager.snapshot_for_job(&job).unwrap();
let snapshot = snapshot.as_ref().expect("snapshot");
assert_eq!(snapshot.status, OperationStatus::Retiring);
assert!(snapshot.terminal_outcome.is_none());
assert_eq!(
manager
.lifecycle_summary_status(&job, Some(snapshot))
.expect("generated summary status"),
JobSummaryStatus::Running
);
assert_eq!(
manager
.reconcile_job_status(&job, Some(snapshot))
.expect("generated status"),
JobStatus::Running {
started_at_unix: 123,
}
);
}
#[test]
fn missing_authority_public_class_is_not_shell_failed() {
let error = JobManager::summary_status_from_public_result(
OperationPublicResultClass::MissingAuthority,
)
.expect_err("missing authority must fail projection");
assert!(
matches!(error, OpsLifecycleError::Internal(ref message) if message.contains("authority missing")),
"unexpected missing authority error: {error:?}"
);
}
#[tokio::test]
async fn synthetic_running_job_registers_into_injected_ops_registry() {
let owner_bridge_session_id = SessionId::new();
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
let manager = JobManager::new(ShellConfig::default())
.with_owner_bridge_session_id(owner_bridge_session_id)
.with_ops_registry(Arc::clone(®istry));
let job_id = manager
.register_synthetic_running_job("shell:synthetic", None, 30)
.await
.expect("synthetic running job should register");
let snapshot = manager
.ops_lifecycle_snapshot(&job_id)
.await
.expect("job manager should project snapshot from injected registry")
.expect("job manager should resolve snapshot from injected registry");
assert_eq!(snapshot.kind, OperationKind::BackgroundToolOp);
assert_eq!(snapshot.status, OperationStatus::Running);
assert_eq!(
registry
.snapshot(&snapshot.id)
.expect("injected registry should own the operation")
.expect("injected registry should own the operation")
.status,
OperationStatus::Running
);
}
#[tokio::test]
async fn synthetic_running_job_concurrency_uses_generated_admission() {
let config = ShellConfig {
max_concurrent_processes: 1,
..Default::default()
};
let manager = bound_job_manager(config);
let _job_id = manager
.register_synthetic_running_job("shell:synthetic-one", None, 30)
.await
.expect("first synthetic running job should register");
let err = manager
.register_synthetic_running_job("shell:synthetic-two", None, 30)
.await
.expect_err("generated operation admission should reject over limit");
assert!(
err.to_string().contains("Concurrency limit exceeded"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn synthetic_running_job_records_execution_placement_without_identity_path() {
let temp_dir = TempDir::new().unwrap();
let project_root = temp_dir.path().to_path_buf();
let subdir = project_root.join("worker");
tokio::fs::create_dir(&subdir).await.unwrap();
let manager = bound_job_manager(ShellConfig::with_project_root(project_root.clone()));
let job_id = manager
.register_synthetic_running_job(
"shell:synthetic-placement",
Some(Path::new("worker")),
30,
)
.await
.expect("synthetic running job should register");
let job = manager
.get_status(&job_id)
.await
.expect("job status")
.expect("job exists");
let placement = job.placement.expect("placement metadata");
assert_eq!(
placement.working_root.as_deref(),
Some(subdir.canonicalize().unwrap().as_path())
);
assert_eq!(
placement.allowed_roots,
vec![project_root.canonicalize().unwrap()]
);
assert_eq!(placement.identity().host_id, None);
assert_eq!(placement.identity().worktree_id, None);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn() {
let temp_dir = TempDir::new().unwrap();
let config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let mut config = config;
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let result = manager.spawn_job("echo test", None, 30).await;
assert!(result.is_ok());
let job_id = result.unwrap();
assert!(job_id.0.starts_with("job_"));
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn_immediate() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let start = Instant::now();
let job_id = manager.spawn_job("sleep 5", None, 30).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 1000,
"spawn_job should return immediately, took {elapsed:?}"
);
let job = manager.get_status(&job_id).await.unwrap();
assert!(job.is_some());
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_spawn_running() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 10", None, 30).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be Running, got {:?}",
job.status
);
if let JobStatus::Running { started_at_unix } = job.status {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(
started_at_unix <= now && started_at_unix > now - 60,
"started_at_unix should be recent"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_get_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent");
let status = manager.get_status(&fake_id).await.unwrap();
assert!(status.is_none());
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
let status = manager.get_status(&job_id).await.unwrap();
assert!(status.is_some());
let job = status.unwrap();
assert_eq!(job.id, job_id);
assert_eq!(job.command, "echo hello");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_list_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let jobs = manager.list_jobs().await.unwrap();
assert!(jobs.is_empty());
let id1 = manager.spawn_job("echo one", None, 30).await.unwrap();
let id2 = manager.spawn_job("echo two", None, 30).await.unwrap();
let jobs = manager.list_jobs().await.unwrap();
assert_eq!(jobs.len(), 2);
let ids: Vec<_> = jobs.iter().map(|j| j.id.clone()).collect();
assert!(ids.contains(&id1));
assert!(ids.contains(&id2));
}
#[tokio::test]
async fn test_job_summary_preserves_started_at_unit() {
let temp_dir = TempDir::new().unwrap();
let config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
let manager = bound_job_manager(config);
let job_id = JobId::new();
let started_at_unix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
let operation_id = OperationId::new();
manager
.ops_registry
.register_operation(OperationSpec {
id: operation_id.clone(),
kind: OperationKind::BackgroundToolOp,
owner_session_id: manager.owner_bridge_session_id.clone(),
display_name: "shell:echo done".to_string(),
source_label: "shell_job".to_string(),
operation_source: None,
child_session_id: None,
expect_peer_channel: false,
})
.unwrap();
manager
.ops_registry
.provisioning_succeeded(&operation_id)
.unwrap();
manager
.ops_registry
.complete_operation(
&operation_id,
OperationResult {
id: operation_id.clone(),
content: "done".to_string(),
is_error: false,
duration_ms: 10,
tokens_used: 0,
},
)
.unwrap();
let job = BackgroundJobRecord {
view: BackgroundJob {
id: job_id.clone(),
command: "echo done".to_string(),
working_dir: None,
placement: None,
timeout_secs: 10,
started_at_unix,
status: JobStatus::Completed {
exit_code: Some(0),
stdout: "done".to_string(),
stderr: String::new(),
duration_secs: 0.01,
},
},
operation_id: operation_id.clone(),
completion_notified: false,
};
manager.jobs.lock().await.insert(job_id.clone(), job);
manager
.canonical_job_ops
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.insert(job_id.clone(), operation_id);
let summaries = manager.list_jobs().await.expect("job summaries");
let summary = summaries
.iter()
.find(|s| s.id == job_id)
.expect("Job should be in list");
assert_eq!(summary.started_at_unix, started_at_unix);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancel() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result = manager.cancel_job(&job_id).await;
assert!(result.is_ok());
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled, got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancel_signal() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let result = manager.cancel_job(&job_id).await;
assert!(result.is_ok());
let fake_id = JobId::from_string("job_fake");
let result = manager.cancel_job(&fake_id).await;
assert!(matches!(result, Err(ShellError::JobNotFound(_))));
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_tokio_spawn() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should be Completed, got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_completed_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager
.spawn_job("echo 'test output'", None, 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
if let JobStatus::Completed {
exit_code,
stdout,
stderr: _,
duration_secs,
} = &job.status
{
assert_eq!(*exit_code, Some(0));
assert!(stdout.contains("test output"));
assert!(*duration_secs >= 0.0);
} else {
unreachable!("Expected Completed status, got {:?}", job.status);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_failed_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("exit 1", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
if let JobStatus::Completed { exit_code, .. } = &job.status {
assert_eq!(*exit_code, Some(1));
} else {
unreachable!(
"Expected Completed status with exit code 1, got {:?}",
job.status
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_timeout_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 30", None, 1).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
if let JobStatus::Failed {
error,
duration_secs,
} = &job.status
{
assert!(error.contains("timed out"));
assert!(*duration_secs >= 1.0);
} else {
unreachable!("Expected Failed timeout status, got {:?}", job.status);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_manager_cancelled_status() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
manager.cancel_job(&job_id).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled, got {:?}",
job.status
);
if let JobStatus::Cancelled { duration_secs } = &job.status {
assert!(*duration_secs >= 0.0);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_async_execution_nonblocking() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let start = Instant::now();
let id1 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let id2 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let id3 = manager.spawn_job("sleep 5", None, 60).await.unwrap();
let elapsed = start.elapsed();
assert!(
elapsed.as_millis() < 1000,
"Spawning 3 jobs should be nearly instant, took {elapsed:?}"
);
for id in [&id1, &id2, &id3] {
let job = manager.get_status(id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job {id} should be running"
);
}
let _ = manager.cancel_job(&id1).await;
let _ = manager.cancel_job(&id2).await;
let _ = manager.cancel_job(&id3).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_timeout_enforced() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 10", None, 1).await.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
if let JobStatus::Failed {
error,
duration_secs,
} = &job.status
{
assert!(error.contains("timed out"));
assert!(
*duration_secs >= 1.0 && *duration_secs < 3.0,
"Duration should be close to timeout: {duration_secs}"
);
} else {
unreachable!("Expected Failed timeout status, got {:?}", job.status);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_kill_terminates_process() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be running before cancel"
);
manager
.cancel_job(&job_id)
.await
.expect("Cancel should succeed");
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be cancelled, got {:?}",
job.status
);
tokio::time::sleep(Duration::from_millis(500)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should still be cancelled"
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrent_job_spawning() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = Arc::new(bound_job_manager(config));
let mut handles = Vec::new();
for i in 0..10 {
let mgr = Arc::clone(&manager);
let cmd = format!("echo job{i}");
handles.push(tokio::spawn(
async move { mgr.spawn_job(&cmd, None, 30).await },
));
}
let mut job_ids = Vec::new();
for handle in handles {
let result = handle.await.unwrap();
assert!(result.is_ok(), "All spawns should succeed");
job_ids.push(result.unwrap());
}
let unique_count = {
let mut ids: Vec<_> = job_ids.iter().map(|id| &id.0).collect();
ids.sort();
ids.dedup();
ids.len()
};
assert_eq!(
unique_count, 10,
"All 10 jobs should have unique IDs, got {unique_count}"
);
tokio::time::sleep(Duration::from_secs(2)).await;
for job_id in &job_ids {
let job = manager.get_status(job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job {} should be completed, got {:?}",
job_id,
job.status
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_remove_job() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(manager.get_status(&job_id).await.unwrap().is_some());
assert_eq!(manager.job_count().await, 1);
let removed = manager.remove_job(&job_id).await;
assert!(removed, "Job should be removed");
assert!(manager.get_status(&job_id).await.unwrap().is_none());
assert_eq!(manager.job_count().await, 0);
let removed_again = manager.remove_job(&job_id).await;
assert!(!removed_again, "Already removed job should return false");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.job_count().await, 0);
let id1 = manager.spawn_job("echo one", None, 30).await.unwrap();
assert_eq!(manager.job_count().await, 1);
let id2 = manager.spawn_job("echo two", None, 30).await.unwrap();
assert_eq!(manager.job_count().await, 2);
manager.remove_job(&id1).await;
assert_eq!(manager.job_count().await, 1);
manager.remove_job(&id2).await;
assert_eq!(manager.job_count().await, 0);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_completed_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.completed_job_count().await, 0);
let _job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
assert_eq!(manager.completed_job_count().await, 0);
tokio::time::sleep(Duration::from_millis(500)).await;
assert_eq!(manager.completed_job_count().await, 1);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cleanup_respects_max_completed_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_completed_jobs = 3;
config.completed_job_ttl_secs = 3600;
let manager = bound_job_manager(config);
for i in 0..5 {
let _ = manager
.spawn_job(&format!("echo job{i}"), None, 30)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(2)).await;
assert_eq!(manager.completed_job_count().await, 5);
let trigger_id = manager.spawn_job("sleep 10", None, 30).await.unwrap();
let completed_after = manager.completed_job_count().await;
assert!(
completed_after <= 3,
"Should have at most 3 completed jobs after cleanup, got {completed_after}"
);
let job = manager.get_status(&trigger_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Trigger job should be running"
);
let _ = manager.cancel_job(&trigger_id).await;
}
#[tokio::test]
async fn test_remove_nonexistent_job() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent");
let removed = manager.remove_job(&fake_id).await;
assert!(!removed, "Removing nonexistent job should return false");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_multibyte_utf8_output() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager
.spawn_job("printf 'Hello 世界! 🎉🚀 Test émojis: 👍'", None, 30)
.await
.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job with UTF-8 output should complete successfully, got {:?}",
job.status
);
if let JobStatus::Completed { stdout, .. } = &job.status {
assert!(
stdout.contains("Hello") || stdout.contains("世界") || stdout.is_empty(),
"Output should be captured without corruption"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_kill_reaps_process() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("sleep 60", None, 120).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should be Running before kill"
);
manager.cancel_job(&job_id).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should be Cancelled after kill, got {:?}",
job.status
);
tokio::time::sleep(Duration::from_millis(100)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Cancelled { .. }),
"Job should still be Cancelled (process reaped), got {:?}",
job.status
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_background_job_auto_completes() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Running { .. }),
"Job should start as Running"
);
tokio::time::sleep(Duration::from_millis(500)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should auto-complete to Completed, got {:?}",
job.status
);
if let JobStatus::Completed {
stdout, exit_code, ..
} = &job.status
{
assert!(
stdout.contains("hello"),
"stdout should contain 'hello', got: {stdout}"
);
assert_eq!(*exit_code, Some(0), "exit code should be 0");
}
}
#[test]
fn test_truncate_output_tail_small_input() {
let data = b"Hello, World!";
let result = super::truncate_output_tail(data, 100);
assert_eq!(result, "Hello, World!");
}
#[test]
fn test_truncate_output_tail_exact_limit() {
let data = b"12345";
let result = super::truncate_output_tail(data, 5);
assert_eq!(result, "12345");
}
#[test]
fn test_truncate_output_tail_exceeds_limit() {
let data = b"0123456789";
let result = super::truncate_output_tail(data, 5);
assert_eq!(result, "56789");
}
#[test]
fn test_truncate_output_tail_utf8_boundary() {
let data = "Hello世界Test".as_bytes();
let result = super::truncate_output_tail(data, 10);
assert!(result.is_ascii() || result.chars().count() > 0);
assert!(result.contains("Test") || result.contains("界"));
}
#[test]
fn test_truncate_output_tail_emoji() {
let data = "Start🎉🚀End".as_bytes();
let result = super::truncate_output_tail(data, 8);
assert!(result.chars().count() > 0);
}
#[test]
fn test_append_with_truncation_small_append() {
let mut buffer = vec![1, 2, 3];
super::append_with_truncation(&mut buffer, &[4, 5], 10);
assert_eq!(buffer, vec![1, 2, 3, 4, 5]);
}
#[test]
fn test_append_with_truncation_triggers_truncation() {
let mut buffer = vec![0; 15]; super::append_with_truncation(&mut buffer, &[1; 10], 10);
assert!(buffer.len() <= 11); assert!(buffer.len() >= 9);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cancel_job_atomic_status_check() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = Arc::new(bound_job_manager(config));
let job_id = manager.spawn_job("sleep 30", None, 60).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let mgr1 = Arc::clone(&manager);
let mgr2 = Arc::clone(&manager);
let job_id1 = job_id.clone();
let job_id2 = job_id.clone();
let (r1, r2) = tokio::join!(
tokio::spawn(async move { mgr1.cancel_job(&job_id1).await }),
tokio::spawn(async move { mgr2.cancel_job(&job_id2).await }),
);
let result1 = r1.expect("Task 1 should not panic");
let result2 = r2.expect("Task 2 should not panic");
let (successes, failures): (Vec<_>, Vec<_>) =
[result1, result2].into_iter().partition(Result::is_ok);
assert_eq!(successes.len(), 1, "Exactly one cancel should succeed");
assert_eq!(
failures.len(),
1,
"Exactly one cancel should fail with JobNotRunning"
);
if let Err(err) = &failures[0] {
assert!(
matches!(err, ShellError::JobNotRunning),
"Expected JobNotRunning error, got: {err:?}"
);
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
#[cfg(unix)]
async fn integration_real_graceful_kill_function_exists() {
use tokio::process::Command;
let mut child = Command::new("sleep")
.arg("1")
.spawn()
.expect("Failed to spawn test process");
let result = super::graceful_kill(&mut child).await;
assert!(result.is_ok(), "graceful_kill should succeed: {result:?}");
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_cleanup_atomicity() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_completed_jobs = 2;
config.completed_job_ttl_secs = 0;
let manager = Arc::new(bound_job_manager(config));
for i in 0..5 {
let _id = manager
.spawn_job(&format!("echo job{i}"), None, 30)
.await
.unwrap();
}
tokio::time::sleep(Duration::from_secs(2)).await;
let mut handles = Vec::new();
for i in 0..5 {
let mgr = Arc::clone(&manager);
handles.push(tokio::spawn(async move {
mgr.spawn_job(&format!("echo trigger{i}"), None, 30).await
}));
}
for handle in handles {
let result = handle.await.expect("Task should not panic");
assert!(result.is_ok(), "Spawn should succeed: {result:?}");
}
let count = manager.job_count().await;
assert!(
count <= 12, "Job count should be reasonable after cleanup: {count}"
);
}
#[test]
#[cfg(unix)]
fn test_process_group_configured() {
use std::os::unix::process::CommandExt;
let mut cmd = std::process::Command::new("echo");
cmd.process_group(0);
}
#[tokio::test]
async fn test_error_context_job_not_found() {
let config = ShellConfig::default();
let manager = bound_job_manager(config);
let fake_id = JobId::from_string("job_nonexistent_12345");
let result = manager.cancel_job(&fake_id).await;
match result {
Err(ShellError::JobNotFound(job_id)) => {
assert_eq!(job_id, "job_nonexistent_12345");
}
other => unreachable!("Expected JobNotFound error, got: {:?}", other),
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_error_context_job_already_completed() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
let job_id = manager.spawn_job("echo done", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let result = manager.cancel_job(&job_id).await;
match result {
Err(ShellError::JobNotRunning) => {}
other => unreachable!("Expected JobNotRunning error, got: {:?}", other),
}
}
#[test]
fn test_streaming_truncation_utf8_safety() {
let mut buffer = "Hello世界".as_bytes().to_vec();
let more_data = "更多数据Test".as_bytes(); super::append_with_truncation(&mut buffer, more_data, 10);
let result = String::from_utf8(buffer.clone());
assert!(
result.is_ok(),
"Buffer should be valid UTF-8 after truncation: {buffer:?}"
);
let result_str = result.unwrap();
assert!(
result_str.contains("Test") || result_str.contains("据"),
"Should contain tail data: {result_str}"
);
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_running_job_count() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = bound_job_manager(config);
assert_eq!(manager.running_job_count().await.unwrap(), 0);
let job1 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
assert_eq!(manager.running_job_count().await.unwrap(), 1);
let job2 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
assert_eq!(manager.running_job_count().await.unwrap(), 2);
let _ = manager.cancel_job(&job1).await;
let _ = manager.cancel_job(&job2).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrency_limit_enforced() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = bound_job_manager(config);
let job1 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
let job2 = manager.spawn_job("sleep 60", None, 120).await.unwrap();
let result = manager.spawn_job("sleep 60", None, 120).await;
assert!(
result.is_err(),
"Should reject job when at concurrency limit"
);
let err = result.unwrap_err();
assert!(
err.to_string().contains("Concurrency limit exceeded"),
"Expected concurrency limit error, got: {err:?}"
);
let _ = manager.cancel_job(&job1).await;
let _ = manager.cancel_job(&job2).await;
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_concurrency_limit_zero_means_unlimited() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 0;
let manager = bound_job_manager(config);
let mut jobs = Vec::new();
for _ in 0..5 {
let result = manager.spawn_job("sleep 60", None, 120).await;
assert!(
result.is_ok(),
"Should allow unlimited jobs when limit is 0"
);
jobs.push(result.unwrap());
}
for job in jobs {
let _ = manager.cancel_job(&job).await;
}
}
#[tokio::test]
#[cfg(feature = "integration-real-tests")]
#[ignore = "integration-real: spawns shell processes"]
async fn integration_real_job_summary_preserves_started_at_for_completed_jobs() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
let manager = JobManager::new(config);
let job_id = manager.spawn_job("echo hello", None, 30).await.unwrap();
tokio::time::sleep(Duration::from_secs(2)).await;
let job = manager.get_status(&job_id).await.unwrap().unwrap();
assert!(
matches!(job.status, JobStatus::Completed { .. }),
"Job should be completed, got {:?}",
job.status
);
let summaries = manager.list_jobs().await.unwrap();
let summary = summaries
.iter()
.find(|s| s.id == job_id)
.expect("Job should be in list");
assert!(
summary.started_at_unix > 0,
"Completed job's started_at_unix should be preserved (> 0), got {}",
summary.started_at_unix
);
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs();
assert!(
summary.started_at_unix <= now && summary.started_at_unix > now - 60,
"started_at_unix should be recent, got {} vs now {}",
summary.started_at_unix,
now
);
}
#[tokio::test]
async fn test_acquire_sync_slot_atomic_stress() {
use std::sync::atomic::{AtomicUsize, Ordering};
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = Arc::new(bound_job_manager(config));
let active_slots = Arc::new(AtomicUsize::new(0));
let max_observed = Arc::new(AtomicUsize::new(0));
let success_count = Arc::new(AtomicUsize::new(0));
let failure_count = Arc::new(AtomicUsize::new(0));
let num_tasks = 20;
let mut handles = Vec::new();
for _ in 0..num_tasks {
let mgr = Arc::clone(&manager);
let active = Arc::clone(&active_slots);
let max_obs = Arc::clone(&max_observed);
let successes = Arc::clone(&success_count);
let failures = Arc::clone(&failure_count);
handles.push(tokio::spawn(async move {
match mgr.acquire_sync_slot().await {
Ok(guard) => {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = max_obs.load(Ordering::SeqCst);
while current > max {
match max_obs.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
successes.fetch_add(1, Ordering::SeqCst);
tokio::time::sleep(Duration::from_millis(10)).await;
active.fetch_sub(1, Ordering::SeqCst);
drop(guard);
}
Err(_) => {
failures.fetch_add(1, Ordering::SeqCst);
}
}
}));
}
for handle in handles {
handle.await.expect("Task should not panic");
}
let final_max = max_observed.load(Ordering::SeqCst);
let total_successes = success_count.load(Ordering::SeqCst);
let total_failures = failure_count.load(Ordering::SeqCst);
assert!(
final_max <= 2,
"TOCTOU race detected! Max concurrent slots was {final_max}, limit is 2. \
Successes: {total_successes}, Failures: {total_failures}"
);
assert_eq!(
total_successes + total_failures,
num_tasks,
"All tasks should complete"
);
assert!(
total_failures > 0,
"Should have some failures when 20 tasks compete for 2 slots"
);
}
#[tokio::test]
async fn test_acquire_sync_slot_enforces_limit() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.max_concurrent_processes = 1;
let manager = bound_job_manager(config);
let _guard = manager
.acquire_sync_slot()
.await
.expect("first slot should be available");
let err = manager
.acquire_sync_slot()
.await
.expect_err("second slot should be rejected");
assert!(
err.to_string().contains("Concurrency limit exceeded"),
"unexpected error: {err}"
);
}
#[tokio::test]
async fn sync_slot_release_does_not_publish_background_completion() {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.max_concurrent_processes = 1;
let manager = bound_job_manager(config);
let feed = manager
.ops_registry
.completion_feed()
.expect("runtime ops registry should expose completion feed");
let operation_id = {
let guard = manager
.acquire_sync_slot()
.await
.expect("sync slot should be available");
let operation_id = guard.operation_id.clone();
let snapshot = manager
.ops_registry
.snapshot(&operation_id)
.expect("sync slot projection should succeed")
.expect("sync slot should be registered");
assert_eq!(snapshot.kind, OperationKind::BackgroundToolCapacitySlot);
operation_id
};
let batch = feed.list_since(0);
assert!(
batch.entries.is_empty(),
"sync-slot release must not publish background completion entries: {:?}",
batch.entries
);
assert_eq!(batch.watermark, 0);
assert!(
manager
.ops_registry
.snapshot(&operation_id)
.unwrap()
.is_none(),
"sync-slot release must discard volatile capacity-slot state"
);
}
#[tokio::test]
async fn test_acquire_sync_slot_atomic_repeated() {
use std::sync::atomic::{AtomicUsize, Ordering};
for iteration in 0..10 {
let temp_dir = TempDir::new().unwrap();
let mut config = ShellConfig::with_project_root(temp_dir.path().to_path_buf());
config.shell = "sh".to_string();
config.max_concurrent_processes = 2;
let manager = Arc::new(bound_job_manager(config));
let max_observed = Arc::new(AtomicUsize::new(0));
let active_slots = Arc::new(AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..10 {
let mgr = Arc::clone(&manager);
let max_obs = Arc::clone(&max_observed);
let active = Arc::clone(&active_slots);
handles.push(tokio::spawn(async move {
if let Ok(guard) = mgr.acquire_sync_slot().await {
let current = active.fetch_add(1, Ordering::SeqCst) + 1;
let mut max = max_obs.load(Ordering::SeqCst);
while current > max {
match max_obs.compare_exchange_weak(
max,
current,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => max = actual,
}
}
tokio::time::sleep(Duration::from_millis(5)).await;
active.fetch_sub(1, Ordering::SeqCst);
drop(guard);
}
}));
}
for handle in handles {
handle.await.expect("Task should not panic");
}
let final_max = max_observed.load(Ordering::SeqCst);
assert!(
final_max <= 2,
"TOCTOU race on iteration {iteration}! Max concurrent was {final_max}, limit is 2"
);
}
}
#[tokio::test]
async fn choke_001_drain_completed_returns_typed_projection() {
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
let manager = JobManager::new(ShellConfig::default())
.with_owner_bridge_session_id(SessionId::new())
.with_ops_registry(Arc::clone(®istry));
let job_id = manager
.register_synthetic_running_job("shell:choke-001", None, 30)
.await
.expect("synthetic job should register");
let op_id = manager
.canonical_operation_for_job(&job_id)
.expect("synthetic job must have a canonical operation");
registry
.complete_operation(
&op_id,
OperationResult {
id: op_id.clone(),
content: "done".to_string(),
is_error: false,
duration_ms: 42,
tokens_used: 0,
},
)
.expect("complete_operation should succeed");
let completions = manager.drain_completed().await;
assert_eq!(
completions.len(),
1,
"drain_completed should return exactly one completion"
);
let c = &completions[0];
assert_eq!(
c.job_id,
job_id.to_string(),
"completion job_id must match the shell job id"
);
assert_eq!(
c.status,
OperationStatus::Completed,
"terminal status must come from canonical ops-lifecycle"
);
assert!(
!c.display_name.is_empty(),
"display_name must be populated from the ops-lifecycle snapshot"
);
let second = manager.drain_completed().await;
assert!(
second.is_empty(),
"second drain_completed must return empty — jobs already notified"
);
}
#[tokio::test]
async fn choke_002_unnotified_completions_survive_cleanup() {
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
let config = ShellConfig {
completed_job_ttl_secs: 1,
max_completed_jobs: 1,
..ShellConfig::default()
};
let manager = JobManager::new(config)
.with_owner_bridge_session_id(SessionId::new())
.with_ops_registry(Arc::clone(®istry));
let job_id = manager
.register_synthetic_running_job("shell:choke-002", None, 30)
.await
.expect("synthetic job should register");
let op_id = manager
.canonical_operation_for_job(&job_id)
.expect("must have canonical op");
registry
.complete_operation(
&op_id,
OperationResult {
id: op_id.clone(),
content: "done".to_string(),
is_error: false,
duration_ms: 10,
tokens_used: 0,
},
)
.expect("complete should succeed");
manager.completed_at.lock().await.insert(
job_id.clone(),
Instant::now()
.checked_sub(Duration::from_secs(2))
.expect("2 seconds before now should be representable"),
);
let _trigger = manager
.register_synthetic_running_job("shell:trigger-cleanup", None, 30)
.await
.expect("trigger job should register");
assert!(
manager.jobs.lock().await.contains_key(&job_id),
"unnotified completed job must survive cleanup even after TTL"
);
let drained = manager.drain_completed().await;
assert!(
!drained.is_empty(),
"drain_completed must return the unnotified completion"
);
manager.completed_at.lock().await.insert(
job_id.clone(),
Instant::now()
.checked_sub(Duration::from_secs(2))
.expect("2 seconds before now should be representable"),
);
let _trigger2 = manager
.register_synthetic_running_job("shell:trigger-cleanup-2", None, 30)
.await
.expect("second trigger job should register");
let still_exists = manager.jobs.lock().await.contains_key(&job_id);
assert!(
!still_exists,
"notified job should be evictable after TTL expiry + cleanup"
);
}
#[tokio::test]
async fn choke_002_overflow_cleanup_uses_filtered_evictable_subset() {
let registry: Arc<dyn OpsLifecycleRegistry> = Arc::new(RuntimeOpsLifecycleRegistry::new());
let config = ShellConfig {
max_completed_jobs: 2,
completed_job_ttl_secs: 3600, ..ShellConfig::default()
};
let manager = JobManager::new(config)
.with_owner_bridge_session_id(SessionId::new())
.with_ops_registry(Arc::clone(®istry));
let mut job_ids = Vec::new();
for i in 0..4 {
let jid = manager
.register_synthetic_running_job(&format!("shell:overflow-{i}"), None, 30)
.await
.expect("synthetic job should register");
let op_id = manager
.canonical_operation_for_job(&jid)
.expect("must have canonical op");
registry
.complete_operation(
&op_id,
OperationResult {
id: op_id.clone(),
content: format!("done-{i}"),
is_error: false,
duration_ms: 10,
tokens_used: 0,
},
)
.expect("complete should succeed");
job_ids.push(jid);
}
let _trigger = manager
.register_synthetic_running_job("shell:trigger-overflow", None, 30)
.await
.expect("trigger job should register");
let jobs_guard = manager.jobs.lock().await;
for jid in &job_ids {
assert!(
jobs_guard.contains_key(jid),
"unnotified job {jid} must survive overflow cleanup"
);
}
drop(jobs_guard);
let drained = manager.drain_completed().await;
assert_eq!(drained.len(), 4, "all 4 completed jobs should be drained");
let _trigger2 = manager
.register_synthetic_running_job("shell:trigger-overflow-2", None, 30)
.await
.expect("second trigger should register");
let remaining_completed = manager.completed_job_count().await;
assert!(
remaining_completed <= manager.config.max_completed_jobs,
"after cleanup, completed job count ({remaining_completed}) should be \
at or below max_completed_jobs ({})",
manager.config.max_completed_jobs,
);
}
}