kaizen-cli 0.1.43

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
use super::rows::*;
use super::*;
mod view_row;
use view_row::tool_span_view_row;

const SESSION_SPAN_TREE_SQL: &str =
    "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
            reasoning_tokens, cost_usd_e6, paths_json,
            parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
     FROM tool_spans
     WHERE session_id = ?1
     ORDER BY depth ASC, started_at_ms ASC, span_id ASC
     LIMIT ?2";

const SESSION_TOOL_SPANS_SQL: &str =
    "SELECT span_id, session_id, tool, tool_call_id, status, started_at_ms, ended_at_ms, lead_time_ms,
            tokens_in, tokens_out, reasoning_tokens, cost_usd_e6, paths_json
     FROM tool_spans WHERE session_id = ?1 ORDER BY started_at_ms ASC, span_id ASC
     LIMIT ?2";

impl Store {
    pub fn tool_rank_rows_in_window(
        &self,
        workspace: &str,
        start_ms: u64,
        end_ms: u64,
    ) -> Result<Vec<RankedTool>> {
        let mut stmt = self.conn.prepare(TOOL_RANK_ROWS_SQL)?;
        let rows = stmt.query_map(
            params![workspace, start_ms as i64, end_ms as i64],
            ranked_tool_row,
        )?;
        rows.map(|r| r.map_err(anyhow::Error::from)).collect()
    }

    pub fn tool_spans_in_window(
        &self,
        workspace: &str,
        start_ms: u64,
        end_ms: u64,
    ) -> Result<Vec<ToolSpanView>> {
        let mut stmt = self.conn.prepare(
            "SELECT span_id, tool, status, lead_time_ms, tokens_in, tokens_out,
                    reasoning_tokens, cost_usd_e6, paths_json,
                    parent_span_id, depth, subtree_cost_usd_e6, subtree_token_count
             FROM (
                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
                        ts.started_at_ms AS sort_ms
                 FROM tool_spans ts
                 JOIN sessions s ON s.id = ts.session_id
                 WHERE s.workspace = ?1
                   AND ts.started_at_ms >= ?2
                   AND ts.started_at_ms <= ?3
                 UNION ALL
                 SELECT ts.span_id, ts.tool, ts.status, ts.lead_time_ms,
                        ts.tokens_in, ts.tokens_out, ts.reasoning_tokens,
                        ts.cost_usd_e6, ts.paths_json, ts.parent_span_id,
                        ts.depth, ts.subtree_cost_usd_e6, ts.subtree_token_count,
                        ts.ended_at_ms AS sort_ms
                 FROM tool_spans ts
                 JOIN sessions s ON s.id = ts.session_id
                 WHERE s.workspace = ?1
                   AND ts.started_at_ms IS NULL
                   AND ts.ended_at_ms >= ?2
                   AND ts.ended_at_ms <= ?3
             )
             ORDER BY sort_ms DESC",
        )?;
        let rows = stmt.query_map(
            params![workspace, start_ms as i64, end_ms as i64],
            tool_span_view_row,
        )?;
        Ok(rows.filter_map(|row| row.ok()).collect())
    }

    pub fn session_span_tree(
        &self,
        session_id: &str,
    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
        let last_event_seq = self.last_event_seq_for_session(session_id)?;
        if let Some(nodes) = cached_span_tree(self, session_id, last_event_seq) {
            return Ok(nodes);
        }
        let nodes = query_session_span_tree(self, session_id, usize::MAX)?;
        cache_span_tree(self, session_id, last_event_seq, &nodes);
        Ok(nodes)
    }

    pub(crate) fn limited_session_span_tree(
        &self,
        session_id: &str,
        limit: usize,
    ) -> Result<Vec<crate::store::span_tree::SpanNode>> {
        query_session_span_tree(self, session_id, limit)
    }

    pub fn tool_spans_for_session(&self, session_id: &str) -> Result<Vec<ToolSpanSyncRow>> {
        query_tool_spans_for_session(self, session_id, usize::MAX)
    }

    pub(crate) fn tool_spans_for_session_limited(
        &self,
        session_id: &str,
        limit: usize,
    ) -> Result<Vec<ToolSpanSyncRow>> {
        query_tool_spans_for_session(self, session_id, limit)
    }
}

fn query_tool_spans_for_session(
    store: &Store,
    session_id: &str,
    limit: usize,
) -> Result<Vec<ToolSpanSyncRow>> {
    let mut stmt = store.conn.prepare(SESSION_TOOL_SPANS_SQL)?;
    let rows = stmt.query_map(params![session_id, sql_limit(limit)], tool_span_sync_row)?;
    rows.map(|row| row.map_err(Into::into)).collect()
}

fn tool_span_sync_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ToolSpanSyncRow> {
    Ok(ToolSpanSyncRow {
        span_id: row.get(0)?,
        session_id: row.get(1)?,
        tool: row.get(2)?,
        tool_call_id: row.get(3)?,
        status: row.get(4)?,
        paths: sync_paths(row)?,
        started_at_ms: sync_u64(row, 5)?,
        ended_at_ms: sync_u64(row, 6)?,
        lead_time_ms: sync_u64(row, 7)?,
        tokens_in: sync_u32(row, 8)?,
        tokens_out: sync_u32(row, 9)?,
        reasoning_tokens: sync_u32(row, 10)?,
        cost_usd_e6: row.get(11)?,
    })
}

fn sync_paths(row: &rusqlite::Row<'_>) -> rusqlite::Result<Vec<String>> {
    let paths_json: String = row.get(12)?;
    Ok(serde_json::from_str(&paths_json).unwrap_or_default())
}

fn sync_u64(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<Option<u64>> {
    row.get::<_, Option<i64>>(index)
        .map(|value| value.map(|value| value as u64))
}

fn sync_u32(row: &rusqlite::Row<'_>, index: usize) -> rusqlite::Result<Option<u32>> {
    row.get::<_, Option<i64>>(index)
        .map(|value| value.map(|value| value as u32))
}

fn query_session_span_tree(
    store: &Store,
    session_id: &str,
    limit: usize,
) -> Result<Vec<crate::store::span_tree::SpanNode>> {
    let mut stmt = store.conn.prepare(SESSION_SPAN_TREE_SQL)?;
    let rows = stmt.query_map(params![session_id, sql_limit(limit)], tool_span_view_row)?;
    let spans = rows
        .map(|row| row.map_err(anyhow::Error::from))
        .collect::<Result<Vec<_>>>()?;
    Ok(crate::store::span_tree::build_tree(spans))
}

fn cached_span_tree(
    store: &Store,
    session_id: &str,
    last_event_seq: Option<u64>,
) -> Option<Vec<crate::store::span_tree::SpanNode>> {
    store
        .span_tree_cache
        .borrow()
        .as_ref()
        .filter(|entry| entry.session_id == session_id && entry.last_event_seq == last_event_seq)
        .map(|entry| entry.nodes.clone())
}

fn cache_span_tree(
    store: &Store,
    session_id: &str,
    last_event_seq: Option<u64>,
    nodes: &[crate::store::span_tree::SpanNode],
) {
    *store.span_tree_cache.borrow_mut() = Some(SpanTreeCacheEntry {
        session_id: session_id.to_string(),
        last_event_seq,
        nodes: nodes.to_vec(),
    });
}

fn sql_limit(limit: usize) -> i64 {
    limit.min(i64::MAX as usize) as i64
}