Skip to main content

kaizen/shell/
cli.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2//! CLI command implementations.
3
4use crate::collect::tail::claude::scan_claude_session_dir;
5use crate::collect::tail::codex::scan_codex_session_dir;
6use crate::collect::tail::copilot_cli::scan_copilot_cli_workspace;
7use crate::collect::tail::copilot_vscode::scan_copilot_vscode_workspace;
8use crate::collect::tail::cursor::scan_session_dir_all;
9use crate::collect::tail::goose::scan_goose_workspace;
10use crate::collect::tail::openclaw::scan_openclaw_workspace;
11use crate::collect::tail::opencode::scan_opencode_workspace;
12use crate::core::config;
13use crate::core::event::{Event, SessionRecord};
14use crate::metrics::report;
15use crate::shell::fmt::fmt_ts;
16use crate::shell::scope;
17use crate::store::{SYNC_STATE_LAST_AGENT_SCAN_MS, SYNC_STATE_LAST_AUTO_PRUNE_MS, Store};
18use anyhow::Result;
19use serde::Serialize;
20use std::collections::HashMap;
21use std::io::IsTerminal;
22use std::path::{Path, PathBuf};
23
24pub use crate::shell::init::cmd_init;
25pub use crate::shell::insights::cmd_insights;
26
27#[derive(Serialize)]
28struct SessionsListJson {
29    workspace: String,
30    #[serde(skip_serializing_if = "Vec::is_empty")]
31    workspaces: Vec<String>,
32    count: usize,
33    sessions: Vec<SessionRecord>,
34}
35
36#[derive(Serialize)]
37struct SummaryJsonOut {
38    workspace: String,
39    #[serde(skip_serializing_if = "Vec::is_empty")]
40    workspaces: Vec<String>,
41    #[serde(flatten)]
42    stats: crate::store::SummaryStats,
43    cost_usd: f64,
44    #[serde(skip_serializing_if = "Option::is_none")]
45    hotspot: Option<crate::metrics::types::RankedFile>,
46    #[serde(skip_serializing_if = "Option::is_none")]
47    slowest_tool: Option<crate::metrics::types::RankedTool>,
48}
49
50struct ScanSpinner(Option<indicatif::ProgressBar>);
51
52impl ScanSpinner {
53    fn start(msg: &'static str) -> Self {
54        if !std::io::stdout().is_terminal() {
55            return Self(None);
56        }
57        let p = indicatif::ProgressBar::new_spinner();
58        p.set_message(msg.to_string());
59        p.enable_steady_tick(std::time::Duration::from_millis(120));
60        Self(Some(p))
61    }
62}
63
64impl Drop for ScanSpinner {
65    fn drop(&mut self) {
66        if let Some(p) = self.0.take() {
67            p.finish_and_clear();
68        }
69    }
70}
71
72fn now_ms_u64() -> u64 {
73    std::time::SystemTime::now()
74        .duration_since(std::time::UNIX_EPOCH)
75        .unwrap_or_default()
76        .as_millis() as u64
77}
78
79/// Minimum interval between automatic local DB prunes after a successful rescan (24h).
80const AUTO_PRUNE_INTERVAL_MS: u64 = 86_400_000;
81
82pub(crate) fn maybe_auto_prune_after_scan(store: &Store, cfg: &config::Config) -> Result<()> {
83    if cfg.retention.hot_days == 0 {
84        return Ok(());
85    }
86    let now = now_ms_u64();
87    if let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS)?
88        && now.saturating_sub(last) < AUTO_PRUNE_INTERVAL_MS
89    {
90        return Ok(());
91    }
92    let cutoff = now.saturating_sub((cfg.retention.hot_days as u64).saturating_mul(86_400_000));
93    store.prune_sessions_started_before(cutoff as i64)?;
94    store.sync_state_set_u64(SYNC_STATE_LAST_AUTO_PRUNE_MS, now)?;
95    Ok(())
96}
97
98/// Full transcript rescan unless throttled by `[scan].min_rescan_seconds` or `refresh` is true.
99pub(crate) fn maybe_scan_all_agents(
100    ws: &Path,
101    cfg: &config::Config,
102    ws_str: &str,
103    store: &Store,
104    refresh: bool,
105) -> Result<()> {
106    let interval_ms = cfg.scan.min_rescan_seconds.saturating_mul(1000);
107    let now = now_ms_u64();
108    if !refresh
109        && interval_ms > 0
110        && let Some(last) = store.sync_state_get_u64(SYNC_STATE_LAST_AGENT_SCAN_MS)?
111        && now.saturating_sub(last) < interval_ms
112    {
113        return Ok(());
114    }
115    scan_all_agents(ws, cfg, ws_str, store)?;
116    store.sync_state_set_u64(SYNC_STATE_LAST_AGENT_SCAN_MS, now_ms_u64())?;
117    Ok(())
118}
119
120pub(crate) fn maybe_refresh_store(workspace: &Path, store: &Store, refresh: bool) -> Result<()> {
121    if !refresh {
122        return Ok(());
123    }
124    let cfg = config::load(workspace)?;
125    let ws_str = workspace.to_string_lossy().to_string();
126    maybe_scan_all_agents(workspace, &cfg, &ws_str, store, true)
127}
128
129fn combine_counts(rows: Vec<Vec<(String, u64)>>) -> Vec<(String, u64)> {
130    let mut counts = HashMap::new();
131    for set in rows {
132        for (key, value) in set {
133            *counts.entry(key).or_insert(0_u64) += value;
134        }
135    }
136    let mut out = counts.into_iter().collect::<Vec<_>>();
137    out.sort_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
138    out
139}
140
141fn workspace_names(roots: &[PathBuf]) -> Vec<String> {
142    roots
143        .iter()
144        .map(|path| path.to_string_lossy().to_string())
145        .collect()
146}
147
148fn open_workspace_store(workspace: &Path) -> Result<Store> {
149    Store::open(&crate::core::workspace::db_path(workspace))
150}
151
152/// `kaizen sessions list` — same output as CLI stdout.
153pub fn sessions_list_text(
154    workspace: Option<&Path>,
155    json_out: bool,
156    refresh: bool,
157    all_workspaces: bool,
158) -> Result<String> {
159    let roots = scope::resolve(workspace, all_workspaces)?;
160    let mut sessions = Vec::new();
161    if crate::daemon::enabled() && !refresh {
162        for workspace in &roots {
163            let ws_str = workspace.to_string_lossy().to_string();
164            let response =
165                crate::daemon::request_blocking(crate::ipc::DaemonRequest::ListSessions {
166                    workspace: ws_str,
167                    offset: 0,
168                    limit: i64::MAX as usize,
169                    filter: crate::store::SessionFilter::default(),
170                })?;
171            match response {
172                crate::ipc::DaemonResponse::Sessions(page) => sessions.extend(page.rows),
173                crate::ipc::DaemonResponse::Error { message, .. } => anyhow::bail!(message),
174                _ => anyhow::bail!("unexpected daemon sessions response"),
175            }
176        }
177    } else {
178        for workspace in &roots {
179            let store = open_workspace_store(workspace)?;
180            maybe_refresh_store(workspace, &store, refresh)?;
181            let ws_str = workspace.to_string_lossy().to_string();
182            sessions.extend(store.list_sessions(&ws_str)?);
183        }
184    }
185    sessions.sort_by(|a, b| {
186        b.started_at_ms
187            .cmp(&a.started_at_ms)
188            .then_with(|| a.id.cmp(&b.id))
189    });
190    let scope_label = scope::label(&roots);
191    let workspaces = if roots.len() > 1 {
192        workspace_names(&roots)
193    } else {
194        Vec::new()
195    };
196    if json_out {
197        return Ok(format!(
198            "{}\n",
199            serde_json::to_string_pretty(&SessionsListJson {
200                workspace: scope_label,
201                workspaces,
202                count: sessions.len(),
203                sessions,
204            })?
205        ));
206    }
207    use std::fmt::Write;
208    let mut out = String::new();
209    if roots.len() > 1 {
210        writeln!(&mut out, "Scope: {scope_label}").unwrap();
211        writeln!(&mut out).unwrap();
212    }
213    writeln!(
214        &mut out,
215        "{:<40} {:<10} {:<10} STARTED",
216        "ID", "AGENT", "STATUS"
217    )
218    .unwrap();
219    writeln!(&mut out, "{}", "-".repeat(80)).unwrap();
220    for s in &sessions {
221        writeln!(
222            &mut out,
223            "{:<40} {:<10} {:<10} {}",
224            s.id,
225            s.agent,
226            format!("{:?}", s.status),
227            fmt_ts(s.started_at_ms),
228        )
229        .unwrap();
230    }
231    if sessions.is_empty() {
232        writeln!(&mut out, "(no sessions)").unwrap();
233        sessions_empty_state_hints(&mut out);
234    }
235    Ok(out)
236}
237
238fn sessions_empty_state_hints(out: &mut String) {
239    use std::fmt::Write;
240    let _ = writeln!(out);
241    let _ = writeln!(out, "No sessions found for this workspace. Try:");
242    let _ = writeln!(out, "  · `kaizen doctor` — verify config and hooks");
243    let _ = writeln!(out, "  · a short agent session in this repo, then re-run");
244    let _ = writeln!(
245        out,
246        "  · docs: https://github.com/marquesds/kaizen/blob/main/docs/config.md (sources)"
247    );
248}
249
250/// `kaizen sessions list` — scan all agent transcripts, upsert sessions, print table.
251pub fn cmd_sessions_list(
252    workspace: Option<&Path>,
253    json_out: bool,
254    refresh: bool,
255    all_workspaces: bool,
256) -> Result<()> {
257    print!(
258        "{}",
259        sessions_list_text(workspace, json_out, refresh, all_workspaces)?
260    );
261    Ok(())
262}
263
264/// `kaizen sessions show` — same output as CLI stdout.
265pub fn session_show_text(id: &str, workspace: Option<&Path>) -> Result<String> {
266    let ws = workspace_path(workspace)?;
267    let store = open_workspace_store(&ws)?;
268    use std::fmt::Write;
269    let mut out = String::new();
270    match store.get_session(id)? {
271        Some(s) => {
272            writeln!(&mut out, "id:           {}", s.id).unwrap();
273            writeln!(&mut out, "agent:        {}", s.agent).unwrap();
274            writeln!(
275                &mut out,
276                "model:        {}",
277                s.model.as_deref().unwrap_or("-")
278            )
279            .unwrap();
280            writeln!(&mut out, "workspace:    {}", s.workspace).unwrap();
281            writeln!(&mut out, "started_at:   {}", fmt_ts(s.started_at_ms)).unwrap();
282            writeln!(
283                &mut out,
284                "ended_at:     {}",
285                s.ended_at_ms.map(fmt_ts).unwrap_or_else(|| "-".to_string())
286            )
287            .unwrap();
288            writeln!(&mut out, "status:       {:?}", s.status).unwrap();
289            writeln!(&mut out, "trace_path:   {}", s.trace_path).unwrap();
290            if let Some(fp) = &s.prompt_fingerprint {
291                writeln!(&mut out, "prompt_fp:    {fp}").unwrap();
292                if let Ok(Some(snap)) = store.get_prompt_snapshot(fp) {
293                    for f in snap.files() {
294                        writeln!(&mut out, "  - {}", f.path).unwrap();
295                    }
296                }
297            }
298        }
299        None => anyhow::bail!("session not found: {id} — try `kaizen sessions list`"),
300    }
301    let evals = store.list_evals_for_session(id).unwrap_or_default();
302    if !evals.is_empty() {
303        writeln!(&mut out, "evals:").unwrap();
304        for e in &evals {
305            writeln!(
306                &mut out,
307                "  {} score={:.2} flagged={} {}",
308                e.rubric_id, e.score, e.flagged, e.rationale
309            )
310            .unwrap();
311        }
312    }
313    let fb = store
314        .feedback_for_sessions(&[id.to_string()])
315        .unwrap_or_default();
316    if let Some(r) = fb.get(id) {
317        let score = r
318            .score
319            .as_ref()
320            .map(|s| s.0.to_string())
321            .unwrap_or_else(|| "-".into());
322        let label = r
323            .label
324            .as_ref()
325            .map(|l| l.to_string())
326            .unwrap_or_else(|| "-".into());
327        writeln!(&mut out, "feedback:     score={score} label={label}").unwrap();
328        if let Some(n) = &r.note {
329            writeln!(&mut out, "  note: {n}").unwrap();
330        }
331    }
332    Ok(out)
333}
334
335/// `kaizen sessions show <id>` — print full session fields.
336pub fn cmd_session_show(id: &str, workspace: Option<&Path>) -> Result<()> {
337    print!("{}", session_show_text(id, workspace)?);
338    Ok(())
339}
340
341pub fn sessions_tree_text(id: &str, max_depth: u32, workspace: Option<&Path>) -> Result<String> {
342    let ws = workspace_path(workspace)?;
343    let store = open_workspace_store(&ws)?;
344    let nodes = store.session_span_tree(id)?;
345    let total_cost: i64 = nodes.iter().map(|n| n.subtree_cost_usd_e6).sum();
346    let mut out = String::new();
347    for node in &nodes {
348        render_node(&mut out, node, 0, max_depth, total_cost);
349    }
350    Ok(out)
351}
352
353fn render_node(
354    out: &mut String,
355    node: &crate::store::span_tree::SpanNode,
356    depth: u32,
357    max_depth: u32,
358    session_total: i64,
359) {
360    use std::fmt::Write;
361    if depth > max_depth {
362        return;
363    }
364    let indent = "│  ".repeat(depth as usize);
365    let prefix = if depth == 0 { "┌─ " } else { "├─ " };
366    let cost_str = match node.span.subtree_cost_usd_e6 {
367        Some(c) => {
368            let pct = if session_total > 0 {
369                c * 100 / session_total
370            } else {
371                0
372            };
373            let flag = if pct > 40 { " ⚡" } else { "" };
374            format!(" ${:.4}{}", c as f64 / 1_000_000.0, flag)
375        }
376        None => String::new(),
377    };
378    writeln!(
379        out,
380        "{}{}{} [{}]{}",
381        indent, prefix, node.span.tool, node.span.status, cost_str
382    )
383    .unwrap();
384    for child in &node.children {
385        render_node(out, child, depth + 1, max_depth, session_total);
386    }
387}
388
389/// `kaizen sessions tree <id>` — produce text output (ASCII or JSON).
390pub fn cmd_sessions_tree_text(
391    id: &str,
392    depth: u32,
393    json: bool,
394    workspace: Option<&Path>,
395) -> Result<String> {
396    if json {
397        let ws = workspace_path(workspace)?;
398        let store = open_workspace_store(&ws)?;
399        let nodes = store.session_span_tree(id)?;
400        Ok(serde_json::to_string_pretty(&nodes)?)
401    } else {
402        sessions_tree_text(id, depth, workspace)
403    }
404}
405
406/// `kaizen sessions tree <id>` — print ASCII span tree.
407pub fn cmd_sessions_tree(id: &str, depth: u32, json: bool, workspace: Option<&Path>) -> Result<()> {
408    print!("{}", cmd_sessions_tree_text(id, depth, json, workspace)?);
409    Ok(())
410}
411
412/// `kaizen summary` — same output as CLI stdout.
413pub fn summary_text(
414    workspace: Option<&Path>,
415    json_out: bool,
416    refresh: bool,
417    all_workspaces: bool,
418    source: crate::core::data_source::DataSource,
419) -> Result<String> {
420    let roots = scope::resolve(workspace, all_workspaces)?;
421    let mut total_cost_usd_e6 = 0_i64;
422    let mut session_count = 0_u64;
423    let mut by_agent = Vec::new();
424    let mut by_model = Vec::new();
425    let mut top_tools = Vec::new();
426    let mut hottest = Vec::new();
427    let mut slowest = Vec::new();
428
429    for workspace in &roots {
430        let cfg = config::load(workspace)?;
431        let store = open_workspace_store(workspace)?;
432        crate::shell::remote_pull::maybe_telemetry_pull(workspace, &store, &cfg, source, refresh)?;
433        maybe_refresh_store(workspace, &store, refresh)?;
434        let ws_str = workspace.to_string_lossy().to_string();
435        let read_store = Store::open_read_only(&crate::core::workspace::db_path(workspace))?;
436        let mut stats = read_store.summary_stats(&ws_str)?;
437        if source != crate::core::data_source::DataSource::Local
438            && let Ok(Some(agg)) =
439                crate::shell::remote_observe::try_remote_event_agg(&read_store, &cfg, workspace)
440        {
441            stats = crate::shell::remote_observe::merge_summary_stats(stats, &agg, source);
442        }
443        total_cost_usd_e6 += stats.total_cost_usd_e6;
444        session_count += stats.session_count;
445        by_agent.push(stats.by_agent);
446        by_model.push(stats.by_model);
447        top_tools.push(stats.top_tools);
448        if let Ok(metrics) = report::build_report(&read_store, &ws_str, 7) {
449            if let Some(file) = metrics.hottest_files.first().cloned() {
450                hottest.push(if roots.len() == 1 {
451                    file
452                } else {
453                    crate::metrics::types::RankedFile {
454                        path: scope::decorate_path(workspace, &file.path),
455                        ..file
456                    }
457                });
458            }
459            if let Some(tool) = metrics.slowest_tools.first().cloned() {
460                slowest.push(tool);
461            }
462        }
463    }
464
465    let stats = crate::store::SummaryStats {
466        session_count,
467        total_cost_usd_e6,
468        by_agent: combine_counts(by_agent),
469        by_model: combine_counts(by_model),
470        top_tools: combine_counts(top_tools),
471    };
472    let cost_dollars = stats.total_cost_usd_e6 as f64 / 1_000_000.0;
473    let hotspot = hottest
474        .into_iter()
475        .max_by(|a, b| a.value.cmp(&b.value).then_with(|| b.path.cmp(&a.path)));
476    let slowest_tool = slowest.into_iter().max_by(|a, b| {
477        a.p95_ms
478            .unwrap_or(0)
479            .cmp(&b.p95_ms.unwrap_or(0))
480            .then_with(|| b.tool.cmp(&a.tool))
481    });
482    let scope_label = scope::label(&roots);
483    let workspaces = if roots.len() > 1 {
484        workspace_names(&roots)
485    } else {
486        Vec::new()
487    };
488    if json_out {
489        return Ok(format!(
490            "{}\n",
491            serde_json::to_string_pretty(&SummaryJsonOut {
492                workspace: scope_label,
493                workspaces,
494                cost_usd: cost_dollars,
495                stats,
496                hotspot,
497                slowest_tool,
498            })?
499        ));
500    }
501    use std::fmt::Write;
502    let mut out = String::new();
503    if roots.len() > 1 {
504        writeln!(&mut out, "Scope: {}", scope::label(&roots)).unwrap();
505    }
506    writeln!(
507        &mut out,
508        "Sessions: {}   Cost: ${:.2}",
509        stats.session_count, cost_dollars
510    )
511    .unwrap();
512
513    if !stats.by_agent.is_empty() {
514        let parts: Vec<String> = stats
515            .by_agent
516            .iter()
517            .map(|(a, n)| format!("{a} {n}"))
518            .collect();
519        writeln!(&mut out, "By agent:  {}", parts.join(" · ")).unwrap();
520    }
521    if !stats.by_model.is_empty() {
522        let parts: Vec<String> = stats
523            .by_model
524            .iter()
525            .map(|(m, n)| format!("{m} {n}"))
526            .collect();
527        writeln!(&mut out, "By model:  {}", parts.join(" · ")).unwrap();
528    }
529    if !stats.top_tools.is_empty() {
530        let parts: Vec<String> = stats
531            .top_tools
532            .iter()
533            .take(5)
534            .map(|(t, n)| format!("{t} {n}"))
535            .collect();
536        writeln!(&mut out, "Top tools: {}", parts.join(" · ")).unwrap();
537    }
538    if let Some(file) = hotspot {
539        writeln!(&mut out, "Hotspot:   {} ({})", file.path, file.value).unwrap();
540    }
541    if let Some(tool) = slowest_tool {
542        let p95 = tool
543            .p95_ms
544            .map(|v| format!("{v}ms"))
545            .unwrap_or_else(|| "-".into());
546        writeln!(&mut out, "Slowest:   {} p95 {}", tool.tool, p95).unwrap();
547    }
548    Ok(out)
549}
550
551/// `kaizen summary` — aggregate session + cost stats across all agents.
552pub fn cmd_summary(
553    workspace: Option<&Path>,
554    json_out: bool,
555    refresh: bool,
556    all_workspaces: bool,
557    source: crate::core::data_source::DataSource,
558) -> Result<()> {
559    print!(
560        "{}",
561        summary_text(workspace, json_out, refresh, all_workspaces, source,)?
562    );
563    Ok(())
564}
565
566pub(crate) fn scan_all_agents(
567    ws: &Path,
568    cfg: &config::Config,
569    ws_str: &str,
570    store: &Store,
571) -> Result<()> {
572    let _spin = ScanSpinner::start("Scanning agent sessions…");
573    let slug = workspace_slug(ws_str);
574    let sync_ctx = crate::sync::ingest_ctx(cfg, ws.to_path_buf());
575
576    for root in &cfg.scan.roots {
577        let expanded = expand_home(root);
578        let cursor_dir = PathBuf::from(&expanded)
579            .join(&slug)
580            .join("agent-transcripts");
581        scan_agent_dirs(
582            &cursor_dir,
583            store,
584            |p| {
585                scan_session_dir_all(p).map(|sessions| {
586                    sessions
587                        .into_iter()
588                        .map(|(mut r, evs)| {
589                            r.workspace = ws_str.to_string();
590                            (r, evs)
591                        })
592                        .collect()
593                })
594            },
595            sync_ctx.as_ref(),
596        )?;
597    }
598
599    let home = std::env::var("HOME").unwrap_or_default();
600
601    let claude_dir = PathBuf::from(&home)
602        .join(".claude/projects")
603        .join(&slug)
604        .join("sessions");
605    scan_agent_dirs(
606        &claude_dir,
607        store,
608        |p| {
609            scan_claude_session_dir(p).map(|(mut r, evs)| {
610                r.workspace = ws_str.to_string();
611                vec![(r, evs)]
612            })
613        },
614        sync_ctx.as_ref(),
615    )?;
616
617    let codex_dir = PathBuf::from(&home).join(".codex/sessions").join(&slug);
618    scan_agent_dirs(
619        &codex_dir,
620        store,
621        |p| {
622            scan_codex_session_dir(p).map(|(mut r, evs)| {
623                r.workspace = ws_str.to_string();
624                vec![(r, evs)]
625            })
626        },
627        sync_ctx.as_ref(),
628    )?;
629
630    let tail = &cfg.sources.tail;
631    let home_pb = PathBuf::from(&home);
632    if tail.goose {
633        let sessions = scan_goose_workspace(&home_pb, ws)?;
634        persist_session_batch(store, sessions, sync_ctx.as_ref())?;
635    }
636    if tail.openclaw {
637        let sessions = scan_openclaw_workspace(ws)?;
638        persist_session_batch(store, sessions, sync_ctx.as_ref())?;
639    }
640    if tail.opencode {
641        let sessions = scan_opencode_workspace(ws)?;
642        persist_session_batch(store, sessions, sync_ctx.as_ref())?;
643    }
644    if tail.copilot_cli {
645        let sessions = scan_copilot_cli_workspace(ws)?;
646        persist_session_batch(store, sessions, sync_ctx.as_ref())?;
647    }
648    if tail.copilot_vscode {
649        let sessions = scan_copilot_vscode_workspace(ws)?;
650        persist_session_batch(store, sessions, sync_ctx.as_ref())?;
651    }
652
653    maybe_auto_prune_after_scan(store, cfg)?;
654    Ok(())
655}
656
657fn persist_session_batch(
658    store: &Store,
659    sessions: Vec<(SessionRecord, Vec<Event>)>,
660    sync_ctx: Option<&crate::sync::SyncIngestContext>,
661) -> Result<()> {
662    for (mut record, events) in sessions {
663        if record.start_commit.is_none() && !record.workspace.is_empty() {
664            let binding = crate::core::repo::binding_for_session(
665                Path::new(&record.workspace),
666                record.started_at_ms,
667                record.ended_at_ms,
668            );
669            record.start_commit = binding.start_commit;
670            record.end_commit = binding.end_commit;
671            record.branch = binding.branch;
672            record.dirty_start = binding.dirty_start;
673            record.dirty_end = binding.dirty_end;
674            record.repo_binding_source = binding.source;
675        }
676        store.upsert_session(&record)?;
677        let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms);
678        for ev in events {
679            store.append_event_with_sync(&ev, sync_ctx)?;
680        }
681        if record.status == crate::core::event::SessionStatus::Done {
682            store.flush_projector_session(&record.id, flush_ms)?;
683        }
684    }
685    Ok(())
686}
687
688pub(crate) fn scan_agent_dirs<F>(
689    dir: &Path,
690    store: &Store,
691    scanner: F,
692    sync_ctx: Option<&crate::sync::SyncIngestContext>,
693) -> Result<()>
694where
695    F: Fn(&Path) -> Result<Vec<(SessionRecord, Vec<Event>)>>,
696{
697    if !dir.exists() {
698        return Ok(());
699    }
700    for entry in std::fs::read_dir(dir)?.filter_map(|e| e.ok()) {
701        if !entry.file_type().map(|t| t.is_dir()).unwrap_or(false) {
702            continue;
703        }
704        match scanner(&entry.path()) {
705            Ok(sessions) => {
706                for (mut record, events) in sessions {
707                    if record.start_commit.is_none() && !record.workspace.is_empty() {
708                        let binding = crate::core::repo::binding_for_session(
709                            Path::new(&record.workspace),
710                            record.started_at_ms,
711                            record.ended_at_ms,
712                        );
713                        record.start_commit = binding.start_commit;
714                        record.end_commit = binding.end_commit;
715                        record.branch = binding.branch;
716                        record.dirty_start = binding.dirty_start;
717                        record.dirty_end = binding.dirty_end;
718                        record.repo_binding_source = binding.source;
719                    }
720                    store.upsert_session(&record)?;
721                    let flush_ms = record.ended_at_ms.unwrap_or(record.started_at_ms);
722                    for ev in events {
723                        store.append_event_with_sync(&ev, sync_ctx)?;
724                    }
725                    if record.status == crate::core::event::SessionStatus::Done {
726                        store.flush_projector_session(&record.id, flush_ms)?;
727                    }
728                }
729            }
730            Err(e) => tracing::warn!("scan {:?}: {e}", entry.path()),
731        }
732    }
733    Ok(())
734}
735
736pub(crate) fn workspace_path(workspace: Option<&Path>) -> Result<PathBuf> {
737    crate::core::workspace::resolve(workspace)
738}
739
740/// Convert workspace path to cursor project slug.
741/// `/Users/lucas/Projects/kaizen` → `Users-lucas-Projects-kaizen`
742pub(crate) fn workspace_slug(ws: &str) -> String {
743    ws.trim_start_matches('/').replace('/', "-")
744}
745
746pub(crate) fn expand_home(path: &str) -> String {
747    if let (Some(rest), Ok(home)) = (path.strip_prefix("~/"), std::env::var("HOME")) {
748        return format!("{home}/{rest}");
749    }
750    path.to_string()
751}