kaizen/shell/
telemetry_tail.rs1use crate::shell::cli::workspace_path;
5use crate::telemetry::default_ndjson_path;
6use anyhow::Context;
7use anyhow::Result;
8use notify::RecursiveMode;
9use notify::Watcher;
10use std::io::BufRead;
11use std::io::Write;
12use std::path::Path;
13use std::path::PathBuf;
14use std::time::Duration;
15
16pub fn cmd_telemetry_tail(
18 workspace: Option<&Path>,
19 file: Option<PathBuf>,
20 no_follow: bool,
21 pretty_json: bool,
22) -> Result<()> {
23 let ws = workspace_path(workspace)?;
24 let path = resolve_tail_path(&ws, file);
25 if no_follow {
26 return dump_file(&path, pretty_json);
27 }
28 follow_file(&path, pretty_json)
29}
30
31fn resolve_tail_path(ws: &Path, file: Option<PathBuf>) -> PathBuf {
32 match file {
33 None => default_ndjson_path(ws),
34 Some(p) if p.is_absolute() => p,
35 Some(p) => ws.join(p),
36 }
37}
38
39fn print_line(line: &str, pretty: bool) -> Result<()> {
40 if pretty {
41 let v: serde_json::Value = serde_json::from_str(line)
42 .with_context(|| format!("line is not valid JSON: {line:?}"))?;
43 println!("{}", serde_json::to_string_pretty(&v)?);
44 } else {
45 println!("{line}");
46 }
47 std::io::stdout().flush()?;
48 Ok(())
49}
50
51fn dump_file(path: &Path, pretty: bool) -> Result<()> {
52 let f = std::fs::File::open(path).with_context(|| {
53 format!(
54 "open {} (set --file or use workspace default; create lines via [[telemetry.exporters]] type = \"file\")",
55 path.display()
56 )
57 })?;
58 for line in std::io::BufReader::new(f).lines() {
59 print_line(&line?, pretty)?;
60 }
61 Ok(())
62}
63
64fn follow_file(path: &Path, pretty: bool) -> Result<()> {
65 let (tx, rx) = std::sync::mpsc::channel();
66 let parent = path.parent().unwrap_or(Path::new(".")).to_path_buf();
67 let mut w = notify::recommended_watcher(move |e| {
68 let _ = tx.send(e);
69 })
70 .with_context(|| format!("watcher for {}", parent.display()))?;
71 w.watch(&parent, RecursiveMode::NonRecursive)
72 .with_context(|| format!("watch {}", parent.display()))?;
73 let _keep = w;
74 let mut off = 0u64;
75 loop {
76 read_appended(path, &mut off, pretty)?;
77 let _ = rx.recv_timeout(Duration::from_millis(400));
78 }
79}
80
81fn read_appended(path: &Path, off: &mut u64, pretty: bool) -> Result<()> {
82 if !path.exists() {
83 return Ok(());
84 }
85 let len = std::fs::metadata(path)?.len();
86 if *off > len {
87 *off = 0;
88 }
89 let mut f = std::fs::File::open(path)?;
90 std::io::Seek::seek(&mut f, std::io::SeekFrom::Start(*off))?;
91 let mut r = std::io::BufReader::new(f);
92 let mut line = String::new();
93 while r.read_line(&mut line)? > 0 {
94 print_line(line.trim_end_matches(['\n', '\r']), pretty)?;
95 line.clear();
96 }
97 *off = std::fs::metadata(path)?.len();
98 Ok(())
99}