1use crate::core::config::{self, ExporterConfig, try_team_salt};
5use crate::provider::{PullWindow, from_config as provider_from_config};
6use crate::shell::cli::workspace_path;
7use crate::shell::scope;
8use crate::store::Store;
9use crate::store::remote_cache::{RemoteCacheStore, RemotePullState};
10use crate::sync::canonical::KAIZEN_SCHEMA_VERSION;
11use crate::sync::chunk_events_into_ingest_batches;
12use crate::sync::outbound::outbound_event_from_row;
13use crate::sync::redact::redact_payload;
14use crate::sync::workspace_hash;
15use crate::telemetry::{self, DatadogResolved, OtlpResolved, PostHogResolved};
16use anyhow::{Context, Result};
17use std::io::{BufRead, Write};
18use std::path::Path;
19
20pub fn cmd_telemetry_configure(workspace: Option<&Path>) -> Result<()> {
22 let ws = workspace_path(workspace)?;
23 let home = std::env::var("HOME").context("HOME not set")?;
24 let p = std::path::PathBuf::from(home).join(".kaizen/config.toml");
25 std::fs::create_dir_all(p.parent().unwrap())?;
26
27 println!("Kaizen pluggable telemetry — optional sinks fan-out alongside Kaizen sync.");
28 println!(
29 "This command appends a `[[telemetry.exporters]]` table to {}.",
30 p.display()
31 );
32 print!("Type `posthog`, `datadog`, `otlp`, or `dev` (or empty to abort): ");
33 std::io::stdout().flush()?;
34 let mut line = String::new();
35 std::io::stdin().lock().read_line(&mut line)?;
36 let t = line.trim().to_lowercase();
37 if t.is_empty() {
38 println!("Aborted.");
39 return Ok(());
40 }
41 let block = match t.as_str() {
42 "posthog" => {
43 r#"
44[[telemetry.exporters]]
45type = "posthog"
46# project_api_key = "phc_..." # or set POSTHOG_API_KEY
47# host = "https://us.i.posthog.com"
48"#
49 }
50 "datadog" => {
51 r#"
52[[telemetry.exporters]]
53type = "datadog"
54# api_key = "..." # or set DD_API_KEY
55# site = "datadoghq.com"
56"#
57 }
58 "otlp" => {
59 r#"
60[[telemetry.exporters]]
61type = "otlp"
62# endpoint = "http://127.0.0.1:4318" # or set OTEL_EXPORTER_OTLP_ENDPOINT
63"#
64 }
65 "dev" => {
66 r#"
67[[telemetry.exporters]]
68type = "dev"
69"#
70 }
71 _ => anyhow::bail!("unknown type (use posthog, datadog, otlp, dev)"),
72 };
73
74 let mut f = std::fs::OpenOptions::new()
75 .create(true)
76 .append(true)
77 .open(&p)?;
78 if f.metadata()?.len() > 0 {
79 f.write_all(b"\n")?;
80 }
81 f.write_all(block.as_bytes())?;
82 let _ = ws;
83 println!("Appended. Rebuild with e.g. `--features telemetry-posthog` for PostHog.");
84 Ok(())
85}
86
87pub fn print_effective_config_text(workspace: Option<&Path>) -> Result<String> {
89 let ws = workspace_path(workspace)?;
90 let cfg = config::load(&ws)?;
91 use std::fmt::Write;
92 let mut s = String::new();
93 writeln!(&mut s, "telemetry.fail_open: {}", cfg.telemetry.fail_open).unwrap();
94 for (i, e) in cfg.telemetry.exporters.iter().enumerate() {
95 match e {
96 ExporterConfig::None => writeln!(&mut s, "[{i}] type=none (ignored)").unwrap(),
97 ExporterConfig::Dev { enabled } => {
98 writeln!(&mut s, "[{i}] type=dev enabled={enabled}").unwrap();
99 }
100 ExporterConfig::PostHog { .. } => {
101 let line = if let Some(r) = PostHogResolved::from_config(e) {
102 format!(
103 "[{i}] type=posthog host={} key=<redacted len {}>",
104 r.host,
105 r.project_api_key.len()
106 )
107 } else {
108 format!(
109 "[{i}] type=posthog (unresolved: set POSTHOG_API_KEY or project_api_key)"
110 )
111 };
112 writeln!(&mut s, "{line}").unwrap();
113 }
114 ExporterConfig::Datadog { .. } => {
115 let line = if let Some(r) = DatadogResolved::from_config(e) {
116 format!(
117 "[{i}] type=datadog site={} key=<redacted len {}>",
118 r.site,
119 r.api_key.len()
120 )
121 } else {
122 format!("[{i}] type=datadog (unresolved: set DD_API_KEY or api_key in TOML)")
123 };
124 writeln!(&mut s, "{line}").unwrap();
125 }
126 ExporterConfig::Otlp { .. } => {
127 let line = if let Some(r) = OtlpResolved::from_config(e) {
128 format!("[{i}] type=otlp endpoint={}", r.endpoint)
129 } else {
130 format!("[{i}] type=otlp (unresolved: OTEL_EXPORTER_OTLP_ENDPOINT)")
131 };
132 writeln!(&mut s, "{line}").unwrap();
133 }
134 }
135 }
136 if cfg.telemetry.exporters.is_empty() {
137 writeln!(&mut s, "(no [[telemetry.exporters]] rows)").unwrap();
138 }
139 Ok(s)
140}
141
142pub fn cmd_telemetry_print_effective(workspace: Option<&Path>) -> Result<()> {
143 print!("{}", print_effective_config_text(workspace)?);
144 Ok(())
145}
146
147pub fn cmd_telemetry_init(workspace: Option<&Path>) -> Result<()> {
149 cmd_telemetry_configure(workspace)
150}
151
152pub fn cmd_telemetry_doctor(workspace: Option<&Path>) -> Result<()> {
154 let ws = workspace_path(workspace)?;
155 let cfg = config::load(&ws)?;
156 println!("telemetry.fail_open: {}", cfg.telemetry.fail_open);
157 println!(
158 "telemetry.query.cache_ttl_seconds: {}",
159 cfg.telemetry.query.cache_ttl_seconds
160 );
161 match cfg.telemetry.query.provider {
162 crate::core::config::QueryAuthority::None => println!("telemetry.query.provider: none"),
163 crate::core::config::QueryAuthority::Posthog => {
164 println!("telemetry.query.provider: posthog");
165 }
166 crate::core::config::QueryAuthority::Datadog => {
167 println!("telemetry.query.provider: datadog");
168 }
169 }
170 if let Some(p) = provider_from_config(&cfg.telemetry.query) {
171 match p.health() {
172 Ok(()) => println!("provider health: ok (schema: {})", p.schema_version()),
173 Err(e) => eprintln!("provider health: {e}"),
174 }
175 } else {
176 println!("query provider: (not configured or features disabled; pull disabled)");
177 }
178 println!("\n{}", print_effective_config_text(Some(&ws))?);
179 println!("\nOTLP: export only — no query/pull in v1.");
180 Ok(())
181}
182
183pub fn cmd_telemetry_pull(workspace: Option<&Path>, days: u32) -> Result<()> {
185 let ws = workspace_path(workspace)?;
186 let cfg = config::load(&ws)?;
187 let p = provider_from_config(&cfg.telemetry.query)
188 .ok_or_else(|| anyhow::anyhow!("set [telemetry.query] provider and credentials"))?;
189 let store = Store::open(&ws.join(".kaizen/kaizen.db"))?;
190 let page = p.pull(PullWindow { days }, None)?;
191 if !cfg.sync.team_id.trim().is_empty()
192 && let Some(ctx) = crate::sync::ingest_ctx(&cfg, ws.to_path_buf())
193 && let Some(wh) = crate::sync::smart::workspace_hash_for(&ctx)
194 {
195 match crate::provider::import_pull_page_to_remote(&store, &cfg.sync.team_id, &wh, &page) {
196 Ok(n) if n > 0 => {
197 tracing::debug!(n, "remote_events: imported from provider pull (cmd)")
198 }
199 _ => {}
200 }
201 }
202 let now_ms = std::time::SystemTime::now()
203 .duration_since(std::time::UNIX_EPOCH)
204 .unwrap_or_default()
205 .as_millis() as i64;
206 let label = match cfg.telemetry.query.provider {
207 crate::core::config::QueryAuthority::None => "none",
208 crate::core::config::QueryAuthority::Posthog => "posthog",
209 crate::core::config::QueryAuthority::Datadog => "datadog",
210 };
211 store.set_pull_state(&RemotePullState {
212 query_provider: label.into(),
213 cursor_json: page.next_cursor.unwrap_or_default(),
214 last_success_ms: Some(now_ms),
215 })?;
216 println!("pull: received {} item(s) (page)", page.items.len());
217 Ok(())
218}
219
220pub fn cmd_telemetry_push(
222 workspace: Option<&Path>,
223 all_workspaces: bool,
224 days: u32,
225 dry_run: bool,
226) -> Result<()> {
227 let roots = scope::resolve(workspace, all_workspaces)?;
228 let primary = roots
229 .first()
230 .cloned()
231 .ok_or_else(|| anyhow::anyhow!("no workspace roots"))?;
232 let cfg = config::load(&primary)?;
233 let Some(salt) = try_team_salt(&cfg.sync) else {
234 anyhow::bail!(
235 "telemetry push requires [sync].team_salt_hex (64 hex chars). \
236 The salt hashes session/workspace identifiers and drives payload redaction — \
237 not only for cloud sync."
238 );
239 };
240 let registry = telemetry::load_exporters(&cfg.telemetry);
241 if registry.is_empty() {
242 anyhow::bail!(
243 "no telemetry exporters to push to: enable a feature (e.g. telemetry-dev) and add \
244 [[telemetry.exporters]] with credentials; see `kaizen telemetry print-effective-config`."
245 );
246 }
247 let fail_open = cfg.telemetry.fail_open;
248 let team_id = cfg.sync.team_id.clone();
249 let end_ms = std::time::SystemTime::now()
250 .duration_since(std::time::UNIX_EPOCH)
251 .unwrap_or_default()
252 .as_millis() as u64;
253 let start_ms = end_ms.saturating_sub((days as u64).saturating_mul(86_400_000));
254
255 let mut total_events: u64 = 0;
256 let mut total_batches: u64 = 0;
257
258 for root in &roots {
259 let store = Store::open(&root.join(".kaizen/kaizen.db"))?;
260 let ws_key = root.to_string_lossy().to_string();
261 let rows = store.retro_events_in_window(&ws_key, start_ms, end_ms)?;
262 let wh = workspace_hash(&salt, root.as_path());
263 let outbound: Vec<_> = rows
264 .into_iter()
265 .map(|(session, ev)| {
266 let mut o = outbound_event_from_row(&ev, &session, &salt);
267 redact_payload(&mut o.payload, root.as_path(), &salt);
268 o
269 })
270 .collect();
271 let n = outbound.len() as u64;
272 total_events += n;
273 let batches = chunk_events_into_ingest_batches(team_id.clone(), wh, outbound, &cfg.sync)?;
274 let bcount = batches.len() as u64;
275 total_batches += bcount;
276 if dry_run {
277 eprintln!(
278 "telemetry push (dry-run): {} — {} event(s), {} batch(es)",
279 root.display(),
280 n,
281 bcount
282 );
283 continue;
284 }
285 for batch in batches {
286 registry
287 .fan_out(fail_open, &batch)
288 .with_context(|| format!("telemetry fan-out ({})", batch.kind_name()))?;
289 }
290 eprintln!(
291 "telemetry push: {} — sent {} event(s) in {} batch(es)",
292 root.display(),
293 n,
294 bcount
295 );
296 }
297
298 eprintln!(
299 "telemetry push: total {} event(s), {} batch(es) across {} workspace(s){}",
300 total_events,
301 total_batches,
302 roots.len(),
303 if dry_run { " (dry-run)" } else { "" }
304 );
305 Ok(())
306}
307
308pub fn cmd_telemetry_print_schema() -> Result<()> {
310 let v = serde_json::json!({
311 "kaizen_schema_version": KAIZEN_SCHEMA_VERSION,
312 "event_names": [
313 "kaizen.event",
314 "kaizen.tool_span",
315 "kaizen.repo_snapshot_chunk",
316 "kaizen.workspace_fact_snapshot"
317 ],
318 "note": "Full shapes: see sync::canonical::CanonicalItem and expand_ingest_batch (tests include golden JSON).",
319 });
320 println!("{}", serde_json::to_string_pretty(&v)?);
321 Ok(())
322}