Skip to main content

cc_token_usage/data/
parser.rs

1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{
9    ApiMessage, AssistantMessage, ContentBlock, DataQuality, JournalEntry, UserMessage,
10    ValidatedTurn,
11};
12
13// ─── Pipeline Stage 1: JSON Parse ──────────────────────────────────────────
14
15fn parse_line(line: &str) -> Option<JournalEntry> {
16    serde_json::from_str(line).ok()
17}
18
19// ─── Pipeline Stage 2: Type Filter + User Text Extraction ──────────────────
20
21/// Extract user message text (truncated to 500 chars) for pairing with assistant turns.
22fn extract_user_text(user_msg: &UserMessage) -> Option<String> {
23    let content_val = user_msg.message.as_ref()?.get("content")?;
24
25    let text = if let Some(s) = content_val.as_str() {
26        s.to_string()
27    } else if let Some(arr) = content_val.as_array() {
28        arr.iter()
29            .filter_map(|b| {
30                if b.get("type").and_then(|t| t.as_str()) == Some("text") {
31                    b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
32                } else {
33                    None
34                }
35            })
36            .collect::<Vec<_>>()
37            .join("\n")
38    } else {
39        return None;
40    };
41
42    if text.is_empty() {
43        return None;
44    }
45
46    Some(if text.len() > 500 {
47        format!("{}...", &text[..text.floor_char_boundary(500)])
48    } else {
49        text
50    })
51}
52
53// ─── Pipeline Stage 3: Validation ──────────────────────────────────────────
54
55enum FilterReason {
56    NoApiMessage,
57    Sidechain,
58    Synthetic,
59    NoModel,
60    NoUsage,
61    ZeroUsage,
62    InvalidTimestamp,
63}
64
65struct ValidatedFields {
66    uuid: String,
67    request_id: Option<String>,
68    timestamp: DateTime<Utc>,
69    model: String,
70    usage: super::models::TokenUsage,
71    stop_reason: Option<String>,
72    content: Option<Vec<ContentBlock>>,
73    agent_id: Option<String>,
74}
75
76fn validate_assistant(
77    msg: AssistantMessage,
78    is_agent: bool,
79    now: DateTime<Utc>,
80) -> Result<ValidatedFields, FilterReason> {
81    let api: ApiMessage = msg.message.ok_or(FilterReason::NoApiMessage)?;
82
83    // Sidechain filter (skip for agent files — they always have isSidechain=true)
84    if !is_agent && msg.is_sidechain == Some(true) {
85        return Err(FilterReason::Sidechain);
86    }
87
88    // Synthetic filter
89    if api.model.as_deref() == Some("<synthetic>") {
90        return Err(FilterReason::Synthetic);
91    }
92
93    let model = api.model.ok_or(FilterReason::NoModel)?;
94    let usage = api.usage.ok_or(FilterReason::NoUsage)?;
95
96    // Non-zero usage
97    let total_tokens = usage.input_tokens.unwrap_or(0)
98        + usage.output_tokens.unwrap_or(0)
99        + usage.cache_creation_input_tokens.unwrap_or(0)
100        + usage.cache_read_input_tokens.unwrap_or(0);
101    if total_tokens == 0 {
102        return Err(FilterReason::ZeroUsage);
103    }
104
105    // Timestamp validation
106    let timestamp_str = msg.timestamp.as_deref()
107        .filter(|s| !s.is_empty())
108        .ok_or(FilterReason::InvalidTimestamp)?;
109    let timestamp: DateTime<Utc> = timestamp_str.parse()
110        .map_err(|_| FilterReason::InvalidTimestamp)?;
111    if timestamp > now {
112        return Err(FilterReason::InvalidTimestamp);
113    }
114
115    Ok(ValidatedFields {
116        uuid: msg.uuid.unwrap_or_default(),
117        request_id: msg.request_id,
118        timestamp,
119        model,
120        usage,
121        stop_reason: api.stop_reason,
122        content: api.content,
123        agent_id: msg.agent_id,
124    })
125}
126
127// ─── Pipeline Stage 4: Content Extraction ──────────────────────────────────
128
129fn extract_content(content: &Option<Vec<ContentBlock>>) -> (Vec<String>, Option<String>, Vec<String>) {
130    let mut content_types = Vec::new();
131    let mut text_parts = Vec::new();
132    let mut tool_names = Vec::new();
133
134    if let Some(blocks) = content {
135        for b in blocks {
136            match b {
137                ContentBlock::Text { text } => {
138                    content_types.push("text".to_string());
139                    if let Some(t) = text {
140                        text_parts.push(t.clone());
141                    }
142                }
143                ContentBlock::ToolUse { name, .. } => {
144                    content_types.push("tool_use".to_string());
145                    if let Some(n) = name {
146                        tool_names.push(n.clone());
147                    }
148                }
149                ContentBlock::Thinking { .. } => {
150                    content_types.push("thinking".to_string());
151                }
152                ContentBlock::ToolResult { .. } => {
153                    content_types.push("tool_result".to_string());
154                }
155                ContentBlock::Other => {
156                    content_types.push("other".to_string());
157                }
158            }
159        }
160    }
161
162    let assistant_text = if text_parts.is_empty() {
163        None
164    } else {
165        let full = text_parts.join("\n");
166        Some(if full.len() > 500 {
167            format!("{}...", &full[..full.floor_char_boundary(500)])
168        } else {
169            full
170        })
171    };
172
173    (content_types, assistant_text, tool_names)
174}
175
176// ─── Pipeline Stage 5: Streaming Deduplication ─────────────────────────────
177
178fn dedup_by_request_id(turns: Vec<ValidatedTurn>) -> (Vec<ValidatedTurn>, usize) {
179    let mut result = Vec::with_capacity(turns.len());
180    let mut request_id_index: HashMap<String, usize> = HashMap::new();
181    let mut dup_count = 0;
182
183    for turn in turns {
184        let rid = turn.request_id.clone().unwrap_or_default();
185        if !rid.is_empty() {
186            if let Some(&idx) = request_id_index.get(&rid) {
187                result[idx] = turn;
188                dup_count += 1;
189                continue;
190            }
191            request_id_index.insert(rid, result.len());
192        }
193        result.push(turn);
194    }
195
196    (result, dup_count)
197}
198
199// ─── Pipeline Orchestrator ─────────────────────────────────────────────────
200
201/// Parse a session JSONL file into validated turns and quality metrics.
202///
203/// Pipeline: JSON parse → type filter → validation → content extraction → deduplication.
204pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
205    let file =
206        File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
207    let reader = BufReader::new(file);
208
209    let mut quality = DataQuality::default();
210    let mut pre_dedup_turns = Vec::new();
211    let now = Utc::now();
212    let mut last_user_text: Option<String> = None;
213
214    for line_result in reader.lines() {
215        let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
216        quality.total_lines += 1;
217
218        // Stage 1: JSON parse
219        let entry = match parse_line(&line) {
220            Some(e) => e,
221            None => {
222                quality.skipped_parse_error += 1;
223                continue;
224            }
225        };
226
227        // Stage 2: Type filter
228        let msg = match entry {
229            JournalEntry::Assistant(msg) => msg,
230            JournalEntry::User(user_msg) => {
231                if let Some(text) = extract_user_text(&user_msg) {
232                    last_user_text = Some(text);
233                }
234                continue;
235            }
236            _ => continue,
237        };
238
239        // Stage 3: Validation
240        let fields = match validate_assistant(msg, is_agent, now) {
241            Ok(f) => f,
242            Err(FilterReason::Sidechain) => { quality.skipped_sidechain += 1; continue; }
243            Err(FilterReason::Synthetic) => { quality.skipped_synthetic += 1; continue; }
244            Err(_) => { quality.skipped_invalid += 1; continue; }
245        };
246
247        // Stage 4: Content extraction
248        let (content_types, assistant_text, tool_names) = extract_content(&fields.content);
249
250        pre_dedup_turns.push(ValidatedTurn {
251            uuid: fields.uuid,
252            request_id: fields.request_id,
253            timestamp: fields.timestamp,
254            model: fields.model,
255            usage: fields.usage,
256            stop_reason: fields.stop_reason,
257            content_types,
258            is_agent,
259            agent_id: fields.agent_id,
260            user_text: last_user_text.take(),
261            assistant_text,
262            tool_names,
263        });
264    }
265
266    // Stage 5: Streaming deduplication
267    let (turns, dup_count) = dedup_by_request_id(pre_dedup_turns);
268    quality.duplicate_turns = dup_count;
269    quality.valid_turns = turns.len();
270
271    Ok((turns, quality))
272}
273
274#[cfg(test)]
275mod tests {
276    use super::*;
277    use std::io::Write;
278    use tempfile::NamedTempFile;
279
280    const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
281
282    fn write_jsonl(lines: &[&str]) -> NamedTempFile {
283        let mut f = NamedTempFile::new().unwrap();
284        for line in lines {
285            writeln!(f, "{}", line).unwrap();
286        }
287        f.flush().unwrap();
288        f
289    }
290
291    #[test]
292    fn parse_valid_assistant_turn() {
293        let f = write_jsonl(&[VALID_ASSISTANT]);
294        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
295
296        assert_eq!(turns.len(), 1);
297        assert_eq!(quality.valid_turns, 1);
298        assert_eq!(turns[0].model, "claude-opus-4-6");
299        assert_eq!(turns[0].uuid, "u1");
300        assert!(!turns[0].is_agent);
301        assert_eq!(turns[0].content_types, vec!["text"]);
302    }
303
304    #[test]
305    fn filters_synthetic_messages() {
306        let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
307        let f = write_jsonl(&[synthetic]);
308        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
309
310        assert_eq!(turns.len(), 0);
311        assert_eq!(quality.skipped_synthetic, 1);
312    }
313
314    #[test]
315    fn filters_zero_usage() {
316        let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
317        let f = write_jsonl(&[zero_usage]);
318        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
319
320        assert_eq!(turns.len(), 0);
321        assert_eq!(quality.skipped_invalid, 1);
322    }
323
324    #[test]
325    fn deduplicates_turns() {
326        let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
327        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
328
329        assert_eq!(turns.len(), 1);
330        assert_eq!(quality.duplicate_turns, 1);
331    }
332
333    #[test]
334    fn skips_malformed_lines() {
335        let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
336        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
337
338        assert_eq!(turns.len(), 1);
339        assert_eq!(quality.skipped_parse_error, 1);
340    }
341
342    #[test]
343    fn non_assistant_types_not_counted_as_parse_error() {
344        let progress = r#"{"type":"progress","data":{"type":"hook_progress"},"uuid":"u1","timestamp":"2026-03-16T13:51:19.053Z","sessionId":"s1"}"#;
345        let system = r#"{"type":"system","subtype":"turn_duration","durationMs":1234,"uuid":"u2","timestamp":"2026-03-16T13:51:19.053Z","sessionId":"s1"}"#;
346        let last_prompt = r#"{"type":"last-prompt","lastPrompt":"hello","sessionId":"s1"}"#;
347        let f = write_jsonl(&[progress, system, last_prompt, VALID_ASSISTANT]);
348        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
349
350        assert_eq!(turns.len(), 1);
351        assert_eq!(quality.skipped_parse_error, 0, "known entry types should not be parse errors");
352        assert_eq!(quality.total_lines, 4);
353    }
354
355    #[test]
356    fn parses_thinking_content_blocks() {
357        let with_thinking = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"thinking","thinking":"hmm","signature":"sig"},{"type":"text","text":"answer"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
358        let f = write_jsonl(&[with_thinking]);
359        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
360
361        assert_eq!(turns.len(), 1);
362        assert_eq!(quality.valid_turns, 1);
363        assert!(turns[0].content_types.contains(&"thinking".to_string()));
364        assert!(turns[0].content_types.contains(&"text".to_string()));
365    }
366
367    #[test]
368    fn filters_sidechain_turns() {
369        let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
370        let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
371        let (turns, quality) = parse_session_file(f.path(), false).unwrap();
372
373        assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
374        assert_eq!(quality.skipped_sidechain, 1);
375        assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
376    }
377
378    // ─── Pipeline unit tests ───────────────────────────────────────────
379
380    #[test]
381    fn dedup_preserves_last_entry() {
382        let t1 = ValidatedTurn {
383            uuid: "u1".into(), request_id: Some("r1".into()),
384            timestamp: "2026-03-16T10:00:00Z".parse().unwrap(),
385            model: "m".into(), usage: Default::default(), stop_reason: None,
386            content_types: vec![], is_agent: false, agent_id: None,
387            user_text: None, assistant_text: Some("first".into()), tool_names: vec![],
388        };
389        let t2 = ValidatedTurn {
390            uuid: "u2".into(), request_id: Some("r1".into()),
391            timestamp: "2026-03-16T10:00:01Z".parse().unwrap(),
392            model: "m".into(), usage: Default::default(), stop_reason: None,
393            content_types: vec![], is_agent: false, agent_id: None,
394            user_text: None, assistant_text: Some("second".into()), tool_names: vec![],
395        };
396        let (result, dup) = dedup_by_request_id(vec![t1, t2]);
397        assert_eq!(result.len(), 1);
398        assert_eq!(dup, 1);
399        assert_eq!(result[0].assistant_text.as_deref(), Some("second"));
400    }
401
402    #[test]
403    fn extract_content_handles_all_types() {
404        use super::super::models::ContentBlock;
405        let blocks = vec![
406            ContentBlock::Text { text: Some("hello".into()) },
407            ContentBlock::ToolUse { id: None, name: Some("Bash".into()), input: None },
408            ContentBlock::Thinking { thinking: Some("hmm".into()), signature: None },
409            ContentBlock::ToolResult { tool_use_id: None, content: None, is_error: None },
410            ContentBlock::Other,
411        ];
412        let (types, text, tools) = extract_content(&Some(blocks));
413        assert_eq!(types, vec!["text", "tool_use", "thinking", "tool_result", "other"]);
414        assert_eq!(text.as_deref(), Some("hello"));
415        assert_eq!(tools, vec!["Bash"]);
416    }
417}