Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::Path;
19
20/// Interactive: append a PostHog stub row to `~/.kaizen/config.toml` (keys via env or paste).
21pub fn cmd_telemetry_configure(workspace: Option<&Path>) -> Result<()> {
22    let ws = workspace_path(workspace)?;
23    let home = std::env::var("HOME").context("HOME not set")?;
24    let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
25    std::fs::create_dir_all(p.parent().unwrap())?;
26
27    println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
28    println!(
29        "This command appends a `[[telemetry.exporters]]` table to {}.",
30        p.display()
31    );
32    print!("Type `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
33    std::io::stdout().flush()?;
34    let mut line = String::new();
35    std::io::stdin().lock().read_line(&mut line)?;
36    let t = line.trim().to_lowercase();
37    if t.is_empty() {
38        println!("Aborted.");
39        return Ok(());
40    }
41    let block = match t.as_str() {
42        "posthog" => {
43            r#"
44[[telemetry.exporters]]
45type = "posthog"
46# project_api_key = "phc_..."  # or set POSTHOG_API_KEY
47# host = "https://us.i.posthog.com"
48"#
49        }
50        "datadog" => {
51            r#"
52[[telemetry.exporters]]
53type = "datadog"
54# api_key = "..."   # or set DD_API_KEY
55# site = "datadoghq.com"
56"#
57        }
58        "otlp" => {
59            r#"
60[[telemetry.exporters]]
61type = "otlp"
62# endpoint = "http://127.0.0.1:4318"  # or set OTEL_EXPORTER_OTLP_ENDPOINT
63"#
64        }
65        "dev" => {
66            r#"
67[[telemetry.exporters]]
68type = "dev"
69"#
70        }
71        _ => anyhow::bail!("unknown type (use posthog, datadog, otlp, dev)"),
72    };
73
74    let mut f = std::fs::OpenOptions::new()
75        .create(true)
76        .append(true)
77        .open(&p)?;
78    if f.metadata()?.len() > 0 {
79        f.write_all(b"\n")?;
80    }
81    f.write_all(block.as_bytes())?;
82    let _ = ws;
83    println!("Appended. Rebuild with e.g. `--features telemetry-posthog` for PostHog.");
84    Ok(())
85}
86
87/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
88pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
89    let ws = workspace_path(workspace)?;
90    let cfg = config::load(&ws)?;
91    use std::fmt::Write;
92    let mut s = String::new();
93    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
94    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
95        match e {
96            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
97            ExporterConfig::Dev { enabled } => {
98                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
99            }
100            ExporterConfig::PostHog { .. } => {
101                let line = if let Some(r) = PostHogResolved::from_config(e) {
102                    format!(
103                        "[{i}] type=posthog host={} key=<redacted len {}>",
104                        r.host,
105                        r.project_api_key.len()
106                    )
107                } else {
108                    format!(
109                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
110                    )
111                };
112                writeln!(&mut s, "{line}").unwrap();
113            }
114            ExporterConfig::Datadog { .. } => {
115                let line = if let Some(r) = DatadogResolved::from_config(e) {
116                    format!(
117                        "[{i}] type=datadog site={} key=<redacted len {}>",
118                        r.site,
119                        r.api_key.len()
120                    )
121                } else {
122                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
123                };
124                writeln!(&mut s, "{line}").unwrap();
125            }
126            ExporterConfig::Otlp { .. } => {
127                let line = if let Some(r) = OtlpResolved::from_config(e) {
128                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
129                } else {
130                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
131                };
132                writeln!(&mut s, "{line}").unwrap();
133            }
134        }
135    }
136    if cfg.telemetry.exporters.is_empty() {
137        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
138    }
139    Ok(s)
140}
141
142pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
143    print!("{}", print_effective_config_text(workspace)?);
144    Ok(())
145}
146
147/// Alias of [`cmd_telemetry_configure`].
148pub fn cmd_telemetry_init(workspace: Option<&Path>) -> Result<()> {
149    cmd_telemetry_configure(workspace)
150}
151
152/// Resolve config, run provider `health` when available, show redacted exporter view.
153pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
154    let ws = workspace_path(workspace)?;
155    let cfg = config::load(&ws)?;
156    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
157    println!(
158        "telemetry.query.cache_ttl_seconds: {}",
159        cfg.telemetry.query.cache_ttl_seconds
160    );
161    match cfg.telemetry.query.provider {
162        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
163        crate::core::config::QueryAuthority::Posthog => {
164            println!("telemetry.query.provider: posthog");
165        }
166        crate::core::config::QueryAuthority::Datadog => {
167            println!("telemetry.query.provider: datadog");
168        }
169    }
170    if let Some(p) = provider_from_config(&cfg.telemetry.query) {
171        match p.health() {
172            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
173            Err(e) => eprintln!("provider health: {e}"),
174        }
175    } else {
176        println!("query provider: (not configured or features disabled; pull disabled)");
177    }
178    println!("\n{}", print_effective_config_text(Some(&ws))?);
179    println!("\nOTLP: export only — no query/pull in v1.");
180    Ok(())
181}
182
183/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
184pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
185    let ws = workspace_path(workspace)?;
186    let cfg = config::load(&ws)?;
187    let p = provider_from_config(&cfg.telemetry.query)
188        .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
189    let store = Store::open(&ws.join(".kaizen/kaizen.db"))?;
190    let page = p.pull(PullWindow { days }, None)?;
191    if !cfg.sync.team_id.trim().is_empty()
192        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
193        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
194    {
195        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
196            Ok(n) if n > 0 => {
197                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
198            }
199            _ => {}
200        }
201    }
202    let now_ms = std::time::SystemTime::now()
203        .duration_since(std::time::UNIX_EPOCH)
204        .unwrap_or_default()
205        .as_millis() as i64;
206    let label = match cfg.telemetry.query.provider {
207        crate::core::config::QueryAuthority::None => "none",
208        crate::core::config::QueryAuthority::Posthog => "posthog",
209        crate::core::config::QueryAuthority::Datadog => "datadog",
210    };
211    store.set_pull_state(&RemotePullState {
212        query_provider: label.into(),
213        cursor_json: page.next_cursor.unwrap_or_default(),
214        last_success_ms: Some(now_ms),
215    })?;
216    println!("pull: received {} item(s) (page)", page.items.len());
217    Ok(())
218}
219
220/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
221pub fn cmd_telemetry_push(
222    workspace: Option<&Path>,
223    all_workspaces: bool,
224    days: u32,
225    dry_run: bool,
226) -> Result<()> {
227    let roots = scope::resolve(workspace, all_workspaces)?;
228    let primary = roots
229        .first()
230        .cloned()
231        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
232    let cfg = config::load(&primary)?;
233    let Some(salt) = try_team_salt(&cfg.sync) else {
234        anyhow::bail!(
235            "telemetry push requires [sync].team_salt_hex (64 hex chars). \
236             The salt hashes session/workspace identifiers and drives payload redaction — \
237             not only for cloud sync."
238        );
239    };
240    let registry = telemetry::load_exporters(&cfg.telemetry);
241    if registry.is_empty() {
242        anyhow::bail!(
243            "no telemetry exporters to push to: enable a feature (e.g. telemetry-dev) and add \
244             [[telemetry.exporters]] with credentials; see `kaizen telemetry print-effective-config`."
245        );
246    }
247    let fail_open = cfg.telemetry.fail_open;
248    let team_id = cfg.sync.team_id.clone();
249    let end_ms = std::time::SystemTime::now()
250        .duration_since(std::time::UNIX_EPOCH)
251        .unwrap_or_default()
252        .as_millis() as u64;
253    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
254
255    let mut total_events: u64 = 0;
256    let mut total_batches: u64 = 0;
257
258    for root in &roots {
259        let store = Store::open(&root.join(".kaizen/kaizen.db"))?;
260        let ws_key = root.to_string_lossy().to_string();
261        let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
262        let wh = workspace_hash(&salt, root.as_path());
263        let outbound: Vec<_> = rows
264            .into_iter()
265            .map(|(session, ev)| {
266                let mut o = outbound_event_from_row(&ev, &session, &salt);
267                redact_payload(&mut o.payload, root.as_path(), &salt);
268                o
269            })
270            .collect();
271        let n = outbound.len() as u64;
272        total_events += n;
273        let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
274        let bcount = batches.len() as u64;
275        total_batches += bcount;
276        if dry_run {
277            eprintln!(
278                "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
279                root.display(),
280                n,
281                bcount
282            );
283            continue;
284        }
285        for batch in batches {
286            registry
287                .fan_out(fail_open, &batch)
288                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
289        }
290        eprintln!(
291            "telemetry push: {} — sent {} event(s) in {} batch(es)",
292            root.display(),
293            n,
294            bcount
295        );
296    }
297
298    eprintln!(
299        "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
300        total_events,
301        total_batches,
302        roots.len(),
303        if dry_run { " (dry-run)" } else { "" }
304    );
305    Ok(())
306}
307
308/// Example JSON for canonical per-item export names (ingest + third-party mappers).
309pub fn cmd_telemetry_print_schema() -> Result<()> {
310    let v = serde_json::json!({
311        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
312        "event_names": [
313            "kaizen.event",
314            "kaizen.tool_span",
315            "kaizen.repo_snapshot_chunk",
316            "kaizen.workspace_fact_snapshot"
317        ],
318        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
319    });
320    println!("{}", serde_json::to_string_pretty(&v)?);
321    Ok(())
322}