ccstat_provider_opencode/
data_loader.rs1use async_trait::async_trait;
7use ccstat_core::error::{CcstatError, Result};
8use ccstat_core::provider::ProviderDataLoader;
9use ccstat_core::types::{ISOTimestamp, ModelName, SessionId, TokenCounts, UsageEntry};
10use chrono::{TimeZone, Utc};
11use futures::stream::Stream;
12use serde::Deserialize;
13use std::collections::HashSet;
14use std::path::PathBuf;
15use std::pin::Pin;
16use tracing::{debug, warn};
17
18pub struct DataLoader {
20 message_dir: PathBuf,
21}
22
23#[async_trait]
24impl ProviderDataLoader for DataLoader {
25 async fn new() -> Result<Self> {
26 let base = if let Ok(data_dir) = std::env::var("OPENCODE_DATA_DIR") {
27 PathBuf::from(data_dir)
28 } else {
29 dirs::data_dir()
30 .ok_or_else(|| CcstatError::Config("Cannot determine data directory".into()))?
31 .join("opencode")
32 };
33
34 let message_dir = base.join("storage").join("message");
35 if !message_dir.exists() {
36 debug!(
37 "OpenCode message directory not found: {}",
38 message_dir.display()
39 );
40 }
41
42 Ok(DataLoader { message_dir })
43 }
44
45 fn load_entries(&self) -> Pin<Box<dyn Stream<Item = Result<UsageEntry>> + Send + '_>> {
46 Box::pin(async_stream::try_stream! {
47 if !self.message_dir.exists() {
48 return;
49 }
50
51 let mut json_files = Vec::new();
52 for entry in walkdir::WalkDir::new(&self.message_dir)
53 .min_depth(1)
54 .max_depth(1)
55 .into_iter()
56 .filter_map(|e| e.ok())
57 {
58 let path = entry.path().to_path_buf();
59 if path.extension().is_some_and(|ext| ext == "json") {
60 json_files.push(path);
61 }
62 }
63
64 debug!("Found {} OpenCode message files", json_files.len());
65
66 let mut seen_ids = HashSet::new();
67
68 for path in json_files {
69 let content = match tokio::fs::read_to_string(&path).await {
70 Ok(c) => c,
71 Err(e) => {
72 warn!("Failed to read OpenCode message file {}: {}", path.display(), e);
73 continue;
74 }
75 };
76
77 let msg: OpenCodeMessage = match serde_json::from_str(&content) {
78 Ok(m) => m,
79 Err(e) => {
80 warn!("Failed to parse OpenCode message {}: {}", path.display(), e);
81 continue;
82 }
83 };
84
85 if !seen_ids.insert(msg.id.clone()) {
87 continue;
88 }
89
90 if let Some(entry) = convert_message(msg) {
91 yield entry;
92 }
93 }
94 })
95 }
96}
97
98#[derive(Deserialize)]
103#[serde(rename_all = "camelCase")]
104struct OpenCodeMessage {
105 id: String,
106 #[serde(rename = "sessionID")]
107 session_id: String,
108 #[serde(rename = "modelID")]
109 model_id: String,
110 time: Option<MessageTime>,
111 tokens: Option<MessageTokens>,
112 cost: Option<f64>,
113}
114
115#[derive(Deserialize)]
116struct MessageTime {
117 created: Option<f64>,
118}
119
120#[derive(Deserialize)]
121struct MessageTokens {
122 #[serde(default)]
123 input: u64,
124 #[serde(default)]
125 output: u64,
126 #[serde(default)]
127 cache: Option<CacheTokens>,
128}
129
130#[derive(Deserialize)]
131struct CacheTokens {
132 #[serde(default)]
133 read: u64,
134 #[serde(default)]
135 write: u64,
136}
137
138fn convert_message(msg: OpenCodeMessage) -> Option<UsageEntry> {
143 let tokens = msg.tokens.as_ref()?;
144
145 if tokens.input == 0 && tokens.output == 0 {
147 return None;
148 }
149
150 let timestamp = msg
151 .time
152 .as_ref()
153 .and_then(|t| t.created)
154 .and_then(|ts| {
155 let secs = ts as i64;
156 let nanos = ((ts - secs as f64) * 1_000_000_000.0) as u32;
157 Utc.timestamp_opt(secs, nanos).single()
158 })
159 .unwrap_or_else(Utc::now);
160
161 let (cache_read, cache_write) = match &tokens.cache {
162 Some(c) => (c.read, c.write),
163 None => (0, 0),
164 };
165
166 let model_name = normalize_model(&msg.model_id);
167
168 Some(UsageEntry {
169 session_id: SessionId::new(msg.session_id),
170 timestamp: ISOTimestamp::new(timestamp),
171 model: ModelName::new(model_name),
172 tokens: TokenCounts::new(tokens.input, tokens.output, cache_write, cache_read),
173 total_cost: msg.cost,
174 project: None,
175 instance_id: None,
176 })
177}
178
179fn normalize_model(model: &str) -> String {
181 if model == "gemini-3-pro-high" {
182 "gemini-3-pro-preview".to_string()
183 } else {
184 model.to_string()
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use std::io::Write as IoWrite;
192 use tempfile::TempDir;
193
194 fn make_message_json(
195 id: &str,
196 session_id: &str,
197 model: &str,
198 created: f64,
199 input: u64,
200 output: u64,
201 cache_read: u64,
202 cache_write: u64,
203 cost: Option<f64>,
204 ) -> String {
205 let cost_json = match cost {
206 Some(c) => format!("{}", c),
207 None => "null".to_string(),
208 };
209 format!(
210 r#"{{"id":"{}","sessionID":"{}","modelID":"{}","time":{{"created":{}}},"tokens":{{"input":{},"output":{},"cache":{{"read":{},"write":{}}}}},"cost":{}}}"#,
211 id, session_id, model, created, input, output, cache_read, cache_write, cost_json
212 )
213 }
214
215 #[tokio::test]
216 async fn test_parse_message() {
217 let dir = TempDir::new().unwrap();
218 let msg_dir = dir.path().join("storage").join("message");
219 std::fs::create_dir_all(&msg_dir).unwrap();
220
221 let msg_file = msg_dir.join("msg1.json");
222 let mut f = std::fs::File::create(&msg_file).unwrap();
223 write!(
224 f,
225 "{}",
226 make_message_json(
227 "msg1",
228 "sess1",
229 "claude-sonnet-4",
230 1735689600.0,
231 100,
232 50,
233 10,
234 5,
235 Some(0.01)
236 )
237 )
238 .unwrap();
239
240 let loader = DataLoader {
241 message_dir: msg_dir,
242 };
243 let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
244 assert_eq!(entries.len(), 1);
245 let entry = entries[0].as_ref().unwrap();
246 assert_eq!(entry.tokens.input_tokens, 100);
247 assert_eq!(entry.tokens.output_tokens, 50);
248 assert_eq!(entry.tokens.cache_read_tokens, 10);
249 assert_eq!(entry.tokens.cache_creation_tokens, 5);
250 assert_eq!(entry.total_cost, Some(0.01));
251 }
252
253 #[tokio::test]
254 async fn test_dedup_by_id() {
255 let dir = TempDir::new().unwrap();
256 let msg_dir = dir.path().join("storage").join("message");
257 std::fs::create_dir_all(&msg_dir).unwrap();
258
259 for i in 0..2 {
261 let msg_file = msg_dir.join(format!("msg_dup_{}.json", i));
262 let mut f = std::fs::File::create(&msg_file).unwrap();
263 write!(
264 f,
265 "{}",
266 make_message_json(
267 "same-id",
268 "sess1",
269 "gpt-5",
270 1735689600.0,
271 100,
272 50,
273 0,
274 0,
275 None
276 )
277 )
278 .unwrap();
279 }
280
281 let loader = DataLoader {
282 message_dir: msg_dir,
283 };
284 let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
285 assert_eq!(entries.len(), 1);
287 }
288
289 #[tokio::test]
290 async fn test_model_alias() {
291 assert_eq!(normalize_model("gemini-3-pro-high"), "gemini-3-pro-preview");
292 assert_eq!(normalize_model("claude-sonnet-4"), "claude-sonnet-4");
293 }
294
295 #[tokio::test]
296 async fn test_skip_zero_tokens() {
297 let msg = OpenCodeMessage {
298 id: "zero".to_string(),
299 session_id: "s1".to_string(),
300 model_id: "gpt-5".to_string(),
301 time: Some(MessageTime {
302 created: Some(1735689600.0),
303 }),
304 tokens: Some(MessageTokens {
305 input: 0,
306 output: 0,
307 cache: None,
308 }),
309 cost: None,
310 };
311 assert!(convert_message(msg).is_none());
312 }
313
314 #[tokio::test]
315 async fn test_no_dir() {
316 let loader = DataLoader {
317 message_dir: PathBuf::from("/tmp/nonexistent-opencode-dir"),
318 };
319 let entries: Vec<_> = futures::StreamExt::collect::<Vec<_>>(loader.load_entries()).await;
320 assert!(entries.is_empty());
321 }
322}