Skip to main content

cc_token_usage/data/
parser.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10/// Parse a session JSONL file into validated turns and quality metrics.
11///
12/// Each line is validated through a pipeline: JSON parse → type filter →
13/// synthetic filter → model/usage/timestamp checks → deduplication.
14pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15    let file =
16        File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17    let reader = BufReader::new(file);
18
19    let mut quality = DataQuality::default();
20    let mut turns = Vec::new();
21    let mut request_id_index: HashMap<String, usize> = HashMap::new();
22    let now = Utc::now();
23    let mut last_user_text: Option<String> = None;
24
25    for line_result in reader.lines() {
26        let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27        quality.total_lines += 1;
28
29        // 1. Parse JSON
30        let entry: JournalEntry = match serde_json::from_str(&line) {
31            Ok(e) => e,
32            Err(_) => {
33                quality.skipped_parse_error += 1;
34                continue;
35            }
36        };
37
38        // 2. Type filter — capture user text, only process Assistant entries
39        let msg = match entry {
40            JournalEntry::Assistant(msg) => msg,
41            JournalEntry::User(user_msg) => {
42                // Extract user message text for pairing with next assistant turn
43                // user_msg.message is {"role":"user","content":...} — need to get "content" first
44                let content_val = user_msg.message.as_ref()
45                    .and_then(|m| m.get("content"));
46                if let Some(content) = content_val {
47                    let text = if let Some(s) = content.as_str() {
48                        // content is a plain string
49                        s.to_string()
50                    } else if let Some(arr) = content.as_array() {
51                        // content is an array of blocks — extract text blocks, skip tool_result
52                        arr.iter()
53                            .filter_map(|b| {
54                                if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55                                    b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56                                } else {
57                                    None
58                                }
59                            })
60                            .collect::<Vec<_>>()
61                            .join("\n")
62                    } else {
63                        String::new()
64                    };
65
66                    if !text.is_empty() {
67                        let truncated = if text.len() > 500 {
68                            format!("{}...", &text[..text.floor_char_boundary(500)])
69                        } else {
70                            text
71                        };
72                        last_user_text = Some(truncated);
73                    }
74                }
75                continue;
76            }
77            JournalEntry::QueueOperation(_) => continue,
78        };
79
80        let api = match msg.message {
81            Some(api) => api,
82            None => {
83                quality.skipped_invalid += 1;
84                continue;
85            }
86        };
87
88        // 3. Sidechain filter — skip abandoned conversation branches (e.g. retried turns)
89        //    Agent files always have isSidechain=true for all entries, so skip this
90        //    filter when parsing agent files.
91        if !is_agent && msg.is_sidechain == Some(true) {
92            quality.skipped_sidechain += 1;
93            continue;
94        }
95
96        // 4. Synthetic filter
97        if api.model.as_deref() == Some("<synthetic>") {
98            quality.skipped_synthetic += 1;
99            continue;
100        }
101
102        // 5. Model existence
103        let model = match api.model {
104            Some(m) => m,
105            None => {
106                quality.skipped_invalid += 1;
107                continue;
108            }
109        };
110
111        // 6. Usage existence
112        let usage = match api.usage {
113            Some(u) => u,
114            None => {
115                quality.skipped_invalid += 1;
116                continue;
117            }
118        };
119
120        // 7. Non-zero usage validation
121        let total_tokens = usage.input_tokens.unwrap_or(0)
122            + usage.output_tokens.unwrap_or(0)
123            + usage.cache_creation_input_tokens.unwrap_or(0)
124            + usage.cache_read_input_tokens.unwrap_or(0);
125        if total_tokens == 0 {
126            quality.skipped_invalid += 1;
127            continue;
128        }
129
130        // 8. Timestamp parsing
131        let timestamp_str = match &msg.timestamp {
132            Some(ts) if !ts.is_empty() => ts.as_str(),
133            _ => {
134                quality.skipped_invalid += 1;
135                continue;
136            }
137        };
138        let timestamp: DateTime<Utc> = match timestamp_str.parse() {
139            Ok(ts) if ts <= now => ts,
140            _ => {
141                quality.skipped_invalid += 1;
142                continue;
143            }
144        };
145
146        // 9. Deduplication by requestId — streaming responses produce multiple
147        //    entries per API call with the same requestId but different uuids.
148        //    The last entry for each requestId contains the complete response,
149        //    so we overwrite earlier entries.
150        let uuid = msg.uuid.unwrap_or_default();
151        let request_id_key = msg.request_id.clone().unwrap_or_default();
152
153        // 10. Extract content types, assistant text, and tool names
154        let mut content_types = Vec::new();
155        let mut assistant_text_parts = Vec::new();
156        let mut tool_names = Vec::new();
157
158        if let Some(ref blocks) = api.content {
159            for b in blocks {
160                match b {
161                    ContentBlock::Text { text } => {
162                        content_types.push("text".to_string());
163                        if let Some(t) = text {
164                            assistant_text_parts.push(t.clone());
165                        }
166                    }
167                    ContentBlock::ToolUse { name, .. } => {
168                        content_types.push("tool_use".to_string());
169                        if let Some(n) = name {
170                            tool_names.push(n.clone());
171                        }
172                    }
173                    ContentBlock::Other => {
174                        content_types.push("other".to_string());
175                    }
176                }
177            }
178        }
179
180        // Truncate assistant text to 500 chars
181        let assistant_text = if assistant_text_parts.is_empty() {
182            None
183        } else {
184            let full = assistant_text_parts.join("\n");
185            Some(if full.len() > 500 {
186                format!("{}...", &full[..full.floor_char_boundary(500)])
187            } else {
188                full
189            })
190        };
191
192        // 11. Construct ValidatedTurn — attach user text from previous message
193        let turn = ValidatedTurn {
194            uuid,
195            request_id: msg.request_id,
196            timestamp,
197            model,
198            usage,
199            stop_reason: api.stop_reason,
200            content_types,
201            is_agent,
202            agent_id: msg.agent_id,
203            user_text: last_user_text.take(),
204            assistant_text,
205            tool_names,
206        };
207
208        // Overwrite earlier entry for the same requestId (streaming duplicates)
209        if !request_id_key.is_empty() {
210            if let Some(&idx) = request_id_index.get(&request_id_key) {
211                turns[idx] = turn;
212                quality.duplicate_turns += 1;
213                continue;
214            }
215            request_id_index.insert(request_id_key, turns.len());
216        }
217        turns.push(turn);
218    }
219
220    quality.valid_turns = turns.len();
221
222    Ok((turns, quality))
223}
224
225#[cfg(test)]
226mod tests {
227    use super::*;
228    use std::io::Write;
229    use tempfile::NamedTempFile;
230
231    const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
232
233    fn write_jsonl(lines: &[&str]) -> NamedTempFile {
234        let mut f = NamedTempFile::new().unwrap();
235        for line in lines {
236            writeln!(f, "{}", line).unwrap();
237        }
238        f.flush().unwrap();
239        f
240    }
241
242    #[test]
243    fn parse_valid_assistant_turn() {
244        let f = write_jsonl(&[VALID_ASSISTANT]);
245        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
246
247        assert_eq!(turns.len(), 1);
248        assert_eq!(quality.valid_turns, 1);
249        assert_eq!(turns[0].model, "claude-opus-4-6");
250        assert_eq!(turns[0].uuid, "u1");
251        assert!(!turns[0].is_agent);
252        assert_eq!(turns[0].content_types, vec!["text"]);
253    }
254
255    #[test]
256    fn filters_synthetic_messages() {
257        let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
258        let f = write_jsonl(&[synthetic]);
259        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
260
261        assert_eq!(turns.len(), 0);
262        assert_eq!(quality.skipped_synthetic, 1);
263    }
264
265    #[test]
266    fn filters_zero_usage() {
267        let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
268        let f = write_jsonl(&[zero_usage]);
269        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
270
271        assert_eq!(turns.len(), 0);
272        assert_eq!(quality.skipped_invalid, 1);
273    }
274
275    #[test]
276    fn deduplicates_turns() {
277        let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
278        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
279
280        assert_eq!(turns.len(), 1);
281        assert_eq!(quality.duplicate_turns, 1);
282    }
283
284    #[test]
285    fn skips_malformed_lines() {
286        let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
287        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
288
289        assert_eq!(turns.len(), 1);
290        assert_eq!(quality.skipped_parse_error, 1);
291    }
292
293    #[test]
294    fn filters_sidechain_turns() {
295        let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
296        let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
297        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
298
299        assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
300        assert_eq!(quality.skipped_sidechain, 1);
301        assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
302    }
303}