use std::path::{Path, PathBuf};
use std::time::Duration;
use anyhow::{Context, Result};
use kanade_shared::subject;
use kanade_shared::wire::ObsEvent;
use tracing::{debug, info, warn};
use uuid::Uuid;
const DRAIN_INTERVAL: Duration = Duration::from_secs(1);
const ACK_TIMEOUT: Duration = Duration::from_secs(30);
pub fn ensure_outbox_dir(obs_outbox_dir: &Path) -> Result<()> {
std::fs::create_dir_all(obs_outbox_dir)
.with_context(|| format!("create obs outbox dir {obs_outbox_dir:?}"))
}
pub fn enqueue(obs_outbox_dir: &Path, event: &ObsEvent) -> Result<PathBuf> {
let file_id = Uuid::new_v4().simple().to_string();
let final_path = obs_outbox_dir.join(format!("{file_id}.json"));
let tmp_path = obs_outbox_dir.join(format!("{file_id}.json.tmp"));
let bytes = serde_json::to_vec(event).context("serialise ObsEvent")?;
std::fs::write(&tmp_path, &bytes)
.with_context(|| format!("write tmp obs outbox file {tmp_path:?}"))?;
std::fs::rename(&tmp_path, &final_path)
.with_context(|| format!("rename tmp → {final_path:?}"))?;
Ok(final_path)
}
pub fn spawn_drain(
client: async_nats::Client,
obs_outbox_dir: PathBuf,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let js = async_nats::jetstream::new(client);
loop {
if let Err(e) = std::fs::create_dir_all(&obs_outbox_dir) {
warn!(
error = %e,
dir = %obs_outbox_dir.display(),
"obs_outbox: create dir failed; will retry next tick",
);
tokio::time::sleep(DRAIN_INTERVAL).await;
continue;
}
drain_once(&js, &obs_outbox_dir).await;
tokio::time::sleep(DRAIN_INTERVAL).await;
}
})
}
async fn drain_once(js: &async_nats::jetstream::Context, obs_outbox_dir: &Path) {
let entries = match std::fs::read_dir(obs_outbox_dir) {
Ok(e) => e,
Err(e) => {
warn!(
error = %e,
dir = %obs_outbox_dir.display(),
"obs_outbox: read_dir failed",
);
return;
}
};
let mut files: Vec<PathBuf> = entries
.filter_map(|r| r.ok().map(|e| e.path()))
.filter(|p| p.extension().is_some_and(|e| e == "json"))
.collect();
if files.is_empty() {
return;
}
files.sort_by_cached_key(|p| std::fs::metadata(p).and_then(|m| m.modified()).ok());
for path in files {
if let Err(e) = publish_one(js, &path).await {
debug!(
error = %e,
path = %path.display(),
"obs_outbox: publish failed for this file; will retry next tick, continuing with others",
);
}
}
}
async fn publish_one(js: &async_nats::jetstream::Context, path: &Path) -> Result<()> {
let bytes = std::fs::read(path).with_context(|| format!("read {path:?}"))?;
let event: ObsEvent = match serde_json::from_slice(&bytes) {
Ok(e) => e,
Err(e) => {
warn!(
error = %e,
path = %path.display(),
"obs_outbox: corrupted file — removing so drain can proceed",
);
let _ = std::fs::remove_file(path);
return Ok(());
}
};
let subj = subject::obs(&event.pc_id);
let ack_future = js
.publish(subj.clone(), bytes.into())
.await
.with_context(|| format!("publish {subj}"))?;
let _ack = tokio::time::timeout(ACK_TIMEOUT, ack_future)
.await
.with_context(|| format!("ack timeout {subj} after {}s", ACK_TIMEOUT.as_secs()))?
.with_context(|| format!("ack {subj}"))?;
std::fs::remove_file(path).with_context(|| format!("remove {path:?}"))?;
info!(
pc_id = %event.pc_id,
kind = %event.kind,
source = %event.source,
subject = %subj,
"obs_outbox: event delivered + file removed",
);
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use serde_json::json;
fn sample(pc_id: &str, event_record_id: &str) -> ObsEvent {
ObsEvent {
pc_id: pc_id.into(),
at: Utc.with_ymd_and_hms(2026, 5, 28, 10, 41, 0).unwrap(),
kind: "logon".into(),
source: "winlog:Security".into(),
event_record_id: Some(event_record_id.into()),
payload: json!({ "user": "yukimemi" }),
}
}
#[test]
fn enqueue_creates_uuid_named_file() {
let dir = tempfile::tempdir().unwrap();
let e = sample("minipc", "1");
let path = enqueue(dir.path(), &e).unwrap();
let name = path.file_name().unwrap().to_string_lossy();
assert!(name.ends_with(".json"), "expected .json suffix on {name}");
assert_eq!(
name.len(),
32 + ".json".len(),
"UUID simple form is 32 chars; got {name}"
);
let back: ObsEvent = serde_json::from_slice(&std::fs::read(&path).unwrap()).unwrap();
assert_eq!(back.pc_id, "minipc");
assert_eq!(back.event_record_id.as_deref(), Some("1"));
}
#[test]
fn enqueue_multiple_events_creates_distinct_files() {
let dir = tempfile::tempdir().unwrap();
let e = sample("minipc", "1");
let p1 = enqueue(dir.path(), &e).unwrap();
let p2 = enqueue(dir.path(), &e).unwrap();
assert_ne!(p1, p2, "two enqueues should land on distinct paths");
}
}