kaizen-cli 0.1.45

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
use super::{SessionSummaryRead, TokenRead};
use crate::store::Store;
use crate::visualization::{TokenTotals, TraceSummary, derive_status};
use anyhow::Result;
use rusqlite::params;
use std::collections::HashMap;

const SUMMARIES_SQL: &str = "
WITH recent AS MATERIALIZED (
 SELECT id, agent, model, workspace, started_at_ms, ended_at_ms,
  status, trace_path, start_commit, end_commit, branch, dirty_start, dirty_end,
  repo_binding_source, prompt_fingerprint, parent_session_id, agent_version, os, arch,
  repo_file_count, repo_total_loc FROM sessions WHERE workspace = ?1
 ORDER BY started_at_ms DESC, id ASC LIMIT ?2
), rollup AS (
 SELECT e.session_id, MAX(e.ts_ms) last_event_ms, COUNT(*) event_count,
  SUM(e.kind = 'Error') error_count, SUM(e.kind = 'ToolCall') tool_call_count,
  COALESCE(SUM(e.cost_usd_e6), 0) cost_usd_e6,
  COALESCE(SUM(e.tokens_in), 0) tokens_in, COALESCE(SUM(e.tokens_out), 0) tokens_out,
  COALESCE(SUM(e.reasoning_tokens), 0) reasoning_tokens,
  COALESCE(SUM(e.cache_read_tokens), 0) cache_read_tokens,
  COALESCE(SUM(e.cache_creation_tokens), 0) cache_creation_tokens
 FROM events e JOIN recent r ON r.id = e.session_id GROUP BY e.session_id
)
SELECT r.*, a.last_event_ms, COALESCE(a.event_count, 0), COALESCE(a.error_count, 0),
 COALESCE(a.tool_call_count, 0), COALESCE(a.cost_usd_e6, 0), COALESCE(a.tokens_in, 0),
 COALESCE(a.tokens_out, 0), COALESCE(a.reasoning_tokens, 0),
 COALESCE(a.cache_read_tokens, 0), COALESCE(a.cache_creation_tokens, 0)
FROM recent r LEFT JOIN rollup a ON a.session_id = r.id
ORDER BY r.started_at_ms DESC, r.id ASC";

const TOP_TOOLS_SQL: &str = "
WITH recent AS MATERIALIZED (SELECT id FROM sessions WHERE workspace = ?1
 ORDER BY started_at_ms DESC, id ASC LIMIT ?2), counts AS (
 SELECT t.session_id, t.tool, COUNT(*) count FROM tool_spans t
 JOIN recent r ON r.id = t.session_id WHERE t.tool <> ''
 GROUP BY t.session_id, t.tool
), ranked AS (
 SELECT session_id, tool, count, ROW_NUMBER() OVER (
  PARTITION BY session_id ORDER BY count DESC, tool ASC) rank FROM counts
)
SELECT session_id, tool, count FROM ranked WHERE rank <= 5 ORDER BY session_id, rank";

impl Store {
    pub(crate) fn visualization_sessions(
        &self,
        workspace: &str,
        limit: usize,
        now_ms: u64,
    ) -> Result<Vec<TraceSummary>> {
        let mut rows = summary_rows(self, workspace, limit)?;
        let mut tools = top_tools(self, workspace, limit)?;
        rows.iter_mut()
            .for_each(|row| row.top_tools = tools.remove(&row.session.id).unwrap_or_default());
        Ok(rows.into_iter().map(|row| summary(row, now_ms)).collect())
    }
}

fn summary_rows(store: &Store, workspace: &str, limit: usize) -> Result<Vec<SessionSummaryRead>> {
    let mut statement = store.conn().prepare(SUMMARIES_SQL)?;
    let rows = statement.query_map(params![workspace, sql_limit(limit)], summary_row)?;
    rows.map(|row| row.map_err(Into::into)).collect()
}

fn top_tools(
    store: &Store,
    workspace: &str,
    limit: usize,
) -> Result<HashMap<String, Vec<(String, u64)>>> {
    let mut statement = store.conn().prepare(TOP_TOOLS_SQL)?;
    let rows = statement.query_map(params![workspace, sql_limit(limit)], tool_row)?;
    let rows = rows.collect::<rusqlite::Result<Vec<_>>>()?;
    Ok(rows.into_iter().fold(HashMap::new(), add_tool))
}

fn add_tool(
    mut tools: HashMap<String, Vec<(String, u64)>>,
    row: (String, String, u64),
) -> HashMap<String, Vec<(String, u64)>> {
    tools.entry(row.0).or_default().push((row.1, row.2));
    tools
}

fn tool_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<(String, String, u64)> {
    Ok((row.get(0)?, row.get(1)?, row.get::<_, i64>(2)? as u64))
}

fn summary_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionSummaryRead> {
    Ok(SessionSummaryRead {
        session: super::super::rows::session_row(row)?,
        last_event_ms: optional(row, 21)?,
        event_count: value(row, 22)?,
        error_count: value(row, 23)?,
        tool_call_count: value(row, 24)?,
        cost_usd_e6: row.get(25)?,
        tokens: tokens(row)?,
        top_tools: Vec::new(),
    })
}

fn tokens(row: &rusqlite::Row<'_>) -> rusqlite::Result<TokenRead> {
    Ok(TokenRead {
        input: value(row, 26)?,
        output: value(row, 27)?,
        reasoning: value(row, 28)?,
        cache_read: value(row, 29)?,
        cache_create: value(row, 30)?,
    })
}

fn optional(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<Option<u64>> {
    row.get::<_, Option<i64>>(index)
        .map(|value| value.map(|value| value as u64))
}

fn value(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<u64> {
    row.get::<_, i64>(index).map(|value| value as u64)
}

fn sql_limit(limit: usize) -> i64 {
    limit.min(i64::MAX as usize) as i64
}

fn summary(row: SessionSummaryRead, now_ms: u64) -> TraceSummary {
    let (status, status_reason) =
        derive_status(&row.session, row.last_event_ms, row.error_count, now_ms);
    TraceSummary {
        id: row.session.id,
        agent: row.session.agent,
        model: row.session.model,
        status,
        status_reason,
        started_at_ms: row.session.started_at_ms,
        ended_at_ms: row.session.ended_at_ms,
        last_event_ms: row.last_event_ms,
        event_count: row.event_count,
        error_count: row.error_count,
        tool_call_count: row.tool_call_count,
        cost_usd_e6: row.cost_usd_e6,
        tokens: token_totals(row.tokens),
        top_tools: row.top_tools,
    }
}

fn token_totals(row: TokenRead) -> TokenTotals {
    let total = row.input + row.output + row.reasoning + row.cache_read + row.cache_create;
    TokenTotals {
        input: row.input,
        output: row.output,
        reasoning: row.reasoning,
        cache_read: row.cache_read,
        cache_create: row.cache_create,
        total,
    }
}