Skip to main content

ccstat_provider_opencode/
data_loader.rs

1//! OpenCode data loader
2//!
3//! Discovers and parses OpenCode per-message JSON files from
4//! `~/.local/share/opencode/storage/message/`.
5
6use async_trait::async_trait;
7use ccstat_core::error::{CcstatError, Result};
8use ccstat_core::provider::ProviderDataLoader;
9use ccstat_core::types::{ISOTimestamp, ModelName, SessionId, TokenCounts, UsageEntry};
10use chrono::{TimeZone, Utc};
11use futures::stream::Stream;
12use serde::Deserialize;
13use std::collections::HashSet;
14use std::path::PathBuf;
15use std::pin::Pin;
16use tracing::{debug, warn};
17
18/// Data loader for OpenCode usage data.
19pub struct DataLoader {
20    message_dir: PathBuf,
21}
22
23#[async_trait]
24impl ProviderDataLoader for DataLoader {
25    async fn new() -> Result<Self> {
26        let base = if let Ok(data_dir) = std::env::var("OPENCODE_DATA_DIR") {
27            PathBuf::from(data_dir)
28        } else {
29            dirs::data_dir()
30                .ok_or_else(|| CcstatError::Config("Cannot determine data directory".into()))?
31                .join("opencode")
32        };
33
34        let message_dir = base.join("storage").join("message");
35        if !message_dir.exists() {
36            debug!(
37                "OpenCode message directory not found: {}",
38                message_dir.display()
39            );
40        }
41
42        Ok(DataLoader { message_dir })
43    }
44
45    fn load_entries(&self) -> Pin<Box<dyn Stream<Item = Result<UsageEntry>> + Send + '_>> {
46        Box::pin(async_stream::try_stream! {
47            if !self.message_dir.exists() {
48                return;
49            }
50
51            let mut json_files = Vec::new();
52            for entry in walkdir::WalkDir::new(&self.message_dir)
53                .min_depth(1)
54                .max_depth(1)
55                .into_iter()
56                .filter_map(|e| e.ok())
57            {
58                let path = entry.path().to_path_buf();
59                if path.extension().is_some_and(|ext| ext == "json") {
60                    json_files.push(path);
61                }
62            }
63
64            debug!("Found {} OpenCode message files", json_files.len());
65
66            let mut seen_ids = HashSet::new();
67
68            for path in json_files {
69                let content = match tokio::fs::read_to_string(&path).await {
70                    Ok(c) => c,
71                    Err(e) => {
72                        warn!("Failed to read OpenCode message file {}: {}", path.display(), e);
73                        continue;
74                    }
75                };
76
77                let msg: OpenCodeMessage = match serde_json::from_str(&content) {
78                    Ok(m) => m,
79                    Err(e) => {
80                        warn!("Failed to parse OpenCode message {}: {}", path.display(), e);
81                        continue;
82                    }
83                };
84
85                // Dedup by message id
86                if !seen_ids.insert(msg.id.clone()) {
87                    continue;
88                }
89
90                if let Some(entry) = convert_message(msg) {
91                    yield entry;
92                }
93            }
94        })
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Message schema
100// ---------------------------------------------------------------------------
101
102#[derive(Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct OpenCodeMessage {
105    id: String,
106    #[serde(rename = "sessionID")]
107    session_id: String,
108    #[serde(rename = "modelID")]
109    model_id: String,
110    time: Option<MessageTime>,
111    tokens: Option<MessageTokens>,
112    cost: Option<f64>,
113}
114
115#[derive(Deserialize)]
116struct MessageTime {
117    created: Option<f64>,
118}
119
120#[derive(Deserialize)]
121struct MessageTokens {
122    #[serde(default)]
123    input: u64,
124    #[serde(default)]
125    output: u64,
126    #[serde(default)]
127    cache: Option<CacheTokens>,
128}
129
130#[derive(Deserialize)]
131struct CacheTokens {
132    #[serde(default)]
133    read: u64,
134    #[serde(default)]
135    write: u64,
136}
137
138// ---------------------------------------------------------------------------
139// Conversion
140// ---------------------------------------------------------------------------
141
142fn convert_message(msg: OpenCodeMessage) -> Option<UsageEntry> {
143    let tokens = msg.tokens.as_ref()?;
144
145    // Skip zero-token messages
146    if tokens.input == 0 && tokens.output == 0 {
147        return None;
148    }
149
150    let timestamp = msg
151        .time
152        .as_ref()
153        .and_then(|t| t.created)
154        .and_then(|ts| {
155            let secs = ts as i64;
156            let nanos = ((ts - secs as f64) * 1_000_000_000.0) as u32;
157            Utc.timestamp_opt(secs, nanos).single()
158        })
159        .unwrap_or_else(Utc::now);
160
161    let (cache_read, cache_write) = match &tokens.cache {
162        Some(c) => (c.read, c.write),
163        None => (0, 0),
164    };
165
166    let model_name = normalize_model(&msg.model_id);
167
168    Some(UsageEntry {
169        session_id: SessionId::new(msg.session_id),
170        timestamp: ISOTimestamp::new(timestamp),
171        model: ModelName::new(model_name),
172        tokens: TokenCounts::new(tokens.input, tokens.output, cache_write, cache_read),
173        total_cost: msg.cost,
174        project: None,
175        instance_id: None,
176    })
177}
178
179/// Normalize OpenCode model names.
180fn normalize_model(model: &str) -> String {
181    if model == "gemini-3-pro-high" {
182        "gemini-3-pro-preview".to_string()
183    } else {
184        model.to_string()
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use std::io::Write as IoWrite;
192    use tempfile::TempDir;
193
194    fn make_message_json(
195        id: &str,
196        session_id: &str,
197        model: &str,
198        created: f64,
199        input: u64,
200        output: u64,
201        cache_read: u64,
202        cache_write: u64,
203        cost: Option<f64>,
204    ) -> String {
205        let cost_json = match cost {
206            Some(c) => format!("{}", c),
207            None => "null".to_string(),
208        };
209        format!(
210            r#"{{"id":"{}","sessionID":"{}","modelID":"{}","time":{{"created":{}}},"tokens":{{"input":{},"output":{},"cache":{{"read":{},"write":{}}}}},"cost":{}}}"#,
211            id, session_id, model, created, input, output, cache_read, cache_write, cost_json
212        )
213    }
214
215    #[tokio::test]
216    async fn test_parse_message() {
217        let dir = TempDir::new().unwrap();
218        let msg_dir = dir.path().join("storage").join("message");
219        std::fs::create_dir_all(&msg_dir).unwrap();
220
221        let msg_file = msg_dir.join("msg1.json");
222        let mut f = std::fs::File::create(&msg_file).unwrap();
223        write!(
224            f,
225            "{}",
226            make_message_json(
227                "msg1",
228                "sess1",
229                "claude-sonnet-4",
230                1735689600.0,
231                100,
232                50,
233                10,
234                5,
235                Some(0.01)
236            )
237        )
238        .unwrap();
239
240        let loader = DataLoader {
241            message_dir: msg_dir,
242        };
243        let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
244        assert_eq!(entries.len(), 1);
245        let entry = entries[0].as_ref().unwrap();
246        assert_eq!(entry.tokens.input_tokens, 100);
247        assert_eq!(entry.tokens.output_tokens, 50);
248        assert_eq!(entry.tokens.cache_read_tokens, 10);
249        assert_eq!(entry.tokens.cache_creation_tokens, 5);
250        assert_eq!(entry.total_cost, Some(0.01));
251    }
252
253    #[tokio::test]
254    async fn test_dedup_by_id() {
255        let dir = TempDir::new().unwrap();
256        let msg_dir = dir.path().join("storage").join("message");
257        std::fs::create_dir_all(&msg_dir).unwrap();
258
259        // Two files with same message id
260        for i in 0..2 {
261            let msg_file = msg_dir.join(format!("msg_dup_{}.json", i));
262            let mut f = std::fs::File::create(&msg_file).unwrap();
263            write!(
264                f,
265                "{}",
266                make_message_json(
267                    "same-id",
268                    "sess1",
269                    "gpt-5",
270                    1735689600.0,
271                    100,
272                    50,
273                    0,
274                    0,
275                    None
276                )
277            )
278            .unwrap();
279        }
280
281        let loader = DataLoader {
282            message_dir: msg_dir,
283        };
284        let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
285        // Should be deduped to 1
286        assert_eq!(entries.len(), 1);
287    }
288
289    #[tokio::test]
290    async fn test_model_alias() {
291        assert_eq!(normalize_model("gemini-3-pro-high"), "gemini-3-pro-preview");
292        assert_eq!(normalize_model("claude-sonnet-4"), "claude-sonnet-4");
293    }
294
295    #[tokio::test]
296    async fn test_skip_zero_tokens() {
297        let msg = OpenCodeMessage {
298            id: "zero".to_string(),
299            session_id: "s1".to_string(),
300            model_id: "gpt-5".to_string(),
301            time: Some(MessageTime {
302                created: Some(1735689600.0),
303            }),
304            tokens: Some(MessageTokens {
305                input: 0,
306                output: 0,
307                cache: None,
308            }),
309            cost: None,
310        };
311        assert!(convert_message(msg).is_none());
312    }
313
314    #[tokio::test]
315    async fn test_no_dir() {
316        let loader = DataLoader {
317            message_dir: PathBuf::from("/tmp/nonexistent-opencode-dir"),
318        };
319        let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
320        assert!(entries.is_empty());
321    }
322}