1use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::Path;
19
20pub fn cmd_telemetry_configure(workspace: Option<&Path>) -> Result<()> {
22 let ws = workspace_path(workspace)?;
23 let home = std::env::var("HOME").context("HOME not set")?;
24 let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
25 std::fs::create_dir_all(p.parent().unwrap())?;
26
27 println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
28 println!(
29 "This command appends a `[[telemetry.exporters]]` table to {}.",
30 p.display()
31 );
32 print!("Type `file`, `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
33 std::io::stdout().flush()?;
34 let mut line = String::new();
35 std::io::stdin().lock().read_line(&mut line)?;
36 let t = line.trim().to_lowercase();
37 if t.is_empty() {
38 println!("Aborted.");
39 return Ok(());
40 }
41 let block = match t.as_str() {
42 "file" => {
43 r#"
44[[telemetry.exporters]]
45type = "file"
46enabled = true
47# path = "telemetry.ndjson" # optional; default .kaizen/telemetry.ndjson under each workspace
48"#
49 }
50 "posthog" => {
51 r#"
52[[telemetry.exporters]]
53type = "posthog"
54# project_api_key = "phc_..." # or set POSTHOG_API_KEY
55# host = "https://us.i.posthog.com"
56"#
57 }
58 "datadog" => {
59 r#"
60[[telemetry.exporters]]
61type = "datadog"
62# api_key = "..." # or set DD_API_KEY
63# site = "datadoghq.com"
64"#
65 }
66 "otlp" => {
67 r#"
68[[telemetry.exporters]]
69type = "otlp"
70# endpoint = "http://127.0.0.1:4318" # or set OTEL_EXPORTER_OTLP_ENDPOINT
71"#
72 }
73 "dev" => {
74 r#"
75[[telemetry.exporters]]
76type = "dev"
77"#
78 }
79 _ => anyhow::bail!("unknown type (use file, posthog, datadog, otlp, dev)"),
80 };
81
82 let mut f = std::fs::OpenOptions::new()
83 .create(true)
84 .append(true)
85 .open(&p)?;
86 if f.metadata()?.len() > 0 {
87 f.write_all(b"\n")?;
88 }
89 f.write_all(block.as_bytes())?;
90 let _ = ws;
91 println!(
92 "Appended. `file` needs no extra feature; use `--features telemetry-posthog` for PostHog."
93 );
94 Ok(())
95}
96
97pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
99 let ws = workspace_path(workspace)?;
100 let cfg = config::load(&ws)?;
101 use std::fmt::Write;
102 let mut s = String::new();
103 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
104 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
105 match e {
106 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
107 ExporterConfig::File { enabled, path } => {
108 let p = path
109 .as_deref()
110 .map(|p| p.to_string())
111 .unwrap_or_else(|| "<workspace>/.kaizen/telemetry.ndjson".into());
112 writeln!(&mut s, "[{i}] type=file enabled={enabled} path={p}").unwrap();
113 }
114 ExporterConfig::Dev { enabled } => {
115 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
116 }
117 ExporterConfig::PostHog { .. } => {
118 let line = if let Some(r) = PostHogResolved::from_config(e) {
119 format!(
120 "[{i}] type=posthog host={} key=<redacted len {}>",
121 r.host,
122 r.project_api_key.len()
123 )
124 } else {
125 format!(
126 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
127 )
128 };
129 writeln!(&mut s, "{line}").unwrap();
130 }
131 ExporterConfig::Datadog { .. } => {
132 let line = if let Some(r) = DatadogResolved::from_config(e) {
133 format!(
134 "[{i}] type=datadog site={} key=<redacted len {}>",
135 r.site,
136 r.api_key.len()
137 )
138 } else {
139 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
140 };
141 writeln!(&mut s, "{line}").unwrap();
142 }
143 ExporterConfig::Otlp { .. } => {
144 let line = if let Some(r) = OtlpResolved::from_config(e) {
145 format!("[{i}] type=otlp endpoint={}", r.endpoint)
146 } else {
147 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
148 };
149 writeln!(&mut s, "{line}").unwrap();
150 }
151 }
152 }
153 if cfg.telemetry.exporters.is_empty() {
154 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
155 }
156 Ok(s)
157}
158
159pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
160 print!("{}", print_effective_config_text(workspace)?);
161 Ok(())
162}
163
164pub fn cmd_telemetry_init(workspace: Option<&Path>) -> Result<()> {
166 cmd_telemetry_configure(workspace)
167}
168
169pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
171 let ws = workspace_path(workspace)?;
172 let cfg = config::load(&ws)?;
173 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
174 println!(
175 "telemetry.query.cache_ttl_seconds: {}",
176 cfg.telemetry.query.cache_ttl_seconds
177 );
178 match cfg.telemetry.query.provider {
179 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
180 crate::core::config::QueryAuthority::Posthog => {
181 println!("telemetry.query.provider: posthog");
182 }
183 crate::core::config::QueryAuthority::Datadog => {
184 println!("telemetry.query.provider: datadog");
185 }
186 }
187 if let Some(p) = provider_from_config(&cfg.telemetry.query) {
188 match p.health() {
189 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
190 Err(e) => eprintln!("provider health: {e}"),
191 }
192 } else {
193 println!("query provider: (not configured or features disabled; pull disabled)");
194 }
195 println!("\n{}", print_effective_config_text(Some(&ws))?);
196 println!("\nOTLP: export only — no query/pull in v1.");
197 Ok(())
198}
199
200pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
202 let ws = workspace_path(workspace)?;
203 let cfg = config::load(&ws)?;
204 let p = provider_from_config(&cfg.telemetry.query)
205 .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
206 let store = Store::open(&ws.join(".kaizen/kaizen.db"))?;
207 let page = p.pull(PullWindow { days }, None)?;
208 if !cfg.sync.team_id.trim().is_empty()
209 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
210 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
211 {
212 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
213 Ok(n) if n > 0 => {
214 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
215 }
216 _ => {}
217 }
218 }
219 let now_ms = std::time::SystemTime::now()
220 .duration_since(std::time::UNIX_EPOCH)
221 .unwrap_or_default()
222 .as_millis() as i64;
223 let label = match cfg.telemetry.query.provider {
224 crate::core::config::QueryAuthority::None => "none",
225 crate::core::config::QueryAuthority::Posthog => "posthog",
226 crate::core::config::QueryAuthority::Datadog => "datadog",
227 };
228 store.set_pull_state(&RemotePullState {
229 query_provider: label.into(),
230 cursor_json: page.next_cursor.unwrap_or_default(),
231 last_success_ms: Some(now_ms),
232 })?;
233 println!("pull: received {} item(s) (page)", page.items.len());
234 Ok(())
235}
236
237pub fn cmd_telemetry_push(
239 workspace: Option<&Path>,
240 all_workspaces: bool,
241 days: u32,
242 dry_run: bool,
243) -> Result<()> {
244 let roots = scope::resolve(workspace, all_workspaces)?;
245 let primary = roots
246 .first()
247 .cloned()
248 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
249 let cfg = config::load(&primary)?;
250 let Some(salt) = try_team_salt(&cfg.sync) else {
251 anyhow::bail!(
252 "telemetry push requires [sync].team_salt_hex (64 hex chars). \
253 The salt hashes session/workspace identifiers and drives payload redaction — \
254 not only for cloud sync."
255 );
256 };
257 let registry = telemetry::load_exporters(&cfg.telemetry, primary.as_path());
258 if registry.is_empty() {
259 anyhow::bail!(
260 "no telemetry exporters to push to: add [[telemetry.exporters]] (e.g. type = \"file\" \
261 needs no extra feature; PostHog/Datadog/OTLP need build features); see \
262 `kaizen telemetry print-effective-config`."
263 );
264 }
265 let fail_open = cfg.telemetry.fail_open;
266 let team_id = cfg.sync.team_id.clone();
267 let end_ms = std::time::SystemTime::now()
268 .duration_since(std::time::UNIX_EPOCH)
269 .unwrap_or_default()
270 .as_millis() as u64;
271 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
272
273 let mut total_events: u64 = 0;
274 let mut total_batches: u64 = 0;
275
276 for root in &roots {
277 let store = Store::open(&root.join(".kaizen/kaizen.db"))?;
278 let ws_key = root.to_string_lossy().to_string();
279 let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
280 let wh = workspace_hash(&salt, root.as_path());
281 let outbound: Vec<_> = rows
282 .into_iter()
283 .map(|(session, ev)| {
284 let mut o = outbound_event_from_row(&ev, &session, &salt);
285 redact_payload(&mut o.payload, root.as_path(), &salt);
286 o
287 })
288 .collect();
289 let n = outbound.len() as u64;
290 total_events += n;
291 let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
292 let bcount = batches.len() as u64;
293 total_batches += bcount;
294 if dry_run {
295 eprintln!(
296 "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
297 root.display(),
298 n,
299 bcount
300 );
301 continue;
302 }
303 for batch in batches {
304 registry
305 .fan_out(fail_open, &batch)
306 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
307 }
308 eprintln!(
309 "telemetry push: {} — sent {} event(s) in {} batch(es)",
310 root.display(),
311 n,
312 bcount
313 );
314 }
315
316 eprintln!(
317 "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
318 total_events,
319 total_batches,
320 roots.len(),
321 if dry_run { " (dry-run)" } else { "" }
322 );
323 Ok(())
324}
325
326pub fn cmd_telemetry_print_schema() -> Result<()> {
328 let v = serde_json::json!({
329 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
330 "event_names": [
331 "kaizen.event",
332 "kaizen.tool_span",
333 "kaizen.repo_snapshot_chunk",
334 "kaizen.workspace_fact_snapshot"
335 ],
336 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
337 });
338 println!("{}", serde_json::to_string_pretty(&v)?);
339 Ok(())
340}