Skip to main content

kaizen/shell/
telemetry_tail.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry tail` — read local NDJSON written by the `file` exporter.
3
4use crate::shell::cli::workspace_path;
5use crate::telemetry::default_ndjson_path;
6use anyhow::Context;
7use anyhow::Result;
8use notify::RecursiveMode;
9use notify::Watcher;
10use std::io::BufRead;
11use std::io::Write;
12use std::path::Path;
13use std::path::PathBuf;
14use std::time::Duration;
15
16/// `--file` relative to `workspace` when not absolute. Default: `.kaizen/telemetry.ndjson`.
17pub fn cmd_telemetry_tail(
18    workspace: Option<&Path>,
19    file: Option<PathBuf>,
20    no_follow: bool,
21    pretty_json: bool,
22) -> Result<()> {
23    let ws = workspace_path(workspace)?;
24    let path = resolve_tail_path(&ws, file);
25    if no_follow {
26        return dump_file(&path, pretty_json);
27    }
28    follow_file(&path, pretty_json)
29}
30
31fn resolve_tail_path(ws: &Path, file: Option<PathBuf>) -> PathBuf {
32    match file {
33        None => default_ndjson_path(ws),
34        Some(p) if p.is_absolute() => p,
35        Some(p) => ws.join(p),
36    }
37}
38
39fn print_line(line: &str, pretty: bool) -> Result<()> {
40    if pretty {
41        let v: serde_json::Value = serde_json::from_str(line)
42            .with_context(|| format!("line is not valid JSON: {line:?}"))?;
43        println!("{}", serde_json::to_string_pretty(&v)?);
44    } else {
45        println!("{line}");
46    }
47    std::io::stdout().flush()?;
48    Ok(())
49}
50
51fn dump_file(path: &Path, pretty: bool) -> Result<()> {
52    let f = std::fs::File::open(path).with_context(|| {
53        format!(
54            "open {} (set --file or use workspace default; create lines via [[telemetry.exporters]] type = \"file\")",
55            path.display()
56        )
57    })?;
58    for line in std::io::BufReader::new(f).lines() {
59        print_line(&line?, pretty)?;
60    }
61    Ok(())
62}
63
64fn follow_file(path: &Path, pretty: bool) -> Result<()> {
65    let (tx, rx) = std::sync::mpsc::channel();
66    let parent = path.parent().unwrap_or(Path::new(".")).to_path_buf();
67    let mut w = notify::recommended_watcher(move |e| {
68        let _ = tx.send(e);
69    })
70    .with_context(|| format!("watcher for {}", parent.display()))?;
71    w.watch(&parent, RecursiveMode::NonRecursive)
72        .with_context(|| format!("watch {}", parent.display()))?;
73    let _keep = w;
74    let mut off = 0u64;
75    loop {
76        read_appended(path, &mut off, pretty)?;
77        let _ = rx.recv_timeout(Duration::from_millis(400));
78    }
79}
80
81fn read_appended(path: &Path, off: &mut u64, pretty: bool) -> Result<()> {
82    if !path.exists() {
83        return Ok(());
84    }
85    let len = std::fs::metadata(path)?.len();
86    if *off > len {
87        *off = 0;
88    }
89    let mut f = std::fs::File::open(path)?;
90    std::io::Seek::seek(&mut f, std::io::SeekFrom::Start(*off))?;
91    let mut r = std::io::BufReader::new(f);
92    let mut line = String::new();
93    while r.read_line(&mut line)? > 0 {
94        print_line(line.trim_end_matches(['\n', '\r']), pretty)?;
95        line.clear();
96    }
97    *off = std::fs::metadata(path)?.len();
98    Ok(())
99}