Skip to main content

cc_token_usage/data/
parser.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10/// Parse a session JSONL file into validated turns and quality metrics.
11///
12/// Each line is validated through a pipeline: JSON parse → type filter →
13/// synthetic filter → model/usage/timestamp checks → deduplication.
14pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15    let file =
16        File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17    let reader = BufReader::new(file);
18
19    let mut quality = DataQuality::default();
20    let mut turns = Vec::new();
21    let mut seen_keys = HashSet::new();
22    let now = Utc::now();
23    let mut last_user_text: Option<String> = None;
24
25    for line_result in reader.lines() {
26        let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27        quality.total_lines += 1;
28
29        // 1. Parse JSON
30        let entry: JournalEntry = match serde_json::from_str(&line) {
31            Ok(e) => e,
32            Err(_) => {
33                quality.skipped_parse_error += 1;
34                continue;
35            }
36        };
37
38        // 2. Type filter — capture user text, only process Assistant entries
39        let msg = match entry {
40            JournalEntry::Assistant(msg) => msg,
41            JournalEntry::User(user_msg) => {
42                // Extract user message text for pairing with next assistant turn
43                // user_msg.message is {"role":"user","content":...} — need to get "content" first
44                let content_val = user_msg.message.as_ref()
45                    .and_then(|m| m.get("content"));
46                if let Some(content) = content_val {
47                    let text = if let Some(s) = content.as_str() {
48                        // content is a plain string
49                        s.to_string()
50                    } else if let Some(arr) = content.as_array() {
51                        // content is an array of blocks — extract text blocks, skip tool_result
52                        arr.iter()
53                            .filter_map(|b| {
54                                if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55                                    b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56                                } else {
57                                    None
58                                }
59                            })
60                            .collect::<Vec<_>>()
61                            .join("\n")
62                    } else {
63                        String::new()
64                    };
65
66                    if !text.is_empty() {
67                        let truncated = if text.len() > 500 {
68                            format!("{}...", &text[..text.floor_char_boundary(500)])
69                        } else {
70                            text
71                        };
72                        last_user_text = Some(truncated);
73                    }
74                }
75                continue;
76            }
77            JournalEntry::QueueOperation(_) => continue,
78        };
79
80        let api = match msg.message {
81            Some(api) => api,
82            None => {
83                quality.skipped_invalid += 1;
84                continue;
85            }
86        };
87
88        // 3. Synthetic filter
89        if api.model.as_deref() == Some("<synthetic>") {
90            quality.skipped_synthetic += 1;
91            continue;
92        }
93
94        // 4. Model existence
95        let model = match api.model {
96            Some(m) => m,
97            None => {
98                quality.skipped_invalid += 1;
99                continue;
100            }
101        };
102
103        // 5. Usage existence
104        let usage = match api.usage {
105            Some(u) => u,
106            None => {
107                quality.skipped_invalid += 1;
108                continue;
109            }
110        };
111
112        // 6. Non-zero usage validation
113        let total_tokens = usage.input_tokens.unwrap_or(0)
114            + usage.output_tokens.unwrap_or(0)
115            + usage.cache_creation_input_tokens.unwrap_or(0)
116            + usage.cache_read_input_tokens.unwrap_or(0);
117        if total_tokens == 0 {
118            quality.skipped_invalid += 1;
119            continue;
120        }
121
122        // 7. Timestamp parsing
123        let timestamp_str = match &msg.timestamp {
124            Some(ts) if !ts.is_empty() => ts.as_str(),
125            _ => {
126                quality.skipped_invalid += 1;
127                continue;
128            }
129        };
130        let timestamp: DateTime<Utc> = match timestamp_str.parse() {
131            Ok(ts) if ts <= now => ts,
132            _ => {
133                quality.skipped_invalid += 1;
134                continue;
135            }
136        };
137
138        // 8. Deduplication by uuid:requestId composite key
139        let uuid = msg.uuid.unwrap_or_default();
140        let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
141        if !seen_keys.insert(dedup_key) {
142            quality.duplicate_turns += 1;
143            continue;
144        }
145
146        // 9. Extract content types, assistant text, and tool names
147        let mut content_types = Vec::new();
148        let mut assistant_text_parts = Vec::new();
149        let mut tool_names = Vec::new();
150
151        if let Some(ref blocks) = api.content {
152            for b in blocks {
153                match b {
154                    ContentBlock::Text { text } => {
155                        content_types.push("text".to_string());
156                        if let Some(t) = text {
157                            assistant_text_parts.push(t.clone());
158                        }
159                    }
160                    ContentBlock::ToolUse { name, .. } => {
161                        content_types.push("tool_use".to_string());
162                        if let Some(n) = name {
163                            tool_names.push(n.clone());
164                        }
165                    }
166                    ContentBlock::Other => {
167                        content_types.push("other".to_string());
168                    }
169                }
170            }
171        }
172
173        // Truncate assistant text to 500 chars
174        let assistant_text = if assistant_text_parts.is_empty() {
175            None
176        } else {
177            let full = assistant_text_parts.join("\n");
178            Some(if full.len() > 500 {
179                format!("{}...", &full[..full.floor_char_boundary(500)])
180            } else {
181                full
182            })
183        };
184
185        // 10. Construct ValidatedTurn — attach user text from previous message
186        turns.push(ValidatedTurn {
187            uuid,
188            request_id: msg.request_id,
189            timestamp,
190            model,
191            usage,
192            stop_reason: api.stop_reason,
193            content_types,
194            is_agent,
195            agent_id: msg.agent_id,
196            user_text: last_user_text.take(),
197            assistant_text,
198            tool_names,
199        });
200    }
201
202    quality.valid_turns = turns.len();
203
204    Ok((turns, quality))
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210    use std::io::Write;
211    use tempfile::NamedTempFile;
212
213    const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
214
215    fn write_jsonl(lines: &[&str]) -> NamedTempFile {
216        let mut f = NamedTempFile::new().unwrap();
217        for line in lines {
218            writeln!(f, "{}", line).unwrap();
219        }
220        f.flush().unwrap();
221        f
222    }
223
224    #[test]
225    fn parse_valid_assistant_turn() {
226        let f = write_jsonl(&[VALID_ASSISTANT]);
227        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
228
229        assert_eq!(turns.len(), 1);
230        assert_eq!(quality.valid_turns, 1);
231        assert_eq!(turns[0].model, "claude-opus-4-6");
232        assert_eq!(turns[0].uuid, "u1");
233        assert!(!turns[0].is_agent);
234        assert_eq!(turns[0].content_types, vec!["text"]);
235    }
236
237    #[test]
238    fn filters_synthetic_messages() {
239        let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
240        let f = write_jsonl(&[synthetic]);
241        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
242
243        assert_eq!(turns.len(), 0);
244        assert_eq!(quality.skipped_synthetic, 1);
245    }
246
247    #[test]
248    fn filters_zero_usage() {
249        let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
250        let f = write_jsonl(&[zero_usage]);
251        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
252
253        assert_eq!(turns.len(), 0);
254        assert_eq!(quality.skipped_invalid, 1);
255    }
256
257    #[test]
258    fn deduplicates_turns() {
259        let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
260        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
261
262        assert_eq!(turns.len(), 1);
263        assert_eq!(quality.duplicate_turns, 1);
264    }
265
266    #[test]
267    fn skips_malformed_lines() {
268        let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
269        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
270
271        assert_eq!(turns.len(), 1);
272        assert_eq!(quality.skipped_parse_error, 1);
273    }
274}