use anyhow::{Context, Result};
use directories::BaseDirs;
use std::path::PathBuf;
use crate::schema::{JobMeta, JobState, JobStatus};
#[derive(Debug)]
pub struct JobNotFound(pub String);
impl std::fmt::Display for JobNotFound {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "job not found: {}", self.0)
}
}
impl std::error::Error for JobNotFound {}
#[derive(Debug)]
pub struct AmbiguousJobId {
pub prefix: String,
pub candidates: Vec<String>,
}
impl std::fmt::Display for AmbiguousJobId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "ambiguous job ID prefix '{}': matches ", self.prefix)?;
if self.candidates.len() <= 5 {
write!(f, "{}", self.candidates.join(", "))
} else {
write!(
f,
"{}, ... and {} more",
self.candidates[..5].join(", "),
self.candidates.len() - 5
)
}
}
}
impl std::error::Error for AmbiguousJobId {}
#[derive(Debug)]
pub struct InvalidJobState(pub String);
impl std::fmt::Display for InvalidJobState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "invalid job state: {}", self.0)
}
}
impl std::error::Error for InvalidJobState {}
pub fn resolve_root(cli_root: Option<&str>) -> PathBuf {
if let Some(root) = cli_root {
return PathBuf::from(root);
}
if let Ok(root) = std::env::var("AGENT_EXEC_ROOT")
&& !root.is_empty()
{
return PathBuf::from(root);
}
if let Ok(xdg) = std::env::var("XDG_DATA_HOME")
&& !xdg.is_empty()
{
return PathBuf::from(xdg).join("agent-exec").join("jobs");
}
if let Some(base_dirs) = BaseDirs::new() {
#[cfg(windows)]
let base = base_dirs.data_local_dir().to_path_buf();
#[cfg(not(windows))]
let base = base_dirs.home_dir().join(".local").join("share");
return base.join("agent-exec").join("jobs");
}
PathBuf::from("~/.local/share/agent-exec/jobs")
}
pub struct TailMetrics {
pub tail: String,
pub truncated: bool,
pub observed_bytes: u64,
pub included_bytes: u64,
}
#[derive(Debug)]
pub struct JobDir {
pub path: PathBuf,
pub job_id: String,
}
impl JobDir {
pub fn open(root: &std::path::Path, job_id: &str) -> Result<Self> {
let path = root.join(job_id);
if path.is_dir() {
return Ok(JobDir {
path,
job_id: job_id.to_string(),
});
}
let mut candidates: Vec<String> = std::fs::read_dir(root)
.into_iter()
.flatten()
.flatten()
.filter_map(|entry| {
let name = entry.file_name().to_string_lossy().into_owned();
if name.starts_with(job_id) && entry.path().is_dir() {
Some(name)
} else {
None
}
})
.collect();
match candidates.len() {
0 => Err(anyhow::Error::new(JobNotFound(job_id.to_string()))),
1 => {
let resolved = candidates.remove(0);
let path = root.join(&resolved);
Ok(JobDir {
path,
job_id: resolved,
})
}
_ => {
candidates.sort();
Err(anyhow::Error::new(AmbiguousJobId {
prefix: job_id.to_string(),
candidates,
}))
}
}
}
pub fn create(root: &std::path::Path, job_id: &str, meta: &JobMeta) -> Result<Self> {
let path = root.join(job_id);
std::fs::create_dir_all(&path)
.with_context(|| format!("create job dir {}", path.display()))?;
let job_dir = JobDir {
path,
job_id: job_id.to_string(),
};
job_dir.write_meta_atomic(meta)?;
Ok(job_dir)
}
pub fn meta_path(&self) -> PathBuf {
self.path.join("meta.json")
}
pub fn state_path(&self) -> PathBuf {
self.path.join("state.json")
}
pub fn stdout_path(&self) -> PathBuf {
self.path.join("stdout.log")
}
pub fn stderr_path(&self) -> PathBuf {
self.path.join("stderr.log")
}
pub fn full_log_path(&self) -> PathBuf {
self.path.join("full.log")
}
pub fn completion_event_path(&self) -> PathBuf {
self.path.join("completion_event.json")
}
pub fn notification_events_path(&self) -> PathBuf {
self.path.join("notification_events.ndjson")
}
pub fn write_completion_event_atomic(
&self,
record: &crate::schema::CompletionEventRecord,
) -> Result<()> {
let target = self.completion_event_path();
let contents = serde_json::to_string_pretty(record)?;
write_atomic(&self.path, &target, contents.as_bytes())?;
Ok(())
}
pub fn read_meta(&self) -> Result<JobMeta> {
let raw = std::fs::read(self.meta_path())?;
Ok(serde_json::from_slice(&raw)?)
}
pub fn read_state(&self) -> Result<JobState> {
let raw = std::fs::read(self.state_path())?;
Ok(serde_json::from_slice(&raw)?)
}
pub fn write_meta_atomic(&self, meta: &JobMeta) -> Result<()> {
let target = self.meta_path();
let contents = serde_json::to_string_pretty(meta)?;
write_atomic(&self.path, &target, contents.as_bytes())?;
Ok(())
}
pub fn write_state(&self, state: &JobState) -> Result<()> {
let target = self.state_path();
let contents = serde_json::to_string_pretty(state)?;
write_atomic(&self.path, &target, contents.as_bytes())?;
Ok(())
}
pub fn tail_log(&self, filename: &str, tail_lines: u64, max_bytes: u64) -> String {
self.tail_log_with_truncated(filename, tail_lines, max_bytes)
.0
}
pub fn tail_log_with_truncated(
&self,
filename: &str,
tail_lines: u64,
max_bytes: u64,
) -> (String, bool) {
let path = self.path.join(filename);
let Ok(data) = std::fs::read(&path) else {
return (String::new(), false);
};
let byte_truncated = data.len() as u64 > max_bytes;
let start = if byte_truncated {
(data.len() as u64 - max_bytes) as usize
} else {
0
};
let slice = &data[start..];
let text = String::from_utf8_lossy(slice);
if tail_lines == 0 {
return (text.into_owned(), byte_truncated);
}
let lines: Vec<&str> = text.lines().collect();
let skip = lines.len().saturating_sub(tail_lines as usize);
let line_truncated = skip > 0;
(lines[skip..].join("\n"), byte_truncated || line_truncated)
}
pub fn read_tail_metrics(
&self,
filename: &str,
tail_lines: u64,
max_bytes: u64,
) -> TailMetrics {
let (tail, truncated) = self.tail_log_with_truncated(filename, tail_lines, max_bytes);
let included_bytes = tail.len() as u64;
let observed_bytes = std::fs::metadata(self.path.join(filename))
.map(|m| m.len())
.unwrap_or(0);
TailMetrics {
tail,
truncated,
observed_bytes,
included_bytes,
}
}
pub fn init_state_created(&self) -> Result<JobState> {
let state = JobState {
job: crate::schema::JobStateJob {
id: self.job_id.clone(),
status: JobStatus::Created,
started_at: None,
},
result: crate::schema::JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: None,
finished_at: None,
updated_at: crate::run::now_rfc3339_pub(),
windows_job_name: None,
};
self.write_state(&state)?;
Ok(state)
}
pub fn init_state(&self, pid: u32, started_at: &str) -> Result<JobState> {
#[cfg(windows)]
let windows_job_name = Some(format!("AgentExec-{}", self.job_id));
#[cfg(not(windows))]
let windows_job_name: Option<String> = None;
let state = JobState {
job: crate::schema::JobStateJob {
id: self.job_id.clone(),
status: JobStatus::Running,
started_at: Some(started_at.to_string()),
},
result: crate::schema::JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: Some(pid),
finished_at: None,
updated_at: crate::run::now_rfc3339_pub(),
windows_job_name,
};
self.write_state(&state)?;
Ok(state)
}
}
fn write_atomic(dir: &std::path::Path, target: &std::path::Path, contents: &[u8]) -> Result<()> {
use std::io::Write;
let mut tmp = tempfile::Builder::new()
.prefix(".tmp-")
.tempfile_in(dir)
.with_context(|| format!("create temp file in {}", dir.display()))?;
tmp.write_all(contents)
.with_context(|| format!("write temp file for {}", target.display()))?;
tmp.persist(target)
.map_err(|e| e.error)
.with_context(|| format!("rename temp file to {}", target.display()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
static ENV_LOCK: std::sync::Mutex<()> = std::sync::Mutex::new(());
#[test]
fn resolve_root_cli_flag_wins() {
let root = resolve_root(Some("/tmp/my-root"));
assert_eq!(root, PathBuf::from("/tmp/my-root"));
}
#[test]
fn resolve_root_env_var() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
unsafe {
std::env::set_var("AGENT_EXEC_ROOT", "/tmp/env-root");
std::env::remove_var("XDG_DATA_HOME");
}
let root = resolve_root(None);
unsafe {
std::env::remove_var("AGENT_EXEC_ROOT");
}
assert_eq!(root, PathBuf::from("/tmp/env-root"));
}
#[test]
fn resolve_root_xdg() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
unsafe {
std::env::remove_var("AGENT_EXEC_ROOT");
std::env::set_var("XDG_DATA_HOME", "/tmp/xdg");
}
let root = resolve_root(None);
unsafe {
std::env::remove_var("XDG_DATA_HOME");
}
assert_eq!(root, PathBuf::from("/tmp/xdg/agent-exec/jobs"));
}
#[test]
fn resolve_root_default_contains_agent_exec() {
let _guard = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
unsafe {
std::env::remove_var("AGENT_EXEC_ROOT");
std::env::remove_var("XDG_DATA_HOME");
}
let root = resolve_root(None);
let root_str = root.to_string_lossy();
assert!(
root_str.contains("agent-exec"),
"expected agent-exec in path, got {root_str}"
);
}
fn make_meta(job_id: &str, root: &std::path::Path) -> crate::schema::JobMeta {
crate::schema::JobMeta {
job: crate::schema::JobMetaJob {
id: job_id.to_string(),
},
schema_version: "0.1".to_string(),
command: vec!["echo".to_string(), "hello".to_string()],
created_at: "2024-01-01T00:00:00Z".to_string(),
root: root.display().to_string(),
env_keys: vec!["FOO".to_string()],
env_vars: vec![],
env_vars_runtime: vec![],
mask: vec![],
cwd: None,
notification: None,
tags: vec![],
inherit_env: true,
env_files: vec![],
timeout_ms: 0,
kill_after_ms: 0,
progress_every_ms: 0,
shell_wrapper: None,
}
}
#[test]
fn job_dir_create_writes_meta_json() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let meta = make_meta("test-job-01", root);
let job_dir = JobDir::create(root, "test-job-01", &meta).unwrap();
assert!(job_dir.path.is_dir(), "job directory was not created");
assert!(job_dir.meta_path().exists(), "meta.json not found");
let loaded_meta = job_dir.read_meta().unwrap();
assert_eq!(loaded_meta.job_id(), "test-job-01");
assert_eq!(loaded_meta.command, vec!["echo", "hello"]);
assert_eq!(loaded_meta.env_keys, vec!["FOO"]);
}
#[test]
fn meta_json_env_keys_only_no_values() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let mut meta = make_meta("test-job-02", root);
meta.env_keys = vec!["SECRET_KEY".to_string(), "API_TOKEN".to_string()];
let job_dir = JobDir::create(root, "test-job-02", &meta).unwrap();
let raw = std::fs::read_to_string(job_dir.meta_path()).unwrap();
assert!(
!raw.contains("secret_value"),
"env value must not be stored in meta.json"
);
assert!(raw.contains("SECRET_KEY"), "env key must be stored");
assert!(raw.contains("API_TOKEN"), "env key must be stored");
}
#[test]
fn state_json_contains_updated_at() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let meta = make_meta("test-job-03", root);
let job_dir = JobDir::create(root, "test-job-03", &meta).unwrap();
let state = crate::schema::JobState {
job: crate::schema::JobStateJob {
id: "test-job-03".to_string(),
status: crate::schema::JobStatus::Running,
started_at: Some("2024-01-01T00:00:00Z".to_string()),
},
result: crate::schema::JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: Some(12345),
finished_at: None,
updated_at: "2024-01-01T00:00:01Z".to_string(),
windows_job_name: None,
};
job_dir.write_state(&state).unwrap();
assert!(job_dir.state_path().exists(), "state.json not found");
let loaded = job_dir.read_state().unwrap();
assert_eq!(loaded.updated_at, "2024-01-01T00:00:01Z");
assert_eq!(loaded.job_id(), "test-job-03");
let raw = std::fs::read_to_string(job_dir.state_path()).unwrap();
assert!(
raw.contains("updated_at"),
"updated_at field missing from state.json"
);
}
#[test]
fn state_json_atomic_write_no_corruption() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let meta = make_meta("test-job-04", root);
let job_dir = JobDir::create(root, "test-job-04", &meta).unwrap();
for i in 0..10 {
let state = crate::schema::JobState {
job: crate::schema::JobStateJob {
id: "test-job-04".to_string(),
status: crate::schema::JobStatus::Running,
started_at: Some("2024-01-01T00:00:00Z".to_string()),
},
result: crate::schema::JobStateResult {
exit_code: None,
signal: None,
duration_ms: None,
},
pid: Some(100 + i),
finished_at: None,
updated_at: format!("2024-01-01T00:00:{:02}Z", i),
windows_job_name: None,
};
job_dir.write_state(&state).unwrap();
let loaded = job_dir.read_state().unwrap();
assert_eq!(
loaded.pid,
Some(100 + i),
"state corrupted at iteration {i}"
);
}
}
#[test]
fn meta_json_atomic_write() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let meta = make_meta("test-job-05", root);
let job_dir = JobDir::create(root, "test-job-05", &meta).unwrap();
let updated_meta = crate::schema::JobMeta {
job: crate::schema::JobMetaJob {
id: "test-job-05".to_string(),
},
schema_version: "0.1".to_string(),
command: vec!["ls".to_string()],
created_at: "2024-06-01T12:00:00Z".to_string(),
root: root.display().to_string(),
env_keys: vec!["PATH".to_string()],
env_vars: vec![],
env_vars_runtime: vec![],
mask: vec![],
cwd: None,
notification: None,
tags: vec![],
inherit_env: true,
env_files: vec![],
timeout_ms: 0,
kill_after_ms: 0,
progress_every_ms: 0,
shell_wrapper: None,
};
job_dir.write_meta_atomic(&updated_meta).unwrap();
let loaded = job_dir.read_meta().unwrap();
assert_eq!(loaded.command, vec!["ls"]);
assert_eq!(loaded.created_at, "2024-06-01T12:00:00Z");
}
#[test]
fn init_state_writes_deterministic_job_name_on_windows() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let job_id = "01TESTJOBID0000000000000";
let meta = make_meta(job_id, root);
let job_dir = JobDir::create(root, job_id, &meta).unwrap();
let state = job_dir.init_state(1234, "2024-01-01T00:00:00Z").unwrap();
#[cfg(windows)]
assert_eq!(
state.windows_job_name.as_deref(),
Some("AgentExec-01TESTJOBID0000000000000"),
"Windows: init_state must set deterministic job name immediately"
);
#[cfg(not(windows))]
assert_eq!(
state.windows_job_name, None,
"non-Windows: init_state must not set windows_job_name"
);
let persisted = job_dir.read_state().unwrap();
#[cfg(windows)]
assert_eq!(
persisted.windows_job_name.as_deref(),
Some("AgentExec-01TESTJOBID0000000000000"),
"Windows: persisted state.json must contain windows_job_name"
);
#[cfg(not(windows))]
assert_eq!(
persisted.windows_job_name, None,
"non-Windows: persisted state.json must not contain windows_job_name"
);
}
#[test]
fn job_dir_open_exact_match() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
let meta = make_meta(job_id, root);
JobDir::create(root, job_id, &meta).unwrap();
let result = JobDir::open(root, job_id).unwrap();
assert_eq!(result.job_id, job_id);
}
#[test]
fn job_dir_open_unique_prefix_resolves() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let job_id = "01JQXK3M8E5PQRSTVWYZ12ABCD";
let meta = make_meta(job_id, root);
JobDir::create(root, job_id, &meta).unwrap();
let result = JobDir::open(root, "01JQXK3M").unwrap();
assert_eq!(result.job_id, job_id);
}
#[test]
fn job_dir_open_not_found_returns_job_not_found() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let err = JobDir::open(root, "ZZZZZ").unwrap_err();
assert!(
err.downcast_ref::<JobNotFound>().is_some(),
"expected JobNotFound, got: {err}"
);
}
#[test]
fn job_dir_open_ambiguous_prefix_returns_ambiguous() {
let tmp = tempfile::tempdir().unwrap();
let root = tmp.path();
let id_a = "01JQXK3M8EAAA00000000000AA";
let id_b = "01JQXK3M8EBBB00000000000BB";
let meta_a = make_meta(id_a, root);
let meta_b = make_meta(id_b, root);
JobDir::create(root, id_a, &meta_a).unwrap();
JobDir::create(root, id_b, &meta_b).unwrap();
let err = JobDir::open(root, "01JQXK3M8E").unwrap_err();
let ambiguous = err
.downcast_ref::<AmbiguousJobId>()
.expect("expected AmbiguousJobId");
assert_eq!(ambiguous.prefix, "01JQXK3M8E");
assert!(ambiguous.candidates.contains(&id_a.to_string()));
assert!(ambiguous.candidates.contains(&id_b.to_string()));
}
#[test]
fn ambiguous_job_id_display_up_to_5_candidates() {
let err = AmbiguousJobId {
prefix: "01J".to_string(),
candidates: vec![
"01JAAA".to_string(),
"01JBBB".to_string(),
"01JCCC".to_string(),
],
};
let msg = err.to_string();
assert!(msg.contains("01J"), "must include prefix: {msg}");
assert!(msg.contains("01JAAA"), "must list candidates: {msg}");
}
#[test]
fn ambiguous_job_id_display_truncates_beyond_5() {
let candidates: Vec<String> = (1..=8)
.map(|i| format!("01JCANDIDATE{i:02}0000000000"))
.collect();
let err = AmbiguousJobId {
prefix: "01J".to_string(),
candidates,
};
let msg = err.to_string();
assert!(msg.contains("... and 3 more"), "must truncate: {msg}");
}
}