Skip to main content

kaizen/shell/
telemetry_tail.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry tail` — read local NDJSON written by the `file` exporter.
3
4use crate::shell::cli::workspace_path;
5use crate::telemetry::default_ndjson_path;
6use anyhow::Context;
7use anyhow::Result;
8use notify::RecursiveMode;
9use notify::Watcher;
10use std::io::BufRead;
11use std::io::Write;
12use std::path::Path;
13use std::path::PathBuf;
14use std::time::Duration;
15
16/// `--file` relative to `workspace` when not absolute. Default: `.kaizen/telemetry.ndjson`.
17pub fn cmd_telemetry_tail(
18    workspace: Option<&Path>,
19    file: Option<PathBuf>,
20    no_follow: bool,
21    pretty_json: bool,
22) -> Result<()> {
23    let ws = workspace_path(workspace)?;
24    let default_tail = file.is_none();
25    let path = resolve_tail_path(&ws, file);
26    if no_follow {
27        return dump_file(&path, pretty_json, default_tail);
28    }
29    follow_file(&path, pretty_json)
30}
31
32fn resolve_tail_path(ws: &Path, file: Option<PathBuf>) -> PathBuf {
33    match file {
34        None => default_ndjson_path(ws),
35        Some(p) if p.is_absolute() => p,
36        Some(p) => ws.join(p),
37    }
38}
39
40fn print_line(line: &str, pretty: bool) -> Result<()> {
41    if pretty {
42        let v: serde_json::Value = serde_json::from_str(line)
43            .with_context(|| format!("line is not valid JSON: {line:?}"))?;
44        println!("{}", serde_json::to_string_pretty(&v)?);
45    } else {
46        println!("{line}");
47    }
48    std::io::stdout().flush()?;
49    Ok(())
50}
51
52fn dump_file(path: &Path, pretty: bool, missing_is_empty: bool) -> Result<()> {
53    if missing_is_empty && !path.exists() {
54        return Ok(());
55    }
56    let f = std::fs::File::open(path).with_context(|| {
57        format!(
58            "open {} (set --file or use workspace default; create lines via [[telemetry.exporters]] type = \"file\")",
59            path.display()
60        )
61    })?;
62    for line in std::io::BufReader::new(f).lines() {
63        print_line(&line?, pretty)?;
64    }
65    Ok(())
66}
67
68fn follow_file(path: &Path, pretty: bool) -> Result<()> {
69    let (tx, rx) = std::sync::mpsc::channel();
70    let parent = path.parent().unwrap_or(Path::new(".")).to_path_buf();
71    let mut w = notify::recommended_watcher(move |e| {
72        let _ = tx.send(e);
73    })
74    .with_context(|| format!("watcher for {}", parent.display()))?;
75    w.watch(&parent, RecursiveMode::NonRecursive)
76        .with_context(|| format!("watch {}", parent.display()))?;
77    let _keep = w;
78    let mut off = 0u64;
79    loop {
80        read_appended(path, &mut off, pretty)?;
81        let _ = rx.recv_timeout(Duration::from_millis(400));
82    }
83}
84
85fn read_appended(path: &Path, off: &mut u64, pretty: bool) -> Result<()> {
86    if !path.exists() {
87        return Ok(());
88    }
89    let len = std::fs::metadata(path)?.len();
90    if *off > len {
91        *off = 0;
92    }
93    let mut f = std::fs::File::open(path)?;
94    std::io::Seek::seek(&mut f, std::io::SeekFrom::Start(*off))?;
95    let mut r = std::io::BufReader::new(f);
96    let mut line = String::new();
97    while r.read_line(&mut line)? > 0 {
98        print_line(line.trim_end_matches(['\n', '\r']), pretty)?;
99        line.clear();
100    }
101    *off = std::fs::metadata(path)?.len();
102    Ok(())
103}