Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::{Path, PathBuf};
19
20#[derive(Debug, Clone, Default)]
21pub struct ConfigureOptions {
22    pub exporter_type: Option<String>,
23    pub path: Option<PathBuf>,
24}
25
26/// Append an exporter stub row to `~/.kaizen/config.toml`.
27pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
28    let ws = workspace_path(workspace)?;
29    let home = std::env::var("HOME").context("HOME not set")?;
30    let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
31    std::fs::create_dir_all(p.parent().unwrap())?;
32
33    println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
34    println!(
35        "This command appends a `[[telemetry.exporters]]` table to {}.",
36        p.display()
37    );
38    let t = match options.exporter_type {
39        Some(t) => t.trim().to_lowercase(),
40        None => {
41            print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
42            std::io::stdout().flush()?;
43            let mut line = String::new();
44            std::io::stdin().lock().read_line(&mut line)?;
45            line.trim().to_lowercase()
46        }
47    };
48    if t.is_empty() {
49        println!("Aborted.");
50        return Ok(());
51    }
52    let block = match t.as_str() {
53        "file" => file_exporter_block(options.path.as_deref()),
54        "posthog" => r#"
55[[telemetry.exporters]]
56type = "posthog"
57# project_api_key = "phc_..."  # or set POSTHOG_API_KEY
58# host = "https://us.i.posthog.com"
59"#
60        .to_string(),
61        "datadog" => r#"
62[[telemetry.exporters]]
63type = "datadog"
64# api_key = "..."   # or set DD_API_KEY
65# site = "datadoghq.com"
66"#
67        .to_string(),
68        "otlp" => r#"
69[[telemetry.exporters]]
70type = "otlp"
71# endpoint = "http://127.0.0.1:4318"  # or set OTEL_EXPORTER_OTLP_ENDPOINT
72"#
73        .to_string(),
74        "dev" => r#"
75[[telemetry.exporters]]
76type = "dev"
77"#
78        .to_string(),
79        _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
80    };
81
82    let mut f = std::fs::OpenOptions::new()
83        .create(true)
84        .append(true)
85        .open(&p)?;
86    if f.metadata()?.len() > 0 {
87        f.write_all(b"\n")?;
88    }
89    f.write_all(block.as_bytes())?;
90    let _ = ws;
91    println!(
92        "Appended. `file` needs no extra feature; use `--features telemetry-posthog` for PostHog."
93    );
94    Ok(())
95}
96
97fn file_exporter_block(path: Option<&Path>) -> String {
98    let mut block = String::from(
99        r#"
100[[telemetry.exporters]]
101type = "file"
102enabled = true
103"#,
104    );
105    if let Some(path) = path {
106        use std::fmt::Write as _;
107        let path = path
108            .to_string_lossy()
109            .replace('\\', "\\\\")
110            .replace('"', "\\\"");
111        writeln!(&mut block, "path = \"{path}\"").unwrap();
112    } else {
113        block.push_str(
114            "# path = \"telemetry.ndjson\"   # optional; default .kaizen/telemetry.ndjson under each workspace\n",
115        );
116    }
117    block
118}
119
120/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
121pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
122    let ws = workspace_path(workspace)?;
123    let cfg = config::load(&ws)?;
124    use std::fmt::Write;
125    let mut s = String::new();
126    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
127    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
128        match e {
129            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
130            ExporterConfig::File { enabled, path } => {
131                let p = path
132                    .as_deref()
133                    .map(|p| p.to_string())
134                    .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
135                writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
136            }
137            ExporterConfig::Dev { enabled } => {
138                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
139            }
140            ExporterConfig::PostHog { .. } => {
141                let line = if let Some(r) = PostHogResolved::from_config(e) {
142                    format!(
143                        "[{i}] type=posthog host={} key=<redacted len {}>",
144                        r.host,
145                        r.project_api_key.len()
146                    )
147                } else {
148                    format!(
149                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
150                    )
151                };
152                writeln!(&mut s, "{line}").unwrap();
153            }
154            ExporterConfig::Datadog { .. } => {
155                let line = if let Some(r) = DatadogResolved::from_config(e) {
156                    format!(
157                        "[{i}] type=datadog site={} key=<redacted len {}>",
158                        r.site,
159                        r.api_key.len()
160                    )
161                } else {
162                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
163                };
164                writeln!(&mut s, "{line}").unwrap();
165            }
166            ExporterConfig::Otlp { .. } => {
167                let line = if let Some(r) = OtlpResolved::from_config(e) {
168                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
169                } else {
170                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
171                };
172                writeln!(&mut s, "{line}").unwrap();
173            }
174        }
175    }
176    if cfg.telemetry.exporters.is_empty() {
177        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
178    }
179    Ok(s)
180}
181
182pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
183    print!("{}", print_effective_config_text(workspace)?);
184    Ok(())
185}
186
187/// Alias of [`cmd_telemetry_configure`].
188pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
189    cmd_telemetry_configure(workspace, options)
190}
191
192/// Resolve config, run provider `health` when available, show redacted exporter view.
193pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
194    let ws = workspace_path(workspace)?;
195    let cfg = config::load(&ws)?;
196    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
197    println!(
198        "telemetry.query.cache_ttl_seconds: {}",
199        cfg.telemetry.query.cache_ttl_seconds
200    );
201    match cfg.telemetry.query.provider {
202        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
203        crate::core::config::QueryAuthority::Posthog => {
204            println!("telemetry.query.provider: posthog");
205        }
206        crate::core::config::QueryAuthority::Datadog => {
207            println!("telemetry.query.provider: datadog");
208        }
209    }
210    if let Some(p) = provider_from_config(&cfg.telemetry.query) {
211        match p.health() {
212            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
213            Err(e) => eprintln!("provider health: {e}"),
214        }
215    } else {
216        println!("query provider: (not configured or features disabled; pull disabled)");
217    }
218    println!("\n{}", print_effective_config_text(Some(&ws))?);
219    println!("\nOTLP: export only — no query/pull in v1.");
220    Ok(())
221}
222
223/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
224pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
225    let ws = workspace_path(workspace)?;
226    let cfg = config::load(&ws)?;
227    let p = provider_from_config(&cfg.telemetry.query)
228        .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
229    let store = Store::open(&ws.join(".kaizen/kaizen.db"))?;
230    let page = p.pull(PullWindow { days }, None)?;
231    if !cfg.sync.team_id.trim().is_empty()
232        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
233        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
234    {
235        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
236            Ok(n) if n > 0 => {
237                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
238            }
239            _ => {}
240        }
241    }
242    let now_ms = std::time::SystemTime::now()
243        .duration_since(std::time::UNIX_EPOCH)
244        .unwrap_or_default()
245        .as_millis() as i64;
246    let label = match cfg.telemetry.query.provider {
247        crate::core::config::QueryAuthority::None => "none",
248        crate::core::config::QueryAuthority::Posthog => "posthog",
249        crate::core::config::QueryAuthority::Datadog => "datadog",
250    };
251    store.set_pull_state(&RemotePullState {
252        query_provider: label.into(),
253        cursor_json: page.next_cursor.unwrap_or_default(),
254        last_success_ms: Some(now_ms),
255    })?;
256    println!("pull: received {} item(s) (page)", page.items.len());
257    Ok(())
258}
259
260/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
261pub fn cmd_telemetry_push(
262    workspace: Option<&Path>,
263    all_workspaces: bool,
264    days: u32,
265    dry_run: bool,
266) -> Result<()> {
267    let roots = scope::resolve(workspace, all_workspaces)?;
268    let primary = roots
269        .first()
270        .cloned()
271        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
272    let cfg = config::load(&primary)?;
273    let Some(salt) = try_team_salt(&cfg.sync) else {
274        anyhow::bail!(
275            "telemetry push requires [sync].team_salt_hex (64 hex chars). \
276             The salt hashes session/workspace identifiers and drives payload redaction — \
277             not only for cloud sync."
278        );
279    };
280    let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
281    if registry.is_empty() {
282        anyhow::bail!(
283            "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
284             needs no extra feature; PostHog/Datadog/OTLP need build features); see \
285             `kaizen telemetry print-effective-config`."
286        );
287    }
288    let fail_open = cfg.telemetry.fail_open;
289    let team_id = cfg.sync.team_id.clone();
290    let end_ms = std::time::SystemTime::now()
291        .duration_since(std::time::UNIX_EPOCH)
292        .unwrap_or_default()
293        .as_millis() as u64;
294    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
295
296    let mut total_events: u64 = 0;
297    let mut total_batches: u64 = 0;
298
299    for root in &roots {
300        let store = Store::open(&root.join(".kaizen/kaizen.db"))?;
301        let ws_key = root.to_string_lossy().to_string();
302        let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
303        let wh = workspace_hash(&salt, root.as_path());
304        let outbound: Vec<_> = rows
305            .into_iter()
306            .map(|(session, ev)| {
307                let mut o = outbound_event_from_row(&ev, &session, &salt);
308                redact_payload(&mut o.payload, root.as_path(), &salt);
309                o
310            })
311            .collect();
312        let n = outbound.len() as u64;
313        total_events += n;
314        let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
315        let bcount = batches.len() as u64;
316        total_batches += bcount;
317        if dry_run {
318            eprintln!(
319                "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
320                root.display(),
321                n,
322                bcount
323            );
324            continue;
325        }
326        for batch in batches {
327            registry
328                .fan_out(fail_open, &batch)
329                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
330        }
331        eprintln!(
332            "telemetry push: {} — sent {} event(s) in {} batch(es)",
333            root.display(),
334            n,
335            bcount
336        );
337    }
338
339    eprintln!(
340        "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
341        total_events,
342        total_batches,
343        roots.len(),
344        if dry_run { " (dry-run)" } else { "" }
345    );
346    Ok(())
347}
348
349/// Example JSON for canonical per-item export names (ingest + third-party mappers).
350pub fn cmd_telemetry_print_schema() -> Result<()> {
351    let v = serde_json::json!({
352        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
353        "event_names": [
354            "kaizen.event",
355            "kaizen.tool_span",
356            "kaizen.repo_snapshot_chunk",
357            "kaizen.workspace_fact_snapshot"
358        ],
359        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
360    });
361    println!("{}", serde_json::to_string_pretty(&v)?);
362    Ok(())
363}