use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use kanade_shared::subject;
use kanade_shared::wire::EventStarted;
use tracing::{debug, info, warn};
const DRAIN_INTERVAL: Duration = Duration::from_secs(1);
pub fn enqueue(events_outbox_dir: &Path, event: &EventStarted) -> Result<PathBuf> {
std::fs::create_dir_all(events_outbox_dir)
.with_context(|| format!("create events outbox dir {events_outbox_dir:?}"))?;
let final_path = events_outbox_dir.join(format!("{}.json", event.result_id));
let tmp_path = events_outbox_dir.join(format!("{}.json.tmp", event.result_id));
let bytes = serde_json::to_vec(event).context("serialise EventStarted")?;
std::fs::write(&tmp_path, &bytes)
.with_context(|| format!("write tmp events outbox file {tmp_path:?}"))?;
std::fs::rename(&tmp_path, &final_path)
.with_context(|| format!("rename tmp → {final_path:?}"))?;
Ok(final_path)
}
pub fn spawn_drain(
client: async_nats::Client,
events_outbox_dir: PathBuf,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let js = async_nats::jetstream::new(client);
loop {
if let Err(e) = std::fs::create_dir_all(&events_outbox_dir) {
warn!(
error = %e,
dir = %events_outbox_dir.display(),
"events_outbox: create dir failed; will retry next tick",
);
tokio::time::sleep(DRAIN_INTERVAL).await;
continue;
}
drain_once(&js, &events_outbox_dir).await;
tokio::time::sleep(DRAIN_INTERVAL).await;
}
})
}
async fn drain_once(js: &async_nats::jetstream::Context, events_outbox_dir: &Path) {
let entries = match std::fs::read_dir(events_outbox_dir) {
Ok(e) => e,
Err(e) => {
warn!(
error = %e,
dir = %events_outbox_dir.display(),
"events_outbox: read_dir failed",
);
return;
}
};
let mut files: Vec<PathBuf> = entries
.filter_map(|r| r.ok().map(|e| e.path()))
.filter(|p| p.extension().is_some_and(|e| e == "json"))
.collect();
if files.is_empty() {
return;
}
files.sort_by_cached_key(|p| std::fs::metadata(p).and_then(|m| m.modified()).ok());
for path in files {
if let Err(e) = publish_one(js, &path).await {
debug!(
error = %e,
path = %path.display(),
"events_outbox: publish failed for this file; will retry next tick, continuing with others",
);
}
}
}
async fn publish_one(js: &async_nats::jetstream::Context, path: &Path) -> Result<()> {
let bytes = std::fs::read(path).with_context(|| format!("read {path:?}"))?;
let event: EventStarted = match serde_json::from_slice(&bytes) {
Ok(e) => e,
Err(e) => {
warn!(
error = %e,
path = %path.display(),
"events_outbox: corrupted file — removing so drain can proceed",
);
let _ = std::fs::remove_file(path);
return Ok(());
}
};
let subj = subject::events_started(&event.exec_id, &event.pc_id);
let ack_future = js
.publish(subj.clone(), bytes.clone().into())
.await
.with_context(|| format!("publish {subj}"))?;
let _ack = ack_future.await.with_context(|| format!("ack {subj}"))?;
std::fs::remove_file(path).with_context(|| format!("remove {path:?}"))?;
info!(
result_id = %event.result_id,
exec_id = %event.exec_id,
pc_id = %event.pc_id,
subject = %subj,
"events_outbox: started event delivered + file removed",
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
fn sample(result_id: &str, exec_id: &str, pc_id: &str) -> EventStarted {
EventStarted {
result_id: result_id.into(),
request_id: "req-1".into(),
exec_id: exec_id.into(),
pc_id: pc_id.into(),
started_at: Utc.with_ymd_and_hms(2026, 5, 20, 12, 0, 0).unwrap(),
manifest_id: "inv-hw".into(),
version: "1.0.0".into(),
}
}
#[test]
fn enqueue_creates_file_named_by_result_id() {
let dir = tempfile::tempdir().unwrap();
let e = sample("res-1", "exec-1", "minipc");
let path = enqueue(dir.path(), &e).unwrap();
assert_eq!(path.file_name().unwrap(), "res-1.json");
let back: EventStarted = serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
assert_eq!(back.result_id, "res-1");
assert_eq!(back.exec_id, "exec-1");
}
#[test]
fn enqueue_atomic_overwrite() {
let dir = tempfile::tempdir().unwrap();
let e1 = sample("res-x", "exec-x", "pc-1");
let e2 = EventStarted {
manifest_id: "different".into(),
..sample("res-x", "exec-x", "pc-1")
};
enqueue(dir.path(), &e1).unwrap();
let path = enqueue(dir.path(), &e2).unwrap();
let back: EventStarted = serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
assert_eq!(back.manifest_id, "different");
}
}