kaizen/shell/
telemetry_tail.rs1use crate::shell::cli::workspace_path;
5use crate::telemetry::default_ndjson_path;
6use anyhow::Context;
7use anyhow::Result;
8use notify::RecursiveMode;
9use notify::Watcher;
10use std::io::BufRead;
11use std::io::Write;
12use std::path::Path;
13use std::path::PathBuf;
14use std::time::Duration;
15
16pub fn cmd_telemetry_tail(
18 workspace: Option<&Path>,
19 file: Option<PathBuf>,
20 no_follow: bool,
21 pretty_json: bool,
22) -> Result<()> {
23 let ws = workspace_path(workspace)?;
24 let default_tail = file.is_none();
25 let path = resolve_tail_path(&ws, file);
26 if no_follow {
27 return dump_file(&path, pretty_json, default_tail);
28 }
29 follow_file(&path, pretty_json)
30}
31
32fn resolve_tail_path(ws: &Path, file: Option<PathBuf>) -> PathBuf {
33 match file {
34 None => default_ndjson_path(ws),
35 Some(p) if p.is_absolute() => p,
36 Some(p) => ws.join(p),
37 }
38}
39
40fn print_line(line: &str, pretty: bool) -> Result<()> {
41 if pretty {
42 let v: serde_json::Value = serde_json::from_str(line)
43 .with_context(|| format!("line is not valid JSON: {line:?}"))?;
44 println!("{}", serde_json::to_string_pretty(&v)?);
45 } else {
46 println!("{line}");
47 }
48 std::io::stdout().flush()?;
49 Ok(())
50}
51
52fn dump_file(path: &Path, pretty: bool, missing_is_empty: bool) -> Result<()> {
53 if missing_is_empty && !path.exists() {
54 return Ok(());
55 }
56 let f = std::fs::File::open(path).with_context(|| {
57 format!(
58 "open {} (set --file or use workspace default; create lines via [[telemetry.exporters]] type = \"file\")",
59 path.display()
60 )
61 })?;
62 for line in std::io::BufReader::new(f).lines() {
63 print_line(&line?, pretty)?;
64 }
65 Ok(())
66}
67
68fn follow_file(path: &Path, pretty: bool) -> Result<()> {
69 let (tx, rx) = std::sync::mpsc::channel();
70 let parent = path.parent().unwrap_or(Path::new(".")).to_path_buf();
71 let mut w = notify::recommended_watcher(move |e| {
72 let _ = tx.send(e);
73 })
74 .with_context(|| format!("watcher for {}", parent.display()))?;
75 w.watch(&parent, RecursiveMode::NonRecursive)
76 .with_context(|| format!("watch {}", parent.display()))?;
77 let _keep = w;
78 let mut off = 0u64;
79 loop {
80 read_appended(path, &mut off, pretty)?;
81 let _ = rx.recv_timeout(Duration::from_millis(400));
82 }
83}
84
85fn read_appended(path: &Path, off: &mut u64, pretty: bool) -> Result<()> {
86 if !path.exists() {
87 return Ok(());
88 }
89 let len = std::fs::metadata(path)?.len();
90 if *off > len {
91 *off = 0;
92 }
93 let mut f = std::fs::File::open(path)?;
94 std::io::Seek::seek(&mut f, std::io::SeekFrom::Start(*off))?;
95 let mut r = std::io::BufReader::new(f);
96 let mut line = String::new();
97 while r.read_line(&mut line)? > 0 {
98 print_line(line.trim_end_matches(['\n', '\r']), pretty)?;
99 line.clear();
100 }
101 *off = std::fs::metadata(path)?.len();
102 Ok(())
103}