Skip to main content

kaizen/shell/
telemetry.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! `kaizen telemetry` subcommands: configure, print-effective, doctor, pull, push, schema.
3
4use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::core::project_identity::project_name;
7use crate::provider::{PullWindow, TelemetryQueryProvider, from_config as provider_from_config};
8use crate::shell::cli::workspace_path;
9use crate::shell::scope;
10use crate::store::Store;
11use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
12use crate::sync::IngestExportBatch;
13use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
14use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::outbound_tool_span;
17use crate::sync::workspace_hash;
18use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
19use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
20use anyhow::{Context, Result};
21use std::io::{BufRead, Write};
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, Clone, Default)]
25pub struct ConfigureOptions {
26    pub exporter_type: Option<String>,
27    pub path: Option<PathBuf>,
28    pub api_key: Option<String>,
29    pub site: Option<String>,
30    pub host: Option<String>,
31    pub endpoint: Option<String>,
32    pub non_interactive: bool,
33}
34
35/// Validating wizard: prompt for missing creds (or read from env / flags), `health`-check the
36/// resolved provider before touching `~/.kaizen/config.toml`, then append the exporter,
37/// idempotently set `[telemetry.query].provider` so `pull` works without extra config, and
38/// ensure a redaction salt exists. Failure to validate aborts with a clear error and writes
39/// nothing. Re-running for the same exporter type + key field is a no-op (no duplicate row).
40pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
41    let ws = workspace_path(workspace)?;
42    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
43    let cfg_path = home.join("config.toml");
44    std::fs::create_dir_all(&home)?;
45
46    println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
47    let t = resolve_exporter_type(&options)?;
48    if t.is_empty() {
49        println!("Aborted.");
50        return Ok(());
51    }
52
53    let block = match t.as_str() {
54        "file" => file_exporter_block(options.path.as_deref()),
55        "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
56        "datadog" => configure_datadog(&options)?,
57        "posthog" => configure_posthog(&options)?,
58        "otlp" => configure_otlp(&options)?,
59        _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
60    };
61
62    let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
63    if exporter_already_present(&existing, &t) {
64        println!(
65            "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
66             Edit the file directly to change credentials.",
67            cfg_path.display()
68        );
69    } else {
70        append_block(&cfg_path, &block)?;
71    }
72    ensure_query_authority(&cfg_path, &t)?;
73
74    let cfg = config::load(&ws)?;
75    let _ = effective_redaction_salt(&cfg.sync, &home).context(
76        "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
77    )?;
78    println!("Wrote {}.", cfg_path.display());
79    println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
80    Ok(())
81}
82
83/// True when the file already contains a `[[telemetry.exporters]]` row whose `type = "<t>"`.
84/// Cheap line scan rather than full TOML parse: keeps the wizard side-effect-free if a user
85/// hand-edited the file with comments/whitespace we cannot round-trip.
86pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
87    let mut in_exporter_block = false;
88    let needle = format!("type = \"{t}\"");
89    for line in toml_text.lines() {
90        let l = line.trim();
91        if l.starts_with("[[telemetry.exporters]]") {
92            in_exporter_block = true;
93            continue;
94        }
95        if l.starts_with('[') {
96            in_exporter_block = false;
97            continue;
98        }
99        if in_exporter_block && l == needle {
100            return true;
101        }
102    }
103    false
104}
105
106/// Append `[telemetry.query] provider = "<authority>"` only if the file has no `[telemetry.query]`
107/// table yet. Never overrides an existing user choice; only sets one for `posthog` / `datadog`.
108fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
109    let authority = match t {
110        "datadog" => "datadog",
111        "posthog" => "posthog",
112        _ => return Ok(()),
113    };
114    let existing = std::fs::read_to_string(path).unwrap_or_default();
115    if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
116        return Ok(());
117    }
118    let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
119    append_block(path, &block)
120}
121
122fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
123    if let Some(t) = &opts.exporter_type {
124        return Ok(t.trim().to_lowercase());
125    }
126    if opts.non_interactive {
127        anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
128    }
129    print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
130    std::io::stdout().flush()?;
131    let mut line = String::new();
132    std::io::stdin().lock().read_line(&mut line)?;
133    Ok(line.trim().to_lowercase())
134}
135
136fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
137    let api_key = read_secret(
138        "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
139         create one at Org Settings > API Keys)",
140        opts.api_key.clone(),
141        "DD_API_KEY",
142        opts.non_interactive,
143    )?;
144    if let Some(rejected) = reject_obvious_app_key(&api_key) {
145        anyhow::bail!("{rejected}");
146    }
147    let site = read_value(
148        "Datadog site",
149        opts.site.clone(),
150        "DD_SITE",
151        Some("datadoghq.com".into()),
152        opts.non_interactive,
153    )?;
154    health_check_datadog(&api_key, &site).context(
155        "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
156    )?;
157    Ok(datadog_block(&api_key, &site))
158}
159
160fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
161    let key = read_secret(
162        "PostHog project API key (phc_...)",
163        opts.api_key.clone(),
164        "POSTHOG_API_KEY",
165        opts.non_interactive,
166    )?;
167    let host = read_value(
168        "PostHog host",
169        opts.host.clone(),
170        "POSTHOG_HOST",
171        Some("https://us.i.posthog.com".into()),
172        opts.non_interactive,
173    )?;
174    health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
175    Ok(format!(
176        "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
177        key.replace('\\', "\\\\").replace('"', "\\\""),
178        host.replace('\\', "\\\\").replace('"', "\\\""),
179    ))
180}
181
182fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
183    let endpoint = read_value(
184        "OTLP endpoint",
185        opts.endpoint.clone(),
186        "OTEL_EXPORTER_OTLP_ENDPOINT",
187        Some("http://127.0.0.1:4318".into()),
188        opts.non_interactive,
189    )?;
190    Ok(format!(
191        "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
192        endpoint.replace('\\', "\\\\").replace('"', "\\\""),
193    ))
194}
195
196/// Local sanity check: DD Application Keys start with `ddapp_`; sending one as `DD-API-KEY`
197/// always 403s. Catch the mistake before the network round-trip with a hint that names both
198/// key types so the user can tell them apart.
199pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
200    if value.starts_with("ddapp_") {
201        Some(
202            "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
203             (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
204        )
205    } else {
206        None
207    }
208}
209
210fn datadog_block(api_key: &str, site: &str) -> String {
211    format!(
212        "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
213        api_key.replace('\\', "\\\\").replace('"', "\\\""),
214        site.replace('\\', "\\\\").replace('"', "\\\""),
215    )
216}
217
218fn append_block(path: &Path, block: &str) -> Result<()> {
219    let mut f = std::fs::OpenOptions::new()
220        .create(true)
221        .append(true)
222        .open(path)?;
223    if f.metadata()?.len() > 0 {
224        f.write_all(b"\n")?;
225    }
226    f.write_all(block.as_bytes())?;
227    Ok(())
228}
229
230fn read_secret(
231    prompt: &str,
232    flag: Option<String>,
233    env_key: &str,
234    non_interactive: bool,
235) -> Result<String> {
236    if let Some(v) = flag.filter(|s| !s.is_empty()) {
237        return Ok(v);
238    }
239    if let Ok(v) = std::env::var(env_key)
240        && !v.is_empty()
241    {
242        return Ok(v);
243    }
244    if non_interactive {
245        anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
246    }
247    print!("{prompt}: ");
248    std::io::stdout().flush()?;
249    let mut line = String::new();
250    std::io::stdin().lock().read_line(&mut line)?;
251    let v = line.trim().to_string();
252    if v.is_empty() {
253        anyhow::bail!("{env_key} is required");
254    }
255    Ok(v)
256}
257
258fn read_value(
259    prompt: &str,
260    flag: Option<String>,
261    env_key: &str,
262    default: Option<String>,
263    non_interactive: bool,
264) -> Result<String> {
265    if let Some(v) = flag.filter(|s| !s.is_empty()) {
266        return Ok(v);
267    }
268    if let Ok(v) = std::env::var(env_key)
269        && !v.is_empty()
270    {
271        return Ok(v);
272    }
273    if non_interactive {
274        return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
275    }
276    let hint = default
277        .as_deref()
278        .map(|d| format!(" [{d}]"))
279        .unwrap_or_default();
280    print!("{prompt}{hint}: ");
281    std::io::stdout().flush()?;
282    let mut line = String::new();
283    std::io::stdin().lock().read_line(&mut line)?;
284    let v = line.trim().to_string();
285    if v.is_empty() {
286        return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
287    }
288    Ok(v)
289}
290
291fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
292    let r = DatadogResolved {
293        site: site.to_string(),
294        api_key: api_key.to_string(),
295        app_key: None,
296    };
297    #[cfg(feature = "telemetry-datadog")]
298    {
299        let c = crate::provider::datadog::DatadogQueryClient::new(&r);
300        c.health()
301    }
302    #[cfg(not(feature = "telemetry-datadog"))]
303    {
304        let _ = &r;
305        anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
306    }
307}
308
309fn health_check_posthog(host: &str) -> Result<()> {
310    let r = PostHogResolved {
311        host: host.to_string(),
312        project_api_key: String::new(),
313    };
314    #[cfg(feature = "telemetry-posthog")]
315    {
316        let c = crate::provider::posthog::PostHogQueryClient::new(&r);
317        c.health()
318    }
319    #[cfg(not(feature = "telemetry-posthog"))]
320    {
321        let _ = &r;
322        anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
323    }
324}
325
326fn file_exporter_block(path: Option<&Path>) -> String {
327    let mut block = String::from(
328        r#"
329[[telemetry.exporters]]
330type = "file"
331enabled = true
332"#,
333    );
334    if let Some(path) = path {
335        use std::fmt::Write as _;
336        let path = path
337            .to_string_lossy()
338            .replace('\\', "\\\\")
339            .replace('"', "\\\"");
340        writeln!(&mut block, "path = \"{path}\"").unwrap();
341    } else {
342        block.push_str(
343            "# path = \"telemetry.ndjson\"   # optional; default .kaizen/telemetry.ndjson under each workspace\n",
344        );
345    }
346    block
347}
348
349/// Redacted: show which env/Toml fields are visible for `telemetry` sinks.
350pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
351    let ws = workspace_path(workspace)?;
352    let cfg = config::load(&ws)?;
353    use std::fmt::Write;
354    let mut s = String::new();
355    writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
356    for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
357        match e {
358            ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
359            ExporterConfig::File { enabled, path } => {
360                let p = path
361                    .as_deref()
362                    .map(|p| p.to_string())
363                    .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
364                writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
365            }
366            ExporterConfig::Dev { enabled } => {
367                writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
368            }
369            ExporterConfig::PostHog { .. } => {
370                let line = if let Some(r) = PostHogResolved::from_config(e) {
371                    format!(
372                        "[{i}] type=posthog host={} key=<redacted len {}>",
373                        r.host,
374                        r.project_api_key.len()
375                    )
376                } else {
377                    format!(
378                        "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
379                    )
380                };
381                writeln!(&mut s, "{line}").unwrap();
382            }
383            ExporterConfig::Datadog { .. } => {
384                let line = if let Some(r) = DatadogResolved::from_config(e) {
385                    format!(
386                        "[{i}] type=datadog site={} key=<redacted len {}>",
387                        r.site,
388                        r.api_key.len()
389                    )
390                } else {
391                    format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
392                };
393                writeln!(&mut s, "{line}").unwrap();
394            }
395            ExporterConfig::Otlp { .. } => {
396                let line = if let Some(r) = OtlpResolved::from_config(e) {
397                    format!("[{i}] type=otlp endpoint={}", r.endpoint)
398                } else {
399                    format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
400                };
401                writeln!(&mut s, "{line}").unwrap();
402            }
403        }
404    }
405    if cfg.telemetry.exporters.is_empty() {
406        writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
407    }
408    Ok(s)
409}
410
411pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
412    print!("{}", print_effective_config_text(workspace)?);
413    Ok(())
414}
415
416/// Alias of [`cmd_telemetry_configure`].
417pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
418    cmd_telemetry_configure(workspace, options)
419}
420
421/// Resolve config, run provider `health` when available, show redacted exporter view.
422pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
423    let ws = workspace_path(workspace)?;
424    let cfg = config::load(&ws)?;
425    println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
426    println!(
427        "telemetry.query.cache_ttl_seconds: {}",
428        cfg.telemetry.query.cache_ttl_seconds
429    );
430    match cfg.telemetry.query.provider {
431        crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
432        crate::core::config::QueryAuthority::Posthog => {
433            println!("telemetry.query.provider: posthog");
434        }
435        crate::core::config::QueryAuthority::Datadog => {
436            println!("telemetry.query.provider: datadog");
437        }
438    }
439    if let Some(p) = provider_from_config(&cfg.telemetry) {
440        match p.health() {
441            Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
442            Err(e) => eprintln!("provider health: {e}"),
443        }
444    } else {
445        println!("query provider: (not configured or features disabled; pull disabled)");
446    }
447    println!("\n{}", print_effective_config_text(Some(&ws))?);
448    println!("\nOTLP: export only — no query/pull in v1.");
449    Ok(())
450}
451
452/// Run one page of `pull` and refresh `remote_pull_state` (payload import when APIs are wired).
453pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
454    let ws = workspace_path(workspace)?;
455    let cfg = config::load(&ws)?;
456    let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
457        anyhow::anyhow!(
458            "no query provider resolved. Either:\n  \
459             1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
460             writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n  \
461             2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
462             ensure DD_API_KEY is reachable (TOML row or env)."
463        )
464    })?;
465    let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
466    let page = p.pull(PullWindow { days }, None)?;
467    if !cfg.sync.team_id.trim().is_empty()
468        && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
469        && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
470    {
471        match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
472            Ok(n) if n > 0 => {
473                tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
474            }
475            _ => {}
476        }
477    }
478    let now_ms = std::time::SystemTime::now()
479        .duration_since(std::time::UNIX_EPOCH)
480        .unwrap_or_default()
481        .as_millis() as i64;
482    let label = match cfg.telemetry.query.provider {
483        crate::core::config::QueryAuthority::None => "none",
484        crate::core::config::QueryAuthority::Posthog => "posthog",
485        crate::core::config::QueryAuthority::Datadog => "datadog",
486    };
487    store.set_pull_state(&RemotePullState {
488        query_provider: label.into(),
489        cursor_json: page.next_cursor.unwrap_or_default(),
490        last_success_ms: Some(now_ms),
491    })?;
492    println!("pull: received {} item(s) (page)", page.items.len());
493    Ok(())
494}
495
496/// Replay stored events in a trailing window through configured telemetry exporters (no Kaizen POST).
497pub fn cmd_telemetry_push(
498    workspace: Option<&Path>,
499    all_workspaces: bool,
500    days: u32,
501    dry_run: bool,
502) -> Result<()> {
503    let roots = scope::resolve(workspace, all_workspaces)?;
504    let primary = roots
505        .first()
506        .cloned()
507        .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
508    let cfg = config::load(&primary)?;
509    let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
510    let salt = effective_redaction_salt(&cfg.sync, &home).context(
511        "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
512    )?;
513    let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
514    if registry.is_empty() {
515        anyhow::bail!(
516            "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
517             needs no extra feature; PostHog/Datadog/OTLP need build features); see \
518             `kaizen telemetry print-effective-config`."
519        );
520    }
521    let fail_open = cfg.telemetry.fail_open;
522    let team_id = cfg.sync.team_id.clone();
523    let end_ms = std::time::SystemTime::now()
524        .duration_since(std::time::UNIX_EPOCH)
525        .unwrap_or_default()
526        .as_millis() as u64;
527    let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
528
529    let mut total_events: u64 = 0;
530    let mut total_spans: u64 = 0;
531    let mut total_batches: u64 = 0;
532    let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
533    let mut total_stale: u64 = 0;
534
535    for root in &roots {
536        let store = Store::open(&crate::core::workspace::db_path(root)?)?;
537        let ws_key = root.to_string_lossy().to_string();
538        let wh = workspace_hash(&salt, root.as_path());
539        let project = project_name(root.as_path());
540
541        let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
542        let stale_events = event_rows
543            .iter()
544            .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
545            .count() as u64;
546        let outbound_events: Vec<_> = event_rows
547            .into_iter()
548            .map(|(session, ev)| {
549                let mut o = outbound_event_from_row(&ev, &session, &salt);
550                redact_payload(&mut o.payload, root.as_path(), &salt);
551                o
552            })
553            .collect();
554        let n_events = outbound_events.len() as u64;
555        let event_batches = chunk_events_into_ingest_batches(
556            team_id.clone(),
557            wh.clone(),
558            project.clone(),
559            outbound_events,
560            &cfg.sync,
561        )?;
562
563        let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
564        let stale_spans = span_rows
565            .iter()
566            .filter(|r| {
567                r.started_at_ms
568                    .or(r.ended_at_ms)
569                    .map(|t| t < intake_warning_threshold_ms)
570                    .unwrap_or(false)
571            })
572            .count() as u64;
573        let outbound_spans: Vec<_> = span_rows
574            .iter()
575            .map(|r| outbound_tool_span(r, &salt))
576            .collect();
577        let n_spans = outbound_spans.len() as u64;
578        let span_batches = chunk_tool_spans_into_ingest_batches(
579            team_id.clone(),
580            wh,
581            project,
582            outbound_spans,
583            &cfg.sync,
584        )?;
585
586        let bcount = (event_batches.len() + span_batches.len()) as u64;
587        total_events += n_events;
588        total_spans += n_spans;
589        total_batches += bcount;
590        total_stale += stale_events + stale_spans;
591
592        if dry_run {
593            eprintln!(
594                "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
595                root.display(),
596                n_events,
597                n_spans,
598                bcount
599            );
600            continue;
601        }
602        for batch in event_batches.into_iter().chain(span_batches) {
603            registry
604                .fan_out(fail_open, &batch)
605                .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
606        }
607        eprintln!(
608            "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
609            root.display(),
610            n_events,
611            n_spans,
612            bcount
613        );
614    }
615
616    eprintln!(
617        "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
618        total_events,
619        total_spans,
620        total_batches,
621        roots.len(),
622        if dry_run { " (dry-run)" } else { "" }
623    );
624    if total_stale > 0 {
625        eprintln!(
626            "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
627             drops these (organization default). PostHog/OTLP/file sinks accept them without \
628             change. Use `--days N` with N <= 1 to skip stale items.",
629            total_stale
630        );
631    }
632    Ok(())
633}
634
635/// Send one synthetic redacted event through every configured exporter, report ok/fail per
636/// sink. Pure observability: no SQLite read, no outbox enqueue, no Kaizen POST.
637pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
638    let ws = workspace_path(workspace)?;
639    let cfg = config::load(&ws)?;
640    let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
641    if registry.is_empty() {
642        anyhow::bail!(
643            "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
644        );
645    }
646    let batch = synthetic_batch(&cfg.sync.team_id);
647    println!("telemetry test: sending one synthetic event to each configured sink ...");
648    let mut all_ok = true;
649    for name in registry.exporter_names() {
650        match registry.export_one(&name, &batch) {
651            Ok(()) => println!("  [{name}] ok"),
652            Err(e) => {
653                all_ok = false;
654                println!("  [{name}] FAIL: {e:#}");
655            }
656        }
657    }
658    if !all_ok {
659        anyhow::bail!("one or more exporters failed (see above)");
660    }
661    println!("telemetry test: all exporters accepted the synthetic event.");
662    Ok(())
663}
664
665fn synthetic_batch(team_id: &str) -> IngestExportBatch {
666    let now_ms = std::time::SystemTime::now()
667        .duration_since(std::time::UNIX_EPOCH)
668        .map(|d| d.as_millis() as u64)
669        .unwrap_or(0);
670    IngestExportBatch::Events(EventsBatchBody {
671        team_id: team_id.to_string(),
672        workspace_hash: "blake3:test-workspace".into(),
673        project_name: Some("telemetry-test".into()),
674        events: vec![OutboundEvent {
675            session_id_hash: "blake3:test-session".into(),
676            event_seq: 0,
677            ts_ms: now_ms,
678            agent: "kaizen".into(),
679            model: "synthetic".into(),
680            kind: "lifecycle".into(),
681            source: "tail".into(),
682            tool: None,
683            tool_call_id: None,
684            tokens_in: Some(0),
685            tokens_out: Some(0),
686            reasoning_tokens: None,
687            cost_usd_e6: None,
688            payload: serde_json::json!({"kaizen.telemetry_test": true}),
689        }],
690    })
691}
692
693/// Example JSON for canonical per-item export names (ingest + third-party mappers).
694pub fn cmd_telemetry_print_schema() -> Result<()> {
695    let v = serde_json::json!({
696        "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
697        "event_names": [
698            "kaizen.event",
699            "kaizen.tool_span",
700            "kaizen.repo_snapshot_chunk",
701            "kaizen.workspace_fact_snapshot"
702        ],
703        "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
704    });
705    println!("{}", serde_json::to_string_pretty(&v)?);
706    Ok(())
707}
708
709#[cfg(test)]
710mod tests {
711    use super::*;
712
713    #[test]
714    fn exporter_already_present_detects_existing_datadog_row() {
715        let toml = r#"
716[[telemetry.exporters]]
717type = "datadog"
718api_key = "abc"
719site = "us5.datadoghq.com"
720"#;
721        assert!(exporter_already_present(toml, "datadog"));
722        assert!(!exporter_already_present(toml, "posthog"));
723    }
724
725    #[test]
726    fn exporter_already_present_handles_other_tables_between() {
727        let toml = r#"
728[[telemetry.exporters]]
729type = "file"
730enabled = true
731
732[telemetry.query]
733provider = "datadog"
734
735[[telemetry.exporters]]
736type = "datadog"
737api_key = "abc"
738"#;
739        assert!(exporter_already_present(toml, "file"));
740        assert!(exporter_already_present(toml, "datadog"));
741        assert!(!exporter_already_present(toml, "otlp"));
742    }
743
744    #[test]
745    fn reject_obvious_app_key_catches_ddapp_prefix() {
746        assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
747        assert!(reject_obvious_app_key("5bed85d67b7b0359bebeb40693537d0b").is_none());
748    }
749
750    #[test]
751    fn ensure_query_authority_appends_when_missing() {
752        let dir = tempfile::TempDir::new().unwrap();
753        let p = dir.path().join("config.toml");
754        std::fs::write(
755            &p,
756            "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
757        )
758        .unwrap();
759        ensure_query_authority(&p, "datadog").unwrap();
760        let s = std::fs::read_to_string(&p).unwrap();
761        assert!(s.contains("[telemetry.query]"));
762        assert!(s.contains("provider = \"datadog\""));
763    }
764
765    #[test]
766    fn ensure_query_authority_idempotent_when_present() {
767        let dir = tempfile::TempDir::new().unwrap();
768        let p = dir.path().join("config.toml");
769        let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
770        std::fs::write(&p, original).unwrap();
771        ensure_query_authority(&p, "datadog").unwrap();
772        let s = std::fs::read_to_string(&p).unwrap();
773        // User's existing posthog choice must NOT be overridden by the wizard.
774        assert_eq!(s, original);
775    }
776}