Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::core::project_identity::project_name;
7#[cfg(any(feature = "telemetry-datadog", feature = "telemetry-posthog"))]
8use crate::provider::TelemetryQueryProvider;
9use crate::provider::{PullWindow, from_config as provider_from_config};
10use crate::shell::cli::workspace_path;
11use crate::shell::scope;
12use crate::store::Store;
13use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
14use crate::sync::IngestExportBatch;
15use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
16use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::outbound_tool_span;
19use crate::sync::workspace_hash;
20use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
21use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
22use anyhow::{Context, Result};
23use std::io::{BufRead, Write};
24use std::path::{Path, PathBuf};
25
26#[derive(Debug, Clone, Default)]
27pub struct ConfigureOptions {
28    pub exporter_type: Option<String>,
29    pub path: Option<PathBuf>,
30    pub api_key: Option<String>,
31    pub site: Option<String>,
32    pub host: Option<String>,
33    pub endpoint: Option<String>,
34    pub non_interactive: bool,
35}
36
37/// Validating wizard: prompt for missing creds (or read from env / flags), `health`-check the
38/// resolved provider before touching `~/.kaizen/config.toml`, then append the exporter,
39/// idempotently set `[telemetry.query].provider` so `pull` works without extra config, and
40/// ensure a redaction salt exists. Failure to validate aborts with a clear error and writes
41/// nothing. Re-running for the same exporter type + key field is a no-op (no duplicate row).
42pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
43    let ws = workspace_path(workspace)?;
44    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
45    let cfg_path = home.join("config.toml");
46    std::fs::create_dir_all(&home)?;
47
48    println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
49    let t = resolve_exporter_type(&options)?;
50    if t.is_empty() {
51        println!("Aborted.");
52        return Ok(());
53    }
54
55    let block = match t.as_str() {
56        "file" => file_exporter_block(options.path.as_deref()),
57        "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
58        "datadog" => configure_datadog(&options)?,
59        "posthog" => configure_posthog(&options)?,
60        "otlp" => configure_otlp(&options)?,
61        _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
62    };
63
64    let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
65    if exporter_already_present(&existing, &t) {
66        println!(
67            "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
68             Edit the file directly to change credentials.",
69            cfg_path.display()
70        );
71    } else {
72        append_block(&cfg_path, &block)?;
73    }
74    ensure_query_authority(&cfg_path, &t)?;
75
76    let cfg = config::load(&ws)?;
77    let _ = effective_redaction_salt(&cfg.sync, &home).context(
78        "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
79    )?;
80    println!("Wrote {}.", cfg_path.display());
81    println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
82    Ok(())
83}
84
85/// True when the file already contains a `[[telemetry.exporters]]` row whose `type = "<t>"`.
86/// Cheap line scan rather than full TOML parse: keeps the wizard side-effect-free if a user
87/// hand-edited the file with comments/whitespace we cannot round-trip.
88pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
89    let mut in_exporter_block = false;
90    let needle = format!("type = \"{t}\"");
91    for line in toml_text.lines() {
92        let l = line.trim();
93        if l.starts_with("[[telemetry.exporters]]") {
94            in_exporter_block = true;
95            continue;
96        }
97        if l.starts_with('[') {
98            in_exporter_block = false;
99            continue;
100        }
101        if in_exporter_block && l == needle {
102            return true;
103        }
104    }
105    false
106}
107
108/// Append `[telemetry.query] provider = "<authority>"` only if the file has no `[telemetry.query]`
109/// table yet. Never overrides an existing user choice; only sets one for `posthog` / `datadog`.
110fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
111    let authority = match t {
112        "datadog" => "datadog",
113        "posthog" => "posthog",
114        _ => return Ok(()),
115    };
116    let existing = std::fs::read_to_string(path).unwrap_or_default();
117    if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
118        return Ok(());
119    }
120    let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
121    append_block(path, &block)
122}
123
124fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
125    if let Some(t) = &opts.exporter_type {
126        return Ok(t.trim().to_lowercase());
127    }
128    if opts.non_interactive {
129        anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
130    }
131    print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
132    std::io::stdout().flush()?;
133    let mut line = String::new();
134    std::io::stdin().lock().read_line(&mut line)?;
135    Ok(line.trim().to_lowercase())
136}
137
138fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
139    let api_key = read_secret(
140        "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
141         create one at Org Settings > API Keys)",
142        opts.api_key.clone(),
143        "DD_API_KEY",
144        opts.non_interactive,
145    )?;
146    if let Some(rejected) = reject_obvious_app_key(&api_key) {
147        anyhow::bail!("{rejected}");
148    }
149    let site = read_value(
150        "Datadog site",
151        opts.site.clone(),
152        "DD_SITE",
153        Some("datadoghq.com".into()),
154        opts.non_interactive,
155    )?;
156    health_check_datadog(&api_key, &site).context(
157        "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
158    )?;
159    Ok(datadog_block(&api_key, &site))
160}
161
162fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
163    let key = read_secret(
164        "PostHog project API key (phc_...)",
165        opts.api_key.clone(),
166        "POSTHOG_API_KEY",
167        opts.non_interactive,
168    )?;
169    let host = read_value(
170        "PostHog host",
171        opts.host.clone(),
172        "POSTHOG_HOST",
173        Some("https://us.i.posthog.com".into()),
174        opts.non_interactive,
175    )?;
176    health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
177    Ok(format!(
178        "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
179        key.replace('\\', "\\\\").replace('"', "\\\""),
180        host.replace('\\', "\\\\").replace('"', "\\\""),
181    ))
182}
183
184fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
185    let endpoint = read_value(
186        "OTLP endpoint",
187        opts.endpoint.clone(),
188        "OTEL_EXPORTER_OTLP_ENDPOINT",
189        Some("http://127.0.0.1:4318".into()),
190        opts.non_interactive,
191    )?;
192    Ok(format!(
193        "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
194        endpoint.replace('\\', "\\\\").replace('"', "\\\""),
195    ))
196}
197
198/// Local sanity check: DD Application Keys start with `ddapp_`; sending one as `DD-API-KEY`
199/// always 403s. Catch the mistake before the network round-trip with a hint that names both
200/// key types so the user can tell them apart.
201pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
202    if value.starts_with("ddapp_") {
203        Some(
204            "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
205             (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
206        )
207    } else {
208        None
209    }
210}
211
212fn datadog_block(api_key: &str, site: &str) -> String {
213    format!(
214        "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
215        api_key.replace('\\', "\\\\").replace('"', "\\\""),
216        site.replace('\\', "\\\\").replace('"', "\\\""),
217    )
218}
219
220fn append_block(path: &Path, block: &str) -> Result<()> {
221    let mut f = std::fs::OpenOptions::new()
222        .create(true)
223        .append(true)
224        .open(path)?;
225    if f.metadata()?.len() > 0 {
226        f.write_all(b"\n")?;
227    }
228    f.write_all(block.as_bytes())?;
229    Ok(())
230}
231
232fn read_secret(
233    prompt: &str,
234    flag: Option<String>,
235    env_key: &str,
236    non_interactive: bool,
237) -> Result<String> {
238    if let Some(v) = flag.filter(|s| !s.is_empty()) {
239        return Ok(v);
240    }
241    if let Ok(v) = std::env::var(env_key)
242        && !v.is_empty()
243    {
244        return Ok(v);
245    }
246    if non_interactive {
247        anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
248    }
249    print!("{prompt}: ");
250    std::io::stdout().flush()?;
251    let mut line = String::new();
252    std::io::stdin().lock().read_line(&mut line)?;
253    let v = line.trim().to_string();
254    if v.is_empty() {
255        anyhow::bail!("{env_key} is required");
256    }
257    Ok(v)
258}
259
260fn read_value(
261    prompt: &str,
262    flag: Option<String>,
263    env_key: &str,
264    default: Option<String>,
265    non_interactive: bool,
266) -> Result<String> {
267    if let Some(v) = flag.filter(|s| !s.is_empty()) {
268        return Ok(v);
269    }
270    if let Ok(v) = std::env::var(env_key)
271        && !v.is_empty()
272    {
273        return Ok(v);
274    }
275    if non_interactive {
276        return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
277    }
278    let hint = default
279        .as_deref()
280        .map(|d| format!(" [{d}]"))
281        .unwrap_or_default();
282    print!("{prompt}{hint}: ");
283    std::io::stdout().flush()?;
284    let mut line = String::new();
285    std::io::stdin().lock().read_line(&mut line)?;
286    let v = line.trim().to_string();
287    if v.is_empty() {
288        return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
289    }
290    Ok(v)
291}
292
293fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
294    let r = DatadogResolved {
295        site: site.to_string(),
296        api_key: api_key.to_string(),
297        app_key: None,
298    };
299    #[cfg(feature = "telemetry-datadog")]
300    {
301        let c = crate::provider::datadog::DatadogQueryClient::new(&r);
302        c.health()
303    }
304    #[cfg(not(feature = "telemetry-datadog"))]
305    {
306        let _ = &r;
307        anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
308    }
309}
310
311fn health_check_posthog(host: &str) -> Result<()> {
312    let r = PostHogResolved {
313        host: host.to_string(),
314        project_api_key: String::new(),
315    };
316    #[cfg(feature = "telemetry-posthog")]
317    {
318        let c = crate::provider::posthog::PostHogQueryClient::new(&r);
319        c.health()
320    }
321    #[cfg(not(feature = "telemetry-posthog"))]
322    {
323        let _ = &r;
324        anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
325    }
326}
327
328fn file_exporter_block(path: Option<&Path>) -> String {
329    let mut block = String::from(
330        r#"
331[[telemetry.exporters]]
332type = "file"
333enabled = true
334"#,
335    );
336    if let Some(path) = path {
337        use std::fmt::Write as _;
338        let path = path
339            .to_string_lossy()
340            .replace('\\', "\\\\")
341            .replace('"', "\\\"");
342        writeln!(&mut block, "path = \"{path}\"").unwrap();
343    } else {
344        block.push_str(
345            "# path = \"telemetry.ndjson\"   # optional; default .kaizen/telemetry.ndjson under each workspace\n",
346        );
347    }
348    block
349}
350
351/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
352pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
353    let ws = workspace_path(workspace)?;
354    let cfg = config::load(&ws)?;
355    use std::fmt::Write;
356    let mut s = String::new();
357    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
358    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
359        match e {
360            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
361            ExporterConfig::File { enabled, path } => {
362                let p = path
363                    .as_deref()
364                    .map(|p| p.to_string())
365                    .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
366                writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
367            }
368            ExporterConfig::Dev { enabled } => {
369                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
370            }
371            ExporterConfig::PostHog { .. } => {
372                let line = if let Some(r) = PostHogResolved::from_config(e) {
373                    format!(
374                        "[{i}] type=posthog host={} key=<redacted len {}>",
375                        r.host,
376                        r.project_api_key.len()
377                    )
378                } else {
379                    format!(
380                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
381                    )
382                };
383                writeln!(&mut s, "{line}").unwrap();
384            }
385            ExporterConfig::Datadog { .. } => {
386                let line = if let Some(r) = DatadogResolved::from_config(e) {
387                    format!(
388                        "[{i}] type=datadog site={} key=<redacted len {}>",
389                        r.site,
390                        r.api_key.len()
391                    )
392                } else {
393                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
394                };
395                writeln!(&mut s, "{line}").unwrap();
396            }
397            ExporterConfig::Otlp { .. } => {
398                let line = if let Some(r) = OtlpResolved::from_config(e) {
399                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
400                } else {
401                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
402                };
403                writeln!(&mut s, "{line}").unwrap();
404            }
405        }
406    }
407    if cfg.telemetry.exporters.is_empty() {
408        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
409    }
410    Ok(s)
411}
412
413pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
414    print!("{}", print_effective_config_text(workspace)?);
415    Ok(())
416}
417
418/// Alias of [`cmd_telemetry_configure`].
419pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
420    cmd_telemetry_configure(workspace, options)
421}
422
423/// Resolve config, run provider `health` when available, show redacted exporter view.
424pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
425    let ws = workspace_path(workspace)?;
426    let cfg = config::load(&ws)?;
427    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
428    println!(
429        "telemetry.query.cache_ttl_seconds: {}",
430        cfg.telemetry.query.cache_ttl_seconds
431    );
432    match cfg.telemetry.query.provider {
433        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
434        crate::core::config::QueryAuthority::Posthog => {
435            println!("telemetry.query.provider: posthog");
436        }
437        crate::core::config::QueryAuthority::Datadog => {
438            println!("telemetry.query.provider: datadog");
439        }
440    }
441    if let Some(p) = provider_from_config(&cfg.telemetry) {
442        match p.health() {
443            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
444            Err(e) => eprintln!("provider health: {e}"),
445        }
446    } else {
447        println!("query provider: (not configured or features disabled; pull disabled)");
448    }
449    println!("\n{}", print_effective_config_text(Some(&ws))?);
450    println!("\nOTLP: export only — no query/pull in v1.");
451    Ok(())
452}
453
454/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
455pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
456    let ws = workspace_path(workspace)?;
457    let cfg = config::load(&ws)?;
458    let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
459        anyhow::anyhow!(
460            "no query provider resolved. Either:\n  \
461             1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
462             writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n  \
463             2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
464             ensure DD_API_KEY is reachable (TOML row or env)."
465        )
466    })?;
467    let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
468    let page = p.pull(PullWindow { days }, None)?;
469    if !cfg.sync.team_id.trim().is_empty()
470        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
471        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
472    {
473        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
474            Ok(n) if n > 0 => {
475                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
476            }
477            _ => {}
478        }
479    }
480    let now_ms = std::time::SystemTime::now()
481        .duration_since(std::time::UNIX_EPOCH)
482        .unwrap_or_default()
483        .as_millis() as i64;
484    let label = match cfg.telemetry.query.provider {
485        crate::core::config::QueryAuthority::None => "none",
486        crate::core::config::QueryAuthority::Posthog => "posthog",
487        crate::core::config::QueryAuthority::Datadog => "datadog",
488    };
489    store.set_pull_state(&RemotePullState {
490        query_provider: label.into(),
491        cursor_json: page.next_cursor.unwrap_or_default(),
492        last_success_ms: Some(now_ms),
493    })?;
494    println!("pull: received {} item(s) (page)", page.items.len());
495    Ok(())
496}
497
498/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
499pub fn cmd_telemetry_push(
500    workspace: Option<&Path>,
501    all_workspaces: bool,
502    days: u32,
503    dry_run: bool,
504) -> Result<()> {
505    let roots = scope::resolve(workspace, all_workspaces)?;
506    let primary = roots
507        .first()
508        .cloned()
509        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
510    let cfg = config::load(&primary)?;
511    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
512    let salt = effective_redaction_salt(&cfg.sync, &home).context(
513        "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
514    )?;
515    let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
516    if registry.is_empty() {
517        anyhow::bail!(
518            "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
519             needs no extra feature; PostHog/Datadog/OTLP need build features); see \
520             `kaizen telemetry print-effective-config`."
521        );
522    }
523    let fail_open = cfg.telemetry.fail_open;
524    let team_id = cfg.sync.team_id.clone();
525    let end_ms = std::time::SystemTime::now()
526        .duration_since(std::time::UNIX_EPOCH)
527        .unwrap_or_default()
528        .as_millis() as u64;
529    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
530
531    let mut total_events: u64 = 0;
532    let mut total_spans: u64 = 0;
533    let mut total_batches: u64 = 0;
534    let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
535    let mut total_stale: u64 = 0;
536
537    for root in &roots {
538        let store = Store::open(&crate::core::workspace::db_path(root)?)?;
539        let ws_key = root.to_string_lossy().to_string();
540        let wh = workspace_hash(&salt, root.as_path());
541        let project = project_name(root.as_path());
542
543        let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
544        let stale_events = event_rows
545            .iter()
546            .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
547            .count() as u64;
548        let outbound_events: Vec<_> = event_rows
549            .into_iter()
550            .map(|(session, ev)| {
551                let mut o = outbound_event_from_row(&ev, &session, &salt);
552                redact_payload(&mut o.payload, root.as_path(), &salt);
553                o
554            })
555            .collect();
556        let n_events = outbound_events.len() as u64;
557        let event_batches = chunk_events_into_ingest_batches(
558            team_id.clone(),
559            wh.clone(),
560            project.clone(),
561            outbound_events,
562            &cfg.sync,
563        )?;
564
565        let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
566        let stale_spans = span_rows
567            .iter()
568            .filter(|r| {
569                r.started_at_ms
570                    .or(r.ended_at_ms)
571                    .map(|t| t < intake_warning_threshold_ms)
572                    .unwrap_or(false)
573            })
574            .count() as u64;
575        let outbound_spans: Vec<_> = span_rows
576            .iter()
577            .map(|r| outbound_tool_span(r, &salt))
578            .collect();
579        let n_spans = outbound_spans.len() as u64;
580        let span_batches = chunk_tool_spans_into_ingest_batches(
581            team_id.clone(),
582            wh,
583            project,
584            outbound_spans,
585            &cfg.sync,
586        )?;
587
588        let bcount = (event_batches.len() + span_batches.len()) as u64;
589        total_events += n_events;
590        total_spans += n_spans;
591        total_batches += bcount;
592        total_stale += stale_events + stale_spans;
593
594        if dry_run {
595            eprintln!(
596                "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
597                root.display(),
598                n_events,
599                n_spans,
600                bcount
601            );
602            continue;
603        }
604        for batch in event_batches.into_iter().chain(span_batches) {
605            registry
606                .fan_out(fail_open, &batch)
607                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
608        }
609        eprintln!(
610            "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
611            root.display(),
612            n_events,
613            n_spans,
614            bcount
615        );
616    }
617
618    eprintln!(
619        "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
620        total_events,
621        total_spans,
622        total_batches,
623        roots.len(),
624        if dry_run { " (dry-run)" } else { "" }
625    );
626    if total_stale > 0 {
627        eprintln!(
628            "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
629             drops these (organization default). PostHog/OTLP/file sinks accept them without \
630             change. Use `--days N` with N <= 1 to skip stale items.",
631            total_stale
632        );
633    }
634    Ok(())
635}
636
637/// Send one synthetic redacted event through every configured exporter, report ok/fail per
638/// sink. Pure observability: no SQLite read, no outbox enqueue, no Kaizen POST.
639pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
640    let ws = workspace_path(workspace)?;
641    let cfg = config::load(&ws)?;
642    let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
643    if registry.is_empty() {
644        anyhow::bail!(
645            "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
646        );
647    }
648    let batch = synthetic_batch(&cfg.sync.team_id);
649    println!("telemetry test: sending one synthetic event to each configured sink ...");
650    let mut all_ok = true;
651    for name in registry.exporter_names() {
652        match registry.export_one(&name, &batch) {
653            Ok(()) => println!("  [{name}] ok"),
654            Err(e) => {
655                all_ok = false;
656                println!("  [{name}] FAIL: {e:#}");
657            }
658        }
659    }
660    if !all_ok {
661        anyhow::bail!("one or more exporters failed (see above)");
662    }
663    println!("telemetry test: all exporters accepted the synthetic event.");
664    Ok(())
665}
666
667fn synthetic_batch(team_id: &str) -> IngestExportBatch {
668    let now_ms = std::time::SystemTime::now()
669        .duration_since(std::time::UNIX_EPOCH)
670        .map(|d| d.as_millis() as u64)
671        .unwrap_or(0);
672    IngestExportBatch::Events(EventsBatchBody {
673        team_id: team_id.to_string(),
674        workspace_hash: "blake3:test-workspace".into(),
675        project_name: Some("telemetry-test".into()),
676        events: vec![OutboundEvent {
677            session_id_hash: "blake3:test-session".into(),
678            event_seq: 0,
679            ts_ms: now_ms,
680            agent: "kaizen".into(),
681            model: "synthetic".into(),
682            kind: "lifecycle".into(),
683            source: "tail".into(),
684            tool: None,
685            tool_call_id: None,
686            tokens_in: Some(0),
687            tokens_out: Some(0),
688            reasoning_tokens: None,
689            cost_usd_e6: None,
690            payload: serde_json::json!({"kaizen.telemetry_test": true}),
691        }],
692    })
693}
694
695/// Example JSON for canonical per-item export names (ingest + third-party mappers).
696pub fn cmd_telemetry_print_schema() -> Result<()> {
697    let v = serde_json::json!({
698        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
699        "event_names": [
700            "kaizen.event",
701            "kaizen.tool_span",
702            "kaizen.repo_snapshot_chunk",
703            "kaizen.workspace_fact_snapshot"
704        ],
705        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
706    });
707    println!("{}", serde_json::to_string_pretty(&v)?);
708    Ok(())
709}
710
711#[cfg(test)]
712mod tests {
713    use super::*;
714
715    #[test]
716    fn exporter_already_present_detects_existing_datadog_row() {
717        let toml = r#"
718[[telemetry.exporters]]
719type = "datadog"
720api_key = "abc"
721site = "us5.datadoghq.com"
722"#;
723        assert!(exporter_already_present(toml, "datadog"));
724        assert!(!exporter_already_present(toml, "posthog"));
725    }
726
727    #[test]
728    fn exporter_already_present_handles_other_tables_between() {
729        let toml = r#"
730[[telemetry.exporters]]
731type = "file"
732enabled = true
733
734[telemetry.query]
735provider = "datadog"
736
737[[telemetry.exporters]]
738type = "datadog"
739api_key = "abc"
740"#;
741        assert!(exporter_already_present(toml, "file"));
742        assert!(exporter_already_present(toml, "datadog"));
743        assert!(!exporter_already_present(toml, "otlp"));
744    }
745
746    #[test]
747    fn reject_obvious_app_key_catches_ddapp_prefix() {
748        assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
749        assert!(reject_obvious_app_key("not_ddapp_plain_api_key_value").is_none());
750    }
751
752    #[test]
753    fn ensure_query_authority_appends_when_missing() {
754        let dir = tempfile::TempDir::new().unwrap();
755        let p = dir.path().join("config.toml");
756        std::fs::write(
757            &p,
758            "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
759        )
760        .unwrap();
761        ensure_query_authority(&p, "datadog").unwrap();
762        let s = std::fs::read_to_string(&p).unwrap();
763        assert!(s.contains("[telemetry.query]"));
764        assert!(s.contains("provider = \"datadog\""));
765    }
766
767    #[test]
768    fn ensure_query_authority_idempotent_when_present() {
769        let dir = tempfile::TempDir::new().unwrap();
770        let p = dir.path().join("config.toml");
771        let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
772        std::fs::write(&p, original).unwrap();
773        ensure_query_authority(&p, "datadog").unwrap();
774        let s = std::fs::read_to_string(&p).unwrap();
775        // User's existing posthog choice must NOT be overridden by the wizard.
776        assert_eq!(s, original);
777    }
778}