Skip to main content

cc_token_usage/data/
parser.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10/// Parse a session JSONL file into validated turns and quality metrics.
11///
12/// Each line is validated through a pipeline: JSON parse → type filter →
13/// synthetic filter → model/usage/timestamp checks → deduplication.
14pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15    let file =
16        File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17    let reader = BufReader::new(file);
18
19    let mut quality = DataQuality::default();
20    let mut turns = Vec::new();
21    let mut seen_keys = HashSet::new();
22    let now = Utc::now();
23    let mut last_user_text: Option<String> = None;
24
25    for line_result in reader.lines() {
26        let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27        quality.total_lines += 1;
28
29        // 1. Parse JSON
30        let entry: JournalEntry = match serde_json::from_str(&line) {
31            Ok(e) => e,
32            Err(_) => {
33                quality.skipped_parse_error += 1;
34                continue;
35            }
36        };
37
38        // 2. Type filter — capture user text, only process Assistant entries
39        let msg = match entry {
40            JournalEntry::Assistant(msg) => msg,
41            JournalEntry::User(user_msg) => {
42                // Extract user message text for pairing with next assistant turn
43                // user_msg.message is {"role":"user","content":...} — need to get "content" first
44                let content_val = user_msg.message.as_ref()
45                    .and_then(|m| m.get("content"));
46                if let Some(content) = content_val {
47                    let text = if let Some(s) = content.as_str() {
48                        // content is a plain string
49                        s.to_string()
50                    } else if let Some(arr) = content.as_array() {
51                        // content is an array of blocks — extract text blocks, skip tool_result
52                        arr.iter()
53                            .filter_map(|b| {
54                                if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55                                    b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56                                } else {
57                                    None
58                                }
59                            })
60                            .collect::<Vec<_>>()
61                            .join("\n")
62                    } else {
63                        String::new()
64                    };
65
66                    if !text.is_empty() {
67                        let truncated = if text.len() > 500 {
68                            format!("{}...", &text[..text.floor_char_boundary(500)])
69                        } else {
70                            text
71                        };
72                        last_user_text = Some(truncated);
73                    }
74                }
75                continue;
76            }
77            JournalEntry::QueueOperation(_) => continue,
78        };
79
80        let api = match msg.message {
81            Some(api) => api,
82            None => {
83                quality.skipped_invalid += 1;
84                continue;
85            }
86        };
87
88        // 3. Sidechain filter — skip abandoned conversation branches (e.g. retried turns)
89        if msg.is_sidechain == Some(true) {
90            quality.skipped_sidechain += 1;
91            continue;
92        }
93
94        // 4. Synthetic filter
95        if api.model.as_deref() == Some("<synthetic>") {
96            quality.skipped_synthetic += 1;
97            continue;
98        }
99
100        // 5. Model existence
101        let model = match api.model {
102            Some(m) => m,
103            None => {
104                quality.skipped_invalid += 1;
105                continue;
106            }
107        };
108
109        // 6. Usage existence
110        let usage = match api.usage {
111            Some(u) => u,
112            None => {
113                quality.skipped_invalid += 1;
114                continue;
115            }
116        };
117
118        // 7. Non-zero usage validation
119        let total_tokens = usage.input_tokens.unwrap_or(0)
120            + usage.output_tokens.unwrap_or(0)
121            + usage.cache_creation_input_tokens.unwrap_or(0)
122            + usage.cache_read_input_tokens.unwrap_or(0);
123        if total_tokens == 0 {
124            quality.skipped_invalid += 1;
125            continue;
126        }
127
128        // 8. Timestamp parsing
129        let timestamp_str = match &msg.timestamp {
130            Some(ts) if !ts.is_empty() => ts.as_str(),
131            _ => {
132                quality.skipped_invalid += 1;
133                continue;
134            }
135        };
136        let timestamp: DateTime<Utc> = match timestamp_str.parse() {
137            Ok(ts) if ts <= now => ts,
138            _ => {
139                quality.skipped_invalid += 1;
140                continue;
141            }
142        };
143
144        // 9. Deduplication by uuid:requestId composite key
145        let uuid = msg.uuid.unwrap_or_default();
146        let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
147        if !seen_keys.insert(dedup_key) {
148            quality.duplicate_turns += 1;
149            continue;
150        }
151
152        // 10. Extract content types, assistant text, and tool names
153        let mut content_types = Vec::new();
154        let mut assistant_text_parts = Vec::new();
155        let mut tool_names = Vec::new();
156
157        if let Some(ref blocks) = api.content {
158            for b in blocks {
159                match b {
160                    ContentBlock::Text { text } => {
161                        content_types.push("text".to_string());
162                        if let Some(t) = text {
163                            assistant_text_parts.push(t.clone());
164                        }
165                    }
166                    ContentBlock::ToolUse { name, .. } => {
167                        content_types.push("tool_use".to_string());
168                        if let Some(n) = name {
169                            tool_names.push(n.clone());
170                        }
171                    }
172                    ContentBlock::Other => {
173                        content_types.push("other".to_string());
174                    }
175                }
176            }
177        }
178
179        // Truncate assistant text to 500 chars
180        let assistant_text = if assistant_text_parts.is_empty() {
181            None
182        } else {
183            let full = assistant_text_parts.join("\n");
184            Some(if full.len() > 500 {
185                format!("{}...", &full[..full.floor_char_boundary(500)])
186            } else {
187                full
188            })
189        };
190
191        // 11. Construct ValidatedTurn — attach user text from previous message
192        turns.push(ValidatedTurn {
193            uuid,
194            request_id: msg.request_id,
195            timestamp,
196            model,
197            usage,
198            stop_reason: api.stop_reason,
199            content_types,
200            is_agent,
201            agent_id: msg.agent_id,
202            user_text: last_user_text.take(),
203            assistant_text,
204            tool_names,
205        });
206    }
207
208    quality.valid_turns = turns.len();
209
210    Ok((turns, quality))
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216    use std::io::Write;
217    use tempfile::NamedTempFile;
218
219    const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
220
221    fn write_jsonl(lines: &[&str]) -> NamedTempFile {
222        let mut f = NamedTempFile::new().unwrap();
223        for line in lines {
224            writeln!(f, "{}", line).unwrap();
225        }
226        f.flush().unwrap();
227        f
228    }
229
230    #[test]
231    fn parse_valid_assistant_turn() {
232        let f = write_jsonl(&[VALID_ASSISTANT]);
233        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
234
235        assert_eq!(turns.len(), 1);
236        assert_eq!(quality.valid_turns, 1);
237        assert_eq!(turns[0].model, "claude-opus-4-6");
238        assert_eq!(turns[0].uuid, "u1");
239        assert!(!turns[0].is_agent);
240        assert_eq!(turns[0].content_types, vec!["text"]);
241    }
242
243    #[test]
244    fn filters_synthetic_messages() {
245        let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
246        let f = write_jsonl(&[synthetic]);
247        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
248
249        assert_eq!(turns.len(), 0);
250        assert_eq!(quality.skipped_synthetic, 1);
251    }
252
253    #[test]
254    fn filters_zero_usage() {
255        let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
256        let f = write_jsonl(&[zero_usage]);
257        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
258
259        assert_eq!(turns.len(), 0);
260        assert_eq!(quality.skipped_invalid, 1);
261    }
262
263    #[test]
264    fn deduplicates_turns() {
265        let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
266        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
267
268        assert_eq!(turns.len(), 1);
269        assert_eq!(quality.duplicate_turns, 1);
270    }
271
272    #[test]
273    fn skips_malformed_lines() {
274        let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
275        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
276
277        assert_eq!(turns.len(), 1);
278        assert_eq!(quality.skipped_parse_error, 1);
279    }
280
281    #[test]
282    fn filters_sidechain_turns() {
283        let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
284        let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
285        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
286
287        assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
288        assert_eq!(quality.skipped_sidechain, 1);
289        assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
290    }
291}