1use crate::core::config::{self, ExporterConfig, effective_redaction_salt};
5use crate::core::paths::kaizen_dir;
6use crate::core::project_identity::project_name;
7use crate::provider::{PullWindow, TelemetryQueryProvider, from_config as provider_from_config};
8use crate::shell::cli::workspace_path;
9use crate::shell::scope;
10use crate::store::Store;
11use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
12use crate::sync::IngestExportBatch;
13use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
14use crate::sync::outbound::{EventsBatchBody, OutboundEvent, outbound_event_from_row};
15use crate::sync::redact::redact_payload;
16use crate::sync::smart::outbound_tool_span;
17use crate::sync::workspace_hash;
18use crate::sync::{chunk_events_into_ingest_batches, chunk_tool_spans_into_ingest_batches};
19use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
20use anyhow::{Context, Result};
21use std::io::{BufRead, Write};
22use std::path::{Path, PathBuf};
23
24#[derive(Debug, Clone, Default)]
25pub struct ConfigureOptions {
26 pub exporter_type: Option<String>,
27 pub path: Option<PathBuf>,
28 pub api_key: Option<String>,
29 pub site: Option<String>,
30 pub host: Option<String>,
31 pub endpoint: Option<String>,
32 pub non_interactive: bool,
33}
34
35pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
41 let ws = workspace_path(workspace)?;
42 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
43 let cfg_path = home.join("config.toml");
44 std::fs::create_dir_all(&home)?;
45
46 println!("Kaizen telemetry — optional sinks fan-out alongside Kaizen sync.");
47 let t = resolve_exporter_type(&options)?;
48 if t.is_empty() {
49 println!("Aborted.");
50 return Ok(());
51 }
52
53 let block = match t.as_str() {
54 "file" => file_exporter_block(options.path.as_deref()),
55 "dev" => "\n[[telemetry.exporters]]\ntype = \"dev\"\n".to_string(),
56 "datadog" => configure_datadog(&options)?,
57 "posthog" => configure_posthog(&options)?,
58 "otlp" => configure_otlp(&options)?,
59 _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
60 };
61
62 let existing = std::fs::read_to_string(&cfg_path).unwrap_or_default();
63 if exporter_already_present(&existing, &t) {
64 println!(
65 "Skipped: a `[[telemetry.exporters]]` row of type `{t}` already exists in {}. \
66 Edit the file directly to change credentials.",
67 cfg_path.display()
68 );
69 } else {
70 append_block(&cfg_path, &block)?;
71 }
72 ensure_query_authority(&cfg_path, &t)?;
73
74 let cfg = config::load(&ws)?;
75 let _ = effective_redaction_salt(&cfg.sync, &home).context(
76 "ensure redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
77 )?;
78 println!("Wrote {}.", cfg_path.display());
79 println!("Next: `kaizen telemetry test` to send one synthetic event to every configured sink.");
80 Ok(())
81}
82
83pub(crate) fn exporter_already_present(toml_text: &str, t: &str) -> bool {
87 let mut in_exporter_block = false;
88 let needle = format!("type = \"{t}\"");
89 for line in toml_text.lines() {
90 let l = line.trim();
91 if l.starts_with("[[telemetry.exporters]]") {
92 in_exporter_block = true;
93 continue;
94 }
95 if l.starts_with('[') {
96 in_exporter_block = false;
97 continue;
98 }
99 if in_exporter_block && l == needle {
100 return true;
101 }
102 }
103 false
104}
105
106fn ensure_query_authority(path: &Path, t: &str) -> Result<()> {
109 let authority = match t {
110 "datadog" => "datadog",
111 "posthog" => "posthog",
112 _ => return Ok(()),
113 };
114 let existing = std::fs::read_to_string(path).unwrap_or_default();
115 if existing.lines().any(|l| l.trim() == "[telemetry.query]") {
116 return Ok(());
117 }
118 let block = format!("\n[telemetry.query]\nprovider = \"{authority}\"\n");
119 append_block(path, &block)
120}
121
122fn resolve_exporter_type(opts: &ConfigureOptions) -> Result<String> {
123 if let Some(t) = &opts.exporter_type {
124 return Ok(t.trim().to_lowercase());
125 }
126 if opts.non_interactive {
127 anyhow::bail!("--non-interactive requires --type=<file|posthog|datadog|otlp|dev>");
128 }
129 print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (empty to abort): ");
130 std::io::stdout().flush()?;
131 let mut line = String::new();
132 std::io::stdin().lock().read_line(&mut line)?;
133 Ok(line.trim().to_lowercase())
134}
135
136fn configure_datadog(opts: &ConfigureOptions) -> Result<String> {
137 let api_key = read_secret(
138 "Datadog API key (DD_API_KEY, 32 hex chars — NOT the `ddapp_*` Application Key; \
139 create one at Org Settings > API Keys)",
140 opts.api_key.clone(),
141 "DD_API_KEY",
142 opts.non_interactive,
143 )?;
144 if let Some(rejected) = reject_obvious_app_key(&api_key) {
145 anyhow::bail!("{rejected}");
146 }
147 let site = read_value(
148 "Datadog site",
149 opts.site.clone(),
150 "DD_SITE",
151 Some("datadoghq.com".into()),
152 opts.non_interactive,
153 )?;
154 health_check_datadog(&api_key, &site).context(
155 "Datadog credentials rejected (DD-API-KEY /api/v1/validate failed); not writing TOML",
156 )?;
157 Ok(datadog_block(&api_key, &site))
158}
159
160fn configure_posthog(opts: &ConfigureOptions) -> Result<String> {
161 let key = read_secret(
162 "PostHog project API key (phc_...)",
163 opts.api_key.clone(),
164 "POSTHOG_API_KEY",
165 opts.non_interactive,
166 )?;
167 let host = read_value(
168 "PostHog host",
169 opts.host.clone(),
170 "POSTHOG_HOST",
171 Some("https://us.i.posthog.com".into()),
172 opts.non_interactive,
173 )?;
174 health_check_posthog(&host).context("PostHog host unreachable; not writing TOML")?;
175 Ok(format!(
176 "\n[[telemetry.exporters]]\ntype = \"posthog\"\nproject_api_key = \"{}\"\nhost = \"{}\"\n",
177 key.replace('\\', "\\\\").replace('"', "\\\""),
178 host.replace('\\', "\\\\").replace('"', "\\\""),
179 ))
180}
181
182fn configure_otlp(opts: &ConfigureOptions) -> Result<String> {
183 let endpoint = read_value(
184 "OTLP endpoint",
185 opts.endpoint.clone(),
186 "OTEL_EXPORTER_OTLP_ENDPOINT",
187 Some("http://127.0.0.1:4318".into()),
188 opts.non_interactive,
189 )?;
190 Ok(format!(
191 "\n[[telemetry.exporters]]\ntype = \"otlp\"\nendpoint = \"{}\"\n",
192 endpoint.replace('\\', "\\\\").replace('"', "\\\""),
193 ))
194}
195
196pub(crate) fn reject_obvious_app_key(value: &str) -> Option<&'static str> {
200 if value.starts_with("ddapp_") {
201 Some(
202 "looks like a Datadog Application Key (`ddapp_*`); the wizard needs the API Key \
203 (32 hex chars). Generate one at Org Settings > API Keys, then rerun.",
204 )
205 } else {
206 None
207 }
208}
209
210fn datadog_block(api_key: &str, site: &str) -> String {
211 format!(
212 "\n[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"{}\"\nsite = \"{}\"\n",
213 api_key.replace('\\', "\\\\").replace('"', "\\\""),
214 site.replace('\\', "\\\\").replace('"', "\\\""),
215 )
216}
217
218fn append_block(path: &Path, block: &str) -> Result<()> {
219 let mut f = std::fs::OpenOptions::new()
220 .create(true)
221 .append(true)
222 .open(path)?;
223 if f.metadata()?.len() > 0 {
224 f.write_all(b"\n")?;
225 }
226 f.write_all(block.as_bytes())?;
227 Ok(())
228}
229
230fn read_secret(
231 prompt: &str,
232 flag: Option<String>,
233 env_key: &str,
234 non_interactive: bool,
235) -> Result<String> {
236 if let Some(v) = flag.filter(|s| !s.is_empty()) {
237 return Ok(v);
238 }
239 if let Ok(v) = std::env::var(env_key)
240 && !v.is_empty()
241 {
242 return Ok(v);
243 }
244 if non_interactive {
245 anyhow::bail!("missing {env_key}: set the env var or pass --api-key");
246 }
247 print!("{prompt}: ");
248 std::io::stdout().flush()?;
249 let mut line = String::new();
250 std::io::stdin().lock().read_line(&mut line)?;
251 let v = line.trim().to_string();
252 if v.is_empty() {
253 anyhow::bail!("{env_key} is required");
254 }
255 Ok(v)
256}
257
258fn read_value(
259 prompt: &str,
260 flag: Option<String>,
261 env_key: &str,
262 default: Option<String>,
263 non_interactive: bool,
264) -> Result<String> {
265 if let Some(v) = flag.filter(|s| !s.is_empty()) {
266 return Ok(v);
267 }
268 if let Ok(v) = std::env::var(env_key)
269 && !v.is_empty()
270 {
271 return Ok(v);
272 }
273 if non_interactive {
274 return default.ok_or_else(|| anyhow::anyhow!("missing {env_key}; set env or pass flag"));
275 }
276 let hint = default
277 .as_deref()
278 .map(|d| format!(" [{d}]"))
279 .unwrap_or_default();
280 print!("{prompt}{hint}: ");
281 std::io::stdout().flush()?;
282 let mut line = String::new();
283 std::io::stdin().lock().read_line(&mut line)?;
284 let v = line.trim().to_string();
285 if v.is_empty() {
286 return default.ok_or_else(|| anyhow::anyhow!("{env_key} is required"));
287 }
288 Ok(v)
289}
290
291fn health_check_datadog(api_key: &str, site: &str) -> Result<()> {
292 let r = DatadogResolved {
293 site: site.to_string(),
294 api_key: api_key.to_string(),
295 app_key: None,
296 };
297 #[cfg(feature = "telemetry-datadog")]
298 {
299 let c = crate::provider::datadog::DatadogQueryClient::new(&r);
300 c.health()
301 }
302 #[cfg(not(feature = "telemetry-datadog"))]
303 {
304 let _ = &r;
305 anyhow::bail!("rebuild with `--features telemetry-datadog` to validate Datadog");
306 }
307}
308
309fn health_check_posthog(host: &str) -> Result<()> {
310 let r = PostHogResolved {
311 host: host.to_string(),
312 project_api_key: String::new(),
313 };
314 #[cfg(feature = "telemetry-posthog")]
315 {
316 let c = crate::provider::posthog::PostHogQueryClient::new(&r);
317 c.health()
318 }
319 #[cfg(not(feature = "telemetry-posthog"))]
320 {
321 let _ = &r;
322 anyhow::bail!("rebuild with `--features telemetry-posthog` to validate PostHog");
323 }
324}
325
326fn file_exporter_block(path: Option<&Path>) -> String {
327 let mut block = String::from(
328 r#"
329[[telemetry.exporters]]
330type = "file"
331enabled = true
332"#,
333 );
334 if let Some(path) = path {
335 use std::fmt::Write as _;
336 let path = path
337 .to_string_lossy()
338 .replace('\\', "\\\\")
339 .replace('"', "\\\"");
340 writeln!(&mut block, "path = \"{path}\"").unwrap();
341 } else {
342 block.push_str(
343 "# path = \"telemetry.ndjson\" # optional; default .kaizen/telemetry.ndjson under each workspace\n",
344 );
345 }
346 block
347}
348
349pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
351 let ws = workspace_path(workspace)?;
352 let cfg = config::load(&ws)?;
353 use std::fmt::Write;
354 let mut s = String::new();
355 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
356 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
357 match e {
358 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
359 ExporterConfig::File { enabled, path } => {
360 let p = path
361 .as_deref()
362 .map(|p| p.to_string())
363 .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
364 writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
365 }
366 ExporterConfig::Dev { enabled } => {
367 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
368 }
369 ExporterConfig::PostHog { .. } => {
370 let line = if let Some(r) = PostHogResolved::from_config(e) {
371 format!(
372 "[{i}] type=posthog host={} key=<redacted len {}>",
373 r.host,
374 r.project_api_key.len()
375 )
376 } else {
377 format!(
378 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
379 )
380 };
381 writeln!(&mut s, "{line}").unwrap();
382 }
383 ExporterConfig::Datadog { .. } => {
384 let line = if let Some(r) = DatadogResolved::from_config(e) {
385 format!(
386 "[{i}] type=datadog site={} key=<redacted len {}>",
387 r.site,
388 r.api_key.len()
389 )
390 } else {
391 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
392 };
393 writeln!(&mut s, "{line}").unwrap();
394 }
395 ExporterConfig::Otlp { .. } => {
396 let line = if let Some(r) = OtlpResolved::from_config(e) {
397 format!("[{i}] type=otlp endpoint={}", r.endpoint)
398 } else {
399 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
400 };
401 writeln!(&mut s, "{line}").unwrap();
402 }
403 }
404 }
405 if cfg.telemetry.exporters.is_empty() {
406 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
407 }
408 Ok(s)
409}
410
411pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
412 print!("{}", print_effective_config_text(workspace)?);
413 Ok(())
414}
415
416pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
418 cmd_telemetry_configure(workspace, options)
419}
420
421pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
423 let ws = workspace_path(workspace)?;
424 let cfg = config::load(&ws)?;
425 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
426 println!(
427 "telemetry.query.cache_ttl_seconds: {}",
428 cfg.telemetry.query.cache_ttl_seconds
429 );
430 match cfg.telemetry.query.provider {
431 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
432 crate::core::config::QueryAuthority::Posthog => {
433 println!("telemetry.query.provider: posthog");
434 }
435 crate::core::config::QueryAuthority::Datadog => {
436 println!("telemetry.query.provider: datadog");
437 }
438 }
439 if let Some(p) = provider_from_config(&cfg.telemetry) {
440 match p.health() {
441 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
442 Err(e) => eprintln!("provider health: {e}"),
443 }
444 } else {
445 println!("query provider: (not configured or features disabled; pull disabled)");
446 }
447 println!("\n{}", print_effective_config_text(Some(&ws))?);
448 println!("\nOTLP: export only — no query/pull in v1.");
449 Ok(())
450}
451
452pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
454 let ws = workspace_path(workspace)?;
455 let cfg = config::load(&ws)?;
456 let p = provider_from_config(&cfg.telemetry).ok_or_else(|| {
457 anyhow::anyhow!(
458 "no query provider resolved. Either:\n \
459 1. Run `kaizen telemetry configure --type=datadog` (or `posthog`) so the wizard \
460 writes both `[[telemetry.exporters]]` and `[telemetry.query]`, OR\n \
461 2. Set `[telemetry.query].provider = \"datadog\"` in `~/.kaizen/config.toml` and \
462 ensure DD_API_KEY is reachable (TOML row or env)."
463 )
464 })?;
465 let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
466 let page = p.pull(PullWindow { days }, None)?;
467 if !cfg.sync.team_id.trim().is_empty()
468 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
469 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
470 {
471 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
472 Ok(n) if n > 0 => {
473 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
474 }
475 _ => {}
476 }
477 }
478 let now_ms = std::time::SystemTime::now()
479 .duration_since(std::time::UNIX_EPOCH)
480 .unwrap_or_default()
481 .as_millis() as i64;
482 let label = match cfg.telemetry.query.provider {
483 crate::core::config::QueryAuthority::None => "none",
484 crate::core::config::QueryAuthority::Posthog => "posthog",
485 crate::core::config::QueryAuthority::Datadog => "datadog",
486 };
487 store.set_pull_state(&RemotePullState {
488 query_provider: label.into(),
489 cursor_json: page.next_cursor.unwrap_or_default(),
490 last_success_ms: Some(now_ms),
491 })?;
492 println!("pull: received {} item(s) (page)", page.items.len());
493 Ok(())
494}
495
496pub fn cmd_telemetry_push(
498 workspace: Option<&Path>,
499 all_workspaces: bool,
500 days: u32,
501 dry_run: bool,
502) -> Result<()> {
503 let roots = scope::resolve(workspace, all_workspaces)?;
504 let primary = roots
505 .first()
506 .cloned()
507 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
508 let cfg = config::load(&primary)?;
509 let home = kaizen_dir().ok_or_else(|| anyhow::anyhow!("KAIZEN_HOME / HOME unset"))?;
510 let salt = effective_redaction_salt(&cfg.sync, &home).context(
511 "resolve redaction salt (configured `[sync].team_salt_hex` or auto-generated `local_salt.hex`)",
512 )?;
513 let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
514 if registry.is_empty() {
515 anyhow::bail!(
516 "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
517 needs no extra feature; PostHog/Datadog/OTLP need build features); see \
518 `kaizen telemetry print-effective-config`."
519 );
520 }
521 let fail_open = cfg.telemetry.fail_open;
522 let team_id = cfg.sync.team_id.clone();
523 let end_ms = std::time::SystemTime::now()
524 .duration_since(std::time::UNIX_EPOCH)
525 .unwrap_or_default()
526 .as_millis() as u64;
527 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
528
529 let mut total_events: u64 = 0;
530 let mut total_spans: u64 = 0;
531 let mut total_batches: u64 = 0;
532 let intake_warning_threshold_ms = end_ms.saturating_sub(18 * 3_600_000);
533 let mut total_stale: u64 = 0;
534
535 for root in &roots {
536 let store = Store::open(&crate::core::workspace::db_path(root)?)?;
537 let ws_key = root.to_string_lossy().to_string();
538 let wh = workspace_hash(&salt, root.as_path());
539 let project = project_name(root.as_path());
540
541 let event_rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
542 let stale_events = event_rows
543 .iter()
544 .filter(|(_, ev)| ev.ts_ms < intake_warning_threshold_ms)
545 .count() as u64;
546 let outbound_events: Vec<_> = event_rows
547 .into_iter()
548 .map(|(session, ev)| {
549 let mut o = outbound_event_from_row(&ev, &session, &salt);
550 redact_payload(&mut o.payload, root.as_path(), &salt);
551 o
552 })
553 .collect();
554 let n_events = outbound_events.len() as u64;
555 let event_batches = chunk_events_into_ingest_batches(
556 team_id.clone(),
557 wh.clone(),
558 project.clone(),
559 outbound_events,
560 &cfg.sync,
561 )?;
562
563 let span_rows = store.tool_spans_sync_rows_in_window(&ws_key, start_ms, end_ms)?;
564 let stale_spans = span_rows
565 .iter()
566 .filter(|r| {
567 r.started_at_ms
568 .or(r.ended_at_ms)
569 .map(|t| t < intake_warning_threshold_ms)
570 .unwrap_or(false)
571 })
572 .count() as u64;
573 let outbound_spans: Vec<_> = span_rows
574 .iter()
575 .map(|r| outbound_tool_span(r, &salt))
576 .collect();
577 let n_spans = outbound_spans.len() as u64;
578 let span_batches = chunk_tool_spans_into_ingest_batches(
579 team_id.clone(),
580 wh,
581 project,
582 outbound_spans,
583 &cfg.sync,
584 )?;
585
586 let bcount = (event_batches.len() + span_batches.len()) as u64;
587 total_events += n_events;
588 total_spans += n_spans;
589 total_batches += bcount;
590 total_stale += stale_events + stale_spans;
591
592 if dry_run {
593 eprintln!(
594 "telemetry push (dry-run): {} — {} event(s), {} span(s), {} batch(es)",
595 root.display(),
596 n_events,
597 n_spans,
598 bcount
599 );
600 continue;
601 }
602 for batch in event_batches.into_iter().chain(span_batches) {
603 registry
604 .fan_out(fail_open, &batch)
605 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
606 }
607 eprintln!(
608 "telemetry push: {} — sent {} event(s), {} span(s) in {} batch(es)",
609 root.display(),
610 n_events,
611 n_spans,
612 bcount
613 );
614 }
615
616 eprintln!(
617 "telemetry push: total {} event(s), {} span(s), {} batch(es) across {} workspace(s){}",
618 total_events,
619 total_spans,
620 total_batches,
621 roots.len(),
622 if dry_run { " (dry-run)" } else { "" }
623 );
624 if total_stale > 0 {
625 eprintln!(
626 "note: {} item(s) have a `timestamp` older than 18h. Datadog Logs intake silently \
627 drops these (organization default). PostHog/OTLP/file sinks accept them without \
628 change. Use `--days N` with N <= 1 to skip stale items.",
629 total_stale
630 );
631 }
632 Ok(())
633}
634
635pub fn cmd_telemetry_test(workspace: Option<&Path>) -> Result<()> {
638 let ws = workspace_path(workspace)?;
639 let cfg = config::load(&ws)?;
640 let registry = telemetry::load_exporters(&cfg.telemetry, ws.as_path());
641 if registry.is_empty() {
642 anyhow::bail!(
643 "no `[[telemetry.exporters]]` rows resolved; run `kaizen telemetry configure --type=...` first"
644 );
645 }
646 let batch = synthetic_batch(&cfg.sync.team_id);
647 println!("telemetry test: sending one synthetic event to each configured sink ...");
648 let mut all_ok = true;
649 for name in registry.exporter_names() {
650 match registry.export_one(&name, &batch) {
651 Ok(()) => println!(" [{name}] ok"),
652 Err(e) => {
653 all_ok = false;
654 println!(" [{name}] FAIL: {e:#}");
655 }
656 }
657 }
658 if !all_ok {
659 anyhow::bail!("one or more exporters failed (see above)");
660 }
661 println!("telemetry test: all exporters accepted the synthetic event.");
662 Ok(())
663}
664
665fn synthetic_batch(team_id: &str) -> IngestExportBatch {
666 let now_ms = std::time::SystemTime::now()
667 .duration_since(std::time::UNIX_EPOCH)
668 .map(|d| d.as_millis() as u64)
669 .unwrap_or(0);
670 IngestExportBatch::Events(EventsBatchBody {
671 team_id: team_id.to_string(),
672 workspace_hash: "blake3:test-workspace".into(),
673 project_name: Some("telemetry-test".into()),
674 events: vec![OutboundEvent {
675 session_id_hash: "blake3:test-session".into(),
676 event_seq: 0,
677 ts_ms: now_ms,
678 agent: "kaizen".into(),
679 model: "synthetic".into(),
680 kind: "lifecycle".into(),
681 source: "tail".into(),
682 tool: None,
683 tool_call_id: None,
684 tokens_in: Some(0),
685 tokens_out: Some(0),
686 reasoning_tokens: None,
687 cost_usd_e6: None,
688 payload: serde_json::json!({"kaizen.telemetry_test": true}),
689 }],
690 })
691}
692
693pub fn cmd_telemetry_print_schema() -> Result<()> {
695 let v = serde_json::json!({
696 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
697 "event_names": [
698 "kaizen.event",
699 "kaizen.tool_span",
700 "kaizen.repo_snapshot_chunk",
701 "kaizen.workspace_fact_snapshot"
702 ],
703 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
704 });
705 println!("{}", serde_json::to_string_pretty(&v)?);
706 Ok(())
707}
708
709#[cfg(test)]
710mod tests {
711 use super::*;
712
713 #[test]
714 fn exporter_already_present_detects_existing_datadog_row() {
715 let toml = r#"
716[[telemetry.exporters]]
717type = "datadog"
718api_key = "abc"
719site = "us5.datadoghq.com"
720"#;
721 assert!(exporter_already_present(toml, "datadog"));
722 assert!(!exporter_already_present(toml, "posthog"));
723 }
724
725 #[test]
726 fn exporter_already_present_handles_other_tables_between() {
727 let toml = r#"
728[[telemetry.exporters]]
729type = "file"
730enabled = true
731
732[telemetry.query]
733provider = "datadog"
734
735[[telemetry.exporters]]
736type = "datadog"
737api_key = "abc"
738"#;
739 assert!(exporter_already_present(toml, "file"));
740 assert!(exporter_already_present(toml, "datadog"));
741 assert!(!exporter_already_present(toml, "otlp"));
742 }
743
744 #[test]
745 fn reject_obvious_app_key_catches_ddapp_prefix() {
746 assert!(reject_obvious_app_key("ddapp_FjBvwn3GKN8C6jiqltnbK0UHdUEs3gmlP1").is_some());
747 assert!(reject_obvious_app_key("not_ddapp_plain_api_key_value").is_none());
748 }
749
750 #[test]
751 fn ensure_query_authority_appends_when_missing() {
752 let dir = tempfile::TempDir::new().unwrap();
753 let p = dir.path().join("config.toml");
754 std::fs::write(
755 &p,
756 "[[telemetry.exporters]]\ntype = \"datadog\"\napi_key = \"abc\"\n",
757 )
758 .unwrap();
759 ensure_query_authority(&p, "datadog").unwrap();
760 let s = std::fs::read_to_string(&p).unwrap();
761 assert!(s.contains("[telemetry.query]"));
762 assert!(s.contains("provider = \"datadog\""));
763 }
764
765 #[test]
766 fn ensure_query_authority_idempotent_when_present() {
767 let dir = tempfile::TempDir::new().unwrap();
768 let p = dir.path().join("config.toml");
769 let original = "[[telemetry.exporters]]\ntype = \"datadog\"\n\n[telemetry.query]\nprovider = \"posthog\"\n";
770 std::fs::write(&p, original).unwrap();
771 ensure_query_authority(&p, "datadog").unwrap();
772 let s = std::fs::read_to_string(&p).unwrap();
773 assert_eq!(s, original);
775 }
776}