use std::cell::RefCell;
use std::fs::File;
use std::io::{BufWriter, Write};
use std::path::{Path, PathBuf};
use std::time::{SystemTime, UNIX_EPOCH};
use serde_json::Value;
pub(crate) fn open_sink(root: &str) -> Result<impl Fn(Value), String> {
let now = SystemTime::now();
let run = run_id(now);
let dir = spawningpool::store::logs_dir();
std::fs::create_dir_all(&dir)
.map_err(|e| format!("can't create logs directory {}: {e}", dir.display()))?;
let path = log_path(&dir, now, root, &run);
let file =
File::create(&path).map_err(|e| format!("can't open log file {}: {e}", path.display()))?;
let writer = RefCell::new(BufWriter::new(file));
Ok(move |event: Value| {
let line = enrich(&run, SystemTime::now(), event);
let mut w = writer.borrow_mut();
if serde_json::to_writer(&mut *w, &line).is_ok() {
let _ = w.write_all(b"\n");
let _ = w.flush();
}
})
}
fn enrich(run: &str, now: SystemTime, mut event: Value) -> Value {
if let Value::Object(map) = &mut event {
map.insert("ts".to_string(), Value::String(rfc3339_millis(now)));
map.insert("run".to_string(), Value::String(run.to_string()));
}
event
}
fn log_path(dir: &Path, now: SystemTime, root: &str, run: &str) -> PathBuf {
dir.join(format!(
"{}-{}-{}.ndjson",
datestamp(now),
sanitize(root),
run
))
}
fn sanitize(root: &str) -> String {
let cleaned: String = root
.chars()
.map(|c| {
if c.is_ascii_alphanumeric() || matches!(c, '.' | '_' | '-') {
c
} else {
'_'
}
})
.collect();
if cleaned.is_empty() {
"run".to_string()
} else {
cleaned
}
}
fn run_id(now: SystemTime) -> String {
let nanos = now.duration_since(UNIX_EPOCH).map_or(0, |d| d.as_nanos());
let mixed = (nanos as u64) ^ ((nanos >> 64) as u64) ^ (u64::from(std::process::id()) << 32);
format!("{:08x}", (mixed ^ (mixed >> 32)) as u32)
}
fn rfc3339_millis(t: SystemTime) -> String {
let (y, mo, d, h, mi, s, ms) = civil(t);
format!("{y:04}-{mo:02}-{d:02}T{h:02}:{mi:02}:{s:02}.{ms:03}Z")
}
fn datestamp(t: SystemTime) -> String {
let (y, mo, d, h, mi, s, _) = civil(t);
format!("{y:04}{mo:02}{d:02}T{h:02}{mi:02}{s:02}Z")
}
fn civil(t: SystemTime) -> (i64, u32, u32, u32, u32, u32, u32) {
let dur = t.duration_since(UNIX_EPOCH).unwrap_or_default();
let secs = dur.as_secs() as i64;
let ms = dur.subsec_millis();
let days = secs.div_euclid(86_400);
let sod = secs.rem_euclid(86_400);
let (y, mo, d) = civil_from_days(days);
let h = (sod / 3600) as u32;
let mi = ((sod % 3600) / 60) as u32;
let s = (sod % 60) as u32;
(y, mo, d, h, mi, s, ms)
}
fn civil_from_days(z: i64) -> (i64, u32, u32) {
let z = z + 719_468;
let era = if z >= 0 { z } else { z - 146_096 } / 146_097;
let doe = z - era * 146_097; let yoe = (doe - doe / 1460 + doe / 36_524 - doe / 146_096) / 365; let y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = (doy - (153 * mp + 2) / 5 + 1) as u32; let m = if mp < 10 { mp + 3 } else { mp - 9 } as u32; (if m <= 2 { y + 1 } else { y }, m, d)
}
#[cfg(test)]
mod tests {
use super::*;
fn at(secs: u64, millis: u32) -> SystemTime {
UNIX_EPOCH + std::time::Duration::new(secs, millis * 1_000_000)
}
#[test]
fn rfc3339_formats_the_epoch() {
assert_eq!(rfc3339_millis(UNIX_EPOCH), "1970-01-01T00:00:00.000Z");
}
#[test]
fn rfc3339_carries_millis_and_time_of_day() {
assert_eq!(rfc3339_millis(at(1, 500)), "1970-01-01T00:00:01.500Z");
}
#[test]
fn rfc3339_handles_a_leap_day() {
assert_eq!(
rfc3339_millis(at(1_582_934_400, 0)),
"2020-02-29T00:00:00.000Z"
);
}
#[test]
fn datestamp_is_filesystem_safe_and_compact() {
assert_eq!(datestamp(at(1_582_934_400, 0)), "20200229T000000Z");
}
#[test]
fn run_id_is_eight_hex_chars() {
let id = run_id(SystemTime::now());
assert_eq!(id.len(), 8);
assert!(id.chars().all(|c| c.is_ascii_hexdigit()));
}
#[test]
fn enrich_injects_ts_and_run_and_keeps_the_event() {
let event = serde_json::json!({ "event": "workflow.start", "wf": "demo" });
let out = enrich("f3a9b21c", at(1_582_934_400, 42), event);
assert_eq!(out["run"], "f3a9b21c");
assert_eq!(out["ts"], "2020-02-29T00:00:00.042Z");
assert_eq!(out["event"], "workflow.start");
assert_eq!(out["wf"], "demo");
}
#[test]
fn log_path_combines_datestamp_sanitized_root_and_run() {
assert_eq!(
log_path(
Path::new("/home/u/.spawningpool/logs"),
at(1_582_934_400, 0),
"my/weird name",
"82e53793"
),
Path::new("/home/u/.spawningpool/logs/20200229T000000Z-my_weird_name-82e53793.ndjson")
);
}
}