Skip to main content

obol_core/transcript/
pi.rs

1//! Pi (`pi --mode json`) transcript -> Vec<MessageUsage>.
2//! Reconciled with AgentsView internal/parser/pi.go (MIT, © 2026 Kenn Software LLC).
3
4use crate::error::ObolError;
5use crate::model::{MessageUsage, Provider};
6use serde_json::Value;
7
8pub fn parse(bytes: &[u8]) -> Result<Vec<MessageUsage>, ObolError> {
9    let text = std::str::from_utf8(bytes).map_err(|e| ObolError::MalformedTranscript {
10        line: 0,
11        msg: e.to_string(),
12    })?;
13    let mut out = Vec::new();
14    let mut current_model = String::new();
15
16    for line in text.lines() {
17        let line = line.trim();
18        if line.is_empty() {
19            continue;
20        }
21        let v: Value = match serde_json::from_str(line) {
22            Ok(v) => v,
23            Err(_) => continue,
24        };
25        let ty = v.get("type").and_then(Value::as_str);
26        // Track the running model so a turn_end without `message.model` can inherit it.
27        if ty == Some("model_change") {
28            if let Some(m) = v.get("modelId").and_then(Value::as_str) {
29                current_model = m.to_string();
30            }
31            continue;
32        }
33        // Usage lives on turn_end; the streaming message_update deltas are ignored.
34        if ty != Some("turn_end") {
35            continue;
36        }
37        let msg = match v.get("message") {
38            Some(m) => m,
39            None => continue,
40        };
41        let usage = match msg.get("usage") {
42            Some(u) if u.as_object().is_some_and(|o| !o.is_empty()) => u,
43            _ => continue, // empty/foreign usage -> no billable record
44        };
45
46        let g = |k: &str| usage.get(k).and_then(Value::as_u64);
47        let nested = |outer: &str, inner: &str| {
48            usage
49                .get(outer)
50                .and_then(|c| c.get(inner))
51                .and_then(Value::as_u64)
52        };
53        let input = g("input").unwrap_or(0);
54        let output = g("output").unwrap_or(0);
55        let cache_read = g("cacheRead")
56            .or_else(|| nested("cache", "read"))
57            .unwrap_or(0);
58        let cache_write = g("cacheWrite")
59            .or_else(|| nested("cache", "write"))
60            .unwrap_or(0);
61
62        let provider_str = msg.get("provider").and_then(Value::as_str).unwrap_or("");
63        let (namespace, provider) = route(provider_str);
64
65        let model = msg
66            .get("model")
67            .and_then(Value::as_str)
68            .filter(|s| !s.is_empty())
69            .map(str::to_string)
70            .unwrap_or_else(|| current_model.clone());
71
72        out.push(MessageUsage {
73            model,
74            provider,
75            namespace,
76            input_uncached: input,
77            cache_read,
78            cache_write_5m: cache_write,
79            cache_write_1h: 0,
80            output,
81            request_input_tokens: input + cache_read + cache_write,
82            service_tier: None,
83        });
84    }
85    Ok(out)
86}
87
88/// Map Pi's `provider` (a backend/route name) to (price namespace, Provider label).
89/// Only `openrouter` uses the OpenRouter table; everything else prices from LiteLLM.
90fn route(provider: &str) -> (String, Provider) {
91    match provider {
92        "openrouter" => ("openrouter".to_string(), Provider::OpenRouter),
93        "anthropic" => ("litellm".to_string(), Provider::Anthropic),
94        "openai" | "openai-codex" => ("litellm".to_string(), Provider::OpenAI),
95        other => ("litellm".to_string(), Provider::Other(other.to_string())),
96    }
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn reads_turn_end_usage_and_routes_by_provider() {
105        let u = parse(include_bytes!("../../tests/fixtures/pi-mini.jsonl")).unwrap();
106        // message_update ignored; empty-usage turn skipped -> 3 records
107        assert_eq!(u.len(), 3, "{u:?}");
108
109        assert_eq!(u[0].namespace, "litellm");
110        assert_eq!(u[0].provider, Provider::OpenAI);
111        assert_eq!(u[0].model, "gpt-5.4");
112        assert_eq!(u[0].input_uncached, 15024);
113        assert_eq!(u[0].cache_read, 12288);
114        assert_eq!(u[0].output, 251);
115
116        assert_eq!(u[1].namespace, "openrouter");
117        assert_eq!(u[1].provider, Provider::OpenRouter);
118        assert_eq!(u[1].model, "tencent/hy3-preview"); // vendor-qualified key, verbatim
119        assert_eq!(u[1].input_uncached, 6412);
120        assert_eq!(u[1].cache_read, 5760);
121
122        // anthropic turn with no `message.model` inherits the prior model_change
123        assert_eq!(u[2].namespace, "litellm");
124        assert_eq!(u[2].provider, Provider::Anthropic);
125        assert_eq!(u[2].model, "claude-opus-4-5");
126        assert_eq!(u[2].input_uncached, 100);
127    }
128}