use crate::core::config::Config;
use crate::core::cost::CostTable;
use crate::core::event::{Event, EventKind, EventSource, SessionRecord, SessionStatus};
use crate::core::trace_span::{TraceSpanKind, TraceSpanRecord, span_payload, trace_id_for_session};
use crate::store::Store;
use crate::sync::ingest_ctx;
use anyhow::Context;
use serde_json::json;
use std::path::Path;
use std::sync::OnceLock;
use std::time::{SystemTime, UNIX_EPOCH};
static COST_TABLE: OnceLock<CostTable> = OnceLock::new();
fn bundled_cost_table() -> &'static CostTable {
COST_TABLE.get_or_init(|| CostTable::load().expect("bundled assets/cost.toml"))
}
fn proxy_event_cost_usd_e6(a: &RecordArgs) -> Option<i64> {
let saw_usage = a.tokens_in.is_some() || a.tokens_out.is_some() || a.reasoning_tokens.is_some();
if a.upstream_error.is_some() && !saw_usage {
return None;
}
let tin = a.tokens_in.unwrap_or(0);
let tout = a
.tokens_out
.unwrap_or(0)
.saturating_add(a.reasoning_tokens.unwrap_or(0));
let table = bundled_cost_table();
let cost = if a.upstream_error.is_none() && !saw_usage {
table.estimate(None, 0, 0)
} else {
table.estimate(a.model.as_deref(), tin, tout)
};
Some(cost)
}
pub fn record_forward_outcome(
store_path: &Path,
cfg: &Config,
workspace: &Path,
a: &RecordArgs,
) -> Result<(), anyhow::Error> {
let store = Store::open(store_path).context("open kaizen store")?;
let sync_c = ingest_ctx(cfg, workspace.to_path_buf());
let now = now_ms()?;
if store.get_session(&a.session_id)?.is_none() {
let rec = SessionRecord {
id: a.session_id.clone(),
agent: "claude".to_string(),
model: a.model.clone(),
workspace: workspace.to_string_lossy().into_owned(),
started_at_ms: now,
ended_at_ms: None,
status: SessionStatus::Running,
trace_path: String::new(),
start_commit: None,
end_commit: None,
branch: None,
dirty_start: None,
dirty_end: None,
repo_binding_source: None,
prompt_fingerprint: None,
parent_session_id: None,
agent_version: None,
os: None,
arch: None,
repo_file_count: None,
repo_total_loc: None,
};
store.upsert_session(&rec)?;
}
let seq = store.next_event_seq(&a.session_id)?;
let (kind, payload) = if let Some(ref err) = a.upstream_error {
(
EventKind::Error,
json!({
"path": a.path,
"method": a.method,
"status": a.status,
"upstream_error": err,
}),
)
} else {
let mut p = json!({
"path": a.path,
"method": a.method,
"status": a.status,
"model": a.model,
});
if let Some(rid) = &a.request_id {
p["request_id"] = json!(rid);
}
(EventKind::Cost, p)
};
let cost_usd_e6 = proxy_event_cost_usd_e6(a);
let e = Event {
session_id: a.session_id.clone(),
seq,
ts_ms: now,
ts_exact: true,
kind,
source: EventSource::Proxy,
tool: None,
tool_call_id: None,
tokens_in: a.tokens_in,
tokens_out: a.tokens_out,
reasoning_tokens: a.reasoning_tokens,
cost_usd_e6,
stop_reason: a.stop_reason.clone(),
latency_ms: a.latency_ms,
ttft_ms: a.ttft_ms,
retry_count: a.retry_count,
context_used_tokens: a.tokens_in,
context_max_tokens: context_window_for_model(a.model.as_deref()),
cache_creation_tokens: a.cache_creation_tokens,
cache_read_tokens: a.cache_read_tokens,
system_prompt_tokens: None,
payload,
};
store.append_event_with_sync(&e, sync_c.as_ref())?;
store.upsert_trace_span(&trace_span(a, seq, now, cost_usd_e6))?;
Ok(())
}
#[derive(Clone)]
pub struct RecordArgs {
pub session_id: String,
pub model: Option<String>,
pub path: String,
pub method: String,
pub status: u16,
pub request_id: Option<String>,
pub tokens_in: Option<u32>,
pub tokens_out: Option<u32>,
pub reasoning_tokens: Option<u32>,
pub cache_creation_tokens: Option<u32>,
pub cache_read_tokens: Option<u32>,
pub stop_reason: Option<String>,
pub latency_ms: Option<u32>,
pub ttft_ms: Option<u32>,
pub retry_count: Option<u16>,
pub upstream_error: Option<String>,
}
fn now_ms() -> Result<u64, anyhow::Error> {
let d = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| anyhow::anyhow!("{e}"))?;
Ok(d.as_millis() as u64)
}
fn trace_span(a: &RecordArgs, seq: u64, now: u64, cost: Option<i64>) -> TraceSpanRecord {
let duration = a.latency_ms.unwrap_or(0);
TraceSpanRecord {
span_id: format!("llm-{}-{seq}", a.session_id),
trace_id: trace_id_for_session(&a.session_id),
parent_span_id: None,
session_id: a.session_id.clone(),
kind: TraceSpanKind::Llm,
name: format!("{}.{}", provider(a), trim_path(&a.path)),
status: if a.upstream_error.is_some() {
"error"
} else {
"ok"
}
.into(),
started_at_ms: Some(now.saturating_sub(duration as u64)),
ended_at_ms: Some(now),
duration_ms: a.latency_ms,
model: a.model.clone(),
tool: None,
tokens_in: a.tokens_in,
tokens_out: a.tokens_out,
reasoning_tokens: a.reasoning_tokens,
cost_usd_e6: cost,
context_used_tokens: a.tokens_in,
context_max_tokens: context_window_for_model(a.model.as_deref()),
payload: span_payload(provider(a), false, a.request_id.as_deref()),
}
}
fn provider(a: &RecordArgs) -> &'static str {
let model = a.model.as_deref().unwrap_or_default();
if a.path.contains("responses") || a.path.contains("chat/completions") {
return "openai";
}
if model.starts_with("gpt-") || model.starts_with('o') {
return "openai";
}
"anthropic"
}
fn trim_path(path: &str) -> &str {
path.trim_matches('/')
.rsplit('/')
.next()
.unwrap_or("request")
}
fn context_window_for_model(model: Option<&str>) -> Option<u32> {
let m = model?;
if m.contains("gpt-4.1") || m.contains("gpt-5") {
Some(1_000_000)
} else if m.contains("claude") || m.contains("gpt-4o") || m.starts_with('o') {
Some(200_000)
} else {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_args() -> RecordArgs {
RecordArgs {
session_id: "s".into(),
model: Some("claude-sonnet-4".into()),
path: "/v1/messages".into(),
method: "POST".into(),
status: 200,
request_id: None,
tokens_in: None,
tokens_out: None,
reasoning_tokens: None,
cache_creation_tokens: None,
cache_read_tokens: None,
stop_reason: None,
latency_ms: None,
ttft_ms: None,
retry_count: None,
upstream_error: None,
}
}
#[test]
fn time_ok() {
assert!(now_ms().unwrap() > 1_000_000_000);
}
#[test]
fn proxy_cost_matches_table_when_tokens_present() {
let mut a = sample_args();
a.tokens_in = Some(1000);
a.tokens_out = Some(500);
assert_eq!(proxy_event_cost_usd_e6(&a), Some(10_500));
}
#[test]
fn proxy_cost_heuristic_when_success_but_no_usage() {
let a = sample_args();
assert!(proxy_event_cost_usd_e6(&a).is_some_and(|c| c > 0));
}
#[test]
fn proxy_cost_none_on_error_without_usage() {
let mut a = sample_args();
a.upstream_error = Some("timeout".into());
assert_eq!(proxy_event_cost_usd_e6(&a), None);
}
#[test]
fn proxy_cost_on_error_when_usage_present() {
let mut a = sample_args();
a.upstream_error = Some("upstream 429".into());
a.tokens_in = Some(100);
a.tokens_out = Some(50);
assert!(proxy_event_cost_usd_e6(&a).is_some_and(|c| c > 0));
}
#[test]
fn recording_success_creates_llm_trace_span() {
let tmp = tempfile::tempdir().unwrap();
let db = tmp.path().join("kaizen.db");
let mut a = sample_args();
a.tokens_in = Some(10);
a.tokens_out = Some(20);
a.latency_ms = Some(30);
record_forward_outcome(&db, &Config::default(), tmp.path(), &a).unwrap();
let store = Store::open(&db).unwrap();
let spans = store.trace_spans_for_session("s").unwrap();
assert_eq!(spans.len(), 1);
assert_eq!(spans[0].kind, crate::core::trace_span::TraceSpanKind::Llm);
assert_eq!(spans[0].tokens_in, Some(10));
assert_eq!(spans[0].duration_ms, Some(30));
}
}