kaizen-cli 0.1.37

Distributable agent observability: real-time-tailable sessions, agile-style retros, and repo-level improvement (Cursor, Claude Code, Codex). SQLite, redact before any sync you enable.
Documentation
// SPDX-License-Identifier: AGPL-3.0-or-later
//! SQLite: ensure proxy session, append one `Cost` or `Error` per completed forward.

use crate::core::config::Config;
use crate::core::cost::CostTable;
use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord, span_payload, trace_id_for_session};
use crate::store::Store;
use crate::sync::ingest_ctx;
use anyhow::Context;
use serde_json::json;
use std::path::Path;
use std::sync::OnceLock;
use std::time::{SystemTime, UNIX_EPOCH};

static COST_TABLE: OnceLock<CostTable> = OnceLock::new();

fn bundled_cost_table() -> &'static CostTable {
    COST_TABLE.get_or_init(|| CostTable::load().expect("bundled assets/cost.toml"))
}

/// Per ADR 002: estimate from price table. Skip heuristic for failed forwards with no usage
/// (avoids attributing fake spend to transport errors).
///
/// Successful responses with no `usage` in the body use the bundled `cursor` row heuristic,
/// same as when the model id is unknown — avoids `cost_usd_e6 = 0` for priced models with
/// zero tokens.
fn proxy_event_cost_usd_e6(a: &RecordArgs) -> Option<i64> {
    let saw_usage = a.tokens_in.is_some() || a.tokens_out.is_some() || a.reasoning_tokens.is_some();
    if a.upstream_error.is_some() && !saw_usage {
        return None;
    }
    let tin = a.tokens_in.unwrap_or(0);
    let tout = a
        .tokens_out
        .unwrap_or(0)
        .saturating_add(a.reasoning_tokens.unwrap_or(0));
    let table = bundled_cost_table();
    let cost = if a.upstream_error.is_none() && !saw_usage {
        table.estimate(None, 0, 0)
    } else {
        table.estimate(a.model.as_deref(), tin, tout)
    };
    Some(cost)
}

/// Append telemetry for one upstream round-trip. Pure sync — call from `spawn_blocking`.
pub fn record_forward_outcome(
    store_path: &Path,
    cfg: &Config,
    workspace: &Path,
    a: &RecordArgs,
) -> Result<(), anyhow::Error> {
    let store = Store::open(store_path).context("open kaizen store")?;
    let sync_c = ingest_ctx(cfg, workspace.to_path_buf());
    let now = now_ms()?;
    if store.get_session(&a.session_id)?.is_none() {
        let rec = SessionRecord {
            id: a.session_id.clone(),
            agent: "claude".to_string(),
            model: a.model.clone(),
            workspace: workspace.to_string_lossy().into_owned(),
            started_at_ms: now,
            ended_at_ms: None,
            status: SessionStatus::Running,
            trace_path: String::new(),
            start_commit: None,
            end_commit: None,
            branch: None,
            dirty_start: None,
            dirty_end: None,
            repo_binding_source: None,
            prompt_fingerprint: None,
            parent_session_id: None,
            agent_version: None,
            os: None,
            arch: None,
            repo_file_count: None,
            repo_total_loc: None,
        };
        store.upsert_session(&rec)?;
    }
    let seq = store.next_event_seq(&a.session_id)?;
    let (kind, payload) = if let Some(ref err) = a.upstream_error {
        (
            EventKind::Error,
            json!({
                "path": a.path,
                "method": a.method,
                "status": a.status,
                "upstream_error": err,
            }),
        )
    } else {
        let mut p = json!({
            "path": a.path,
            "method": a.method,
            "status": a.status,
            "model": a.model,
        });
        if let Some(rid) = &a.request_id {
            p["request_id"] = json!(rid);
        }
        (EventKind::Cost, p)
    };
    let cost_usd_e6 = proxy_event_cost_usd_e6(a);
    let e = Event {
        session_id: a.session_id.clone(),
        seq,
        ts_ms: now,
        ts_exact: true,
        kind,
        source: EventSource::Proxy,
        tool: None,
        tool_call_id: None,
        tokens_in: a.tokens_in,
        tokens_out: a.tokens_out,
        reasoning_tokens: a.reasoning_tokens,
        cost_usd_e6,
        stop_reason: a.stop_reason.clone(),
        latency_ms: a.latency_ms,
        ttft_ms: a.ttft_ms,
        retry_count: a.retry_count,
        context_used_tokens: a.tokens_in,
        context_max_tokens: context_window_for_model(a.model.as_deref()),
        cache_creation_tokens: a.cache_creation_tokens,
        cache_read_tokens: a.cache_read_tokens,
        system_prompt_tokens: None,
        payload,
    };
    store.append_event_with_sync(&e, sync_c.as_ref())?;
    store.upsert_trace_span(&trace_span(a, seq, now, cost_usd_e6))?;
    Ok(())
}

