Skip to main content

obol_core/transcript/
pi.rs

1//! Pi (`pi --mode json`) transcript -> Vec<MessageUsage>.
2//! Reconciled with AgentsView internal/parser/pi.go (MIT, © 2026 Kenn Software LLC).
3
4use crate::error::ObolError;
5use crate::model::{MessageUsage, Provider};
6use serde_json::Value;
7
8pub fn parse(bytes: &[u8]) -> Result<Vec<MessageUsage>, ObolError> {
9    let text = std::str::from_utf8(bytes).map_err(|e| ObolError::MalformedTranscript {
10        line: 0,
11        msg: e.to_string(),
12    })?;
13    // Pi serializes the same usage on two disjoint channels: the `pi --mode json`
14    // stdout stream carries it on `turn_end` rows, while persisted session files
15    // carry it on `type:"message"` (role=assistant) rows and contain no `turn_end`.
16    // Bucket each shape separately and return one — never the sum — so a transcript
17    // that ever carried both (it shouldn't) can't be double-counted.
18    let mut from_turn_end = Vec::new();
19    let mut from_message = Vec::new();
20    let mut current_model = String::new();
21
22    for line in text.lines() {
23        let line = line.trim();
24        if line.is_empty() {
25            continue;
26        }
27        let v: Value = match serde_json::from_str(line) {
28            Ok(v) => v,
29            Err(_) => continue,
30        };
31        let ty = v.get("type").and_then(Value::as_str);
32        // Track the running model so a row without `message.model` can inherit it.
33        if ty == Some("model_change") {
34            if let Some(m) = v.get("modelId").and_then(Value::as_str) {
35                current_model = m.to_string();
36            }
37            continue;
38        }
39        match ty {
40            // Stdout-stream shape; message_update/start/end deltas are ignored.
41            Some("turn_end") => {
42                if let Some(rec) = extract(v.get("message"), &current_model) {
43                    from_turn_end.push(rec);
44                }
45            }
46            // Session-file shape: usage on the persisted assistant message.
47            Some("message") => {
48                let msg = v.get("message");
49                let is_assistant =
50                    msg.and_then(|m| m.get("role")).and_then(Value::as_str) == Some("assistant");
51                if is_assistant {
52                    if let Some(rec) = extract(msg, &current_model) {
53                        from_message.push(rec);
54                    }
55                }
56            }
57            _ => {}
58        }
59    }
60    // Prefer the stream aggregate when present; otherwise the persisted messages.
61    Ok(if from_turn_end.is_empty() {
62        from_message
63    } else {
64        from_turn_end
65    })
66}
67
68/// Build a usage record from a Pi `message` object (shared by both channels).
69/// Returns `None` when usage is absent or an empty object — no billable record.
70fn extract(msg: Option<&Value>, current_model: &str) -> Option<MessageUsage> {
71    let msg = msg?;
72    let usage = match msg.get("usage") {
73        Some(u) if u.as_object().is_some_and(|o| !o.is_empty()) => u,
74        _ => return None, // empty/foreign usage -> no billable record
75    };
76
77    let g = |k: &str| usage.get(k).and_then(Value::as_u64);
78    let nested = |outer: &str, inner: &str| {
79        usage
80            .get(outer)
81            .and_then(|c| c.get(inner))
82            .and_then(Value::as_u64)
83    };
84    let input = g("input").unwrap_or(0);
85    let output = g("output").unwrap_or(0);
86    let cache_read = g("cacheRead")
87        .or_else(|| nested("cache", "read"))
88        .unwrap_or(0);
89    let cache_write = g("cacheWrite")
90        .or_else(|| nested("cache", "write"))
91        .unwrap_or(0);
92
93    // Pi reports its own all-in cost per call (`usage.cost.total`). When present
94    // and sane, it's ground truth — preferred over list-price math downstream.
95    let native_cost_usd = usage
96        .get("cost")
97        .and_then(|c| c.get("total"))
98        .and_then(Value::as_f64)
99        .filter(|c| c.is_finite() && *c >= 0.0);
100
101    let provider_str = msg.get("provider").and_then(Value::as_str).unwrap_or("");
102    let (namespace, provider) = route(provider_str);
103
104    let model = msg
105        .get("model")
106        .and_then(Value::as_str)
107        .filter(|s| !s.is_empty())
108        .map(str::to_string)
109        .unwrap_or_else(|| current_model.to_string());
110
111    Some(MessageUsage {
112        model,
113        provider,
114        namespace,
115        input_uncached: input,
116        cache_read,
117        cache_write_5m: cache_write,
118        cache_write_1h: 0,
119        output,
120        request_input_tokens: input + cache_read + cache_write,
121        service_tier: None,
122        native_cost_usd,
123    })
124}
125
126/// Map Pi's `provider` (a backend/route name) to (price namespace, Provider label).
127/// Only `openrouter` uses the OpenRouter table; everything else prices from LiteLLM.
128fn route(provider: &str) -> (String, Provider) {
129    match provider {
130        "openrouter" => ("openrouter".to_string(), Provider::OpenRouter),
131        "anthropic" => ("litellm".to_string(), Provider::Anthropic),
132        "openai" | "openai-codex" => ("litellm".to_string(), Provider::OpenAI),
133        other => ("litellm".to_string(), Provider::Other(other.to_string())),
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[test]
142    fn reads_turn_end_usage_and_routes_by_provider() {
143        let u = parse(include_bytes!("../../tests/fixtures/pi-mini.jsonl")).unwrap();
144        // message_update ignored; empty-usage turn skipped -> 3 records
145        assert_eq!(u.len(), 3, "{u:?}");
146
147        assert_eq!(u[0].namespace, "litellm");
148        assert_eq!(u[0].provider, Provider::OpenAI);
149        assert_eq!(u[0].model, "gpt-5.4");
150        assert_eq!(u[0].input_uncached, 15024);
151        assert_eq!(u[0].cache_read, 12288);
152        assert_eq!(u[0].output, 251);
153
154        assert_eq!(u[1].namespace, "openrouter");
155        assert_eq!(u[1].provider, Provider::OpenRouter);
156        assert_eq!(u[1].model, "tencent/hy3-preview"); // vendor-qualified key, verbatim
157        assert_eq!(u[1].input_uncached, 6412);
158        assert_eq!(u[1].cache_read, 5760);
159
160        // anthropic turn with no `message.model` inherits the prior model_change
161        assert_eq!(u[2].namespace, "litellm");
162        assert_eq!(u[2].provider, Provider::Anthropic);
163        assert_eq!(u[2].model, "claude-opus-4-5");
164        assert_eq!(u[2].input_uncached, 100);
165    }
166
167    #[test]
168    fn reads_session_file_message_rows() {
169        // Persisted session files carry usage on `type:"message"` assistant rows
170        // (role=user rows and empty-usage rows skipped) and contain no `turn_end`.
171        let u = parse(include_bytes!("../../tests/fixtures/pi-session-mini.jsonl")).unwrap();
172        assert_eq!(u.len(), 3, "{u:?}");
173
174        assert_eq!(u[0].namespace, "litellm");
175        assert_eq!(u[0].provider, Provider::OpenAI);
176        assert_eq!(u[0].model, "gpt-5.5");
177        assert_eq!(u[0].input_uncached, 6197);
178        assert_eq!(u[0].cache_read, 64512);
179        assert_eq!(u[0].output, 972);
180
181        assert_eq!(u[1].namespace, "openrouter");
182        assert_eq!(u[1].provider, Provider::OpenRouter);
183        assert_eq!(u[1].model, "tencent/hy3-preview");
184        assert_eq!(u[1].input_uncached, 6412);
185        assert_eq!(u[1].cache_read, 5760);
186
187        // anthropic message with no `message.model` inherits the prior model_change
188        assert_eq!(u[2].namespace, "litellm");
189        assert_eq!(u[2].provider, Provider::Anthropic);
190        assert_eq!(u[2].model, "claude-opus-4-5");
191        assert_eq!(u[2].input_uncached, 100);
192    }
193
194    #[test]
195    fn captures_native_cost_from_cost_total() {
196        let u = parse(include_bytes!("../../tests/fixtures/pi-session-mini.jsonl")).unwrap();
197        // openai-codex message carries usage.cost.total
198        assert_eq!(u[0].native_cost_usd, Some(0.0537));
199        // openrouter message: cost.total present
200        assert_eq!(u[1].native_cost_usd, Some(0.0007));
201        // anthropic message has no `cost` block -> fall back to list-price (None)
202        assert_eq!(u[2].native_cost_usd, None);
203    }
204
205    #[test]
206    fn prefers_turn_end_over_message_never_summing_both() {
207        // The two channels are disjoint in practice; if a transcript ever carried
208        // both, we return the turn_end bucket alone — not the sum — so the same
209        // usage can't be counted twice.
210        let mixed = br#"{"type":"turn_end","message":{"model":"gpt-5.5","provider":"openai","usage":{"input":1275,"output":5}}}
211{"type":"message","message":{"role":"assistant","model":"gpt-5.5","provider":"openai","usage":{"input":1275,"output":5}}}"#;
212        let u = parse(mixed).unwrap();
213        assert_eq!(u.len(), 1, "{u:?}");
214        assert_eq!(u[0].input_uncached, 1275);
215    }
216}