1use crate::collect::tail::claude::scan_claude_session_dir;
5use crate::collect::tail::codex::scan_codex_session_dir;
6use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace;
7use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace;
8use crate::collect::tail::cursor::scan_session_dir_all;
9use crate::collect::tail::goose::scan_goose_workspace;
10use crate::collect::tail::opencode::scan_opencode_workspace;
11use crate::core::config;
12use crate::core::event::{Event, SessionRecord};
13use crate::metrics::report;
14use crate::shell::fmt::fmt_ts;
15use crate::shell::scope;
16use crate::store::{SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, Store};
17use anyhow::Result;
18use serde::Serialize;
19use std::collections::HashMap;
20use std::io::IsTerminal;
21use std::path::{Path, PathBuf};
22
23pub use crate::shell::init::cmd_init;
24pub use crate::shell::insights::cmd_insights;
25
26#[derive(Serialize)]
27struct SessionsListJson {
28 workspace: String,
29 #[serde(skip_serializing_if = "Vec::is_empty")]
30 workspaces: Vec<String>,
31 count: usize,
32 sessions: Vec<SessionRecord>,
33}
34
35#[derive(Serialize)]
36struct SummaryJsonOut {
37 workspace: String,
38 #[serde(skip_serializing_if = "Vec::is_empty")]
39 workspaces: Vec<String>,
40 #[serde(flatten)]
41 stats: crate::store::SummaryStats,
42 cost_usd: f64,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 hotspot: Option<crate::metrics::types::RankedFile>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 slowest_tool: Option<crate::metrics::types::RankedTool>,
47}
48
49struct ScanSpinner(Option<indicatif::ProgressBar>);
50
51impl ScanSpinner {
52 fn start(msg: &'static str) -> Self {
53 if !std::io::stdout().is_terminal() {
54 return Self(None);
55 }
56 let p = indicatif::ProgressBar::new_spinner();
57 p.set_message(msg.to_string());
58 p.enable_steady_tick(std::time::Duration::from_millis(120));
59 Self(Some(p))
60 }
61}
62
63impl Drop for ScanSpinner {
64 fn drop(&mut self) {
65 if let Some(p) = self.0.take() {
66 p.finish_and_clear();
67 }
68 }
69}
70
71fn now_ms_u64() -> u64 {
72 std::time::SystemTime::now()
73 .duration_since(std::time::UNIX_EPOCH)
74 .unwrap_or_default()
75 .as_millis() as u64
76}
77
78const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000;
80
81pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> {
82 if cfg.retention.hot_days == 0 {
83 return Ok(());
84 }
85 let now = now_ms_u64();
86 if let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS)?
87 && now.saturating_sub(last) < AUTO_PRUNE_INTERVAL_MS
88 {
89 return Ok(());
90 }
91 let cutoff = now.saturating_sub((cfg.retention.hot_days as u64).saturating_mul(86_400_000));
92 store.prune_sessions_started_before(cutoff as i64)?;
93 store.sync_state_set_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS, now)?;
94 Ok(())
95}
96
97pub(crate) fn maybe_scan_all_agents(
99 ws: &Path,
100 cfg: &config::Config,
101 ws_str: &str,
102 store: &Store,
103 refresh: bool,
104) -> Result<()> {
105 let interval_ms = cfg.scan.min_rescan_seconds.saturating_mul(1000);
106 let now = now_ms_u64();
107 if !refresh
108 && interval_ms > 0
109 && let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)?
110 && now.saturating_sub(last) < interval_ms
111 {
112 return Ok(());
113 }
114 scan_all_agents(ws, cfg, ws_str, store)?;
115 store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?;
116 Ok(())
117}
118
119pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> {
120 if !refresh {
121 return Ok(());
122 }
123 let cfg = config::load(workspace)?;
124 let ws_str = workspace.to_string_lossy().to_string();
125 maybe_scan_all_agents(workspace, &cfg, &ws_str, store, true)
126}
127
128fn combine_counts(rows: Vec<Vec<(String, u64)>>) -> Vec<(String, u64)> {
129 let mut counts = HashMap::new();
130 for set in rows {
131 for (key, value) in set {
132 *counts.entry(key).or_insert(0_u64) += value;
133 }
134 }
135 let mut out = counts.into_iter().collect::<Vec<_>>();
136 out.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
137 out
138}
139
140fn workspace_names(roots: &[PathBuf]) -> Vec<String> {
141 roots
142 .iter()
143 .map(|path| path.to_string_lossy().to_string())
144 .collect()
145}
146
147fn open_workspace_store(workspace: &Path) -> Result<Store> {
148 Store::open(&crate::core::workspace::db_path(workspace))
149}
150
151pub fn sessions_list_text(
153 workspace: Option<&Path>,
154 json_out: bool,
155 refresh: bool,
156 all_workspaces: bool,
157) -> Result<String> {
158 let roots = scope::resolve(workspace, all_workspaces)?;
159 let mut sessions = Vec::new();
160 for workspace in &roots {
161 let store = open_workspace_store(workspace)?;
162 maybe_refresh_store(workspace, &store, refresh)?;
163 let ws_str = workspace.to_string_lossy().to_string();
164 sessions.extend(store.list_sessions(&ws_str)?);
165 }
166 sessions.sort_by(|a, b| {
167 b.started_at_ms
168 .cmp(&a.started_at_ms)
169 .then_with(|| a.id.cmp(&b.id))
170 });
171 let scope_label = scope::label(&roots);
172 let workspaces = if roots.len() > 1 {
173 workspace_names(&roots)
174 } else {
175 Vec::new()
176 };
177 if json_out {
178 return Ok(format!(
179 "{}\n",
180 serde_json::to_string_pretty(&SessionsListJson {
181 workspace: scope_label,
182 workspaces,
183 count: sessions.len(),
184 sessions,
185 })?
186 ));
187 }
188 use std::fmt::Write;
189 let mut out = String::new();
190 if roots.len() > 1 {
191 writeln!(&mut out, "Scope: {scope_label}").unwrap();
192 writeln!(&mut out).unwrap();
193 }
194 writeln!(
195 &mut out,
196 "{:<40} {:<10} {:<10} STARTED",
197 "ID", "AGENT", "STATUS"
198 )
199 .unwrap();
200 writeln!(&mut out, "{}", "-".repeat(80)).unwrap();
201 for s in &sessions {
202 writeln!(
203 &mut out,
204 "{:<40} {:<10} {:<10} {}",
205 s.id,
206 s.agent,
207 format!("{:?}", s.status),
208 fmt_ts(s.started_at_ms),
209 )
210 .unwrap();
211 }
212 if sessions.is_empty() {
213 writeln!(&mut out, "(no sessions)").unwrap();
214 sessions_empty_state_hints(&mut out);
215 }
216 Ok(out)
217}
218
219fn sessions_empty_state_hints(out: &mut String) {
220 use std::fmt::Write;
221 let _ = writeln!(out);
222 let _ = writeln!(out, "No sessions found for this workspace. Try:");
223 let _ = writeln!(out, " · `kaizen doctor` — verify config and hooks");
224 let _ = writeln!(out, " · a short agent session in this repo, then re-run");
225 let _ = writeln!(
226 out,
227 " · docs: https://github.com/marquesds/kaizen/blob/main/docs/config.md (sources)"
228 );
229}
230
231pub fn cmd_sessions_list(
233 workspace: Option<&Path>,
234 json_out: bool,
235 refresh: bool,
236 all_workspaces: bool,
237) -> Result<()> {
238 print!(
239 "{}",
240 sessions_list_text(workspace, json_out, refresh, all_workspaces)?
241 );
242 Ok(())
243}
244
245pub fn session_show_text(id: &str, workspace: Option<&Path>) -> Result<String> {
247 let ws = workspace_path(workspace)?;
248 let store = open_workspace_store(&ws)?;
249 use std::fmt::Write;
250 let mut out = String::new();
251 match store.get_session(id)? {
252 Some(s) => {
253 writeln!(&mut out, "id: {}", s.id).unwrap();
254 writeln!(&mut out, "agent: {}", s.agent).unwrap();
255 writeln!(
256 &mut out,
257 "model: {}",
258 s.model.as_deref().unwrap_or("-")
259 )
260 .unwrap();
261 writeln!(&mut out, "workspace: {}", s.workspace).unwrap();
262 writeln!(&mut out, "started_at: {}", fmt_ts(s.started_at_ms)).unwrap();
263 writeln!(
264 &mut out,
265 "ended_at: {}",
266 s.ended_at_ms.map(fmt_ts).unwrap_or_else(|| "-".to_string())
267 )
268 .unwrap();
269 writeln!(&mut out, "status: {:?}", s.status).unwrap();
270 writeln!(&mut out, "trace_path: {}", s.trace_path).unwrap();
271 }
272 None => anyhow::bail!("session not found: {id} — try `kaizen sessions list`"),
273 }
274 let evals = store.list_evals_for_session(id).unwrap_or_default();
275 if !evals.is_empty() {
276 writeln!(&mut out, "evals:").unwrap();
277 for e in &evals {
278 writeln!(
279 &mut out,
280 " {} score={:.2} flagged={} {}",
281 e.rubric_id, e.score, e.flagged, e.rationale
282 )
283 .unwrap();
284 }
285 }
286 Ok(out)
287}
288
289pub fn cmd_session_show(id: &str, workspace: Option<&Path>) -> Result<()> {
291 print!("{}", session_show_text(id, workspace)?);
292 Ok(())
293}
294
295pub fn summary_text(
297 workspace: Option<&Path>,
298 json_out: bool,
299 refresh: bool,
300 all_workspaces: bool,
301 source: crate::core::data_source::DataSource,
302) -> Result<String> {
303 let roots = scope::resolve(workspace, all_workspaces)?;
304 let mut total_cost_usd_e6 = 0_i64;
305 let mut session_count = 0_u64;
306 let mut by_agent = Vec::new();
307 let mut by_model = Vec::new();
308 let mut top_tools = Vec::new();
309 let mut hottest = Vec::new();
310 let mut slowest = Vec::new();
311
312 for workspace in &roots {
313 let cfg = config::load(workspace)?;
314 let store = open_workspace_store(workspace)?;
315 crate::shell::remote_pull::maybe_telemetry_pull(workspace, &store, &cfg, source, refresh)?;
316 maybe_refresh_store(workspace, &store, refresh)?;
317 let ws_str = workspace.to_string_lossy().to_string();
318 let mut stats = store.summary_stats(&ws_str)?;
319 if source != crate::core::data_source::DataSource::Local
320 && let Ok(Some(agg)) =
321 crate::shell::remote_observe::try_remote_event_agg(&store, &cfg, workspace)
322 {
323 stats = crate::shell::remote_observe::merge_summary_stats(stats, &agg, source);
324 }
325 total_cost_usd_e6 += stats.total_cost_usd_e6;
326 session_count += stats.session_count;
327 by_agent.push(stats.by_agent);
328 by_model.push(stats.by_model);
329 top_tools.push(stats.top_tools);
330 if let Ok(metrics) = report::build_report(&store, &ws_str, 7) {
331 if let Some(file) = metrics.hottest_files.first().cloned() {
332 hottest.push(if roots.len() == 1 {
333 file
334 } else {
335 crate::metrics::types::RankedFile {
336 path: scope::decorate_path(workspace, &file.path),
337 ..file
338 }
339 });
340 }
341 if let Some(tool) = metrics.slowest_tools.first().cloned() {
342 slowest.push(tool);
343 }
344 }
345 }
346
347 let stats = crate::store::SummaryStats {
348 session_count,
349 total_cost_usd_e6,
350 by_agent: combine_counts(by_agent),
351 by_model: combine_counts(by_model),
352 top_tools: combine_counts(top_tools),
353 };
354 let cost_dollars = stats.total_cost_usd_e6 as f64 / 1_000_000.0;
355 let hotspot = hottest
356 .into_iter()
357 .max_by(|a, b| a.value.cmp(&b.value).then_with(|| b.path.cmp(&a.path)));
358 let slowest_tool = slowest.into_iter().max_by(|a, b| {
359 a.p95_ms
360 .unwrap_or(0)
361 .cmp(&b.p95_ms.unwrap_or(0))
362 .then_with(|| b.tool.cmp(&a.tool))
363 });
364 let scope_label = scope::label(&roots);
365 let workspaces = if roots.len() > 1 {
366 workspace_names(&roots)
367 } else {
368 Vec::new()
369 };
370 if json_out {
371 return Ok(format!(
372 "{}\n",
373 serde_json::to_string_pretty(&SummaryJsonOut {
374 workspace: scope_label,
375 workspaces,
376 cost_usd: cost_dollars,
377 stats,
378 hotspot,
379 slowest_tool,
380 })?
381 ));
382 }
383 use std::fmt::Write;
384 let mut out = String::new();
385 if roots.len() > 1 {
386 writeln!(&mut out, "Scope: {}", scope::label(&roots)).unwrap();
387 }
388 writeln!(
389 &mut out,
390 "Sessions: {} Cost: ${:.2}",
391 stats.session_count, cost_dollars
392 )
393 .unwrap();
394
395 if !stats.by_agent.is_empty() {
396 let parts: Vec<String> = stats
397 .by_agent
398 .iter()
399 .map(|(a, n)| format!("{a} {n}"))
400 .collect();
401 writeln!(&mut out, "By agent: {}", parts.join(" · ")).unwrap();
402 }
403 if !stats.by_model.is_empty() {
404 let parts: Vec<String> = stats
405 .by_model
406 .iter()
407 .map(|(m, n)| format!("{m} {n}"))
408 .collect();
409 writeln!(&mut out, "By model: {}", parts.join(" · ")).unwrap();
410 }
411 if !stats.top_tools.is_empty() {
412 let parts: Vec<String> = stats
413 .top_tools
414 .iter()
415 .take(5)
416 .map(|(t, n)| format!("{t} {n}"))
417 .collect();
418 writeln!(&mut out, "Top tools: {}", parts.join(" · ")).unwrap();
419 }
420 if let Some(file) = hotspot {
421 writeln!(&mut out, "Hotspot: {} ({})", file.path, file.value).unwrap();
422 }
423 if let Some(tool) = slowest_tool {
424 let p95 = tool
425 .p95_ms
426 .map(|v| format!("{v}ms"))
427 .unwrap_or_else(|| "-".into());
428 writeln!(&mut out, "Slowest: {} p95 {}", tool.tool, p95).unwrap();
429 }
430 Ok(out)
431}
432
433pub fn cmd_summary(
435 workspace: Option<&Path>,
436 json_out: bool,
437 refresh: bool,
438 all_workspaces: bool,
439 source: crate::core::data_source::DataSource,
440) -> Result<()> {
441 print!(
442 "{}",
443 summary_text(workspace, json_out, refresh, all_workspaces, source,)?
444 );
445 Ok(())
446}
447
448pub(crate) fn scan_all_agents(
449 ws: &Path,
450 cfg: &config::Config,
451 ws_str: &str,
452 store: &Store,
453) -> Result<()> {
454 let _spin = ScanSpinner::start("Scanning agent sessions…");
455 let slug = workspace_slug(ws_str);
456 let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf());
457
458 for root in &cfg.scan.roots {
459 let expanded = expand_home(root);
460 let cursor_dir = PathBuf::from(&expanded)
461 .join(&slug)
462 .join("agent-transcripts");
463 scan_agent_dirs(
464 &cursor_dir,
465 store,
466 |p| {
467 scan_session_dir_all(p).map(|sessions| {
468 sessions
469 .into_iter()
470 .map(|(mut r, evs)| {
471 r.workspace = ws_str.to_string();
472 (r, evs)
473 })
474 .collect()
475 })
476 },
477 sync_ctx.as_ref(),
478 )?;
479 }
480
481 let home = std::env::var("HOME").unwrap_or_default();
482
483 let claude_dir = PathBuf::from(&home)
484 .join(".claude/projects")
485 .join(&slug)
486 .join("sessions");
487 scan_agent_dirs(
488 &claude_dir,
489 store,
490 |p| {
491 scan_claude_session_dir(p).map(|(mut r, evs)| {
492 r.workspace = ws_str.to_string();
493 vec![(r, evs)]
494 })
495 },
496 sync_ctx.as_ref(),
497 )?;
498
499 let codex_dir = PathBuf::from(&home).join(".codex/sessions").join(&slug);
500 scan_agent_dirs(
501 &codex_dir,
502 store,
503 |p| {
504 scan_codex_session_dir(p).map(|(mut r, evs)| {
505 r.workspace = ws_str.to_string();
506 vec![(r, evs)]
507 })
508 },
509 sync_ctx.as_ref(),
510 )?;
511
512 let tail = &cfg.sources.tail;
513 let home_pb = PathBuf::from(&home);
514 if tail.goose {
515 let sessions = scan_goose_workspace(&home_pb, ws)?;
516 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
517 }
518 if tail.opencode {
519 let sessions = scan_opencode_workspace(ws)?;
520 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
521 }
522 if tail.copilot_cli {
523 let sessions = scan_copilot_cli_workspace(ws)?;
524 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
525 }
526 if tail.copilot_vscode {
527 let sessions = scan_copilot_vscode_workspace(ws)?;
528 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
529 }
530
531 maybe_auto_prune_after_scan(store, cfg)?;
532 Ok(())
533}
534
535fn persist_session_batch(
536 store: &Store,
537 sessions: Vec<(SessionRecord, Vec<Event>)>,
538 sync_ctx: Option<&crate::sync::SyncIngestContext>,
539) -> Result<()> {
540 for (mut record, events) in sessions {
541 if record.start_commit.is_none() && !record.workspace.is_empty() {
542 let binding = crate::core::repo::binding_for_session(
543 Path::new(&record.workspace),
544 record.started_at_ms,
545 record.ended_at_ms,
546 );
547 record.start_commit = binding.start_commit;
548 record.end_commit = binding.end_commit;
549 record.branch = binding.branch;
550 record.dirty_start = binding.dirty_start;
551 record.dirty_end = binding.dirty_end;
552 record.repo_binding_source = binding.source;
553 }
554 store.upsert_session(&record)?;
555 for ev in events {
556 store.append_event_with_sync(&ev, sync_ctx)?;
557 }
558 }
559 Ok(())
560}
561
562pub(crate) fn scan_agent_dirs<F>(
563 dir: &Path,
564 store: &Store,
565 scanner: F,
566 sync_ctx: Option<&crate::sync::SyncIngestContext>,
567) -> Result<()>
568where
569 F: Fn(&Path) -> Result<Vec<(SessionRecord, Vec<Event>)>>,
570{
571 if !dir.exists() {
572 return Ok(());
573 }
574 for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) {
575 if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
576 continue;
577 }
578 match scanner(&entry.path()) {
579 Ok(sessions) => {
580 for (mut record, events) in sessions {
581 if record.start_commit.is_none() && !record.workspace.is_empty() {
582 let binding = crate::core::repo::binding_for_session(
583 Path::new(&record.workspace),
584 record.started_at_ms,
585 record.ended_at_ms,
586 );
587 record.start_commit = binding.start_commit;
588 record.end_commit = binding.end_commit;
589 record.branch = binding.branch;
590 record.dirty_start = binding.dirty_start;
591 record.dirty_end = binding.dirty_end;
592 record.repo_binding_source = binding.source;
593 }
594 store.upsert_session(&record)?;
595 for ev in events {
596 store.append_event_with_sync(&ev, sync_ctx)?;
597 }
598 }
599 }
600 Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()),
601 }
602 }
603 Ok(())
604}
605
606pub(crate) fn workspace_path(workspace: Option<&Path>) -> Result<PathBuf> {
607 crate::core::workspace::resolve(workspace)
608}
609
610pub(crate) fn workspace_slug(ws: &str) -> String {
613 ws.trim_start_matches('/').replace('/', "-")
614}
615
616pub(crate) fn expand_home(path: &str) -> String {
617 if let (Some(rest), Ok(home)) = (path.strip_prefix("~/"), std::env::var("HOME")) {
618 return format!("{home}/{rest}");
619 }
620 path.to_string()
621}