Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::Path;
19
20/// Interactive: append a PostHog stub row to `~/.kaizen/config.toml` (keys via env or paste).
21pub fn cmd_telemetry_configure(workspace: Option<&Path>) -> Result<()> {
22    let ws = workspace_path(workspace)?;
23    let home = std::env::var("HOME").context("HOME not set")?;
24    let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
25    std::fs::create_dir_all(p.parent().unwrap())?;
26
27    println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
28    println!(
29        "This command appends a `[[telemetry.exporters]]` table to {}.",
30        p.display()
31    );
32    print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
33    std::io::stdout().flush()?;
34    let mut line = String::new();
35    std::io::stdin().lock().read_line(&mut line)?;
36    let t = line.trim().to_lowercase();
37    if t.is_empty() {
38        println!("Aborted.");
39        return Ok(());
40    }
41    let block = match t.as_str() {
42        "file" => {
43            r#"
44[[telemetry.exporters]]
45type = "file"
46enabled = true
47# path = "telemetry.ndjson"   # optional; default .kaizen/telemetry.ndjson under each workspace
48"#
49        }
50        "posthog" => {
51            r#"
52[[telemetry.exporters]]
53type = "posthog"
54# project_api_key = "phc_..."  # or set POSTHOG_API_KEY
55# host = "https://us.i.posthog.com"
56"#
57        }
58        "datadog" => {
59            r#"
60[[telemetry.exporters]]
61type = "datadog"
62# api_key = "..."   # or set DD_API_KEY
63# site = "datadoghq.com"
64"#
65        }
66        "otlp" => {
67            r#"
68[[telemetry.exporters]]
69type = "otlp"
70# endpoint = "http://127.0.0.1:4318"  # or set OTEL_EXPORTER_OTLP_ENDPOINT
71"#
72        }
73        "dev" => {
74            r#"
75[[telemetry.exporters]]
76type = "dev"
77"#
78        }
79        _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
80    };
81
82    let mut f = std::fs::OpenOptions::new()
83        .create(true)
84        .append(true)
85        .open(&p)?;
86    if f.metadata()?.len() > 0 {
87        f.write_all(b"\n")?;
88    }
89    f.write_all(block.as_bytes())?;
90    let _ = ws;
91    println!(
92        "Appended. `file` needs no extra feature; use `--features telemetry-posthog` for PostHog."
93    );
94    Ok(())
95}
96
97/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
98pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
99    let ws = workspace_path(workspace)?;
100    let cfg = config::load(&ws)?;
101    use std::fmt::Write;
102    let mut s = String::new();
103    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
104    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
105        match e {
106            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
107            ExporterConfig::File { enabled, path } => {
108                let p = path
109                    .as_deref()
110                    .map(|p| p.to_string())
111                    .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
112                writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
113            }
114            ExporterConfig::Dev { enabled } => {
115                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
116            }
117            ExporterConfig::PostHog { .. } => {
118                let line = if let Some(r) = PostHogResolved::from_config(e) {
119                    format!(
120                        "[{i}] type=posthog host={} key=<redacted len {}>",
121                        r.host,
122                        r.project_api_key.len()
123                    )
124                } else {
125                    format!(
126                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
127                    )
128                };
129                writeln!(&mut s, "{line}").unwrap();
130            }
131            ExporterConfig::Datadog { .. } => {
132                let line = if let Some(r) = DatadogResolved::from_config(e) {
133                    format!(
134                        "[{i}] type=datadog site={} key=<redacted len {}>",
135                        r.site,
136                        r.api_key.len()
137                    )
138                } else {
139                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
140                };
141                writeln!(&mut s, "{line}").unwrap();
142            }
143            ExporterConfig::Otlp { .. } => {
144                let line = if let Some(r) = OtlpResolved::from_config(e) {
145                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
146                } else {
147                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
148                };
149                writeln!(&mut s, "{line}").unwrap();
150            }
151        }
152    }
153    if cfg.telemetry.exporters.is_empty() {
154        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
155    }
156    Ok(s)
157}
158
159pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
160    print!("{}", print_effective_config_text(workspace)?);
161    Ok(())
162}
163
164/// Alias of [`cmd_telemetry_configure`].
165pub fn cmd_telemetry_init(workspace: Option<&Path>) -> Result<()> {
166    cmd_telemetry_configure(workspace)
167}
168
169/// Resolve config, run provider `health` when available, show redacted exporter view.
170pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
171    let ws = workspace_path(workspace)?;
172    let cfg = config::load(&ws)?;
173    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
174    println!(
175        "telemetry.query.cache_ttl_seconds: {}",
176        cfg.telemetry.query.cache_ttl_seconds
177    );
178    match cfg.telemetry.query.provider {
179        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
180        crate::core::config::QueryAuthority::Posthog => {
181            println!("telemetry.query.provider: posthog");
182        }
183        crate::core::config::QueryAuthority::Datadog => {
184            println!("telemetry.query.provider: datadog");
185        }
186    }
187    if let Some(p) = provider_from_config(&cfg.telemetry.query) {
188        match p.health() {
189            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
190            Err(e) => eprintln!("provider health: {e}"),
191        }
192    } else {
193        println!("query provider: (not configured or features disabled; pull disabled)");
194    }
195    println!("\n{}", print_effective_config_text(Some(&ws))?);
196    println!("\nOTLP: export only — no query/pull in v1.");
197    Ok(())
198}
199
200/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
201pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
202    let ws = workspace_path(workspace)?;
203    let cfg = config::load(&ws)?;
204    let p = provider_from_config(&cfg.telemetry.query)
205        .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
206    let store = Store::open(&ws.join(".kaizen/kaizen.db"))?;
207    let page = p.pull(PullWindow { days }, None)?;
208    if !cfg.sync.team_id.trim().is_empty()
209        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
210        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
211    {
212        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
213            Ok(n) if n > 0 => {
214                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
215            }
216            _ => {}
217        }
218    }
219    let now_ms = std::time::SystemTime::now()
220        .duration_since(std::time::UNIX_EPOCH)
221        .unwrap_or_default()
222        .as_millis() as i64;
223    let label = match cfg.telemetry.query.provider {
224        crate::core::config::QueryAuthority::None => "none",
225        crate::core::config::QueryAuthority::Posthog => "posthog",
226        crate::core::config::QueryAuthority::Datadog => "datadog",
227    };
228    store.set_pull_state(&RemotePullState {
229        query_provider: label.into(),
230        cursor_json: page.next_cursor.unwrap_or_default(),
231        last_success_ms: Some(now_ms),
232    })?;
233    println!("pull: received {} item(s) (page)", page.items.len());
234    Ok(())
235}
236
237/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
238pub fn cmd_telemetry_push(
239    workspace: Option<&Path>,
240    all_workspaces: bool,
241    days: u32,
242    dry_run: bool,
243) -> Result<()> {
244    let roots = scope::resolve(workspace, all_workspaces)?;
245    let primary = roots
246        .first()
247        .cloned()
248        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
249    let cfg = config::load(&primary)?;
250    let Some(salt) = try_team_salt(&cfg.sync) else {
251        anyhow::bail!(
252            "telemetry push requires [sync].team_salt_hex (64 hex chars). \
253             The salt hashes session/workspace identifiers and drives payload redaction — \
254             not only for cloud sync."
255        );
256    };
257    let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
258    if registry.is_empty() {
259        anyhow::bail!(
260            "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
261             needs no extra feature; PostHog/Datadog/OTLP need build features); see \
262             `kaizen telemetry print-effective-config`."
263        );
264    }
265    let fail_open = cfg.telemetry.fail_open;
266    let team_id = cfg.sync.team_id.clone();
267    let end_ms = std::time::SystemTime::now()
268        .duration_since(std::time::UNIX_EPOCH)
269        .unwrap_or_default()
270        .as_millis() as u64;
271    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
272
273    let mut total_events: u64 = 0;
274    let mut total_batches: u64 = 0;
275
276    for root in &roots {
277        let store = Store::open(&root.join(".kaizen/kaizen.db"))?;
278        let ws_key = root.to_string_lossy().to_string();
279        let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
280        let wh = workspace_hash(&salt, root.as_path());
281        let outbound: Vec<_> = rows
282            .into_iter()
283            .map(|(session, ev)| {
284                let mut o = outbound_event_from_row(&ev, &session, &salt);
285                redact_payload(&mut o.payload, root.as_path(), &salt);
286                o
287            })
288            .collect();
289        let n = outbound.len() as u64;
290        total_events += n;
291        let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
292        let bcount = batches.len() as u64;
293        total_batches += bcount;
294        if dry_run {
295            eprintln!(
296                "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
297                root.display(),
298                n,
299                bcount
300            );
301            continue;
302        }
303        for batch in batches {
304            registry
305                .fan_out(fail_open, &batch)
306                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
307        }
308        eprintln!(
309            "telemetry push: {} — sent {} event(s) in {} batch(es)",
310            root.display(),
311            n,
312            bcount
313        );
314    }
315
316    eprintln!(
317        "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
318        total_events,
319        total_batches,
320        roots.len(),
321        if dry_run { " (dry-run)" } else { "" }
322    );
323    Ok(())
324}
325
326/// Example JSON for canonical per-item export names (ingest + third-party mappers).
327pub fn cmd_telemetry_print_schema() -> Result<()> {
328    let v = serde_json::json!({
329        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
330        "event_names": [
331            "kaizen.event",
332            "kaizen.tool_span",
333            "kaizen.repo_snapshot_chunk",
334            "kaizen.workspace_fact_snapshot"
335        ],
336        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
337    });
338    println!("{}", serde_json::to_string_pretty(&v)?);
339    Ok(())
340}