Skip to main content

kaizen/telemetry/
file.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Append-only NDJSON file sink (local telemetry).
3
4use super::TelemetryExporter;
5use super::batch_metadata;
6use crate::sync::IngestExportBatch;
7use anyhow::{Context, Result};
8use std::fs::OpenOptions;
9use std::io::Write;
10use std::path::{Path, PathBuf};
11
12/// Default: `<workspace>/.kaizen/telemetry.ndjson`.
13pub fn default_ndjson_path(workspace: &Path) -> PathBuf {
14    workspace.join(".kaizen/telemetry.ndjson")
15}
16
17pub fn resolve_file_exporter_path(path_opt: Option<&str>, workspace: &Path) -> PathBuf {
18    let p: PathBuf = path_opt
19        .map(PathBuf::from)
20        .unwrap_or_else(|| default_ndjson_path(workspace));
21    if p.is_absolute() {
22        p
23    } else {
24        workspace.join(p)
25    }
26}
27
28pub struct FileExporter {
29    path: PathBuf,
30}
31
32impl FileExporter {
33    pub fn new(path: PathBuf) -> Self {
34        Self { path }
35    }
36
37    pub fn path(&self) -> &Path {
38        &self.path
39    }
40}
41
42impl TelemetryExporter for FileExporter {
43    fn name(&self) -> &str {
44        "file"
45    }
46
47    fn export(&self, batch: &IngestExportBatch) -> Result<()> {
48        let t = now_ms();
49        let v = batch_metadata::telemetry_file_line(batch, t);
50        append_json_line(&self.path, &v)
51    }
52}
53
54fn now_ms() -> i64 {
55    std::time::SystemTime::now()
56        .duration_since(std::time::UNIX_EPOCH)
57        .map(|d| d.as_millis() as i64)
58        .unwrap_or(0)
59}
60
61fn append_json_line(path: &Path, v: &serde_json::Value) -> Result<()> {
62    if let Some(d) = path.parent() {
63        std::fs::create_dir_all(d).with_context(|| format!("create {}", d.display()))?;
64    }
65    let mut f = OpenOptions::new()
66        .create(true)
67        .append(true)
68        .open(path)
69        .with_context(|| format!("open {}", path.display()))?;
70    serde_json::to_writer(&mut f, v)?;
71    f.write_all(b"\n")?;
72    f.flush()?;
73    Ok(())
74}
75
76#[cfg(test)]
77mod tests {
78    use super::*;
79    use crate::sync::IngestExportBatch;
80    use crate::sync::export_batch::SessionEvalsBatchBody;
81
82    #[test]
83    fn file_export_writes_line() {
84        let dir = tempfile::tempdir().unwrap();
85        let p = dir.path().join("t.ndjson");
86        let e = FileExporter::new(p.clone());
87        let b = IngestExportBatch::SessionEvals(SessionEvalsBatchBody { evals: vec![] });
88        e.export(&b).unwrap();
89        let s = std::fs::read_to_string(&p).unwrap();
90        let v: serde_json::Value = serde_json::from_str(s.lines().next().unwrap()).unwrap();
91        assert_eq!(
92            v.get("batch_kind").and_then(|x| x.as_str()),
93            Some("session_evals")
94        );
95    }
96}