kaizen-cli 0.1.38

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
use super::events::*;
use super::*;

impl Store {
    pub fn flush_projector_session(&self, session_id: &str, now_ms: u64) -> Result<()> {
        if projector_legacy_mode() {
            rebuild_tool_spans_for_session(&self.conn, session_id)?;
            self.invalidate_span_tree_cache();
            return Ok(());
        }
        let deltas = self
            .projector
            .borrow_mut()
            .flush_session(session_id, now_ms);
        if self.apply_projector_events(&deltas)? {
            self.invalidate_span_tree_cache();
        }
        Ok(())
    }

    pub(super) fn replay_projector_session(&self, session_id: &str) -> Result<()> {
        clear_session_spans(&self.conn, session_id)?;
        self.projector.borrow_mut().reset_session(session_id);
        let events = self.list_events_for_session(session_id)?;
        let mut changed = false;
        for event in &events {
            let deltas = self.projector.borrow_mut().apply(event);
            changed |= self.apply_projector_events(&deltas)?;
        }
        if self
            .get_session(session_id)?
            .is_some_and(|session| session.status == SessionStatus::Done)
        {
            let now_ms = events.last().map(|event| event.ts_ms).unwrap_or(0);
            let deltas = self
                .projector
                .borrow_mut()
                .flush_session(session_id, now_ms);
            changed |= self.apply_projector_events(&deltas)?;
        }
        if changed {
            self.invalidate_span_tree_cache();
        }
        Ok(())
    }

    pub(super) fn apply_projector_events(&self, deltas: &[ProjectorEvent]) -> Result<bool> {
        let mut changed = false;
        for delta in deltas {
            match delta {
                ProjectorEvent::SpanClosed(span, sample) => {
                    upsert_tool_span_record(&self.conn, span)?;
                    tracing::debug!(
                        session_id = %sample.session_id,
                        span_id = %sample.span_id,
                        tool = ?sample.tool,
                        lead_time_ms = ?sample.lead_time_ms,
                        tokens_in = ?sample.tokens_in,
                        tokens_out = ?sample.tokens_out,
                        reasoning_tokens = ?sample.reasoning_tokens,
                        cost_usd_e6 = ?sample.cost_usd_e6,
                        paths = ?sample.paths,
                        "tool span closed"
                    );
                    changed = true;
                }
                ProjectorEvent::SpanPatched(span) => {
                    upsert_tool_span_record(&self.conn, span)?;
                    changed = true;
                }
                ProjectorEvent::FileTouched { session, path } => {
                    self.conn.execute(
                        "INSERT OR IGNORE INTO files_touched (session_id, path) VALUES (?1, ?2)",
                        params![session, path],
                    )?;
                    changed = true;
                }
                ProjectorEvent::SkillUsed { session, skill } => {
                    self.conn.execute(
                        "INSERT OR IGNORE INTO skills_used (session_id, skill) VALUES (?1, ?2)",
                        params![session, skill],
                    )?;
                    changed = true;
                }
                ProjectorEvent::RuleUsed { session, rule } => {
                    self.conn.execute(
                        "INSERT OR IGNORE INTO rules_used (session_id, rule) VALUES (?1, ?2)",
                        params![session, rule],
                    )?;
                    changed = true;
                }
            }
        }
        Ok(changed)
    }
}