use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use kanade_shared::ExecResult;
use kanade_shared::subject;
use tracing::{debug, info, warn};
const DRAIN_INTERVAL: Duration = Duration::from_secs(1);
pub fn enqueue(outbox_dir: &Path, result: &ExecResult) -> Result<PathBuf> {
std::fs::create_dir_all(outbox_dir)
.with_context(|| format!("create outbox dir {outbox_dir:?}"))?;
let final_path = outbox_dir.join(format!("{}.json", result.request_id));
let tmp_path = outbox_dir.join(format!("{}.json.tmp", result.request_id));
let bytes = serde_json::to_vec(result).context("serialise ExecResult")?;
std::fs::write(&tmp_path, &bytes)
.with_context(|| format!("write tmp outbox file {tmp_path:?}"))?;
std::fs::rename(&tmp_path, &final_path)
.with_context(|| format!("rename tmp → {final_path:?}"))?;
Ok(final_path)
}
pub fn spawn_drain(client: async_nats::Client, outbox_dir: PathBuf) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
if let Err(e) = std::fs::create_dir_all(&outbox_dir) {
warn!(error = %e, dir = %outbox_dir.display(), "outbox: create dir failed; drain task exiting");
return;
}
let js = async_nats::jetstream::new(client);
loop {
drain_once(&js, &outbox_dir).await;
tokio::time::sleep(DRAIN_INTERVAL).await;
}
})
}
async fn drain_once(js: &async_nats::jetstream::Context, outbox_dir: &Path) {
let entries = match std::fs::read_dir(outbox_dir) {
Ok(e) => e,
Err(e) => {
warn!(error = %e, dir = %outbox_dir.display(), "outbox: read_dir failed");
return;
}
};
let mut files: Vec<PathBuf> = entries
.filter_map(|r| r.ok().map(|e| e.path()))
.filter(|p| p.extension().is_some_and(|e| e == "json"))
.collect();
if files.is_empty() {
return;
}
files.sort_by_key(|p| std::fs::metadata(p).and_then(|m| m.modified()).ok());
for path in files {
if let Err(e) = publish_one(js, &path).await {
debug!(
error = %e,
path = %path.display(),
"outbox: publish failed; will retry next tick",
);
return;
}
}
}
async fn publish_one(js: &async_nats::jetstream::Context, path: &Path) -> Result<()> {
let bytes = std::fs::read(path).with_context(|| format!("read {path:?}"))?;
let result: ExecResult =
serde_json::from_slice(&bytes).with_context(|| format!("parse {path:?}"))?;
let subj = subject::results(&result.request_id);
let ack_future = js
.publish(subj.clone(), bytes.clone().into())
.await
.with_context(|| format!("publish {subj}"))?;
let _ack = ack_future.await.with_context(|| format!("ack {subj}"))?;
std::fs::remove_file(path).with_context(|| format!("remove {path:?}"))?;
info!(
request_id = %result.request_id,
subject = %subj,
"outbox: result delivered + file removed",
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
fn sample(rid: &str) -> ExecResult {
ExecResult {
request_id: rid.into(),
pc_id: "minipc".into(),
exit_code: 0,
stdout: "ok".into(),
stderr: String::new(),
started_at: Utc.with_ymd_and_hms(2026, 5, 19, 0, 0, 0).unwrap(),
finished_at: Utc.with_ymd_and_hms(2026, 5, 19, 0, 0, 1).unwrap(),
manifest_id: Some("inventory-hw".into()),
}
}
#[test]
fn enqueue_creates_file_with_request_id_name() {
let dir = tempfile::tempdir().unwrap();
let r = sample("req-abc");
let path = enqueue(dir.path(), &r).unwrap();
assert_eq!(path.file_name().unwrap(), "req-abc.json");
let bytes = std::fs::read(&path).unwrap();
let back: ExecResult = serde_json::from_slice(&bytes).unwrap();
assert_eq!(back.request_id, r.request_id);
assert_eq!(back.exit_code, 0);
}
#[test]
fn enqueue_atomic_overwrite_preserves_old_on_failure() {
let dir = tempfile::tempdir().unwrap();
let r1 = sample("req-x");
let r2 = ExecResult {
exit_code: 7,
..sample("req-x")
};
enqueue(dir.path(), &r1).unwrap();
let path = enqueue(dir.path(), &r2).unwrap();
let back: ExecResult = serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
assert_eq!(back.exit_code, 7);
}
}