1use crate::collect::tail::claude::scan_claude_session_dir;
5use crate::collect::tail::codex::scan_codex_session_dir;
6use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace;
7use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace;
8use crate::collect::tail::cursor::scan_session_dir_all;
9use crate::collect::tail::goose::scan_goose_workspace;
10use crate::collect::tail::openclaw::scan_openclaw_workspace;
11use crate::collect::tail::opencode::scan_opencode_workspace;
12use crate::core::config;
13use crate::core::event::{Event, SessionRecord};
14use crate::metrics::report;
15use crate::shell::fmt::fmt_ts;
16use crate::shell::scope;
17use crate::store::{SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, Store};
18use anyhow::Result;
19use serde::Serialize;
20use std::collections::HashMap;
21use std::io::IsTerminal;
22use std::path::{Path, PathBuf};
23
24pub use crate::shell::init::cmd_init;
25pub use crate::shell::insights::cmd_insights;
26
27#[derive(Serialize)]
28struct SessionsListJson {
29 workspace: String,
30 #[serde(skip_serializing_if = "Vec::is_empty")]
31 workspaces: Vec<String>,
32 count: usize,
33 sessions: Vec<SessionRecord>,
34}
35
36#[derive(Serialize)]
37struct SummaryJsonOut {
38 workspace: String,
39 #[serde(skip_serializing_if = "Vec::is_empty")]
40 workspaces: Vec<String>,
41 #[serde(flatten)]
42 stats: crate::store::SummaryStats,
43 cost_usd: f64,
44 #[serde(skip_serializing_if = "Option::is_none")]
45 cost_note: Option<String>,
46 #[serde(skip_serializing_if = "Option::is_none")]
47 hotspot: Option<crate::metrics::types::RankedFile>,
48 #[serde(skip_serializing_if = "Option::is_none")]
49 slowest_tool: Option<crate::metrics::types::RankedTool>,
50}
51
52pub(crate) fn summary_needs_cost_rollup_note(session_count: u64, total_cost_usd_e6: i64) -> bool {
54 session_count > 0 && total_cost_usd_e6 == 0
55}
56
57pub(crate) fn cost_rollup_zero_note_paragraph() -> &'static str {
58 "Cost rollup shows $0.00 because stored events have no cost_usd_e6 — common when Cursor agent-transcript lines omit usage/tokens. \
59If you expect non-zero spend, ingest Claude/Codex transcripts with usage, hooks with total_cost_usd, or Kaizen proxy Cost events; run `kaizen summary --refresh` after ingest changes. \
60See docs/usage.md#cost-shows-zero."
61}
62
63pub(crate) fn cost_rollup_zero_doctor_hint() -> &'static str {
64 "Cost rollup $0.00 with sessions but no cost_usd_e6 — often Cursor transcripts without usage; see docs/usage.md#cost-shows-zero"
65}
66
67struct ScanSpinner(Option<indicatif::ProgressBar>);
68
69impl ScanSpinner {
70 fn start(msg: &'static str) -> Self {
71 if !std::io::stdout().is_terminal() {
72 return Self(None);
73 }
74 let p = indicatif::ProgressBar::new_spinner();
75 p.set_message(msg.to_string());
76 p.enable_steady_tick(std::time::Duration::from_millis(120));
77 Self(Some(p))
78 }
79}
80
81impl Drop for ScanSpinner {
82 fn drop(&mut self) {
83 if let Some(p) = self.0.take() {
84 p.finish_and_clear();
85 }
86 }
87}
88
89fn now_ms_u64() -> u64 {
90 std::time::SystemTime::now()
91 .duration_since(std::time::UNIX_EPOCH)
92 .unwrap_or_default()
93 .as_millis() as u64
94}
95
96const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000;
98
99pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> {
100 if cfg.retention.hot_days == 0 {
101 return Ok(());
102 }
103 let now = now_ms_u64();
104 if let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS)?
105 && now.saturating_sub(last) < AUTO_PRUNE_INTERVAL_MS
106 {
107 return Ok(());
108 }
109 let cutoff = now.saturating_sub((cfg.retention.hot_days as u64).saturating_mul(86_400_000));
110 store.prune_sessions_started_before(cutoff as i64)?;
111 store.sync_state_set_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS, now)?;
112 Ok(())
113}
114
115pub(crate) fn maybe_scan_all_agents(
117 ws: &Path,
118 cfg: &config::Config,
119 ws_str: &str,
120 store: &Store,
121 refresh: bool,
122) -> Result<()> {
123 let interval_ms = cfg.scan.min_rescan_seconds.saturating_mul(1000);
124 let now = now_ms_u64();
125 if !refresh
126 && interval_ms > 0
127 && let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)?
128 && now.saturating_sub(last) < interval_ms
129 {
130 return Ok(());
131 }
132 scan_all_agents(ws, cfg, ws_str, store)?;
133 store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?;
134 Ok(())
135}
136
137pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> {
138 if !refresh {
139 return Ok(());
140 }
141 let cfg = config::load(workspace)?;
142 let ws_str = workspace.to_string_lossy().to_string();
143 maybe_scan_all_agents(workspace, &cfg, &ws_str, store, true)
144}
145
146fn combine_counts(rows: Vec<Vec<(String, u64)>>) -> Vec<(String, u64)> {
147 let mut counts = HashMap::new();
148 for set in rows {
149 for (key, value) in set {
150 *counts.entry(key).or_insert(0_u64) += value;
151 }
152 }
153 let mut out = counts.into_iter().collect::<Vec<_>>();
154 out.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
155 out
156}
157
158fn workspace_names(roots: &[PathBuf]) -> Vec<String> {
159 roots
160 .iter()
161 .map(|path| path.to_string_lossy().to_string())
162 .collect()
163}
164
165fn open_workspace_store(workspace: &Path) -> Result<Store> {
166 Store::open(&crate::core::workspace::db_path(workspace))
167}
168
169pub fn sessions_list_text(
171 workspace: Option<&Path>,
172 json_out: bool,
173 refresh: bool,
174 all_workspaces: bool,
175 limit: Option<usize>,
176) -> Result<String> {
177 let roots = scope::resolve(workspace, all_workspaces)?;
178 let mut sessions = Vec::new();
179 if crate::daemon::enabled() && !refresh {
180 for workspace in &roots {
181 let ws_str = workspace.to_string_lossy().to_string();
182 let response =
183 crate::daemon::request_blocking(crate::ipc::DaemonRequest::ListSessions {
184 workspace: ws_str,
185 offset: 0,
186 limit: i64::MAX as usize,
187 filter: crate::store::SessionFilter::default(),
188 })?;
189 match response {
190 crate::ipc::DaemonResponse::Sessions(page) => sessions.extend(page.rows),
191 crate::ipc::DaemonResponse::Error { message, .. } => anyhow::bail!(message),
192 _ => anyhow::bail!("unexpected daemon sessions response"),
193 }
194 }
195 } else {
196 for workspace in &roots {
197 let store = open_workspace_store(workspace)?;
198 maybe_refresh_store(workspace, &store, refresh)?;
199 let ws_str = workspace.to_string_lossy().to_string();
200 sessions.extend(store.list_sessions(&ws_str)?);
201 }
202 }
203 sessions.sort_by(|a, b| {
204 b.started_at_ms
205 .cmp(&a.started_at_ms)
206 .then_with(|| a.id.cmp(&b.id))
207 });
208 if let Some(n) = limit.filter(|&n| n > 0) {
209 sessions.truncate(n);
210 }
211 let scope_label = scope::label(&roots);
212 let workspaces = if roots.len() > 1 {
213 workspace_names(&roots)
214 } else {
215 Vec::new()
216 };
217 if json_out {
218 return Ok(format!(
219 "{}\n",
220 serde_json::to_string_pretty(&SessionsListJson {
221 workspace: scope_label,
222 workspaces,
223 count: sessions.len(),
224 sessions,
225 })?
226 ));
227 }
228 use std::fmt::Write;
229 let mut out = String::new();
230 if roots.len() > 1 {
231 writeln!(&mut out, "Scope: {scope_label}").unwrap();
232 writeln!(&mut out).unwrap();
233 }
234 writeln!(
235 &mut out,
236 "{:<40} {:<10} {:<10} STARTED",
237 "ID", "AGENT", "STATUS"
238 )
239 .unwrap();
240 writeln!(&mut out, "{}", "-".repeat(80)).unwrap();
241 for s in &sessions {
242 writeln!(
243 &mut out,
244 "{:<40} {:<10} {:<10} {}",
245 s.id,
246 s.agent,
247 format!("{:?}", s.status),
248 fmt_ts(s.started_at_ms),
249 )
250 .unwrap();
251 }
252 if sessions.is_empty() {
253 writeln!(&mut out, "(no sessions)").unwrap();
254 sessions_empty_state_hints(&mut out);
255 }
256 Ok(out)
257}
258
259fn sessions_empty_state_hints(out: &mut String) {
260 use std::fmt::Write;
261 let _ = writeln!(out);
262 let _ = writeln!(out, "No sessions found for this workspace. Try:");
263 let _ = writeln!(out, " · `kaizen doctor` — verify config and hooks");
264 let _ = writeln!(out, " · a short agent session in this repo, then re-run");
265 let _ = writeln!(
266 out,
267 " · docs: https://github.com/marquesds/kaizen/blob/main/docs/config.md (sources)"
268 );
269}
270
271pub fn cmd_sessions_list(
273 workspace: Option<&Path>,
274 json_out: bool,
275 refresh: bool,
276 all_workspaces: bool,
277 limit: Option<usize>,
278) -> Result<()> {
279 print!(
280 "{}",
281 sessions_list_text(workspace, json_out, refresh, all_workspaces, limit)?
282 );
283 Ok(())
284}
285
286pub fn session_show_text(id: &str, workspace: Option<&Path>) -> Result<String> {
288 let ws = workspace_path(workspace)?;
289 let store = open_workspace_store(&ws)?;
290 use std::fmt::Write;
291 let mut out = String::new();
292 match store.get_session(id)? {
293 Some(s) => {
294 writeln!(&mut out, "id: {}", s.id).unwrap();
295 writeln!(&mut out, "agent: {}", s.agent).unwrap();
296 writeln!(
297 &mut out,
298 "model: {}",
299 s.model.as_deref().unwrap_or("-")
300 )
301 .unwrap();
302 writeln!(&mut out, "workspace: {}", s.workspace).unwrap();
303 writeln!(&mut out, "started_at: {}", fmt_ts(s.started_at_ms)).unwrap();
304 writeln!(
305 &mut out,
306 "ended_at: {}",
307 s.ended_at_ms.map(fmt_ts).unwrap_or_else(|| "-".to_string())
308 )
309 .unwrap();
310 writeln!(&mut out, "status: {:?}", s.status).unwrap();
311 writeln!(&mut out, "trace_path: {}", s.trace_path).unwrap();
312 if let Some(fp) = &s.prompt_fingerprint {
313 writeln!(&mut out, "prompt_fp: {fp}").unwrap();
314 if let Ok(Some(snap)) = store.get_prompt_snapshot(fp) {
315 for f in snap.files() {
316 writeln!(&mut out, " - {}", f.path).unwrap();
317 }
318 }
319 }
320 }
321 None => anyhow::bail!("session not found: {id} — try `kaizen sessions list`"),
322 }
323 let evals = store.list_evals_for_session(id).unwrap_or_default();
324 if !evals.is_empty() {
325 writeln!(&mut out, "evals:").unwrap();
326 for e in &evals {
327 writeln!(
328 &mut out,
329 " {} score={:.2} flagged={} {}",
330 e.rubric_id, e.score, e.flagged, e.rationale
331 )
332 .unwrap();
333 }
334 }
335 let fb = store
336 .feedback_for_sessions(&[id.to_string()])
337 .unwrap_or_default();
338 if let Some(r) = fb.get(id) {
339 let score = r
340 .score
341 .as_ref()
342 .map(|s| s.0.to_string())
343 .unwrap_or_else(|| "-".into());
344 let label = r
345 .label
346 .as_ref()
347 .map(|l| l.to_string())
348 .unwrap_or_else(|| "-".into());
349 writeln!(&mut out, "feedback: score={score} label={label}").unwrap();
350 if let Some(n) = &r.note {
351 writeln!(&mut out, " note: {n}").unwrap();
352 }
353 }
354 Ok(out)
355}
356
357pub fn cmd_session_show(id: &str, workspace: Option<&Path>) -> Result<()> {
359 print!("{}", session_show_text(id, workspace)?);
360 Ok(())
361}
362
363pub fn sessions_tree_text(id: &str, max_depth: u32, workspace: Option<&Path>) -> Result<String> {
364 let ws = workspace_path(workspace)?;
365 let store = open_workspace_store(&ws)?;
366 let nodes = store.session_span_tree(id)?;
367 if nodes.is_empty() {
368 if store.get_session(id)?.is_none() {
369 anyhow::bail!("session not found: {id}");
370 }
371 return Ok(format!("(no tool spans for session {id})\n"));
372 }
373 let total_cost: i64 = nodes.iter().map(|n| n.subtree_cost_usd_e6).sum();
374 let mut out = String::new();
375 for node in &nodes {
376 render_node(&mut out, node, 0, max_depth, total_cost);
377 }
378 Ok(out)
379}
380
381fn render_node(
382 out: &mut String,
383 node: &crate::store::span_tree::SpanNode,
384 depth: u32,
385 max_depth: u32,
386 session_total: i64,
387) {
388 use std::fmt::Write;
389 if depth > max_depth {
390 return;
391 }
392 let indent = "│ ".repeat(depth as usize);
393 let prefix = if depth == 0 { "┌─ " } else { "├─ " };
394 let cost_str = match node.span.subtree_cost_usd_e6 {
395 Some(c) => {
396 let pct = if session_total > 0 {
397 c * 100 / session_total
398 } else {
399 0
400 };
401 let flag = if pct > 40 { " ⚡" } else { "" };
402 format!(" ${:.4}{}", c as f64 / 1_000_000.0, flag)
403 }
404 None => String::new(),
405 };
406 writeln!(
407 out,
408 "{}{}{} [{}]{}",
409 indent, prefix, node.span.tool, node.span.status, cost_str
410 )
411 .unwrap();
412 for child in &node.children {
413 render_node(out, child, depth + 1, max_depth, session_total);
414 }
415}
416
417pub fn cmd_sessions_tree_text(
419 id: &str,
420 depth: u32,
421 json: bool,
422 workspace: Option<&Path>,
423) -> Result<String> {
424 if json {
425 let ws = workspace_path(workspace)?;
426 let store = open_workspace_store(&ws)?;
427 let nodes = store.session_span_tree(id)?;
428 Ok(serde_json::to_string_pretty(&nodes)?)
429 } else {
430 sessions_tree_text(id, depth, workspace)
431 }
432}
433
434pub fn cmd_sessions_tree(id: &str, depth: u32, json: bool, workspace: Option<&Path>) -> Result<()> {
436 print!("{}", cmd_sessions_tree_text(id, depth, json, workspace)?);
437 Ok(())
438}
439
440pub fn summary_text(
442 workspace: Option<&Path>,
443 json_out: bool,
444 refresh: bool,
445 all_workspaces: bool,
446 source: crate::core::data_source::DataSource,
447) -> Result<String> {
448 let roots = scope::resolve(workspace, all_workspaces)?;
449 let mut total_cost_usd_e6 = 0_i64;
450 let mut session_count = 0_u64;
451 let mut by_agent = Vec::new();
452 let mut by_model = Vec::new();
453 let mut top_tools = Vec::new();
454 let mut hottest = Vec::new();
455 let mut slowest = Vec::new();
456
457 for workspace in &roots {
458 let cfg = config::load(workspace)?;
459 let store = open_workspace_store(workspace)?;
460 crate::shell::remote_pull::maybe_telemetry_pull(workspace, &store, &cfg, source, refresh)?;
461 maybe_refresh_store(workspace, &store, refresh)?;
462 let ws_str = workspace.to_string_lossy().to_string();
463 let read_store = Store::open_read_only(&crate::core::workspace::db_path(workspace))?;
464 let query = crate::store::query::QueryStore::open(&workspace.join(".kaizen"))?;
465 let mut stats = query.summary_stats(&read_store, &ws_str)?;
466 if source != crate::core::data_source::DataSource::Local
467 && let Ok(Some(agg)) =
468 crate::shell::remote_observe::try_remote_event_agg(&read_store, &cfg, workspace)
469 {
470 stats = crate::shell::remote_observe::merge_summary_stats(stats, &agg, source);
471 }
472 total_cost_usd_e6 += stats.total_cost_usd_e6;
473 session_count += stats.session_count;
474 by_agent.push(stats.by_agent);
475 by_model.push(stats.by_model);
476 top_tools.push(stats.top_tools);
477 if let Ok(metrics) = report::build_report(&read_store, &ws_str, 7) {
478 if let Some(file) = metrics.hottest_files.first().cloned() {
479 hottest.push(if roots.len() == 1 {
480 file
481 } else {
482 crate::metrics::types::RankedFile {
483 path: scope::decorate_path(workspace, &file.path),
484 ..file
485 }
486 });
487 }
488 if let Some(tool) = metrics.slowest_tools.first().cloned() {
489 slowest.push(tool);
490 }
491 }
492 }
493
494 let stats = crate::store::SummaryStats {
495 session_count,
496 total_cost_usd_e6,
497 by_agent: combine_counts(by_agent),
498 by_model: combine_counts(by_model),
499 top_tools: combine_counts(top_tools),
500 };
501 let cost_dollars = stats.total_cost_usd_e6 as f64 / 1_000_000.0;
502 let hotspot = hottest
503 .into_iter()
504 .max_by(|a, b| a.value.cmp(&b.value).then_with(|| b.path.cmp(&a.path)));
505 let slowest_tool = slowest.into_iter().max_by(|a, b| {
506 a.p95_ms
507 .unwrap_or(0)
508 .cmp(&b.p95_ms.unwrap_or(0))
509 .then_with(|| b.tool.cmp(&a.tool))
510 });
511 let scope_label = scope::label(&roots);
512 let workspaces = if roots.len() > 1 {
513 workspace_names(&roots)
514 } else {
515 Vec::new()
516 };
517 let cost_note = summary_needs_cost_rollup_note(stats.session_count, stats.total_cost_usd_e6)
518 .then_some(cost_rollup_zero_note_paragraph().to_string());
519 if json_out {
520 return Ok(format!(
521 "{}\n",
522 serde_json::to_string_pretty(&SummaryJsonOut {
523 workspace: scope_label,
524 workspaces,
525 cost_usd: cost_dollars,
526 stats,
527 cost_note,
528 hotspot,
529 slowest_tool,
530 })?
531 ));
532 }
533 use std::fmt::Write;
534 let mut out = String::new();
535 if roots.len() > 1 {
536 writeln!(&mut out, "Scope: {}", scope::label(&roots)).unwrap();
537 }
538 writeln!(
539 &mut out,
540 "Sessions: {} Cost: ${:.2}",
541 stats.session_count, cost_dollars
542 )
543 .unwrap();
544
545 if !stats.by_agent.is_empty() {
546 let parts: Vec<String> = stats
547 .by_agent
548 .iter()
549 .map(|(a, n)| format!("{a} {n}"))
550 .collect();
551 writeln!(&mut out, "By agent: {}", parts.join(" · ")).unwrap();
552 }
553 if !stats.by_model.is_empty() {
554 let parts: Vec<String> = stats
555 .by_model
556 .iter()
557 .map(|(m, n)| format!("{m} {n}"))
558 .collect();
559 writeln!(&mut out, "By model: {}", parts.join(" · ")).unwrap();
560 }
561 if !stats.top_tools.is_empty() {
562 let parts: Vec<String> = stats
563 .top_tools
564 .iter()
565 .take(5)
566 .map(|(t, n)| format!("{t} {n}"))
567 .collect();
568 writeln!(&mut out, "Top tools: {}", parts.join(" · ")).unwrap();
569 }
570 if let Some(file) = hotspot {
571 writeln!(&mut out, "Hotspot: {} ({})", file.path, file.value).unwrap();
572 }
573 if let Some(tool) = slowest_tool {
574 let p95 = tool
575 .p95_ms
576 .map(|v| format!("{v}ms"))
577 .unwrap_or_else(|| "-".into());
578 writeln!(&mut out, "Slowest: {} p95 {}", tool.tool, p95).unwrap();
579 }
580 if cost_note.is_some() {
581 writeln!(&mut out).unwrap();
582 writeln!(&mut out, "Note: {}", cost_rollup_zero_note_paragraph()).unwrap();
583 }
584 Ok(out)
585}
586
587pub fn cmd_summary(
589 workspace: Option<&Path>,
590 json_out: bool,
591 refresh: bool,
592 all_workspaces: bool,
593 source: crate::core::data_source::DataSource,
594) -> Result<()> {
595 print!(
596 "{}",
597 summary_text(workspace, json_out, refresh, all_workspaces, source,)?
598 );
599 Ok(())
600}
601
602pub(crate) fn scan_all_agents(
603 ws: &Path,
604 cfg: &config::Config,
605 ws_str: &str,
606 store: &Store,
607) -> Result<()> {
608 let _spin = ScanSpinner::start("Scanning agent sessions…");
609 let slug = workspace_slug(ws_str);
610 let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf());
611
612 for root in &cfg.scan.roots {
613 let expanded = expand_home(root);
614 let cursor_dir = PathBuf::from(&expanded)
615 .join(&slug)
616 .join("agent-transcripts");
617 scan_agent_dirs(
618 &cursor_dir,
619 store,
620 |p| {
621 scan_session_dir_all(p).map(|sessions| {
622 sessions
623 .into_iter()
624 .map(|(mut r, evs)| {
625 r.workspace = ws_str.to_string();
626 (r, evs)
627 })
628 .collect()
629 })
630 },
631 sync_ctx.as_ref(),
632 )?;
633 }
634
635 let home = std::env::var("HOME").unwrap_or_default();
636
637 let claude_dir = PathBuf::from(&home)
638 .join(".claude/projects")
639 .join(&slug)
640 .join("sessions");
641 scan_agent_dirs(
642 &claude_dir,
643 store,
644 |p| {
645 scan_claude_session_dir(p).map(|(mut r, evs)| {
646 r.workspace = ws_str.to_string();
647 vec![(r, evs)]
648 })
649 },
650 sync_ctx.as_ref(),
651 )?;
652
653 let codex_dir = PathBuf::from(&home).join(".codex/sessions").join(&slug);
654 scan_agent_dirs(
655 &codex_dir,
656 store,
657 |p| {
658 scan_codex_session_dir(p).map(|(mut r, evs)| {
659 r.workspace = ws_str.to_string();
660 vec![(r, evs)]
661 })
662 },
663 sync_ctx.as_ref(),
664 )?;
665
666 let tail = &cfg.sources.tail;
667 let home_pb = PathBuf::from(&home);
668 if tail.goose {
669 let sessions = scan_goose_workspace(&home_pb, ws)?;
670 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
671 }
672 if tail.openclaw {
673 let sessions = scan_openclaw_workspace(ws)?;
674 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
675 }
676 if tail.opencode {
677 let sessions = scan_opencode_workspace(ws)?;
678 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
679 }
680 if tail.copilot_cli {
681 let sessions = scan_copilot_cli_workspace(ws)?;
682 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
683 }
684 if tail.copilot_vscode {
685 let sessions = scan_copilot_vscode_workspace(ws)?;
686 persist_session_batch(store, sessions, sync_ctx.as_ref())?;
687 }
688
689 maybe_auto_prune_after_scan(store, cfg)?;
690 Ok(())
691}
692
693fn persist_session_batch(
694 store: &Store,
695 sessions: Vec<(SessionRecord, Vec<Event>)>,
696 sync_ctx: Option<&crate::sync::SyncIngestContext>,
697) -> Result<()> {
698 for (mut record, events) in sessions {
699 if record.start_commit.is_none() && !record.workspace.is_empty() {
700 let binding = crate::core::repo::binding_for_session(
701 Path::new(&record.workspace),
702 record.started_at_ms,
703 record.ended_at_ms,
704 );
705 record.start_commit = binding.start_commit;
706 record.end_commit = binding.end_commit;
707 record.branch = binding.branch;
708 record.dirty_start = binding.dirty_start;
709 record.dirty_end = binding.dirty_end;
710 record.repo_binding_source = binding.source;
711 }
712 store.upsert_session(&record)?;
713 let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms);
714 for ev in events {
715 store.append_event_with_sync(&ev, sync_ctx)?;
716 }
717 if record.status == crate::core::event::SessionStatus::Done {
718 store.flush_projector_session(&record.id, flush_ms)?;
719 }
720 }
721 Ok(())
722}
723
724pub(crate) fn scan_agent_dirs<F>(
725 dir: &Path,
726 store: &Store,
727 scanner: F,
728 sync_ctx: Option<&crate::sync::SyncIngestContext>,
729) -> Result<()>
730where
731 F: Fn(&Path) -> Result<Vec<(SessionRecord, Vec<Event>)>>,
732{
733 if !dir.exists() {
734 return Ok(());
735 }
736 for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) {
737 if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
738 continue;
739 }
740 match scanner(&entry.path()) {
741 Ok(sessions) => {
742 for (mut record, events) in sessions {
743 if record.start_commit.is_none() && !record.workspace.is_empty() {
744 let binding = crate::core::repo::binding_for_session(
745 Path::new(&record.workspace),
746 record.started_at_ms,
747 record.ended_at_ms,
748 );
749 record.start_commit = binding.start_commit;
750 record.end_commit = binding.end_commit;
751 record.branch = binding.branch;
752 record.dirty_start = binding.dirty_start;
753 record.dirty_end = binding.dirty_end;
754 record.repo_binding_source = binding.source;
755 }
756 store.upsert_session(&record)?;
757 let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms);
758 for ev in events {
759 store.append_event_with_sync(&ev, sync_ctx)?;
760 }
761 if record.status == crate::core::event::SessionStatus::Done {
762 store.flush_projector_session(&record.id, flush_ms)?;
763 }
764 }
765 }
766 Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()),
767 }
768 }
769 Ok(())
770}
771
772pub(crate) fn workspace_path(workspace: Option<&Path>) -> Result<PathBuf> {
773 crate::core::workspace::resolve(workspace)
774}
775
776pub(crate) fn workspace_slug(ws: &str) -> String {
779 ws.trim_start_matches('/').replace('/', "-")
780}
781
782pub(crate) fn expand_home(path: &str) -> String {
783 if let (Some(rest), Ok(home)) = (path.strip_prefix("~/"), std::env::var("HOME")) {
784 return format!("{home}/{rest}");
785 }
786 path.to_string()
787}
788
789#[cfg(test)]
790mod cost_rollup_note_tests {
791 use super::*;
792
793 #[test]
794 fn needs_note_only_when_sessions_and_zero_cost() {
795 assert!(summary_needs_cost_rollup_note(1, 0));
796 assert!(!summary_needs_cost_rollup_note(0, 0));
797 assert!(!summary_needs_cost_rollup_note(1, 1));
798 }
799
800 #[test]
801 fn paragraph_names_gap_and_doc_anchor() {
802 let s = cost_rollup_zero_note_paragraph();
803 assert!(s.contains("cost_usd_e6"));
804 assert!(s.contains("usage"));
805 assert!(s.contains("docs/usage.md#cost-shows-zero"));
806 }
807}