1use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::provider::{PullWindow, TelemetryQueryProvider, from_config as provider_from_config};
7use crate::shell::cli::workspace_path;
8use crate::shell::scope;
9use crate::store::Store;
10use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
11use crate::sync::IngestExportBatch;
12use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
13use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
14use crate::sync::redact::redact_payload;
15use crate::sync::smart::outbound_tool_span;
16use crate::sync::workspace_hash;
17use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
18use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
19use anyhow::{Context, Result};
20use std::io::{BufRead, Write};
21use std::path::{Path, PathBuf};
22
23#[derive(Debug, Clone, Default)]
24pub struct ConfigureOptions {
25 pub exporter_type: Option<String>,
26 pub path: Option<PathBuf>,
27 pub api_key: Option<String>,
28 pub site: Option<String>,
29 pub host: Option<String>,
30 pub endpoint: Option<String>,
31 pub non_interactive: bool,
32}
33
34pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
40 let ws = workspace_path(workspace)?;
41 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
42 let cfg_path = home.join("config.toml");
43 std::fs::create_dir_all(&home)?;
44
45 println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
46 let t = resolve_exporter_type(&options)?;
47 if t.is_empty() {
48 println!("Aborted.");
49 return Ok(());
50 }
51
52 let block = match t.as_str() {
53 "file" => file_exporter_block(options.path.as_deref()),
54 "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
55 "datadog" => configure_datadog(&options)?,
56 "posthog" => configure_posthog(&options)?,
57 "otlp" => configure_otlp(&options)?,
58 _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
59 };
60
61 let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
62 if exporter_already_present(&existing, &t) {
63 println!(
64 "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
65 Edit the file directly to change credentials.",
66 cfg_path.display()
67 );
68 } else {
69 append_block(&cfg_path, &block)?;
70 }
71 ensure_query_authority(&cfg_path, &t)?;
72
73 let cfg = config::load(&ws)?;
74 let _ = effective_redaction_salt(&cfg.sync, &home).context(
75 "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
76 )?;
77 println!("Wrote {}.", cfg_path.display());
78 println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
79 Ok(())
80}
81
82pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
86 let mut in_exporter_block = false;
87 let needle = format!("type = \"{t}\"");
88 for line in toml_text.lines() {
89 let l = line.trim();
90 if l.starts_with("[[telemetry.exporters]]") {
91 in_exporter_block = true;
92 continue;
93 }
94 if l.starts_with('[') {
95 in_exporter_block = false;
96 continue;
97 }
98 if in_exporter_block && l == needle {
99 return true;
100 }
101 }
102 false
103}
104
105fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
108 let authority = match t {
109 "datadog" => "datadog",
110 "posthog" => "posthog",
111 _ => return Ok(()),
112 };
113 let existing = std::fs::read_to_string(path).unwrap_or_default();
114 if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
115 return Ok(());
116 }
117 let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
118 append_block(path, &block)
119}
120
121fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
122 if let Some(t) = &opts.exporter_type {
123 return Ok(t.trim().to_lowercase());
124 }
125 if opts.non_interactive {
126 anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
127 }
128 print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
129 std::io::stdout().flush()?;
130 let mut line = String::new();
131 std::io::stdin().lock().read_line(&mut line)?;
132 Ok(line.trim().to_lowercase())
133}
134
135fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
136 let api_key = read_secret(
137 "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
138 create one at Org Settings > API Keys)",
139 opts.api_key.clone(),
140 "DD_API_KEY",
141 opts.non_interactive,
142 )?;
143 if let Some(rejected) = reject_obvious_app_key(&api_key) {
144 anyhow::bail!("{rejected}");
145 }
146 let site = read_value(
147 "Datadog site",
148 opts.site.clone(),
149 "DD_SITE",
150 Some("datadoghq.com".into()),
151 opts.non_interactive,
152 )?;
153 health_check_datadog(&api_key, &site).context(
154 "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
155 )?;
156 Ok(datadog_block(&api_key, &site))
157}
158
159fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
160 let key = read_secret(
161 "PostHog project API key (phc_...)",
162 opts.api_key.clone(),
163 "POSTHOG_API_KEY",
164 opts.non_interactive,
165 )?;
166 let host = read_value(
167 "PostHog host",
168 opts.host.clone(),
169 "POSTHOG_HOST",
170 Some("https://us.i.posthog.com".into()),
171 opts.non_interactive,
172 )?;
173 health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
174 Ok(format!(
175 "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
176 key.replace('\\', "\\\\").replace('"', "\\\""),
177 host.replace('\\', "\\\\").replace('"', "\\\""),
178 ))
179}
180
181fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
182 let endpoint = read_value(
183 "OTLP endpoint",
184 opts.endpoint.clone(),
185 "OTEL_EXPORTER_OTLP_ENDPOINT",
186 Some("http://127.0.0.1:4318".into()),
187 opts.non_interactive,
188 )?;
189 Ok(format!(
190 "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
191 endpoint.replace('\\', "\\\\").replace('"', "\\\""),
192 ))
193}
194
195pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
199 if value.starts_with("ddapp_") {
200 Some(
201 "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
202 (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
203 )
204 } else {
205 None
206 }
207}
208
209fn datadog_block(api_key: &str, site: &str) -> String {
210 format!(
211 "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
212 api_key.replace('\\', "\\\\").replace('"', "\\\""),
213 site.replace('\\', "\\\\").replace('"', "\\\""),
214 )
215}
216
217fn append_block(path: &Path, block: &str) -> Result<()> {
218 let mut f = std::fs::OpenOptions::new()
219 .create(true)
220 .append(true)
221 .open(path)?;
222 if f.metadata()?.len() > 0 {
223 f.write_all(b"\n")?;
224 }
225 f.write_all(block.as_bytes())?;
226 Ok(())
227}
228
229fn read_secret(
230 prompt: &str,
231 flag: Option<String>,
232 env_key: &str,
233 non_interactive: bool,
234) -> Result<String> {
235 if let Some(v) = flag.filter(|s| !s.is_empty()) {
236 return Ok(v);
237 }
238 if let Ok(v) = std::env::var(env_key)
239 && !v.is_empty()
240 {
241 return Ok(v);
242 }
243 if non_interactive {
244 anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
245 }
246 print!("{prompt}: ");
247 std::io::stdout().flush()?;
248 let mut line = String::new();
249 std::io::stdin().lock().read_line(&mut line)?;
250 let v = line.trim().to_string();
251 if v.is_empty() {
252 anyhow::bail!("{env_key} is required");
253 }
254 Ok(v)
255}
256
257fn read_value(
258 prompt: &str,
259 flag: Option<String>,
260 env_key: &str,
261 default: Option<String>,
262 non_interactive: bool,
263) -> Result<String> {
264 if let Some(v) = flag.filter(|s| !s.is_empty()) {
265 return Ok(v);
266 }
267 if let Ok(v) = std::env::var(env_key)
268 && !v.is_empty()
269 {
270 return Ok(v);
271 }
272 if non_interactive {
273 return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
274 }
275 let hint = default
276 .as_deref()
277 .map(|d| format!(" [{d}]"))
278 .unwrap_or_default();
279 print!("{prompt}{hint}: ");
280 std::io::stdout().flush()?;
281 let mut line = String::new();
282 std::io::stdin().lock().read_line(&mut line)?;
283 let v = line.trim().to_string();
284 if v.is_empty() {
285 return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
286 }
287 Ok(v)
288}
289
290fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
291 let r = DatadogResolved {
292 site: site.to_string(),
293 api_key: api_key.to_string(),
294 app_key: None,
295 };
296 #[cfg(feature = "telemetry-datadog")]
297 {
298 let c = crate::provider::datadog::DatadogQueryClient::new(&r);
299 c.health()
300 }
301 #[cfg(not(feature = "telemetry-datadog"))]
302 {
303 let _ = &r;
304 anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
305 }
306}
307
308fn health_check_posthog(host: &str) -> Result<()> {
309 let r = PostHogResolved {
310 host: host.to_string(),
311 project_api_key: String::new(),
312 };
313 #[cfg(feature = "telemetry-posthog")]
314 {
315 let c = crate::provider::posthog::PostHogQueryClient::new(&r);
316 c.health()
317 }
318 #[cfg(not(feature = "telemetry-posthog"))]
319 {
320 let _ = &r;
321 anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
322 }
323}
324
325fn file_exporter_block(path: Option<&Path>) -> String {
326 let mut block = String::from(
327 r#"
328[[telemetry.exporters]]
329type = "file"
330enabled = true
331"#,
332 );
333 if let Some(path) = path {
334 use std::fmt::Write as _;
335 let path = path
336 .to_string_lossy()
337 .replace('\\', "\\\\")
338 .replace('"', "\\\"");
339 writeln!(&mut block, "path = \"{path}\"").unwrap();
340 } else {
341 block.push_str(
342 "# path = \"telemetry.ndjson\" # optional; default .kaizen/telemetry.ndjson under each workspace\n",
343 );
344 }
345 block
346}
347
348pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
350 let ws = workspace_path(workspace)?;
351 let cfg = config::load(&ws)?;
352 use std::fmt::Write;
353 let mut s = String::new();
354 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
355 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
356 match e {
357 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
358 ExporterConfig::File { enabled, path } => {
359 let p = path
360 .as_deref()
361 .map(|p| p.to_string())
362 .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
363 writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
364 }
365 ExporterConfig::Dev { enabled } => {
366 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
367 }
368 ExporterConfig::PostHog { .. } => {
369 let line = if let Some(r) = PostHogResolved::from_config(e) {
370 format!(
371 "[{i}] type=posthog host={} key=<redacted len {}>",
372 r.host,
373 r.project_api_key.len()
374 )
375 } else {
376 format!(
377 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
378 )
379 };
380 writeln!(&mut s, "{line}").unwrap();
381 }
382 ExporterConfig::Datadog { .. } => {
383 let line = if let Some(r) = DatadogResolved::from_config(e) {
384 format!(
385 "[{i}] type=datadog site={} key=<redacted len {}>",
386 r.site,
387 r.api_key.len()
388 )
389 } else {
390 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
391 };
392 writeln!(&mut s, "{line}").unwrap();
393 }
394 ExporterConfig::Otlp { .. } => {
395 let line = if let Some(r) = OtlpResolved::from_config(e) {
396 format!("[{i}] type=otlp endpoint={}", r.endpoint)
397 } else {
398 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
399 };
400 writeln!(&mut s, "{line}").unwrap();
401 }
402 }
403 }
404 if cfg.telemetry.exporters.is_empty() {
405 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
406 }
407 Ok(s)
408}
409
410pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
411 print!("{}", print_effective_config_text(workspace)?);
412 Ok(())
413}
414
415pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
417 cmd_telemetry_configure(workspace, options)
418}
419
420pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
422 let ws = workspace_path(workspace)?;
423 let cfg = config::load(&ws)?;
424 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
425 println!(
426 "telemetry.query.cache_ttl_seconds: {}",
427 cfg.telemetry.query.cache_ttl_seconds
428 );
429 match cfg.telemetry.query.provider {
430 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
431 crate::core::config::QueryAuthority::Posthog => {
432 println!("telemetry.query.provider: posthog");
433 }
434 crate::core::config::QueryAuthority::Datadog => {
435 println!("telemetry.query.provider: datadog");
436 }
437 }
438 if let Some(p) = provider_from_config(&cfg.telemetry) {
439 match p.health() {
440 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
441 Err(e) => eprintln!("provider health: {e}"),
442 }
443 } else {
444 println!("query provider: (not configured or features disabled; pull disabled)");
445 }
446 println!("\n{}", print_effective_config_text(Some(&ws))?);
447 println!("\nOTLP: export only — no query/pull in v1.");
448 Ok(())
449}
450
451pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
453 let ws = workspace_path(workspace)?;
454 let cfg = config::load(&ws)?;
455 let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
456 anyhow::anyhow!(
457 "no query provider resolved. Either:\n \
458 1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
459 writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n \
460 2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
461 ensure DD_API_KEY is reachable (TOML row or env)."
462 )
463 })?;
464 let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
465 let page = p.pull(PullWindow { days }, None)?;
466 if !cfg.sync.team_id.trim().is_empty()
467 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
468 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
469 {
470 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
471 Ok(n) if n > 0 => {
472 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
473 }
474 _ => {}
475 }
476 }
477 let now_ms = std::time::SystemTime::now()
478 .duration_since(std::time::UNIX_EPOCH)
479 .unwrap_or_default()
480 .as_millis() as i64;
481 let label = match cfg.telemetry.query.provider {
482 crate::core::config::QueryAuthority::None => "none",
483 crate::core::config::QueryAuthority::Posthog => "posthog",
484 crate::core::config::QueryAuthority::Datadog => "datadog",
485 };
486 store.set_pull_state(&RemotePullState {
487 query_provider: label.into(),
488 cursor_json: page.next_cursor.unwrap_or_default(),
489 last_success_ms: Some(now_ms),
490 })?;
491 println!("pull: received {} item(s) (page)", page.items.len());
492 Ok(())
493}
494
495pub fn cmd_telemetry_push(
497 workspace: Option<&Path>,
498 all_workspaces: bool,
499 days: u32,
500 dry_run: bool,
501) -> Result<()> {
502 let roots = scope::resolve(workspace, all_workspaces)?;
503 let primary = roots
504 .first()
505 .cloned()
506 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
507 let cfg = config::load(&primary)?;
508 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
509 let salt = effective_redaction_salt(&cfg.sync, &home).context(
510 "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
511 )?;
512 let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
513 if registry.is_empty() {
514 anyhow::bail!(
515 "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
516 needs no extra feature; PostHog/Datadog/OTLP need build features); see \
517 `kaizen telemetry print-effective-config`."
518 );
519 }
520 let fail_open = cfg.telemetry.fail_open;
521 let team_id = cfg.sync.team_id.clone();
522 let end_ms = std::time::SystemTime::now()
523 .duration_since(std::time::UNIX_EPOCH)
524 .unwrap_or_default()
525 .as_millis() as u64;
526 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
527
528 let mut total_events: u64 = 0;
529 let mut total_spans: u64 = 0;
530 let mut total_batches: u64 = 0;
531 let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
532 let mut total_stale: u64 = 0;
533
534 for root in &roots {
535 let store = Store::open(&crate::core::workspace::db_path(root)?)?;
536 let ws_key = root.to_string_lossy().to_string();
537 let wh = workspace_hash(&salt, root.as_path());
538
539 let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
540 let stale_events = event_rows
541 .iter()
542 .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
543 .count() as u64;
544 let outbound_events: Vec<_> = event_rows
545 .into_iter()
546 .map(|(session, ev)| {
547 let mut o = outbound_event_from_row(&ev, &session, &salt);
548 redact_payload(&mut o.payload, root.as_path(), &salt);
549 o
550 })
551 .collect();
552 let n_events = outbound_events.len() as u64;
553 let event_batches = chunk_events_into_ingest_batches(
554 team_id.clone(),
555 wh.clone(),
556 outbound_events,
557 &cfg.sync,
558 )?;
559
560 let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
561 let stale_spans = span_rows
562 .iter()
563 .filter(|r| {
564 r.started_at_ms
565 .or(r.ended_at_ms)
566 .map(|t| t < intake_warning_threshold_ms)
567 .unwrap_or(false)
568 })
569 .count() as u64;
570 let outbound_spans: Vec<_> = span_rows
571 .iter()
572 .map(|r| outbound_tool_span(r, &salt))
573 .collect();
574 let n_spans = outbound_spans.len() as u64;
575 let span_batches =
576 chunk_tool_spans_into_ingest_batches(team_id.clone(), wh, outbound_spans, &cfg.sync)?;
577
578 let bcount = (event_batches.len() + span_batches.len()) as u64;
579 total_events += n_events;
580 total_spans += n_spans;
581 total_batches += bcount;
582 total_stale += stale_events + stale_spans;
583
584 if dry_run {
585 eprintln!(
586 "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
587 root.display(),
588 n_events,
589 n_spans,
590 bcount
591 );
592 continue;
593 }
594 for batch in event_batches.into_iter().chain(span_batches) {
595 registry
596 .fan_out(fail_open, &batch)
597 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
598 }
599 eprintln!(
600 "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
601 root.display(),
602 n_events,
603 n_spans,
604 bcount
605 );
606 }
607
608 eprintln!(
609 "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
610 total_events,
611 total_spans,
612 total_batches,
613 roots.len(),
614 if dry_run { " (dry-run)" } else { "" }
615 );
616 if total_stale > 0 {
617 eprintln!(
618 "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
619 drops these (organization default). PostHog/OTLP/file sinks accept them without \
620 change. Use `--days N` with N <= 1 to skip stale items.",
621 total_stale
622 );
623 }
624 Ok(())
625}
626
627pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
630 let ws = workspace_path(workspace)?;
631 let cfg = config::load(&ws)?;
632 let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
633 if registry.is_empty() {
634 anyhow::bail!(
635 "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
636 );
637 }
638 let batch = synthetic_batch(&cfg.sync.team_id);
639 println!("telemetry test: sending one synthetic event to each configured sink ...");
640 let mut all_ok = true;
641 for name in registry.exporter_names() {
642 match registry.export_one(&name, &batch) {
643 Ok(()) => println!(" [{name}] ok"),
644 Err(e) => {
645 all_ok = false;
646 println!(" [{name}] FAIL: {e:#}");
647 }
648 }
649 }
650 if !all_ok {
651 anyhow::bail!("one or more exporters failed (see above)");
652 }
653 println!("telemetry test: all exporters accepted the synthetic event.");
654 Ok(())
655}
656
657fn synthetic_batch(team_id: &str) -> IngestExportBatch {
658 let now_ms = std::time::SystemTime::now()
659 .duration_since(std::time::UNIX_EPOCH)
660 .map(|d| d.as_millis() as u64)
661 .unwrap_or(0);
662 IngestExportBatch::Events(EventsBatchBody {
663 team_id: team_id.to_string(),
664 workspace_hash: "blake3:test-workspace".into(),
665 events: vec![OutboundEvent {
666 session_id_hash: "blake3:test-session".into(),
667 event_seq: 0,
668 ts_ms: now_ms,
669 agent: "kaizen".into(),
670 model: "synthetic".into(),
671 kind: "lifecycle".into(),
672 source: "tail".into(),
673 tool: None,
674 tool_call_id: None,
675 tokens_in: Some(0),
676 tokens_out: Some(0),
677 reasoning_tokens: None,
678 cost_usd_e6: None,
679 payload: serde_json::json!({"kaizen.telemetry_test": true}),
680 }],
681 })
682}
683
684pub fn cmd_telemetry_print_schema() -> Result<()> {
686 let v = serde_json::json!({
687 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
688 "event_names": [
689 "kaizen.event",
690 "kaizen.tool_span",
691 "kaizen.repo_snapshot_chunk",
692 "kaizen.workspace_fact_snapshot"
693 ],
694 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
695 });
696 println!("{}", serde_json::to_string_pretty(&v)?);
697 Ok(())
698}
699
700#[cfg(test)]
701mod tests {
702 use super::*;
703
704 #[test]
705 fn exporter_already_present_detects_existing_datadog_row() {
706 let toml = r#"
707[[telemetry.exporters]]
708type = "datadog"
709api_key = "abc"
710site = "us5.datadoghq.com"
711"#;
712 assert!(exporter_already_present(toml, "datadog"));
713 assert!(!exporter_already_present(toml, "posthog"));
714 }
715
716 #[test]
717 fn exporter_already_present_handles_other_tables_between() {
718 let toml = r#"
719[[telemetry.exporters]]
720type = "file"
721enabled = true
722
723[telemetry.query]
724provider = "datadog"
725
726[[telemetry.exporters]]
727type = "datadog"
728api_key = "abc"
729"#;
730 assert!(exporter_already_present(toml, "file"));
731 assert!(exporter_already_present(toml, "datadog"));
732 assert!(!exporter_already_present(toml, "otlp"));
733 }
734
735 #[test]
736 fn reject_obvious_app_key_catches_ddapp_prefix() {
737 assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
738 assert!(reject_obvious_app_key("5bed85d67b7b0359bebeb40693537d0b").is_none());
739 }
740
741 #[test]
742 fn ensure_query_authority_appends_when_missing() {
743 let dir = tempfile::TempDir::new().unwrap();
744 let p = dir.path().join("config.toml");
745 std::fs::write(
746 &p,
747 "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
748 )
749 .unwrap();
750 ensure_query_authority(&p, "datadog").unwrap();
751 let s = std::fs::read_to_string(&p).unwrap();
752 assert!(s.contains("[telemetry.query]"));
753 assert!(s.contains("provider = \"datadog\""));
754 }
755
756 #[test]
757 fn ensure_query_authority_idempotent_when_present() {
758 let dir = tempfile::TempDir::new().unwrap();
759 let p = dir.path().join("config.toml");
760 let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
761 std::fs::write(&p, original).unwrap();
762 ensure_query_authority(&p, "datadog").unwrap();
763 let s = std::fs::read_to_string(&p).unwrap();
764 assert_eq!(s, original);
766 }
767}