Skip to main content

cc_token_usage/data/
parser.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10/// Parse a session JSONL file into validated turns and quality metrics.
11///
12/// Each line is validated through a pipeline: JSON parse → type filter →
13/// synthetic filter → model/usage/timestamp checks → deduplication.
14pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15    let file =
16        File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17    let reader = BufReader::new(file);
18
19    let mut quality = DataQuality::default();
20    let mut turns = Vec::new();
21    let mut seen_keys = HashSet::new();
22    let now = Utc::now();
23    let mut last_user_text: Option<String> = None;
24
25    for line_result in reader.lines() {
26        let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27        quality.total_lines += 1;
28
29        // 1. Parse JSON
30        let entry: JournalEntry = match serde_json::from_str(&line) {
31            Ok(e) => e,
32            Err(_) => {
33                quality.skipped_parse_error += 1;
34                continue;
35            }
36        };
37
38        // 2. Type filter — capture user text, only process Assistant entries
39        let msg = match entry {
40            JournalEntry::Assistant(msg) => msg,
41            JournalEntry::User(user_msg) => {
42                // Extract user message text for pairing with next assistant turn
43                // user_msg.message is {"role":"user","content":...} — need to get "content" first
44                let content_val = user_msg.message.as_ref()
45                    .and_then(|m| m.get("content"));
46                if let Some(content) = content_val {
47                    let text = if let Some(s) = content.as_str() {
48                        // content is a plain string
49                        s.to_string()
50                    } else if let Some(arr) = content.as_array() {
51                        // content is an array of blocks — extract text blocks, skip tool_result
52                        arr.iter()
53                            .filter_map(|b| {
54                                if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55                                    b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56                                } else {
57                                    None
58                                }
59                            })
60                            .collect::<Vec<_>>()
61                            .join("\n")
62                    } else {
63                        String::new()
64                    };
65
66                    if !text.is_empty() {
67                        let truncated = if text.len() > 500 {
68                            format!("{}...", &text[..text.floor_char_boundary(500)])
69                        } else {
70                            text
71                        };
72                        last_user_text = Some(truncated);
73                    }
74                }
75                continue;
76            }
77            JournalEntry::QueueOperation(_) => continue,
78        };
79
80        let api = match msg.message {
81            Some(api) => api,
82            None => {
83                quality.skipped_invalid += 1;
84                continue;
85            }
86        };
87
88        // 3. Sidechain filter — skip abandoned conversation branches (e.g. retried turns)
89        //    Agent files always have isSidechain=true for all entries, so skip this
90        //    filter when parsing agent files.
91        if !is_agent && msg.is_sidechain == Some(true) {
92            quality.skipped_sidechain += 1;
93            continue;
94        }
95
96        // 4. Synthetic filter
97        if api.model.as_deref() == Some("<synthetic>") {
98            quality.skipped_synthetic += 1;
99            continue;
100        }
101
102        // 5. Model existence
103        let model = match api.model {
104            Some(m) => m,
105            None => {
106                quality.skipped_invalid += 1;
107                continue;
108            }
109        };
110
111        // 6. Usage existence
112        let usage = match api.usage {
113            Some(u) => u,
114            None => {
115                quality.skipped_invalid += 1;
116                continue;
117            }
118        };
119
120        // 7. Non-zero usage validation
121        let total_tokens = usage.input_tokens.unwrap_or(0)
122            + usage.output_tokens.unwrap_or(0)
123            + usage.cache_creation_input_tokens.unwrap_or(0)
124            + usage.cache_read_input_tokens.unwrap_or(0);
125        if total_tokens == 0 {
126            quality.skipped_invalid += 1;
127            continue;
128        }
129
130        // 8. Timestamp parsing
131        let timestamp_str = match &msg.timestamp {
132            Some(ts) if !ts.is_empty() => ts.as_str(),
133            _ => {
134                quality.skipped_invalid += 1;
135                continue;
136            }
137        };
138        let timestamp: DateTime<Utc> = match timestamp_str.parse() {
139            Ok(ts) if ts <= now => ts,
140            _ => {
141                quality.skipped_invalid += 1;
142                continue;
143            }
144        };
145
146        // 9. Deduplication by uuid:requestId composite key
147        let uuid = msg.uuid.unwrap_or_default();
148        let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
149        if !seen_keys.insert(dedup_key) {
150            quality.duplicate_turns += 1;
151            continue;
152        }
153
154        // 10. Extract content types, assistant text, and tool names
155        let mut content_types = Vec::new();
156        let mut assistant_text_parts = Vec::new();
157        let mut tool_names = Vec::new();
158
159        if let Some(ref blocks) = api.content {
160            for b in blocks {
161                match b {
162                    ContentBlock::Text { text } => {
163                        content_types.push("text".to_string());
164                        if let Some(t) = text {
165                            assistant_text_parts.push(t.clone());
166                        }
167                    }
168                    ContentBlock::ToolUse { name, .. } => {
169                        content_types.push("tool_use".to_string());
170                        if let Some(n) = name {
171                            tool_names.push(n.clone());
172                        }
173                    }
174                    ContentBlock::Other => {
175                        content_types.push("other".to_string());
176                    }
177                }
178            }
179        }
180
181        // Truncate assistant text to 500 chars
182        let assistant_text = if assistant_text_parts.is_empty() {
183            None
184        } else {
185            let full = assistant_text_parts.join("\n");
186            Some(if full.len() > 500 {
187                format!("{}...", &full[..full.floor_char_boundary(500)])
188            } else {
189                full
190            })
191        };
192
193        // 11. Construct ValidatedTurn — attach user text from previous message
194        turns.push(ValidatedTurn {
195            uuid,
196            request_id: msg.request_id,
197            timestamp,
198            model,
199            usage,
200            stop_reason: api.stop_reason,
201            content_types,
202            is_agent,
203            agent_id: msg.agent_id,
204            user_text: last_user_text.take(),
205            assistant_text,
206            tool_names,
207        });
208    }
209
210    quality.valid_turns = turns.len();
211
212    Ok((turns, quality))
213}
214
215#[cfg(test)]
216mod tests {
217    use super::*;
218    use std::io::Write;
219    use tempfile::NamedTempFile;
220
221    const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
222
223    fn write_jsonl(lines: &[&str]) -> NamedTempFile {
224        let mut f = NamedTempFile::new().unwrap();
225        for line in lines {
226            writeln!(f, "{}", line).unwrap();
227        }
228        f.flush().unwrap();
229        f
230    }
231
232    #[test]
233    fn parse_valid_assistant_turn() {
234        let f = write_jsonl(&[VALID_ASSISTANT]);
235        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
236
237        assert_eq!(turns.len(), 1);
238        assert_eq!(quality.valid_turns, 1);
239        assert_eq!(turns[0].model, "claude-opus-4-6");
240        assert_eq!(turns[0].uuid, "u1");
241        assert!(!turns[0].is_agent);
242        assert_eq!(turns[0].content_types, vec!["text"]);
243    }
244
245    #[test]
246    fn filters_synthetic_messages() {
247        let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
248        let f = write_jsonl(&[synthetic]);
249        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
250
251        assert_eq!(turns.len(), 0);
252        assert_eq!(quality.skipped_synthetic, 1);
253    }
254
255    #[test]
256    fn filters_zero_usage() {
257        let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
258        let f = write_jsonl(&[zero_usage]);
259        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
260
261        assert_eq!(turns.len(), 0);
262        assert_eq!(quality.skipped_invalid, 1);
263    }
264
265    #[test]
266    fn deduplicates_turns() {
267        let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
268        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
269
270        assert_eq!(turns.len(), 1);
271        assert_eq!(quality.duplicate_turns, 1);
272    }
273
274    #[test]
275    fn skips_malformed_lines() {
276        let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
277        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
278
279        assert_eq!(turns.len(), 1);
280        assert_eq!(quality.skipped_parse_error, 1);
281    }
282
283    #[test]
284    fn filters_sidechain_turns() {
285        let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
286        let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
287        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
288
289        assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
290        assert_eq!(quality.skipped_sidechain, 1);
291        assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
292    }
293}