1use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::{Path, PathBuf};
19
20#[derive(Debug, Clone, Default)]
21pub struct ConfigureOptions {
22 pub exporter_type: Option<String>,
23 pub path: Option<PathBuf>,
24}
25
26pub fn cmd_telemetry_configure(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
28 let ws = workspace_path(workspace)?;
29 let home = std::env::var("HOME").context("HOME not set")?;
30 let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
31 std::fs::create_dir_all(p.parent().unwrap())?;
32
33 println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
34 println!(
35 "This command appends a `[[telemetry.exporters]]` table to {}.",
36 p.display()
37 );
38 let t = match options.exporter_type {
39 Some(t) => t.trim().to_lowercase(),
40 None => {
41 print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
42 std::io::stdout().flush()?;
43 let mut line = String::new();
44 std::io::stdin().lock().read_line(&mut line)?;
45 line.trim().to_lowercase()
46 }
47 };
48 if t.is_empty() {
49 println!("Aborted.");
50 return Ok(());
51 }
52 let block = match t.as_str() {
53 "file" => file_exporter_block(options.path.as_deref()),
54 "posthog" => r#"
55[[telemetry.exporters]]
56type = "posthog"
57# project_api_key = "phc_..." # or set POSTHOG_API_KEY
58# host = "https://us.i.posthog.com"
59"#
60 .to_string(),
61 "datadog" => r#"
62[[telemetry.exporters]]
63type = "datadog"
64# api_key = "..." # or set DD_API_KEY
65# site = "datadoghq.com"
66"#
67 .to_string(),
68 "otlp" => r#"
69[[telemetry.exporters]]
70type = "otlp"
71# endpoint = "http://127.0.0.1:4318" # or set OTEL_EXPORTER_OTLP_ENDPOINT
72"#
73 .to_string(),
74 "dev" => r#"
75[[telemetry.exporters]]
76type = "dev"
77"#
78 .to_string(),
79 _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
80 };
81
82 let mut f = std::fs::OpenOptions::new()
83 .create(true)
84 .append(true)
85 .open(&p)?;
86 if f.metadata()?.len() > 0 {
87 f.write_all(b"\n")?;
88 }
89 f.write_all(block.as_bytes())?;
90 let _ = ws;
91 println!(
92 "Appended. `file` needs no extra feature; use `--features telemetry-posthog` for PostHog."
93 );
94 Ok(())
95}
96
97fn file_exporter_block(path: Option<&Path>) -> String {
98 let mut block = String::from(
99 r#"
100[[telemetry.exporters]]
101type = "file"
102enabled = true
103"#,
104 );
105 if let Some(path) = path {
106 use std::fmt::Write as _;
107 let path = path
108 .to_string_lossy()
109 .replace('\\', "\\\\")
110 .replace('"', "\\\"");
111 writeln!(&mut block, "path = \"{path}\"").unwrap();
112 } else {
113 block.push_str(
114 "# path = \"telemetry.ndjson\" # optional; default .kaizen/telemetry.ndjson under each workspace\n",
115 );
116 }
117 block
118}
119
120pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
122 let ws = workspace_path(workspace)?;
123 let cfg = config::load(&ws)?;
124 use std::fmt::Write;
125 let mut s = String::new();
126 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
127 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
128 match e {
129 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
130 ExporterConfig::File { enabled, path } => {
131 let p = path
132 .as_deref()
133 .map(|p| p.to_string())
134 .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
135 writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
136 }
137 ExporterConfig::Dev { enabled } => {
138 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
139 }
140 ExporterConfig::PostHog { .. } => {
141 let line = if let Some(r) = PostHogResolved::from_config(e) {
142 format!(
143 "[{i}] type=posthog host={} key=<redacted len {}>",
144 r.host,
145 r.project_api_key.len()
146 )
147 } else {
148 format!(
149 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
150 )
151 };
152 writeln!(&mut s, "{line}").unwrap();
153 }
154 ExporterConfig::Datadog { .. } => {
155 let line = if let Some(r) = DatadogResolved::from_config(e) {
156 format!(
157 "[{i}] type=datadog site={} key=<redacted len {}>",
158 r.site,
159 r.api_key.len()
160 )
161 } else {
162 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
163 };
164 writeln!(&mut s, "{line}").unwrap();
165 }
166 ExporterConfig::Otlp { .. } => {
167 let line = if let Some(r) = OtlpResolved::from_config(e) {
168 format!("[{i}] type=otlp endpoint={}", r.endpoint)
169 } else {
170 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
171 };
172 writeln!(&mut s, "{line}").unwrap();
173 }
174 }
175 }
176 if cfg.telemetry.exporters.is_empty() {
177 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
178 }
179 Ok(s)
180}
181
182pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
183 print!("{}", print_effective_config_text(workspace)?);
184 Ok(())
185}
186
187pub fn cmd_telemetry_init(workspace: Option<&Path>, options: ConfigureOptions) -> Result<()> {
189 cmd_telemetry_configure(workspace, options)
190}
191
192pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
194 let ws = workspace_path(workspace)?;
195 let cfg = config::load(&ws)?;
196 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
197 println!(
198 "telemetry.query.cache_ttl_seconds: {}",
199 cfg.telemetry.query.cache_ttl_seconds
200 );
201 match cfg.telemetry.query.provider {
202 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
203 crate::core::config::QueryAuthority::Posthog => {
204 println!("telemetry.query.provider: posthog");
205 }
206 crate::core::config::QueryAuthority::Datadog => {
207 println!("telemetry.query.provider: datadog");
208 }
209 }
210 if let Some(p) = provider_from_config(&cfg.telemetry.query) {
211 match p.health() {
212 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
213 Err(e) => eprintln!("provider health: {e}"),
214 }
215 } else {
216 println!("query provider: (not configured or features disabled; pull disabled)");
217 }
218 println!("\n{}", print_effective_config_text(Some(&ws))?);
219 println!("\nOTLP: export only — no query/pull in v1.");
220 Ok(())
221}
222
223pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
225 let ws = workspace_path(workspace)?;
226 let cfg = config::load(&ws)?;
227 let p = provider_from_config(&cfg.telemetry.query)
228 .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
229 let store = Store::open(&crate::core::workspace::db_path(&ws)?)?;
230 let page = p.pull(PullWindow { days }, None)?;
231 if !cfg.sync.team_id.trim().is_empty()
232 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
233 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
234 {
235 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
236 Ok(n) if n > 0 => {
237 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
238 }
239 _ => {}
240 }
241 }
242 let now_ms = std::time::SystemTime::now()
243 .duration_since(std::time::UNIX_EPOCH)
244 .unwrap_or_default()
245 .as_millis() as i64;
246 let label = match cfg.telemetry.query.provider {
247 crate::core::config::QueryAuthority::None => "none",
248 crate::core::config::QueryAuthority::Posthog => "posthog",
249 crate::core::config::QueryAuthority::Datadog => "datadog",
250 };
251 store.set_pull_state(&RemotePullState {
252 query_provider: label.into(),
253 cursor_json: page.next_cursor.unwrap_or_default(),
254 last_success_ms: Some(now_ms),
255 })?;
256 println!("pull: received {} item(s) (page)", page.items.len());
257 Ok(())
258}
259
260pub fn cmd_telemetry_push(
262 workspace: Option<&Path>,
263 all_workspaces: bool,
264 days: u32,
265 dry_run: bool,
266) -> Result<()> {
267 let roots = scope::resolve(workspace, all_workspaces)?;
268 let primary = roots
269 .first()
270 .cloned()
271 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
272 let cfg = config::load(&primary)?;
273 let Some(salt) = try_team_salt(&cfg.sync) else {
274 anyhow::bail!(
275 "telemetry push requires [sync].team_salt_hex (64 hex chars). \
276 The salt hashes session/workspace identifiers and drives payload redaction — \
277 not only for cloud sync."
278 );
279 };
280 let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
281 if registry.is_empty() {
282 anyhow::bail!(
283 "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
284 needs no extra feature; PostHog/Datadog/OTLP need build features); see \
285 `kaizen telemetry print-effective-config`."
286 );
287 }
288 let fail_open = cfg.telemetry.fail_open;
289 let team_id = cfg.sync.team_id.clone();
290 let end_ms = std::time::SystemTime::now()
291 .duration_since(std::time::UNIX_EPOCH)
292 .unwrap_or_default()
293 .as_millis() as u64;
294 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
295
296 let mut total_events: u64 = 0;
297 let mut total_batches: u64 = 0;
298
299 for root in &roots {
300 let store = Store::open(&crate::core::workspace::db_path(root)?)?;
301 let ws_key = root.to_string_lossy().to_string();
302 let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
303 let wh = workspace_hash(&salt, root.as_path());
304 let outbound: Vec<_> = rows
305 .into_iter()
306 .map(|(session, ev)| {
307 let mut o = outbound_event_from_row(&ev, &session, &salt);
308 redact_payload(&mut o.payload, root.as_path(), &salt);
309 o
310 })
311 .collect();
312 let n = outbound.len() as u64;
313 total_events += n;
314 let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
315 let bcount = batches.len() as u64;
316 total_batches += bcount;
317 if dry_run {
318 eprintln!(
319 "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
320 root.display(),
321 n,
322 bcount
323 );
324 continue;
325 }
326 for batch in batches {
327 registry
328 .fan_out(fail_open, &batch)
329 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
330 }
331 eprintln!(
332 "telemetry push: {} — sent {} event(s) in {} batch(es)",
333 root.display(),
334 n,
335 bcount
336 );
337 }
338
339 eprintln!(
340 "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
341 total_events,
342 total_batches,
343 roots.len(),
344 if dry_run { " (dry-run)" } else { "" }
345 );
346 Ok(())
347}
348
349pub fn cmd_telemetry_print_schema() -> Result<()> {
351 let v = serde_json::json!({
352 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
353 "event_names": [
354 "kaizen.event",
355 "kaizen.tool_span",
356 "kaizen.repo_snapshot_chunk",
357 "kaizen.workspace_fact_snapshot"
358 ],
359 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
360 });
361 println!("{}", serde_json::to_string_pretty(&v)?);
362 Ok(())
363}