1use crate::collect::tail::claude::scan_claude_session_dir;
5use crate::collect::tail::codex::scan_codex_session_dir;
6use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace;
7use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace;
8use crate::collect::tail::cursor::scan_session_dir_all;
9use crate::collect::tail::goose::scan_goose_workspace;
10use crate::collect::tail::opencode::scan_opencode_workspace;
11use crate::core::config;
12use crate::core::event::{Event, SessionRecord};
13use crate::metrics::report;
14use crate::shell::fmt::fmt_ts;
15use crate::shell::scope;
16use crate::store::{SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, Store};
17use anyhow::Result;
18use serde::Serialize;
19use std::collections::HashMap;
20use std::io::IsTerminal;
21use std::path::{Path, PathBuf};
22
23pub use crate::shell::init::cmd_init;
24pub use crate::shell::insights::cmd_insights;
25
26#[derive(Serialize)]
27struct SessionsListJson {
28 workspace: String,
29 #[serde(skip_serializing_if = "Vec::is_empty")]
30 workspaces: Vec<String>,
31 count: usize,
32 sessions: Vec<SessionRecord>,
33}
34
35#[derive(Serialize)]
36struct SummaryJsonOut {
37 workspace: String,
38 #[serde(skip_serializing_if = "Vec::is_empty")]
39 workspaces: Vec<String>,
40 #[serde(flatten)]
41 stats: crate::store::SummaryStats,
42 cost_usd: f64,
43 #[serde(skip_serializing_if = "Option::is_none")]
44 hotspot: Option<crate::metrics::types::RankedFile>,
45 #[serde(skip_serializing_if = "Option::is_none")]
46 slowest_tool: Option<crate::metrics::types::RankedTool>,
47}
48
49struct ScanSpinner(Option<indicatif::ProgressBar>);
50
51impl ScanSpinner {
52 fn start(msg: &'static str) -> Self {
53 if !std::io::stdout().is_terminal() {
54 return Self(None);
55 }
56 let p = indicatif::ProgressBar::new_spinner();
57 p.set_message(msg.to_string());
58 p.enable_steady_tick(std::time::Duration::from_millis(120));
59 Self(Some(p))
60 }
61}
62
63impl Drop for ScanSpinner {
64 fn drop(&mut self) {
65 if let Some(p) = self.0.take() {
66 p.finish_and_clear();
67 }
68 }
69}
70
71fn now_ms_u64() -> u64 {
72 std::time::SystemTime::now()
73 .duration_since(std::time::UNIX_EPOCH)
74 .unwrap_or_default()
75 .as_millis() as u64
76}
77
78const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000;
80
81pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> {
82 if cfg.retention.hot_days == 0 {
83 return Ok(());
84 }
85 let now = now_ms_u64();
86 if let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS)?
87 && now.saturating_sub(last) < AUTO_PRUNE_INTERVAL_MS
88 {
89 return Ok(());
90 }
91 let cutoff = now.saturating_sub((cfg.retention.hot_days as u64).saturating_mul(86_400_000));
92 store.prune_sessions_started_before(cutoff as i64)?;
93 store.sync_state_set_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS, now)?;
94 Ok(())
95}
96
97pub(crate) fn maybe_scan_all_agents(
99 ws: &Path,
100 cfg: &config::Config,
101 ws_str: &str,
102 store: &Store,
103 refresh: bool,
104) -> Result<()> {
105 let interval_ms = cfg.scan.min_rescan_seconds.saturating_mul(1000);
106 let now = now_ms_u64();
107 if !refresh
108 && interval_ms > 0
109 && let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)?
110 && now.saturating_sub(last) < interval_ms
111 {
112 return Ok(());
113 }
114 scan_all_agents(ws, cfg, ws_str, store)?;
115 store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?;
116 Ok(())
117}
118
119pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> {
120 if !refresh {
121 return Ok(());
122 }
123 let cfg = config::load(workspace)?;
124 let ws_str = workspace.to_string_lossy().to_string();
125 maybe_scan_all_agents(workspace, &cfg, &ws_str, store, true)
126}
127
128fn combine_counts(rows: Vec<Vec<(String, u64)>>) -> Vec<(String, u64)> {
129 let mut counts = HashMap::new();
130 for set in rows {
131 for (key, value) in set {
132 *counts.entry(key).or_insert(0_u64) += value;
133 }
134 }
135 let mut out = counts.into_iter().collect::<Vec<_>>();
136 out.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
137 out
138}
139
140fn workspace_names(roots: &[PathBuf]) -> Vec<String> {
141 roots
142 .iter()
143 .map(|path| path.to_string_lossy().to_string())
144 .collect()
145}
146
147fn open_workspace_store(workspace: &Path) -> Result<Store> {
148 Store::open(&crate::core::workspace::db_path(workspace))
149}
150
151pub fn sessions_list_text(
153 workspace: Option<&Path>,
154 json_out: bool,
155 refresh: bool,
156 all_workspaces: bool,
157) -> Result<String> {
158 let roots = scope::resolve(workspace, all_workspaces)?;
159 let mut sessions = Vec::new();
160 for workspace in &roots {
161 let store = open_workspace_store(workspace)?;
162 maybe_refresh_store(workspace, &store, refresh)?;
163 let ws_str = workspace.to_string_lossy().to_string();
164 sessions.extend(store.list_sessions(&ws_str)?);
165 }
166 sessions.sort_by(|a, b| {
167 b.started_at_ms
168 .cmp(&a.started_at_ms)
169 .then_with(|| a.id.cmp(&b.id))
170 });
171 let scope_label = scope::label(&roots);
172 let workspaces = if roots.len() > 1 {
173 workspace_names(&roots)
174 } else {
175 Vec::new()
176 };
177 if json_out {
178 return Ok(format!(
179 "{}\n",
180 serde_json::to_string_pretty(&SessionsListJson {
181 workspace: scope_label,
182 workspaces,
183 count: sessions.len(),
184 sessions,
185 })?
186 ));
187 }
188 use std::fmt::Write;
189 let mut out = String::new();
190 if roots.len() > 1 {
191 writeln!(&mut out, "Scope: {scope_label}").unwrap();
192 writeln!(&mut out).unwrap();
193 }
194 writeln!(
195 &mut out,
196 "{:<40} {:<10} {:<10} STARTED",
197 "ID", "AGENT", "STATUS"
198 )
199 .unwrap();
200 writeln!(&mut out, "{}", "-".repeat(80)).unwrap();
201 for s in &sessions {
202 writeln!(
203 &mut out,
204 "{:<40} {:<10} {:<10} {}",
205 s.id,
206 s.agent,
207 format!("{:?}", s.status),
208 fmt_ts(s.started_at_ms),
209 )
210 .unwrap();
211 }
212 if sessions.is_empty() {
213 writeln!(&mut out, "(no sessions)").unwrap();
214 sessions_empty_state_hints(&mut out);
215 }
216 Ok(out)
217}
218
219fn sessions_empty_state_hints(out: &mut String) {
220 use std::fmt::Write;
221 let _ = writeln!(out);
222 let _ = writeln!(out, "No sessions found for this workspace. Try:");
223 let _ = writeln!(out, " · `kaizen doctor` — verify config and hooks");
224 let _ = writeln!(out, " · a short agent session in this repo, then re-run");
225 let _ = writeln!(
226 out,
227 " · docs: https://github.com/marquesds/kaizen/blob/main/docs/config.md (sources)"
228 );
229}
230
231pub fn cmd_sessions_list(
233 workspace: Option<&Path>,
234 json_out: bool,
235 refresh: bool,
236 all_workspaces: bool,
237) -> Result<()> {
238 print!(
239 "{}",
240 sessions_list_text(workspace, json_out, refresh, all_workspaces)?
241 );
242 Ok(())
243}
244
245pub fn session_show_text(id: &str, workspace: Option<&Path>) -> Result<String> {
247 let ws = workspace_path(workspace)?;
248 let store = open_workspace_store(&ws)?;
249 use std::fmt::Write;
250 let mut out = String::new();
251 match store.get_session(id)? {
252 Some(s) => {
253 writeln!(&mut out, "id: {}", s.id).unwrap();
254 writeln!(&mut out, "agent: {}", s.agent).unwrap();
255 writeln!(
256 &mut out,
257 "model: {}",
258 s.model.as_deref().unwrap_or("-")
259 )
260 .unwrap();
261 writeln!(&mut out, "workspace: {}", s.workspace).unwrap();
262 writeln!(&mut out, "started_at: {}", fmt_ts(s.started_at_ms)).unwrap();
263 writeln!(
264 &mut out,
265 "ended_at: {}",
266 s.ended_at_ms.map(fmt_ts).unwrap_or_else(|| "-".to_string())
267 )
268 .unwrap();
269 writeln!(&mut out, "status: {:?}", s.status).unwrap();
270 writeln!(&mut out, "trace_path: {}", s.trace_path).unwrap();
271 }
272 None => anyhow::bail!("session not found: {id} — try `kaizen sessions list`"),
273 }
274 Ok(out)
275}
276
277pub fn cmd_session_show(id: &str, workspace: Option<&Path>) -> Result<()> {
279 print!("{}", session_show_text(id, workspace)?);
280 Ok(())
281}
282
283pub fn summary_text(
285 workspace: Option<&Path>,
286 json_out: bool,
287 refresh: bool,
288 all_workspaces: bool,
289 source: crate::core::data_source::DataSource,
290) -> Result<String> {
291 let roots = scope::resolve(workspace, all_workspaces)?;
292 let mut total_cost_usd_e6 = 0_i64;
293 let mut session_count = 0_u64;
294 let mut by_agent = Vec::new();
295 let mut by_model = Vec::new();
296 let mut top_tools = Vec::new();
297 let mut hottest = Vec::new();
298 let mut slowest = Vec::new();
299
300 for workspace in &roots {
301 let cfg = config::load(workspace)?;
302 let store = open_workspace_store(workspace)?;
303 crate::shell::remote_pull::maybe_telemetry_pull(workspace, &store, &cfg, source, refresh)?;
304 maybe_refresh_store(workspace, &store, refresh)?;
305 let ws_str = workspace.to_string_lossy().to_string();
306 let mut stats = store.summary_stats(&ws_str)?;
307 if source != crate::core::data_source::DataSource::Local
308 && let Ok(Some(agg)) =
309 crate::shell::remote_observe::try_remote_event_agg(&store, &cfg, workspace)
310 {
311 stats = crate::shell::remote_observe::merge_summary_stats(stats, &agg, source);
312 }
313 total_cost_usd_e6 += stats.total_cost_usd_e6;
314 session_count += stats.session_count;
315 by_agent.push(stats.by_agent);
316 by_model.push(stats.by_model);
317 top_tools.push(stats.top_tools);
318 if let Ok(metrics) = report::build_report(&store, &ws_str, 7) {
319 if let Some(file) = metrics.hottest_files.first().cloned() {
320 hottest.push(if roots.len() == 1 {
321 file
322 } else {
323 crate::metrics::types::RankedFile {
324 path: scope::decorate_path(workspace, &file.path),
325 ..file
326 }
327 });
328 }
329 if let Some(tool) = metrics.slowest_tools.first().cloned() {
330 slowest.push(tool);
331 }
332 }
333 }
334
335 let stats = crate::store::SummaryStats {
336 session_count,
337 total_cost_usd_e6,
338 by_agent: combine_counts(by_agent),
339 by_model: combine_counts(by_model),
340 top_tools: combine_counts(top_tools),
341 };
342 let cost_dollars = stats.total_cost_usd_e6 as f64 / 1_000_000.0;
343 let hotspot = hottest
344 .into_iter()
345 .max_by(|a, b| a.value.cmp(&b.value).then_with(|| b.path.cmp(&a.path)));
346 let slowest_tool = slowest.into_iter().max_by(|a, b| {
347 a.p95_ms
348 .unwrap_or(0)
349 .cmp(&b.p95_ms.unwrap_or(0))
350 .then_with(|| b.tool.cmp(&a.tool))
351 });
352 let scope_label = scope::label(&roots);
353 let workspaces = if roots.len() > 1 {
354 workspace_names(&roots)
355 } else {
356 Vec::new()
357 };
358 if json_out {
359 return Ok(format!(
360 "{}\n",
361 serde_json::to_string_pretty(&SummaryJsonOut {
362 workspace: scope_label,
363 workspaces,
364 cost_usd: cost_dollars,
365 stats,
366 hotspot,
367 slowest_tool,
368 })?
369 ));
370 }
371 use std::fmt::Write;
372 let mut out = String::new();
373 if roots.len() > 1 {
374 writeln!(&mut out, "Scope: {}", scope::label(&roots)).unwrap();
375 }
376 writeln!(
377 &mut out,
378 "Sessions: {} Cost: ${:.2}",
379 stats.session_count, cost_dollars
380 )
381 .unwrap();
382
383 if !stats.by_agent.is_empty() {
384 let parts: Vec<String> = stats
385 .by_agent
386 .iter()
387 .map(|(a, n)| format!("{a} {n}"))
388 .collect();
389 writeln!(&mut out, "By agent: {}", parts.join(" · ")).unwrap();
390 }
391 if !stats.by_model.is_empty() {
392 let parts: Vec<String> = stats
393 .by_model
394 .iter()
395 .map(|(m, n)| format!("{m} {n}"))
396 .collect();
397 writeln!(&mut out, "By model: {}", parts.join(" · ")).unwrap();
398 }
399 if !stats.top_tools.is_empty() {
400 let parts: Vec<String> = stats
401 .top_tools
402 .iter()
403 .take(5)
404 .map(|(t, n)| format!("{t} {n}"))
405 .collect();
406 writeln!(&mut out, "Top tools: {}", parts.join(" · ")).unwrap();
407 }
408 if let Some(file) = hotspot {
409 writeln!(&mut out, "Hotspot: {} ({})", file.path, file.value).unwrap();
410 }
411 if let Some(tool) = slowest_tool {
412 let p95 = tool
413 .p95_ms
414 .map(|v| format!("{v}ms"))
415 .unwrap_or_else(|| "-".into());
416 writeln!(&mut out, "Slowest: {} p95 {}", tool.tool, p95).unwrap();
417 }
418 Ok(out)
419}
420
421pub fn cmd_summary(
423 workspace: Option<&Path>,
424 json_out: bool,
425 refresh: bool,
426 all_workspaces: bool,
427 source: crate::core::data_source::DataSource,
428) -> Result<()> {
429 print!(
430 "{}",
431 summary_text(workspace, json_out, refresh, all_workspaces, source,)?
432 );
433 Ok(())
434}
435
436pub(crate) fn scan_all_agents(
437 ws: &Path,
438 cfg: &config::Config,
439 ws_str: &str,
440 store: &Store,
441) -> Result<()> {
442 let _spin = ScanSpinner::start("Scanning agent sessions…");
443 let slug = workspace_slug(ws_str);
444 let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf());
445
446 for root in &cfg.scan.roots {
447 let expanded = expand_home(root);
448 let cursor_dir = PathBuf::from(&expanded)
449 .join(&slug)
450 .join("agent-transcripts");
451 scan_agent_dirs(
452 &cursor_dir,
453 store,
454 |p| {
455 scan_session_dir_all(p).map(|sessions| {
456 sessions
457 .into_iter()
458 .map(|(mut r, evs)| {
459 r.workspace = ws_str.to_string();
460 (r, evs)
461 })
462 .collect()
463 })
464 },
465 sync_ctx.as_ref(),
466 )?;
467 }
468
469 let home = std::env::var("HOME").unwrap_or_default();
470
471 let claude_dir = PathBuf::from(&home)
472 .join(".claude/projects")
473 .join(&slug)
474 .join("sessions");
475 scan_agent_dirs(
476 &claude_dir,
477 store,
478 |p| {
479 scan_claude_session_dir(p).map(|(mut r, evs)| {
480 r.workspace = ws_str.to_string();
481 vec![(r, evs)]
482 })
483 },
484 sync_ctx.as_ref(),
485 )?;
486
487 let codex_dir = PathBuf::from(&home).join(".codex/sessions").join(&slug);
488 scan_agent_dirs(
489 &codex_dir,
490 store,
491 |p| {
492 scan_codex_session_dir(p).map(|(mut r, evs)| {
493 r.workspace = ws_str.to_string();
494 vec![(r, evs)]
495 })
496 },
497 sync_ctx.as_ref(),
498 )?;
499
500 let tail = &cfg.sources.tail;
501 let home_pb = PathBuf::from(&home);
502 if tail.goose {
503 let sessions = scan_goose_workspace(&home_pb, ws)?;
504 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
505 }
506 if tail.opencode {
507 let sessions = scan_opencode_workspace(ws)?;
508 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
509 }
510 if tail.copilot_cli {
511 let sessions = scan_copilot_cli_workspace(ws)?;
512 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
513 }
514 if tail.copilot_vscode {
515 let sessions = scan_copilot_vscode_workspace(ws)?;
516 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
517 }
518
519 maybe_auto_prune_after_scan(store, cfg)?;
520 Ok(())
521}
522
523fn persist_session_batch(
524 store: &Store,
525 sessions: Vec<(SessionRecord, Vec<Event>)>,
526 sync_ctx: Option<&crate::sync::SyncIngestContext>,
527) -> Result<()> {
528 for (mut record, events) in sessions {
529 if record.start_commit.is_none() && !record.workspace.is_empty() {
530 let binding = crate::core::repo::binding_for_session(
531 Path::new(&record.workspace),
532 record.started_at_ms,
533 record.ended_at_ms,
534 );
535 record.start_commit = binding.start_commit;
536 record.end_commit = binding.end_commit;
537 record.branch = binding.branch;
538 record.dirty_start = binding.dirty_start;
539 record.dirty_end = binding.dirty_end;
540 record.repo_binding_source = binding.source;
541 }
542 store.upsert_session(&record)?;
543 for ev in events {
544 store.append_event_with_sync(&ev, sync_ctx)?;
545 }
546 }
547 Ok(())
548}
549
550pub(crate) fn scan_agent_dirs<F>(
551 dir: &Path,
552 store: &Store,
553 scanner: F,
554 sync_ctx: Option<&crate::sync::SyncIngestContext>,
555) -> Result<()>
556where
557 F: Fn(&Path) -> Result<Vec<(SessionRecord, Vec<Event>)>>,
558{
559 if !dir.exists() {
560 return Ok(());
561 }
562 for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) {
563 if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
564 continue;
565 }
566 match scanner(&entry.path()) {
567 Ok(sessions) => {
568 for (mut record, events) in sessions {
569 if record.start_commit.is_none() && !record.workspace.is_empty() {
570 let binding = crate::core::repo::binding_for_session(
571 Path::new(&record.workspace),
572 record.started_at_ms,
573 record.ended_at_ms,
574 );
575 record.start_commit = binding.start_commit;
576 record.end_commit = binding.end_commit;
577 record.branch = binding.branch;
578 record.dirty_start = binding.dirty_start;
579 record.dirty_end = binding.dirty_end;
580 record.repo_binding_source = binding.source;
581 }
582 store.upsert_session(&record)?;
583 for ev in events {
584 store.append_event_with_sync(&ev, sync_ctx)?;
585 }
586 }
587 }
588 Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()),
589 }
590 }
591 Ok(())
592}
593
594pub(crate) fn workspace_path(workspace: Option<&Path>) -> Result<PathBuf> {
595 crate::core::workspace::resolve(workspace)
596}
597
598pub(crate) fn workspace_slug(ws: &str) -> String {
601 ws.trim_start_matches('/').replace('/', "-")
602}
603
604pub(crate) fn expand_home(path: &str) -> String {
605 if let (Some(rest), Ok(home)) = (path.strip_prefix("~/"), std::env::var("HOME")) {
606 return format!("{home}/{rest}");
607 }
608 path.to_string()
609}