1use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::core::project_identity::project_name;
7#[cfg(any(feature = "telemetry-datadog", feature = "telemetry-posthog"))]
8use crate::provider::TelemetryQueryProvider;
9use crate::provider::{PullWindow, from_config as provider_from_config};
10use crate::shell::cli::workspace_path;
11use crate::shell::scope;
12use crate::store::Store;
13use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
14use crate::sync::IngestExportBatch;
15use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
16use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
17use crate::sync::redact::redact_payload;
18use crate::sync::smart::outbound_tool_span;
19use crate::sync::workspace_hash;
20use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
21use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
22use anyhow::{Context, Result};
23use std::io::{BufRead, Write};
24use std::path::{Path, PathBuf};
25
26#[derive(Debug, Clone, Default)]
27pub struct ConfigureOptions {
28 pub exporter_type: Option<String>,
29 pub path: Option<PathBuf>,
30 pub api_key: Option<String>,
31 pub site: Option<String>,
32 pub host: Option<String>,
33 pub endpoint: Option<String>,
34 pub non_interactive: bool,
35}
36
37pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
43 let ws = workspace_path(workspace)?;
44 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
45 let cfg_path = home.join("config.toml");
46 std::fs::create_dir_all(&home)?;
47
48 println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
49 let t = resolve_exporter_type(&options)?;
50 if t.is_empty() {
51 println!("Aborted.");
52 return Ok(());
53 }
54
55 let block = match t.as_str() {
56 "file" => file_exporter_block(options.path.as_deref()),
57 "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
58 "datadog" => configure_datadog(&options)?,
59 "posthog" => configure_posthog(&options)?,
60 "otlp" => configure_otlp(&options)?,
61 _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
62 };
63
64 let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
65 if exporter_already_present(&existing, &t) {
66 println!(
67 "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
68 Edit the file directly to change credentials.",
69 cfg_path.display()
70 );
71 } else {
72 append_block(&cfg_path, &block)?;
73 }
74 ensure_query_authority(&cfg_path, &t)?;
75
76 let cfg = config::load(&ws)?;
77 let _ = effective_redaction_salt(&cfg.sync, &home).context(
78 "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
79 )?;
80 println!("Wrote {}.", cfg_path.display());
81 println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
82 Ok(())
83}
84
85pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
89 let mut in_exporter_block = false;
90 let needle = format!("type = \"{t}\"");
91 for line in toml_text.lines() {
92 let l = line.trim();
93 if l.starts_with("[[telemetry.exporters]]") {
94 in_exporter_block = true;
95 continue;
96 }
97 if l.starts_with('[') {
98 in_exporter_block = false;
99 continue;
100 }
101 if in_exporter_block && l == needle {
102 return true;
103 }
104 }
105 false
106}
107
108fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
111 let authority = match t {
112 "datadog" => "datadog",
113 "posthog" => "posthog",
114 _ => return Ok(()),
115 };
116 let existing = std::fs::read_to_string(path).unwrap_or_default();
117 if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
118 return Ok(());
119 }
120 let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
121 append_block(path, &block)
122}
123
124fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
125 if let Some(t) = &opts.exporter_type {
126 return Ok(t.trim().to_lowercase());
127 }
128 if opts.non_interactive {
129 anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
130 }
131 print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
132 std::io::stdout().flush()?;
133 let mut line = String::new();
134 std::io::stdin().lock().read_line(&mut line)?;
135 Ok(line.trim().to_lowercase())
136}
137
138fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
139 let api_key = read_secret(
140 "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
141 create one at Org Settings > API Keys)",
142 opts.api_key.clone(),
143 "DD_API_KEY",
144 opts.non_interactive,
145 )?;
146 if let Some(rejected) = reject_obvious_app_key(&api_key) {
147 anyhow::bail!("{rejected}");
148 }
149 let site = read_value(
150 "Datadog site",
151 opts.site.clone(),
152 "DD_SITE",
153 Some("datadoghq.com".into()),
154 opts.non_interactive,
155 )?;
156 health_check_datadog(&api_key, &site).context(
157 "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
158 )?;
159 Ok(datadog_block(&api_key, &site))
160}
161
162fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
163 let key = read_secret(
164 "PostHog project API key (phc_...)",
165 opts.api_key.clone(),
166 "POSTHOG_API_KEY",
167 opts.non_interactive,
168 )?;
169 let host = read_value(
170 "PostHog host",
171 opts.host.clone(),
172 "POSTHOG_HOST",
173 Some("https://us.i.posthog.com".into()),
174 opts.non_interactive,
175 )?;
176 health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
177 Ok(format!(
178 "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
179 key.replace('\\', "\\\\").replace('"', "\\\""),
180 host.replace('\\', "\\\\").replace('"', "\\\""),
181 ))
182}
183
184fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
185 let endpoint = read_value(
186 "OTLP endpoint",
187 opts.endpoint.clone(),
188 "OTEL_EXPORTER_OTLP_ENDPOINT",
189 Some("http://127.0.0.1:4318".into()),
190 opts.non_interactive,
191 )?;
192 Ok(format!(
193 "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
194 endpoint.replace('\\', "\\\\").replace('"', "\\\""),
195 ))
196}
197
198pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
202 if value.starts_with("ddapp_") {
203 Some(
204 "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
205 (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
206 )
207 } else {
208 None
209 }
210}
211
212fn datadog_block(api_key: &str, site: &str) -> String {
213 format!(
214 "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
215 api_key.replace('\\', "\\\\").replace('"', "\\\""),
216 site.replace('\\', "\\\\").replace('"', "\\\""),
217 )
218}
219
220fn append_block(path: &Path, block: &str) -> Result<()> {
221 let mut f = std::fs::OpenOptions::new()
222 .create(true)
223 .append(true)
224 .open(path)?;
225 if f.metadata()?.len() > 0 {
226 f.write_all(b"\n")?;
227 }
228 f.write_all(block.as_bytes())?;
229 Ok(())
230}
231
232fn read_secret(
233 prompt: &str,
234 flag: Option<String>,
235 env_key: &str,
236 non_interactive: bool,
237) -> Result<String> {
238 if let Some(v) = flag.filter(|s| !s.is_empty()) {
239 return Ok(v);
240 }
241 if let Ok(v) = std::env::var(env_key)
242 && !v.is_empty()
243 {
244 return Ok(v);
245 }
246 if non_interactive {
247 anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
248 }
249 print!("{prompt}: ");
250 std::io::stdout().flush()?;
251 let mut line = String::new();
252 std::io::stdin().lock().read_line(&mut line)?;
253 let v = line.trim().to_string();
254 if v.is_empty() {
255 anyhow::bail!("{env_key} is required");
256 }
257 Ok(v)
258}
259
260fn read_value(
261 prompt: &str,
262 flag: Option<String>,
263 env_key: &str,
264 default: Option<String>,
265 non_interactive: bool,
266) -> Result<String> {
267 if let Some(v) = flag.filter(|s| !s.is_empty()) {
268 return Ok(v);
269 }
270 if let Ok(v) = std::env::var(env_key)
271 && !v.is_empty()
272 {
273 return Ok(v);
274 }
275 if non_interactive {
276 return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
277 }
278 let hint = default
279 .as_deref()
280 .map(|d| format!(" [{d}]"))
281 .unwrap_or_default();
282 print!("{prompt}{hint}: ");
283 std::io::stdout().flush()?;
284 let mut line = String::new();
285 std::io::stdin().lock().read_line(&mut line)?;
286 let v = line.trim().to_string();
287 if v.is_empty() {
288 return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
289 }
290 Ok(v)
291}
292
293fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
294 let r = DatadogResolved {
295 site: site.to_string(),
296 api_key: api_key.to_string(),
297 app_key: None,
298 };
299 #[cfg(feature = "telemetry-datadog")]
300 {
301 let c = crate::provider::datadog::DatadogQueryClient::new(&r);
302 c.health()
303 }
304 #[cfg(not(feature = "telemetry-datadog"))]
305 {
306 let _ = &r;
307 anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
308 }
309}
310
311fn health_check_posthog(host: &str) -> Result<()> {
312 let r = PostHogResolved {
313 host: host.to_string(),
314 project_api_key: String::new(),
315 };
316 #[cfg(feature = "telemetry-posthog")]
317 {
318 let c = crate::provider::posthog::PostHogQueryClient::new(&r);
319 c.health()
320 }
321 #[cfg(not(feature = "telemetry-posthog"))]
322 {
323 let _ = &r;
324 anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
325 }
326}
327
328fn file_exporter_block(path: Option<&Path>) -> String {
329 let mut block = String::from(
330 r#"
331[[telemetry.exporters]]
332type = "file"
333enabled = true
334"#,
335 );
336 if let Some(path) = path {
337 use std::fmt::Write as _;
338 let path = path
339 .to_string_lossy()
340 .replace('\\', "\\\\")
341 .replace('"', "\\\"");
342 writeln!(&mut block, "path = \"{path}\"").unwrap();
343 } else {
344 block.push_str(
345 "# path = \"telemetry.ndjson\" # optional; default .kaizen/telemetry.ndjson under each workspace\n",
346 );
347 }
348 block
349}
350
351pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
353 let ws = workspace_path(workspace)?;
354 let cfg = config::load(&ws)?;
355 use std::fmt::Write;
356 let mut s = String::new();
357 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
358 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
359 match e {
360 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
361 ExporterConfig::File { enabled, path } => {
362 let p = path
363 .as_deref()
364 .map(|p| p.to_string())
365 .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
366 writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
367 }
368 ExporterConfig::Dev { enabled } => {
369 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
370 }
371 ExporterConfig::PostHog { .. } => {
372 let line = if let Some(r) = PostHogResolved::from_config(e) {
373 format!(
374 "[{i}] type=posthog host={} key=<redacted len {}>",
375 r.host,
376 r.project_api_key.len()
377 )
378 } else {
379 format!(
380 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
381 )
382 };
383 writeln!(&mut s, "{line}").unwrap();
384 }
385 ExporterConfig::Datadog { .. } => {
386 let line = if let Some(r) = DatadogResolved::from_config(e) {
387 format!(
388 "[{i}] type=datadog site={} key=<redacted len {}>",
389 r.site,
390 r.api_key.len()
391 )
392 } else {
393 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
394 };
395 writeln!(&mut s, "{line}").unwrap();
396 }
397 ExporterConfig::Otlp { .. } => {
398 let line = if let Some(r) = OtlpResolved::from_config(e) {
399 format!("[{i}] type=otlp endpoint={}", r.endpoint)
400 } else {
401 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
402 };
403 writeln!(&mut s, "{line}").unwrap();
404 }
405 }
406 }
407 if cfg.telemetry.exporters.is_empty() {
408 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
409 }
410 Ok(s)
411}
412
413pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
414 print!("{}", print_effective_config_text(workspace)?);
415 Ok(())
416}
417
418pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
420 cmd_telemetry_configure(workspace, options)
421}
422
423pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
425 let ws = workspace_path(workspace)?;
426 let cfg = config::load(&ws)?;
427 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
428 println!(
429 "telemetry.query.cache_ttl_seconds: {}",
430 cfg.telemetry.query.cache_ttl_seconds
431 );
432 match cfg.telemetry.query.provider {
433 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
434 crate::core::config::QueryAuthority::Posthog => {
435 println!("telemetry.query.provider: posthog");
436 }
437 crate::core::config::QueryAuthority::Datadog => {
438 println!("telemetry.query.provider: datadog");
439 }
440 }
441 if let Some(p) = provider_from_config(&cfg.telemetry) {
442 match p.health() {
443 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
444 Err(e) => eprintln!("provider health: {e}"),
445 }
446 } else {
447 println!("query provider: (not configured or features disabled; pull disabled)");
448 }
449 println!("\n{}", print_effective_config_text(Some(&ws))?);
450 println!("\nOTLP: export only — no query/pull in v1.");
451 Ok(())
452}
453
454pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
456 let ws = workspace_path(workspace)?;
457 let cfg = config::load(&ws)?;
458 let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
459 anyhow::anyhow!(
460 "no query provider resolved. Either:\n \
461 1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
462 writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n \
463 2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
464 ensure DD_API_KEY is reachable (TOML row or env)."
465 )
466 })?;
467 let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
468 let page = p.pull(PullWindow { days }, None)?;
469 if !cfg.sync.team_id.trim().is_empty()
470 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
471 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
472 {
473 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
474 Ok(n) if n > 0 => {
475 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
476 }
477 _ => {}
478 }
479 }
480 let now_ms = std::time::SystemTime::now()
481 .duration_since(std::time::UNIX_EPOCH)
482 .unwrap_or_default()
483 .as_millis() as i64;
484 let label = match cfg.telemetry.query.provider {
485 crate::core::config::QueryAuthority::None => "none",
486 crate::core::config::QueryAuthority::Posthog => "posthog",
487 crate::core::config::QueryAuthority::Datadog => "datadog",
488 };
489 store.set_pull_state(&RemotePullState {
490 query_provider: label.into(),
491 cursor_json: page.next_cursor.unwrap_or_default(),
492 last_success_ms: Some(now_ms),
493 })?;
494 println!("pull: received {} item(s) (page)", page.items.len());
495 Ok(())
496}
497
498pub fn cmd_telemetry_push(
500 workspace: Option<&Path>,
501 all_workspaces: bool,
502 days: u32,
503 dry_run: bool,
504) -> Result<()> {
505 let roots = scope::resolve(workspace, all_workspaces)?;
506 let primary = roots
507 .first()
508 .cloned()
509 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
510 let cfg = config::load(&primary)?;
511 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
512 let salt = effective_redaction_salt(&cfg.sync, &home).context(
513 "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
514 )?;
515 let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
516 if registry.is_empty() {
517 anyhow::bail!(
518 "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
519 needs no extra feature; PostHog/Datadog/OTLP need build features); see \
520 `kaizen telemetry print-effective-config`."
521 );
522 }
523 let fail_open = cfg.telemetry.fail_open;
524 let team_id = cfg.sync.team_id.clone();
525 let end_ms = std::time::SystemTime::now()
526 .duration_since(std::time::UNIX_EPOCH)
527 .unwrap_or_default()
528 .as_millis() as u64;
529 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
530
531 let mut total_events: u64 = 0;
532 let mut total_spans: u64 = 0;
533 let mut total_batches: u64 = 0;
534 let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
535 let mut total_stale: u64 = 0;
536
537 for root in &roots {
538 let store = Store::open(&crate::core::workspace::db_path(root)?)?;
539 let ws_key = root.to_string_lossy().to_string();
540 let wh = workspace_hash(&salt, root.as_path());
541 let project = project_name(root.as_path());
542
543 let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
544 let stale_events = event_rows
545 .iter()
546 .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
547 .count() as u64;
548 let outbound_events: Vec<_> = event_rows
549 .into_iter()
550 .map(|(session, ev)| {
551 let mut o = outbound_event_from_row(&ev, &session, &salt);
552 redact_payload(&mut o.payload, root.as_path(), &salt);
553 o
554 })
555 .collect();
556 let n_events = outbound_events.len() as u64;
557 let event_batches = chunk_events_into_ingest_batches(
558 team_id.clone(),
559 wh.clone(),
560 project.clone(),
561 outbound_events,
562 &cfg.sync,
563 )?;
564
565 let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
566 let stale_spans = span_rows
567 .iter()
568 .filter(|r| {
569 r.started_at_ms
570 .or(r.ended_at_ms)
571 .map(|t| t < intake_warning_threshold_ms)
572 .unwrap_or(false)
573 })
574 .count() as u64;
575 let outbound_spans: Vec<_> = span_rows
576 .iter()
577 .map(|r| outbound_tool_span(r, &salt))
578 .collect();
579 let n_spans = outbound_spans.len() as u64;
580 let span_batches = chunk_tool_spans_into_ingest_batches(
581 team_id.clone(),
582 wh,
583 project,
584 outbound_spans,
585 &cfg.sync,
586 )?;
587
588 let bcount = (event_batches.len() + span_batches.len()) as u64;
589 total_events += n_events;
590 total_spans += n_spans;
591 total_batches += bcount;
592 total_stale += stale_events + stale_spans;
593
594 if dry_run {
595 eprintln!(
596 "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
597 root.display(),
598 n_events,
599 n_spans,
600 bcount
601 );
602 continue;
603 }
604 for batch in event_batches.into_iter().chain(span_batches) {
605 registry
606 .fan_out(fail_open, &batch)
607 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
608 }
609 eprintln!(
610 "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
611 root.display(),
612 n_events,
613 n_spans,
614 bcount
615 );
616 }
617
618 eprintln!(
619 "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
620 total_events,
621 total_spans,
622 total_batches,
623 roots.len(),
624 if dry_run { " (dry-run)" } else { "" }
625 );
626 if total_stale > 0 {
627 eprintln!(
628 "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
629 drops these (organization default). PostHog/OTLP/file sinks accept them without \
630 change. Use `--days N` with N <= 1 to skip stale items.",
631 total_stale
632 );
633 }
634 Ok(())
635}
636
637pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
640 let ws = workspace_path(workspace)?;
641 let cfg = config::load(&ws)?;
642 let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
643 if registry.is_empty() {
644 anyhow::bail!(
645 "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
646 );
647 }
648 let batch = synthetic_batch(&cfg.sync.team_id);
649 println!("telemetry test: sending one synthetic event to each configured sink ...");
650 let mut all_ok = true;
651 for name in registry.exporter_names() {
652 match registry.export_one(&name, &batch) {
653 Ok(()) => println!(" [{name}] ok"),
654 Err(e) => {
655 all_ok = false;
656 println!(" [{name}] FAIL: {e:#}");
657 }
658 }
659 }
660 if !all_ok {
661 anyhow::bail!("one or more exporters failed (see above)");
662 }
663 println!("telemetry test: all exporters accepted the synthetic event.");
664 Ok(())
665}
666
667fn synthetic_batch(team_id: &str) -> IngestExportBatch {
668 let now_ms = std::time::SystemTime::now()
669 .duration_since(std::time::UNIX_EPOCH)
670 .map(|d| d.as_millis() as u64)
671 .unwrap_or(0);
672 IngestExportBatch::Events(EventsBatchBody {
673 team_id: team_id.to_string(),
674 workspace_hash: "blake3:test-workspace".into(),
675 project_name: Some("telemetry-test".into()),
676 events: vec![OutboundEvent {
677 session_id_hash: "blake3:test-session".into(),
678 event_seq: 0,
679 ts_ms: now_ms,
680 agent: "kaizen".into(),
681 model: "synthetic".into(),
682 kind: "lifecycle".into(),
683 source: "tail".into(),
684 tool: None,
685 tool_call_id: None,
686 tokens_in: Some(0),
687 tokens_out: Some(0),
688 reasoning_tokens: None,
689 cost_usd_e6: None,
690 payload: serde_json::json!({"kaizen.telemetry_test": true}),
691 }],
692 })
693}
694
695pub fn cmd_telemetry_print_schema() -> Result<()> {
697 let v = serde_json::json!({
698 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
699 "event_names": [
700 "kaizen.event",
701 "kaizen.tool_span",
702 "kaizen.repo_snapshot_chunk",
703 "kaizen.workspace_fact_snapshot"
704 ],
705 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
706 });
707 println!("{}", serde_json::to_string_pretty(&v)?);
708 Ok(())
709}
710
711#[cfg(test)]
712mod tests {
713 use super::*;
714
715 #[test]
716 fn exporter_already_present_detects_existing_datadog_row() {
717 let toml = r#"
718[[telemetry.exporters]]
719type = "datadog"
720api_key = "abc"
721site = "us5.datadoghq.com"
722"#;
723 assert!(exporter_already_present(toml, "datadog"));
724 assert!(!exporter_already_present(toml, "posthog"));
725 }
726
727 #[test]
728 fn exporter_already_present_handles_other_tables_between() {
729 let toml = r#"
730[[telemetry.exporters]]
731type = "file"
732enabled = true
733
734[telemetry.query]
735provider = "datadog"
736
737[[telemetry.exporters]]
738type = "datadog"
739api_key = "abc"
740"#;
741 assert!(exporter_already_present(toml, "file"));
742 assert!(exporter_already_present(toml, "datadog"));
743 assert!(!exporter_already_present(toml, "otlp"));
744 }
745
746 #[test]
747 fn reject_obvious_app_key_catches_ddapp_prefix() {
748 assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
749 assert!(reject_obvious_app_key("not_ddapp_plain_api_key_value").is_none());
750 }
751
752 #[test]
753 fn ensure_query_authority_appends_when_missing() {
754 let dir = tempfile::TempDir::new().unwrap();
755 let p = dir.path().join("config.toml");
756 std::fs::write(
757 &p,
758 "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
759 )
760 .unwrap();
761 ensure_query_authority(&p, "datadog").unwrap();
762 let s = std::fs::read_to_string(&p).unwrap();
763 assert!(s.contains("[telemetry.query]"));
764 assert!(s.contains("provider = \"datadog\""));
765 }
766
767 #[test]
768 fn ensure_query_authority_idempotent_when_present() {
769 let dir = tempfile::TempDir::new().unwrap();
770 let p = dir.path().join("config.toml");
771 let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
772 std::fs::write(&p, original).unwrap();
773 ensure_query_authority(&p, "datadog").unwrap();
774 let s = std::fs::read_to_string(&p).unwrap();
775 assert_eq!(s, original);
777 }
778}