Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::provider::{PullWindow, TelemetryQueryProvider, from_config as provider_from_config};
7use crate::shell::cli::workspace_path;
8use crate::shell::scope;
9use crate::store::Store;
10use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
11use crate::sync::IngestExportBatch;
12use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
13use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
14use crate::sync::redact::redact_payload;
15use crate::sync::smart::outbound_tool_span;
16use crate::sync::workspace_hash;
17use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
18use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
19use anyhow::{Context, Result};
20use std::io::{BufRead, Write};
21use std::path::{Path, PathBuf};
22
23#[derive(Debug, Clone, Default)]
24pub struct ConfigureOptions {
25    pub exporter_type: Option<String>,
26    pub path: Option<PathBuf>,
27    pub api_key: Option<String>,
28    pub site: Option<String>,
29    pub host: Option<String>,
30    pub endpoint: Option<String>,
31    pub non_interactive: bool,
32}
33
34/// Validating wizard: prompt for missing creds (or read from env / flags), `health`-check the
35/// resolved provider before touching `~/.kaizen/config.toml`, then append the exporter,
36/// idempotently set `[telemetry.query].provider` so `pull` works without extra config, and
37/// ensure a redaction salt exists. Failure to validate aborts with a clear error and writes
38/// nothing. Re-running for the same exporter type + key field is a no-op (no duplicate row).
39pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
40    let ws = workspace_path(workspace)?;
41    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
42    let cfg_path = home.join("config.toml");
43    std::fs::create_dir_all(&home)?;
44
45    println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
46    let t = resolve_exporter_type(&options)?;
47    if t.is_empty() {
48        println!("Aborted.");
49        return Ok(());
50    }
51
52    let block = match t.as_str() {
53        "file" => file_exporter_block(options.path.as_deref()),
54        "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
55        "datadog" => configure_datadog(&options)?,
56        "posthog" => configure_posthog(&options)?,
57        "otlp" => configure_otlp(&options)?,
58        _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
59    };
60
61    let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
62    if exporter_already_present(&existing, &t) {
63        println!(
64            "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
65             Edit the file directly to change credentials.",
66            cfg_path.display()
67        );
68    } else {
69        append_block(&cfg_path, &block)?;
70    }
71    ensure_query_authority(&cfg_path, &t)?;
72
73    let cfg = config::load(&ws)?;
74    let _ = effective_redaction_salt(&cfg.sync, &home).context(
75        "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
76    )?;
77    println!("Wrote {}.", cfg_path.display());
78    println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
79    Ok(())
80}
81
82/// True when the file already contains a `[[telemetry.exporters]]` row whose `type = "<t>"`.
83/// Cheap line scan rather than full TOML parse: keeps the wizard side-effect-free if a user
84/// hand-edited the file with comments/whitespace we cannot round-trip.
85pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
86    let mut in_exporter_block = false;
87    let needle = format!("type = \"{t}\"");
88    for line in toml_text.lines() {
89        let l = line.trim();
90        if l.starts_with("[[telemetry.exporters]]") {
91            in_exporter_block = true;
92            continue;
93        }
94        if l.starts_with('[') {
95            in_exporter_block = false;
96            continue;
97        }
98        if in_exporter_block && l == needle {
99            return true;
100        }
101    }
102    false
103}
104
105/// Append `[telemetry.query] provider = "<authority>"` only if the file has no `[telemetry.query]`
106/// table yet. Never overrides an existing user choice; only sets one for `posthog` / `datadog`.
107fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
108    let authority = match t {
109        "datadog" => "datadog",
110        "posthog" => "posthog",
111        _ => return Ok(()),
112    };
113    let existing = std::fs::read_to_string(path).unwrap_or_default();
114    if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
115        return Ok(());
116    }
117    let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
118    append_block(path, &block)
119}
120
121fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
122    if let Some(t) = &opts.exporter_type {
123        return Ok(t.trim().to_lowercase());
124    }
125    if opts.non_interactive {
126        anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
127    }
128    print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
129    std::io::stdout().flush()?;
130    let mut line = String::new();
131    std::io::stdin().lock().read_line(&mut line)?;
132    Ok(line.trim().to_lowercase())
133}
134
135fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
136    let api_key = read_secret(
137        "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
138         create one at Org Settings > API Keys)",
139        opts.api_key.clone(),
140        "DD_API_KEY",
141        opts.non_interactive,
142    )?;
143    if let Some(rejected) = reject_obvious_app_key(&api_key) {
144        anyhow::bail!("{rejected}");
145    }
146    let site = read_value(
147        "Datadog site",
148        opts.site.clone(),
149        "DD_SITE",
150        Some("datadoghq.com".into()),
151        opts.non_interactive,
152    )?;
153    health_check_datadog(&api_key, &site).context(
154        "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
155    )?;
156    Ok(datadog_block(&api_key, &site))
157}
158
159fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
160    let key = read_secret(
161        "PostHog project API key (phc_...)",
162        opts.api_key.clone(),
163        "POSTHOG_API_KEY",
164        opts.non_interactive,
165    )?;
166    let host = read_value(
167        "PostHog host",
168        opts.host.clone(),
169        "POSTHOG_HOST",
170        Some("https://us.i.posthog.com".into()),
171        opts.non_interactive,
172    )?;
173    health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
174    Ok(format!(
175        "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
176        key.replace('\\', "\\\\").replace('"', "\\\""),
177        host.replace('\\', "\\\\").replace('"', "\\\""),
178    ))
179}
180
181fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
182    let endpoint = read_value(
183        "OTLP endpoint",
184        opts.endpoint.clone(),
185        "OTEL_EXPORTER_OTLP_ENDPOINT",
186        Some("http://127.0.0.1:4318".into()),
187        opts.non_interactive,
188    )?;
189    Ok(format!(
190        "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
191        endpoint.replace('\\', "\\\\").replace('"', "\\\""),
192    ))
193}
194
195/// Local sanity check: DD Application Keys start with `ddapp_`; sending one as `DD-API-KEY`
196/// always 403s. Catch the mistake before the network round-trip with a hint that names both
197/// key types so the user can tell them apart.
198pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
199    if value.starts_with("ddapp_") {
200        Some(
201            "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
202             (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
203        )
204    } else {
205        None
206    }
207}
208
209fn datadog_block(api_key: &str, site: &str) -> String {
210    format!(
211        "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
212        api_key.replace('\\', "\\\\").replace('"', "\\\""),
213        site.replace('\\', "\\\\").replace('"', "\\\""),
214    )
215}
216
217fn append_block(path: &Path, block: &str) -> Result<()> {
218    let mut f = std::fs::OpenOptions::new()
219        .create(true)
220        .append(true)
221        .open(path)?;
222    if f.metadata()?.len() > 0 {
223        f.write_all(b"\n")?;
224    }
225    f.write_all(block.as_bytes())?;
226    Ok(())
227}
228
229fn read_secret(
230    prompt: &str,
231    flag: Option<String>,
232    env_key: &str,
233    non_interactive: bool,
234) -> Result<String> {
235    if let Some(v) = flag.filter(|s| !s.is_empty()) {
236        return Ok(v);
237    }
238    if let Ok(v) = std::env::var(env_key)
239        && !v.is_empty()
240    {
241        return Ok(v);
242    }
243    if non_interactive {
244        anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
245    }
246    print!("{prompt}: ");
247    std::io::stdout().flush()?;
248    let mut line = String::new();
249    std::io::stdin().lock().read_line(&mut line)?;
250    let v = line.trim().to_string();
251    if v.is_empty() {
252        anyhow::bail!("{env_key} is required");
253    }
254    Ok(v)
255}
256
257fn read_value(
258    prompt: &str,
259    flag: Option<String>,
260    env_key: &str,
261    default: Option<String>,
262    non_interactive: bool,
263) -> Result<String> {
264    if let Some(v) = flag.filter(|s| !s.is_empty()) {
265        return Ok(v);
266    }
267    if let Ok(v) = std::env::var(env_key)
268        && !v.is_empty()
269    {
270        return Ok(v);
271    }
272    if non_interactive {
273        return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
274    }
275    let hint = default
276        .as_deref()
277        .map(|d| format!(" [{d}]"))
278        .unwrap_or_default();
279    print!("{prompt}{hint}: ");
280    std::io::stdout().flush()?;
281    let mut line = String::new();
282    std::io::stdin().lock().read_line(&mut line)?;
283    let v = line.trim().to_string();
284    if v.is_empty() {
285        return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
286    }
287    Ok(v)
288}
289
290fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
291    let r = DatadogResolved {
292        site: site.to_string(),
293        api_key: api_key.to_string(),
294        app_key: None,
295    };
296    #[cfg(feature = "telemetry-datadog")]
297    {
298        let c = crate::provider::datadog::DatadogQueryClient::new(&r);
299        c.health()
300    }
301    #[cfg(not(feature = "telemetry-datadog"))]
302    {
303        let _ = &r;
304        anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
305    }
306}
307
308fn health_check_posthog(host: &str) -> Result<()> {
309    let r = PostHogResolved {
310        host: host.to_string(),
311        project_api_key: String::new(),
312    };
313    #[cfg(feature = "telemetry-posthog")]
314    {
315        let c = crate::provider::posthog::PostHogQueryClient::new(&r);
316        c.health()
317    }
318    #[cfg(not(feature = "telemetry-posthog"))]
319    {
320        let _ = &r;
321        anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
322    }
323}
324
325fn file_exporter_block(path: Option<&Path>) -> String {
326    let mut block = String::from(
327        r#"
328[[telemetry.exporters]]
329type = "file"
330enabled = true
331"#,
332    );
333    if let Some(path) = path {
334        use std::fmt::Write as _;
335        let path = path
336            .to_string_lossy()
337            .replace('\\', "\\\\")
338            .replace('"', "\\\"");
339        writeln!(&mut block, "path = \"{path}\"").unwrap();
340    } else {
341        block.push_str(
342            "# path = \"telemetry.ndjson\"   # optional; default .kaizen/telemetry.ndjson under each workspace\n",
343        );
344    }
345    block
346}
347
348/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
349pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
350    let ws = workspace_path(workspace)?;
351    let cfg = config::load(&ws)?;
352    use std::fmt::Write;
353    let mut s = String::new();
354    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
355    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
356        match e {
357            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
358            ExporterConfig::File { enabled, path } => {
359                let p = path
360                    .as_deref()
361                    .map(|p| p.to_string())
362                    .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
363                writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
364            }
365            ExporterConfig::Dev { enabled } => {
366                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
367            }
368            ExporterConfig::PostHog { .. } => {
369                let line = if let Some(r) = PostHogResolved::from_config(e) {
370                    format!(
371                        "[{i}] type=posthog host={} key=<redacted len {}>",
372                        r.host,
373                        r.project_api_key.len()
374                    )
375                } else {
376                    format!(
377                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
378                    )
379                };
380                writeln!(&mut s, "{line}").unwrap();
381            }
382            ExporterConfig::Datadog { .. } => {
383                let line = if let Some(r) = DatadogResolved::from_config(e) {
384                    format!(
385                        "[{i}] type=datadog site={} key=<redacted len {}>",
386                        r.site,
387                        r.api_key.len()
388                    )
389                } else {
390                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
391                };
392                writeln!(&mut s, "{line}").unwrap();
393            }
394            ExporterConfig::Otlp { .. } => {
395                let line = if let Some(r) = OtlpResolved::from_config(e) {
396                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
397                } else {
398                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
399                };
400                writeln!(&mut s, "{line}").unwrap();
401            }
402        }
403    }
404    if cfg.telemetry.exporters.is_empty() {
405        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
406    }
407    Ok(s)
408}
409
410pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
411    print!("{}", print_effective_config_text(workspace)?);
412    Ok(())
413}
414
415/// Alias of [`cmd_telemetry_configure`].
416pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
417    cmd_telemetry_configure(workspace, options)
418}
419
420/// Resolve config, run provider `health` when available, show redacted exporter view.
421pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
422    let ws = workspace_path(workspace)?;
423    let cfg = config::load(&ws)?;
424    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
425    println!(
426        "telemetry.query.cache_ttl_seconds: {}",
427        cfg.telemetry.query.cache_ttl_seconds
428    );
429    match cfg.telemetry.query.provider {
430        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
431        crate::core::config::QueryAuthority::Posthog => {
432            println!("telemetry.query.provider: posthog");
433        }
434        crate::core::config::QueryAuthority::Datadog => {
435            println!("telemetry.query.provider: datadog");
436        }
437    }
438    if let Some(p) = provider_from_config(&cfg.telemetry) {
439        match p.health() {
440            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
441            Err(e) => eprintln!("provider health: {e}"),
442        }
443    } else {
444        println!("query provider: (not configured or features disabled; pull disabled)");
445    }
446    println!("\n{}", print_effective_config_text(Some(&ws))?);
447    println!("\nOTLP: export only — no query/pull in v1.");
448    Ok(())
449}
450
451/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
452pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
453    let ws = workspace_path(workspace)?;
454    let cfg = config::load(&ws)?;
455    let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
456        anyhow::anyhow!(
457            "no query provider resolved. Either:\n  \
458             1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
459             writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n  \
460             2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
461             ensure DD_API_KEY is reachable (TOML row or env)."
462        )
463    })?;
464    let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
465    let page = p.pull(PullWindow { days }, None)?;
466    if !cfg.sync.team_id.trim().is_empty()
467        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
468        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
469    {
470        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
471            Ok(n) if n > 0 => {
472                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
473            }
474            _ => {}
475        }
476    }
477    let now_ms = std::time::SystemTime::now()
478        .duration_since(std::time::UNIX_EPOCH)
479        .unwrap_or_default()
480        .as_millis() as i64;
481    let label = match cfg.telemetry.query.provider {
482        crate::core::config::QueryAuthority::None => "none",
483        crate::core::config::QueryAuthority::Posthog => "posthog",
484        crate::core::config::QueryAuthority::Datadog => "datadog",
485    };
486    store.set_pull_state(&RemotePullState {
487        query_provider: label.into(),
488        cursor_json: page.next_cursor.unwrap_or_default(),
489        last_success_ms: Some(now_ms),
490    })?;
491    println!("pull: received {} item(s) (page)", page.items.len());
492    Ok(())
493}
494
495/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
496pub fn cmd_telemetry_push(
497    workspace: Option<&Path>,
498    all_workspaces: bool,
499    days: u32,
500    dry_run: bool,
501) -> Result<()> {
502    let roots = scope::resolve(workspace, all_workspaces)?;
503    let primary = roots
504        .first()
505        .cloned()
506        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
507    let cfg = config::load(&primary)?;
508    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
509    let salt = effective_redaction_salt(&cfg.sync, &home).context(
510        "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
511    )?;
512    let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
513    if registry.is_empty() {
514        anyhow::bail!(
515            "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
516             needs no extra feature; PostHog/Datadog/OTLP need build features); see \
517             `kaizen telemetry print-effective-config`."
518        );
519    }
520    let fail_open = cfg.telemetry.fail_open;
521    let team_id = cfg.sync.team_id.clone();
522    let end_ms = std::time::SystemTime::now()
523        .duration_since(std::time::UNIX_EPOCH)
524        .unwrap_or_default()
525        .as_millis() as u64;
526    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
527
528    let mut total_events: u64 = 0;
529    let mut total_spans: u64 = 0;
530    let mut total_batches: u64 = 0;
531    let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
532    let mut total_stale: u64 = 0;
533
534    for root in &roots {
535        let store = Store::open(&crate::core::workspace::db_path(root)?)?;
536        let ws_key = root.to_string_lossy().to_string();
537        let wh = workspace_hash(&salt, root.as_path());
538
539        let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
540        let stale_events = event_rows
541            .iter()
542            .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
543            .count() as u64;
544        let outbound_events: Vec<_> = event_rows
545            .into_iter()
546            .map(|(session, ev)| {
547                let mut o = outbound_event_from_row(&ev, &session, &salt);
548                redact_payload(&mut o.payload, root.as_path(), &salt);
549                o
550            })
551            .collect();
552        let n_events = outbound_events.len() as u64;
553        let event_batches = chunk_events_into_ingest_batches(
554            team_id.clone(),
555            wh.clone(),
556            outbound_events,
557            &cfg.sync,
558        )?;
559
560        let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
561        let stale_spans = span_rows
562            .iter()
563            .filter(|r| {
564                r.started_at_ms
565                    .or(r.ended_at_ms)
566                    .map(|t| t < intake_warning_threshold_ms)
567                    .unwrap_or(false)
568            })
569            .count() as u64;
570        let outbound_spans: Vec<_> = span_rows
571            .iter()
572            .map(|r| outbound_tool_span(r, &salt))
573            .collect();
574        let n_spans = outbound_spans.len() as u64;
575        let span_batches =
576            chunk_tool_spans_into_ingest_batches(team_id.clone(), wh, outbound_spans, &cfg.sync)?;
577
578        let bcount = (event_batches.len() + span_batches.len()) as u64;
579        total_events += n_events;
580        total_spans += n_spans;
581        total_batches += bcount;
582        total_stale += stale_events + stale_spans;
583
584        if dry_run {
585            eprintln!(
586                "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
587                root.display(),
588                n_events,
589                n_spans,
590                bcount
591            );
592            continue;
593        }
594        for batch in event_batches.into_iter().chain(span_batches) {
595            registry
596                .fan_out(fail_open, &batch)
597                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
598        }
599        eprintln!(
600            "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
601            root.display(),
602            n_events,
603            n_spans,
604            bcount
605        );
606    }
607
608    eprintln!(
609        "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
610        total_events,
611        total_spans,
612        total_batches,
613        roots.len(),
614        if dry_run { " (dry-run)" } else { "" }
615    );
616    if total_stale > 0 {
617        eprintln!(
618            "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
619             drops these (organization default). PostHog/OTLP/file sinks accept them without \
620             change. Use `--days N` with N <= 1 to skip stale items.",
621            total_stale
622        );
623    }
624    Ok(())
625}
626
627/// Send one synthetic redacted event through every configured exporter, report ok/fail per
628/// sink. Pure observability: no SQLite read, no outbox enqueue, no Kaizen POST.
629pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
630    let ws = workspace_path(workspace)?;
631    let cfg = config::load(&ws)?;
632    let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
633    if registry.is_empty() {
634        anyhow::bail!(
635            "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
636        );
637    }
638    let batch = synthetic_batch(&cfg.sync.team_id);
639    println!("telemetry test: sending one synthetic event to each configured sink ...");
640    let mut all_ok = true;
641    for name in registry.exporter_names() {
642        match registry.export_one(&name, &batch) {
643            Ok(()) => println!("  [{name}] ok"),
644            Err(e) => {
645                all_ok = false;
646                println!("  [{name}] FAIL: {e:#}");
647            }
648        }
649    }
650    if !all_ok {
651        anyhow::bail!("one or more exporters failed (see above)");
652    }
653    println!("telemetry test: all exporters accepted the synthetic event.");
654    Ok(())
655}
656
657fn synthetic_batch(team_id: &str) -> IngestExportBatch {
658    let now_ms = std::time::SystemTime::now()
659        .duration_since(std::time::UNIX_EPOCH)
660        .map(|d| d.as_millis() as u64)
661        .unwrap_or(0);
662    IngestExportBatch::Events(EventsBatchBody {
663        team_id: team_id.to_string(),
664        workspace_hash: "blake3:test-workspace".into(),
665        events: vec![OutboundEvent {
666            session_id_hash: "blake3:test-session".into(),
667            event_seq: 0,
668            ts_ms: now_ms,
669            agent: "kaizen".into(),
670            model: "synthetic".into(),
671            kind: "lifecycle".into(),
672            source: "tail".into(),
673            tool: None,
674            tool_call_id: None,
675            tokens_in: Some(0),
676            tokens_out: Some(0),
677            reasoning_tokens: None,
678            cost_usd_e6: None,
679            payload: serde_json::json!({"kaizen.telemetry_test": true}),
680        }],
681    })
682}
683
684/// Example JSON for canonical per-item export names (ingest + third-party mappers).
685pub fn cmd_telemetry_print_schema() -> Result<()> {
686    let v = serde_json::json!({
687        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
688        "event_names": [
689            "kaizen.event",
690            "kaizen.tool_span",
691            "kaizen.repo_snapshot_chunk",
692            "kaizen.workspace_fact_snapshot"
693        ],
694        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
695    });
696    println!("{}", serde_json::to_string_pretty(&v)?);
697    Ok(())
698}
699
700#[cfg(test)]
701mod tests {
702    use super::*;
703
704    #[test]
705    fn exporter_already_present_detects_existing_datadog_row() {
706        let toml = r#"
707[[telemetry.exporters]]
708type = "datadog"
709api_key = "abc"
710site = "us5.datadoghq.com"
711"#;
712        assert!(exporter_already_present(toml, "datadog"));
713        assert!(!exporter_already_present(toml, "posthog"));
714    }
715
716    #[test]
717    fn exporter_already_present_handles_other_tables_between() {
718        let toml = r#"
719[[telemetry.exporters]]
720type = "file"
721enabled = true
722
723[telemetry.query]
724provider = "datadog"
725
726[[telemetry.exporters]]
727type = "datadog"
728api_key = "abc"
729"#;
730        assert!(exporter_already_present(toml, "file"));
731        assert!(exporter_already_present(toml, "datadog"));
732        assert!(!exporter_already_present(toml, "otlp"));
733    }
734
735    #[test]
736    fn reject_obvious_app_key_catches_ddapp_prefix() {
737        assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
738        assert!(reject_obvious_app_key("5bed85d67b7b0359bebeb40693537d0b").is_none());
739    }
740
741    #[test]
742    fn ensure_query_authority_appends_when_missing() {
743        let dir = tempfile::TempDir::new().unwrap();
744        let p = dir.path().join("config.toml");
745        std::fs::write(
746            &p,
747            "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
748        )
749        .unwrap();
750        ensure_query_authority(&p, "datadog").unwrap();
751        let s = std::fs::read_to_string(&p).unwrap();
752        assert!(s.contains("[telemetry.query]"));
753        assert!(s.contains("provider = \"datadog\""));
754    }
755
756    #[test]
757    fn ensure_query_authority_idempotent_when_present() {
758        let dir = tempfile::TempDir::new().unwrap();
759        let p = dir.path().join("config.toml");
760        let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
761        std::fs::write(&p, original).unwrap();
762        ensure_query_authority(&p, "datadog").unwrap();
763        let s = std::fs::read_to_string(&p).unwrap();
764        // User's existing posthog choice must NOT be overridden by the wizard.
765        assert_eq!(s, original);
766    }
767}