#[derive(Clone)]
pub struct RecordArgs {
    pub session_id: String,
    pub model: Option<String>,
    pub path: String,
    pub method: String,
    pub status: u16,
    pub request_id: Option<String>,
    pub tokens_in: Option<u32>,
    pub tokens_out: Option<u32>,
    pub reasoning_tokens: Option<u32>,
    pub cache_creation_tokens: Option<u32>,
    pub cache_read_tokens: Option<u32>,
    pub stop_reason: Option<String>,
    pub latency_ms: Option<u32>,
    pub ttft_ms: Option<u32>,
    pub retry_count: Option<u16>,
    pub upstream_error: Option<String>,
}

fn now_ms() -> Result<u64, anyhow::Error> {
    let d = SystemTime::now()
        .duration_since(UNIX_EPOCH)
        .map_err(|e| anyhow::anyhow!("{e}"))?;
    Ok(d.as_millis() as u64)
}

fn trace_span(a: &RecordArgs, seq: u64, now: u64, cost: Option<i64>) -> TraceSpanRecord {
    let duration = a.latency_ms.unwrap_or(0);
    TraceSpanRecord {
        span_id: format!("llm-{}-{seq}", a.session_id),
        trace_id: trace_id_for_session(&a.session_id),
        parent_span_id: None,
        session_id: a.session_id.clone(),
        kind: TraceSpanKind::Llm,
        name: format!("{}.{}", provider(a), trim_path(&a.path)),
        status: if a.upstream_error.is_some() {
            "error"
        } else {
            "ok"
        }
        .into(),
        started_at_ms: Some(now.saturating_sub(duration as u64)),
        ended_at_ms: Some(now),
        duration_ms: a.latency_ms,
        model: a.model.clone(),
        tool: None,
        tokens_in: a.tokens_in,
        tokens_out: a.tokens_out,
        reasoning_tokens: a.reasoning_tokens,
        cost_usd_e6: cost,
        context_used_tokens: a.tokens_in,
        context_max_tokens: context_window_for_model(a.model.as_deref()),
        payload: span_payload(provider(a), false, a.request_id.as_deref()),
    }
}

fn provider(a: &RecordArgs) -> &'static str {
    let model = a.model.as_deref().unwrap_or_default();
    if a.path.contains("responses") || a.path.contains("chat/completions") {
        return "openai";
    }
    if model.starts_with("gpt-") || model.starts_with('o') {
        return "openai";
    }
    "anthropic"
}

fn trim_path(path: &str) -> &str {
    path.trim_matches('/')
        .rsplit('/')
        .next()
        .unwrap_or("request")
}

fn context_window_for_model(model: Option<&str>) -> Option<u32> {
    let m = model?;
    if m.contains("gpt-4.1") || m.contains("gpt-5") {
        Some(1_000_000)
    } else if m.contains("claude") || m.contains("gpt-4o") || m.starts_with('o') {
        Some(200_000)
    } else {
        None
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn sample_args() -> RecordArgs {
        RecordArgs {
            session_id: "s".into(),
            model: Some("claude-sonnet-4".into()),
            path: "/v1/messages".into(),
            method: "POST".into(),
            status: 200,
            request_id: None,
            tokens_in: None,
            tokens_out: None,
            reasoning_tokens: None,
            cache_creation_tokens: None,
            cache_read_tokens: None,
            stop_reason: None,
            latency_ms: None,
            ttft_ms: None,
            retry_count: None,
            upstream_error: None,
        }
    }

    #[test]
    fn time_ok() {
        assert!(now_ms().unwrap() > 1_000_000_000);
    }

    #[test]
    fn proxy_cost_matches_table_when_tokens_present() {
        let mut a = sample_args();
        a.tokens_in = Some(1000);
        a.tokens_out = Some(500);
        assert_eq!(proxy_event_cost_usd_e6(&a), Some(10_500));
    }

    #[test]
    fn proxy_cost_heuristic_when_success_but_no_usage() {
        let a = sample_args();
        assert!(proxy_event_cost_usd_e6(&a).is_some_and(|c| c > 0));
    }

    #[test]
    fn proxy_cost_none_on_error_without_usage() {
        let mut a = sample_args();
        a.upstream_error = Some("timeout".into());
        assert_eq!(proxy_event_cost_usd_e6(&a), None);
    }

    #[test]
    fn proxy_cost_on_error_when_usage_present() {
        let mut a = sample_args();
        a.upstream_error = Some("upstream 429".into());
        a.tokens_in = Some(100);
        a.tokens_out = Some(50);
        assert!(proxy_event_cost_usd_e6(&a).is_some_and(|c| c > 0));
    }

    #[test]
    fn recording_success_creates_llm_trace_span() {
        let tmp = tempfile::tempdir().unwrap();
        let db = tmp.path().join("kaizen.db");
        let mut a = sample_args();
        a.tokens_in = Some(10);
        a.tokens_out = Some(20);
        a.latency_ms = Some(30);
        record_forward_outcome(&db, &Config::default(), tmp.path(), &a).unwrap();
        let store = Store::open(&db).unwrap();
        let spans = store.trace_spans_for_session("s").unwrap();
        assert_eq!(spans.len(), 1);
        assert_eq!(spans[0].kind, crate::core::trace_span::TraceSpanKind::Llm);
        assert_eq!(spans[0].tokens_in, Some(10));
        assert_eq!(spans[0].duration_ms, Some(30));
    }
}