1use crate::collect::tail::claude::scan_claude_session_dir;
5use crate::collect::tail::codex::scan_codex_session_dir;
6use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace;
7use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace;
8use crate::collect::tail::cursor::scan_session_dir_all;
9use crate::collect::tail::goose::scan_goose_workspace;
10use crate::collect::tail::openclaw::scan_openclaw_workspace;
11use crate::collect::tail::opencode::scan_opencode_workspace;
12use crate::core::config;
13use crate::core::event::{Event, SessionRecord};
14use crate::metrics::report;
15use crate::shell::fmt::fmt_ts;
16use crate::shell::scope;
17use crate::store::{SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, Store};
18use anyhow::Result;
19use serde::Serialize;
20use std::collections::HashMap;
21use std::io::IsTerminal;
22use std::path::{Path, PathBuf};
23
24pub use crate::shell::init::cmd_init;
25pub use crate::shell::insights::cmd_insights;
26
27#[derive(Serialize)]
28struct SessionsListJson {
29 workspace: String,
30 #[serde(skip_serializing_if = "Vec::is_empty")]
31 workspaces: Vec<String>,
32 count: usize,
33 sessions: Vec<SessionRecord>,
34}
35
36#[derive(Serialize)]
37struct SummaryJsonOut {
38 workspace: String,
39 #[serde(skip_serializing_if = "Vec::is_empty")]
40 workspaces: Vec<String>,
41 #[serde(flatten)]
42 stats: crate::store::SummaryStats,
43 cost_usd: f64,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 hotspot: Option<crate::metrics::types::RankedFile>,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 slowest_tool: Option<crate::metrics::types::RankedTool>,
48}
49
50struct ScanSpinner(Option<indicatif::ProgressBar>);
51
52impl ScanSpinner {
53 fn start(msg: &'static str) -> Self {
54 if !std::io::stdout().is_terminal() {
55 return Self(None);
56 }
57 let p = indicatif::ProgressBar::new_spinner();
58 p.set_message(msg.to_string());
59 p.enable_steady_tick(std::time::Duration::from_millis(120));
60 Self(Some(p))
61 }
62}
63
64impl Drop for ScanSpinner {
65 fn drop(&mut self) {
66 if let Some(p) = self.0.take() {
67 p.finish_and_clear();
68 }
69 }
70}
71
72fn now_ms_u64() -> u64 {
73 std::time::SystemTime::now()
74 .duration_since(std::time::UNIX_EPOCH)
75 .unwrap_or_default()
76 .as_millis() as u64
77}
78
79const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000;
81
82pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> {
83 if cfg.retention.hot_days == 0 {
84 return Ok(());
85 }
86 let now = now_ms_u64();
87 if let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS)?
88 && now.saturating_sub(last) < AUTO_PRUNE_INTERVAL_MS
89 {
90 return Ok(());
91 }
92 let cutoff = now.saturating_sub((cfg.retention.hot_days as u64).saturating_mul(86_400_000));
93 store.prune_sessions_started_before(cutoff as i64)?;
94 store.sync_state_set_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS, now)?;
95 Ok(())
96}
97
98pub(crate) fn maybe_scan_all_agents(
100 ws: &Path,
101 cfg: &config::Config,
102 ws_str: &str,
103 store: &Store,
104 refresh: bool,
105) -> Result<()> {
106 let interval_ms = cfg.scan.min_rescan_seconds.saturating_mul(1000);
107 let now = now_ms_u64();
108 if !refresh
109 && interval_ms > 0
110 && let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)?
111 && now.saturating_sub(last) < interval_ms
112 {
113 return Ok(());
114 }
115 scan_all_agents(ws, cfg, ws_str, store)?;
116 store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?;
117 Ok(())
118}
119
120pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> {
121 if !refresh {
122 return Ok(());
123 }
124 let cfg = config::load(workspace)?;
125 let ws_str = workspace.to_string_lossy().to_string();
126 maybe_scan_all_agents(workspace, &cfg, &ws_str, store, true)
127}
128
129fn combine_counts(rows: Vec<Vec<(String, u64)>>) -> Vec<(String, u64)> {
130 let mut counts = HashMap::new();
131 for set in rows {
132 for (key, value) in set {
133 *counts.entry(key).or_insert(0_u64) += value;
134 }
135 }
136 let mut out = counts.into_iter().collect::<Vec<_>>();
137 out.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
138 out
139}
140
141fn workspace_names(roots: &[PathBuf]) -> Vec<String> {
142 roots
143 .iter()
144 .map(|path| path.to_string_lossy().to_string())
145 .collect()
146}
147
148fn open_workspace_store(workspace: &Path) -> Result<Store> {
149 Store::open(&crate::core::workspace::db_path(workspace))
150}
151
152pub fn sessions_list_text(
154 workspace: Option<&Path>,
155 json_out: bool,
156 refresh: bool,
157 all_workspaces: bool,
158) -> Result<String> {
159 let roots = scope::resolve(workspace, all_workspaces)?;
160 let mut sessions = Vec::new();
161 for workspace in &roots {
162 let store = open_workspace_store(workspace)?;
163 maybe_refresh_store(workspace, &store, refresh)?;
164 let ws_str = workspace.to_string_lossy().to_string();
165 sessions.extend(store.list_sessions(&ws_str)?);
166 }
167 sessions.sort_by(|a, b| {
168 b.started_at_ms
169 .cmp(&a.started_at_ms)
170 .then_with(|| a.id.cmp(&b.id))
171 });
172 let scope_label = scope::label(&roots);
173 let workspaces = if roots.len() > 1 {
174 workspace_names(&roots)
175 } else {
176 Vec::new()
177 };
178 if json_out {
179 return Ok(format!(
180 "{}\n",
181 serde_json::to_string_pretty(&SessionsListJson {
182 workspace: scope_label,
183 workspaces,
184 count: sessions.len(),
185 sessions,
186 })?
187 ));
188 }
189 use std::fmt::Write;
190 let mut out = String::new();
191 if roots.len() > 1 {
192 writeln!(&mut out, "Scope: {scope_label}").unwrap();
193 writeln!(&mut out).unwrap();
194 }
195 writeln!(
196 &mut out,
197 "{:<40} {:<10} {:<10} STARTED",
198 "ID", "AGENT", "STATUS"
199 )
200 .unwrap();
201 writeln!(&mut out, "{}", "-".repeat(80)).unwrap();
202 for s in &sessions {
203 writeln!(
204 &mut out,
205 "{:<40} {:<10} {:<10} {}",
206 s.id,
207 s.agent,
208 format!("{:?}", s.status),
209 fmt_ts(s.started_at_ms),
210 )
211 .unwrap();
212 }
213 if sessions.is_empty() {
214 writeln!(&mut out, "(no sessions)").unwrap();
215 sessions_empty_state_hints(&mut out);
216 }
217 Ok(out)
218}
219
220fn sessions_empty_state_hints(out: &mut String) {
221 use std::fmt::Write;
222 let _ = writeln!(out);
223 let _ = writeln!(out, "No sessions found for this workspace. Try:");
224 let _ = writeln!(out, " · `kaizen doctor` — verify config and hooks");
225 let _ = writeln!(out, " · a short agent session in this repo, then re-run");
226 let _ = writeln!(
227 out,
228 " · docs: https://github.com/marquesds/kaizen/blob/main/docs/config.md (sources)"
229 );
230}
231
232pub fn cmd_sessions_list(
234 workspace: Option<&Path>,
235 json_out: bool,
236 refresh: bool,
237 all_workspaces: bool,
238) -> Result<()> {
239 print!(
240 "{}",
241 sessions_list_text(workspace, json_out, refresh, all_workspaces)?
242 );
243 Ok(())
244}
245
246pub fn session_show_text(id: &str, workspace: Option<&Path>) -> Result<String> {
248 let ws = workspace_path(workspace)?;
249 let store = open_workspace_store(&ws)?;
250 use std::fmt::Write;
251 let mut out = String::new();
252 match store.get_session(id)? {
253 Some(s) => {
254 writeln!(&mut out, "id: {}", s.id).unwrap();
255 writeln!(&mut out, "agent: {}", s.agent).unwrap();
256 writeln!(
257 &mut out,
258 "model: {}",
259 s.model.as_deref().unwrap_or("-")
260 )
261 .unwrap();
262 writeln!(&mut out, "workspace: {}", s.workspace).unwrap();
263 writeln!(&mut out, "started_at: {}", fmt_ts(s.started_at_ms)).unwrap();
264 writeln!(
265 &mut out,
266 "ended_at: {}",
267 s.ended_at_ms.map(fmt_ts).unwrap_or_else(|| "-".to_string())
268 )
269 .unwrap();
270 writeln!(&mut out, "status: {:?}", s.status).unwrap();
271 writeln!(&mut out, "trace_path: {}", s.trace_path).unwrap();
272 if let Some(fp) = &s.prompt_fingerprint {
273 writeln!(&mut out, "prompt_fp: {fp}").unwrap();
274 if let Ok(Some(snap)) = store.get_prompt_snapshot(fp) {
275 for f in snap.files() {
276 writeln!(&mut out, " - {}", f.path).unwrap();
277 }
278 }
279 }
280 }
281 None => anyhow::bail!("session not found: {id} — try `kaizen sessions list`"),
282 }
283 let evals = store.list_evals_for_session(id).unwrap_or_default();
284 if !evals.is_empty() {
285 writeln!(&mut out, "evals:").unwrap();
286 for e in &evals {
287 writeln!(
288 &mut out,
289 " {} score={:.2} flagged={} {}",
290 e.rubric_id, e.score, e.flagged, e.rationale
291 )
292 .unwrap();
293 }
294 }
295 let fb = store
296 .feedback_for_sessions(&[id.to_string()])
297 .unwrap_or_default();
298 if let Some(r) = fb.get(id) {
299 let score = r
300 .score
301 .as_ref()
302 .map(|s| s.0.to_string())
303 .unwrap_or_else(|| "-".into());
304 let label = r
305 .label
306 .as_ref()
307 .map(|l| l.to_string())
308 .unwrap_or_else(|| "-".into());
309 writeln!(&mut out, "feedback: score={score} label={label}").unwrap();
310 if let Some(n) = &r.note {
311 writeln!(&mut out, " note: {n}").unwrap();
312 }
313 }
314 Ok(out)
315}
316
317pub fn cmd_session_show(id: &str, workspace: Option<&Path>) -> Result<()> {
319 print!("{}", session_show_text(id, workspace)?);
320 Ok(())
321}
322
323pub fn summary_text(
325 workspace: Option<&Path>,
326 json_out: bool,
327 refresh: bool,
328 all_workspaces: bool,
329 source: crate::core::data_source::DataSource,
330) -> Result<String> {
331 let roots = scope::resolve(workspace, all_workspaces)?;
332 let mut total_cost_usd_e6 = 0_i64;
333 let mut session_count = 0_u64;
334 let mut by_agent = Vec::new();
335 let mut by_model = Vec::new();
336 let mut top_tools = Vec::new();
337 let mut hottest = Vec::new();
338 let mut slowest = Vec::new();
339
340 for workspace in &roots {
341 let cfg = config::load(workspace)?;
342 let store = open_workspace_store(workspace)?;
343 crate::shell::remote_pull::maybe_telemetry_pull(workspace, &store, &cfg, source, refresh)?;
344 maybe_refresh_store(workspace, &store, refresh)?;
345 let ws_str = workspace.to_string_lossy().to_string();
346 let mut stats = store.summary_stats(&ws_str)?;
347 if source != crate::core::data_source::DataSource::Local
348 && let Ok(Some(agg)) =
349 crate::shell::remote_observe::try_remote_event_agg(&store, &cfg, workspace)
350 {
351 stats = crate::shell::remote_observe::merge_summary_stats(stats, &agg, source);
352 }
353 total_cost_usd_e6 += stats.total_cost_usd_e6;
354 session_count += stats.session_count;
355 by_agent.push(stats.by_agent);
356 by_model.push(stats.by_model);
357 top_tools.push(stats.top_tools);
358 if let Ok(metrics) = report::build_report(&store, &ws_str, 7) {
359 if let Some(file) = metrics.hottest_files.first().cloned() {
360 hottest.push(if roots.len() == 1 {
361 file
362 } else {
363 crate::metrics::types::RankedFile {
364 path: scope::decorate_path(workspace, &file.path),
365 ..file
366 }
367 });
368 }
369 if let Some(tool) = metrics.slowest_tools.first().cloned() {
370 slowest.push(tool);
371 }
372 }
373 }
374
375 let stats = crate::store::SummaryStats {
376 session_count,
377 total_cost_usd_e6,
378 by_agent: combine_counts(by_agent),
379 by_model: combine_counts(by_model),
380 top_tools: combine_counts(top_tools),
381 };
382 let cost_dollars = stats.total_cost_usd_e6 as f64 / 1_000_000.0;
383 let hotspot = hottest
384 .into_iter()
385 .max_by(|a, b| a.value.cmp(&b.value).then_with(|| b.path.cmp(&a.path)));
386 let slowest_tool = slowest.into_iter().max_by(|a, b| {
387 a.p95_ms
388 .unwrap_or(0)
389 .cmp(&b.p95_ms.unwrap_or(0))
390 .then_with(|| b.tool.cmp(&a.tool))
391 });
392 let scope_label = scope::label(&roots);
393 let workspaces = if roots.len() > 1 {
394 workspace_names(&roots)
395 } else {
396 Vec::new()
397 };
398 if json_out {
399 return Ok(format!(
400 "{}\n",
401 serde_json::to_string_pretty(&SummaryJsonOut {
402 workspace: scope_label,
403 workspaces,
404 cost_usd: cost_dollars,
405 stats,
406 hotspot,
407 slowest_tool,
408 })?
409 ));
410 }
411 use std::fmt::Write;
412 let mut out = String::new();
413 if roots.len() > 1 {
414 writeln!(&mut out, "Scope: {}", scope::label(&roots)).unwrap();
415 }
416 writeln!(
417 &mut out,
418 "Sessions: {} Cost: ${:.2}",
419 stats.session_count, cost_dollars
420 )
421 .unwrap();
422
423 if !stats.by_agent.is_empty() {
424 let parts: Vec<String> = stats
425 .by_agent
426 .iter()
427 .map(|(a, n)| format!("{a} {n}"))
428 .collect();
429 writeln!(&mut out, "By agent: {}", parts.join(" · ")).unwrap();
430 }
431 if !stats.by_model.is_empty() {
432 let parts: Vec<String> = stats
433 .by_model
434 .iter()
435 .map(|(m, n)| format!("{m} {n}"))
436 .collect();
437 writeln!(&mut out, "By model: {}", parts.join(" · ")).unwrap();
438 }
439 if !stats.top_tools.is_empty() {
440 let parts: Vec<String> = stats
441 .top_tools
442 .iter()
443 .take(5)
444 .map(|(t, n)| format!("{t} {n}"))
445 .collect();
446 writeln!(&mut out, "Top tools: {}", parts.join(" · ")).unwrap();
447 }
448 if let Some(file) = hotspot {
449 writeln!(&mut out, "Hotspot: {} ({})", file.path, file.value).unwrap();
450 }
451 if let Some(tool) = slowest_tool {
452 let p95 = tool
453 .p95_ms
454 .map(|v| format!("{v}ms"))
455 .unwrap_or_else(|| "-".into());
456 writeln!(&mut out, "Slowest: {} p95 {}", tool.tool, p95).unwrap();
457 }
458 Ok(out)
459}
460
461pub fn cmd_summary(
463 workspace: Option<&Path>,
464 json_out: bool,
465 refresh: bool,
466 all_workspaces: bool,
467 source: crate::core::data_source::DataSource,
468) -> Result<()> {
469 print!(
470 "{}",
471 summary_text(workspace, json_out, refresh, all_workspaces, source,)?
472 );
473 Ok(())
474}
475
476pub(crate) fn scan_all_agents(
477 ws: &Path,
478 cfg: &config::Config,
479 ws_str: &str,
480 store: &Store,
481) -> Result<()> {
482 let _spin = ScanSpinner::start("Scanning agent sessions…");
483 let slug = workspace_slug(ws_str);
484 let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf());
485
486 for root in &cfg.scan.roots {
487 let expanded = expand_home(root);
488 let cursor_dir = PathBuf::from(&expanded)
489 .join(&slug)
490 .join("agent-transcripts");
491 scan_agent_dirs(
492 &cursor_dir,
493 store,
494 |p| {
495 scan_session_dir_all(p).map(|sessions| {
496 sessions
497 .into_iter()
498 .map(|(mut r, evs)| {
499 r.workspace = ws_str.to_string();
500 (r, evs)
501 })
502 .collect()
503 })
504 },
505 sync_ctx.as_ref(),
506 )?;
507 }
508
509 let home = std::env::var("HOME").unwrap_or_default();
510
511 let claude_dir = PathBuf::from(&home)
512 .join(".claude/projects")
513 .join(&slug)
514 .join("sessions");
515 scan_agent_dirs(
516 &claude_dir,
517 store,
518 |p| {
519 scan_claude_session_dir(p).map(|(mut r, evs)| {
520 r.workspace = ws_str.to_string();
521 vec![(r, evs)]
522 })
523 },
524 sync_ctx.as_ref(),
525 )?;
526
527 let codex_dir = PathBuf::from(&home).join(".codex/sessions").join(&slug);
528 scan_agent_dirs(
529 &codex_dir,
530 store,
531 |p| {
532 scan_codex_session_dir(p).map(|(mut r, evs)| {
533 r.workspace = ws_str.to_string();
534 vec![(r, evs)]
535 })
536 },
537 sync_ctx.as_ref(),
538 )?;
539
540 let tail = &cfg.sources.tail;
541 let home_pb = PathBuf::from(&home);
542 if tail.goose {
543 let sessions = scan_goose_workspace(&home_pb, ws)?;
544 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
545 }
546 if tail.openclaw {
547 let sessions = scan_openclaw_workspace(ws)?;
548 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
549 }
550 if tail.opencode {
551 let sessions = scan_opencode_workspace(ws)?;
552 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
553 }
554 if tail.copilot_cli {
555 let sessions = scan_copilot_cli_workspace(ws)?;
556 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
557 }
558 if tail.copilot_vscode {
559 let sessions = scan_copilot_vscode_workspace(ws)?;
560 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
561 }
562
563 maybe_auto_prune_after_scan(store, cfg)?;
564 Ok(())
565}
566
567fn persist_session_batch(
568 store: &Store,
569 sessions: Vec<(SessionRecord, Vec<Event>)>,
570 sync_ctx: Option<&crate::sync::SyncIngestContext>,
571) -> Result<()> {
572 for (mut record, events) in sessions {
573 if record.start_commit.is_none() && !record.workspace.is_empty() {
574 let binding = crate::core::repo::binding_for_session(
575 Path::new(&record.workspace),
576 record.started_at_ms,
577 record.ended_at_ms,
578 );
579 record.start_commit = binding.start_commit;
580 record.end_commit = binding.end_commit;
581 record.branch = binding.branch;
582 record.dirty_start = binding.dirty_start;
583 record.dirty_end = binding.dirty_end;
584 record.repo_binding_source = binding.source;
585 }
586 store.upsert_session(&record)?;
587 for ev in events {
588 store.append_event_with_sync(&ev, sync_ctx)?;
589 }
590 }
591 Ok(())
592}
593
594pub(crate) fn scan_agent_dirs<F>(
595 dir: &Path,
596 store: &Store,
597 scanner: F,
598 sync_ctx: Option<&crate::sync::SyncIngestContext>,
599) -> Result<()>
600where
601 F: Fn(&Path) -> Result<Vec<(SessionRecord, Vec<Event>)>>,
602{
603 if !dir.exists() {
604 return Ok(());
605 }
606 for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) {
607 if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
608 continue;
609 }
610 match scanner(&entry.path()) {
611 Ok(sessions) => {
612 for (mut record, events) in sessions {
613 if record.start_commit.is_none() && !record.workspace.is_empty() {
614 let binding = crate::core::repo::binding_for_session(
615 Path::new(&record.workspace),
616 record.started_at_ms,
617 record.ended_at_ms,
618 );
619 record.start_commit = binding.start_commit;
620 record.end_commit = binding.end_commit;
621 record.branch = binding.branch;
622 record.dirty_start = binding.dirty_start;
623 record.dirty_end = binding.dirty_end;
624 record.repo_binding_source = binding.source;
625 }
626 store.upsert_session(&record)?;
627 for ev in events {
628 store.append_event_with_sync(&ev, sync_ctx)?;
629 }
630 }
631 }
632 Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()),
633 }
634 }
635 Ok(())
636}
637
638pub(crate) fn workspace_path(workspace: Option<&Path>) -> Result<PathBuf> {
639 crate::core::workspace::resolve(workspace)
640}
641
642pub(crate) fn workspace_slug(ws: &str) -> String {
645 ws.trim_start_matches('/').replace('/', "-")
646}
647
648pub(crate) fn expand_home(path: &str) -> String {
649 if let (Some(rest), Ok(home)) = (path.strip_prefix("~/"), std::env::var("HOME")) {
650 return format!("{home}/{rest}");
651 }
652 path.to_string()
653}