Skip to main content

kaizen/telemetry/
file.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! Append-only NDJSON file sink (local telemetry).
3
4use super::TelemetryExporter;
5use super::batch_metadata;
6use crate::sync::IngestExportBatch;
7use anyhow::{Context, Result};
8use std::io::Write;
9use std::path::{Path, PathBuf};
10
11/// Default: `$KAIZEN_HOME/projects/<slug>/telemetry.ndjson`.
12pub fn default_ndjson_path(workspace: &Path) -> Result<PathBuf> {
13    crate::core::paths::project_data_child(workspace, Path::new("telemetry.ndjson"))
14}
15
16pub fn resolve_file_exporter_path(path_opt: Option<&str>, workspace: &Path) -> Result<PathBuf> {
17    let Some(p) = path_opt.map(PathBuf::from) else {
18        return default_ndjson_path(workspace);
19    };
20    if p.is_absolute() {
21        ensure_outside_workspace(&p, workspace)?;
22        ensure_not_symlink(&p)?;
23        return Ok(p);
24    }
25    ensure_relative(&p)?;
26    crate::core::paths::project_data_child(workspace, &p)
27}
28
29fn ensure_not_symlink(path: &Path) -> Result<()> {
30    match std::fs::symlink_metadata(path) {
31        Ok(metadata) => anyhow::ensure!(
32            !metadata.file_type().is_symlink(),
33            "telemetry output rejects symlink"
34        ),
35        Err(error) if error.kind() == std::io::ErrorKind::NotFound => {}
36        Err(error) => return Err(error.into()),
37    }
38    Ok(())
39}
40
41fn ensure_relative(path: &Path) -> Result<()> {
42    let escapes = path
43        .components()
44        .any(|part| matches!(part, std::path::Component::ParentDir));
45    anyhow::ensure!(!escapes, "telemetry file path cannot contain `..`");
46    Ok(())
47}
48
49fn ensure_outside_workspace(path: &Path, workspace: &Path) -> Result<()> {
50    let workspace = crate::core::paths::canonical(workspace);
51    let linked_inside = path
52        .ancestors()
53        .find_map(|parent| parent.canonicalize().ok())
54        .is_some_and(|parent| parent.starts_with(&workspace));
55    anyhow::ensure!(
56        !path.starts_with(&workspace) && !linked_inside,
57        "telemetry output cannot be inside target repository"
58    );
59    Ok(())
60}
61
62pub struct FileExporter {
63    path: PathBuf,
64}
65
66impl FileExporter {
67    pub fn new(path: PathBuf) -> Self {
68        Self { path }
69    }
70
71    pub fn path(&self) -> &Path {
72        &self.path
73    }
74}
75
76impl TelemetryExporter for FileExporter {
77    fn name(&self) -> &str {
78        "file"
79    }
80
81    fn export(&self, batch: &IngestExportBatch) -> Result<()> {
82        let t = now_ms();
83        let v = batch_metadata::telemetry_file_line(batch, t);
84        append_json_line(&self.path, &v)
85    }
86}
87
88fn now_ms() -> i64 {
89    std::time::SystemTime::now()
90        .duration_since(std::time::UNIX_EPOCH)
91        .map(|d| d.as_millis() as i64)
92        .unwrap_or(0)
93}
94
95fn append_json_line(path: &Path, v: &serde_json::Value) -> Result<()> {
96    if let Some(d) = path.parent() {
97        std::fs::create_dir_all(d).with_context(|| format!("create {}", d.display()))?;
98    }
99    ensure_not_symlink(path)?;
100    let mut f =
101        crate::core::safe_fs::append(path).with_context(|| format!("open {}", path.display()))?;
102    serde_json::to_writer(&mut f, v)?;
103    f.write_all(b"\n")?;
104    f.flush()?;
105    Ok(())
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111    use crate::core::paths::test_lock;
112    use crate::sync::IngestExportBatch;
113    use crate::sync::export_batch::SessionEvalsBatchBody;
114
115    #[test]
116    fn file_export_writes_line() {
117        let dir = tempfile::tempdir().unwrap();
118        let p = dir.path().join("t.ndjson");
119        let e = FileExporter::new(p.clone());
120        let b = IngestExportBatch::SessionEvals(SessionEvalsBatchBody { evals: vec![] });
121        e.export(&b).unwrap();
122        let s = std::fs::read_to_string(&p).unwrap();
123        let v: serde_json::Value = serde_json::from_str(s.lines().next().unwrap()).unwrap();
124        assert_eq!(
125            v.get("batch_kind").and_then(|x| x.as_str()),
126            Some("session_evals")
127        );
128    }
129
130    #[test]
131    fn relative_path_stays_in_project_data() {
132        let _guard = test_lock::global().lock().unwrap();
133        let home = tempfile::tempdir().unwrap();
134        let workspace = tempfile::tempdir().unwrap();
135        unsafe { std::env::set_var("KAIZEN_HOME", home.path()) };
136        let path =
137            resolve_file_exporter_path(Some("custom/events.ndjson"), workspace.path()).unwrap();
138        let expected = crate::core::paths::project_data_path(workspace.path())
139            .unwrap()
140            .join("custom/events.ndjson");
141        unsafe { std::env::remove_var("KAIZEN_HOME") };
142        assert_eq!(path, expected);
143    }
144
145    #[test]
146    fn absolute_path_inside_workspace_is_rejected() {
147        let workspace = tempfile::tempdir().unwrap();
148        let path = workspace.path().join("telemetry.ndjson");
149        let error = resolve_file_exporter_path(path.to_str(), workspace.path()).unwrap_err();
150        assert!(error.to_string().contains("target repository"));
151    }
152}