1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{
9 ApiMessage, AssistantMessage, ContentBlock, DataQuality, JournalEntry, UserMessage,
10 ValidatedTurn,
11};
12
13fn parse_line(line: &str) -> Option<JournalEntry> {
16 serde_json::from_str(line).ok()
17}
18
19fn extract_user_text(user_msg: &UserMessage) -> Option<String> {
23 let content_val = user_msg.message.as_ref()?.get("content")?;
24
25 let text = if let Some(s) = content_val.as_str() {
26 s.to_string()
27 } else if let Some(arr) = content_val.as_array() {
28 arr.iter()
29 .filter_map(|b| {
30 if b.get("type").and_then(|t| t.as_str()) == Some("text") {
31 b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
32 } else {
33 None
34 }
35 })
36 .collect::<Vec<_>>()
37 .join("\n")
38 } else {
39 return None;
40 };
41
42 if text.is_empty() {
43 return None;
44 }
45
46 Some(if text.len() > 500 {
47 format!("{}...", &text[..text.floor_char_boundary(500)])
48 } else {
49 text
50 })
51}
52
53enum FilterReason {
56 NoApiMessage,
57 Sidechain,
58 Synthetic,
59 NoModel,
60 NoUsage,
61 ZeroUsage,
62 InvalidTimestamp,
63}
64
65struct ValidatedFields {
66 uuid: String,
67 request_id: Option<String>,
68 timestamp: DateTime<Utc>,
69 model: String,
70 usage: super::models::TokenUsage,
71 stop_reason: Option<String>,
72 content: Option<Vec<ContentBlock>>,
73 agent_id: Option<String>,
74}
75
76fn validate_assistant(
77 msg: AssistantMessage,
78 is_agent: bool,
79 now: DateTime<Utc>,
80) -> Result<ValidatedFields, FilterReason> {
81 let api: ApiMessage = msg.message.ok_or(FilterReason::NoApiMessage)?;
82
83 if !is_agent && msg.is_sidechain == Some(true) {
85 return Err(FilterReason::Sidechain);
86 }
87
88 if api.model.as_deref() == Some("<synthetic>") {
90 return Err(FilterReason::Synthetic);
91 }
92
93 let model = api.model.ok_or(FilterReason::NoModel)?;
94 let usage = api.usage.ok_or(FilterReason::NoUsage)?;
95
96 let total_tokens = usage.input_tokens.unwrap_or(0)
98 + usage.output_tokens.unwrap_or(0)
99 + usage.cache_creation_input_tokens.unwrap_or(0)
100 + usage.cache_read_input_tokens.unwrap_or(0);
101 if total_tokens == 0 {
102 return Err(FilterReason::ZeroUsage);
103 }
104
105 let timestamp_str = msg.timestamp.as_deref()
107 .filter(|s| !s.is_empty())
108 .ok_or(FilterReason::InvalidTimestamp)?;
109 let timestamp: DateTime<Utc> = timestamp_str.parse()
110 .map_err(|_| FilterReason::InvalidTimestamp)?;
111 if timestamp > now {
112 return Err(FilterReason::InvalidTimestamp);
113 }
114
115 Ok(ValidatedFields {
116 uuid: msg.uuid.unwrap_or_default(),
117 request_id: msg.request_id,
118 timestamp,
119 model,
120 usage,
121 stop_reason: api.stop_reason,
122 content: api.content,
123 agent_id: msg.agent_id,
124 })
125}
126
127fn extract_content(content: &Option<Vec<ContentBlock>>) -> (Vec<String>, Option<String>, Vec<String>) {
130 let mut content_types = Vec::new();
131 let mut text_parts = Vec::new();
132 let mut tool_names = Vec::new();
133
134 if let Some(blocks) = content {
135 for b in blocks {
136 match b {
137 ContentBlock::Text { text } => {
138 content_types.push("text".to_string());
139 if let Some(t) = text {
140 text_parts.push(t.clone());
141 }
142 }
143 ContentBlock::ToolUse { name, .. } => {
144 content_types.push("tool_use".to_string());
145 if let Some(n) = name {
146 tool_names.push(n.clone());
147 }
148 }
149 ContentBlock::Thinking { .. } => {
150 content_types.push("thinking".to_string());
151 }
152 ContentBlock::ToolResult { .. } => {
153 content_types.push("tool_result".to_string());
154 }
155 ContentBlock::Other => {
156 content_types.push("other".to_string());
157 }
158 }
159 }
160 }
161
162 let assistant_text = if text_parts.is_empty() {
163 None
164 } else {
165 let full = text_parts.join("\n");
166 Some(if full.len() > 500 {
167 format!("{}...", &full[..full.floor_char_boundary(500)])
168 } else {
169 full
170 })
171 };
172
173 (content_types, assistant_text, tool_names)
174}
175
176fn dedup_by_request_id(turns: Vec<ValidatedTurn>) -> (Vec<ValidatedTurn>, usize) {
179 let mut result = Vec::with_capacity(turns.len());
180 let mut request_id_index: HashMap<String, usize> = HashMap::new();
181 let mut dup_count = 0;
182
183 for turn in turns {
184 let rid = turn.request_id.clone().unwrap_or_default();
185 if !rid.is_empty() {
186 if let Some(&idx) = request_id_index.get(&rid) {
187 result[idx] = turn;
188 dup_count += 1;
189 continue;
190 }
191 request_id_index.insert(rid, result.len());
192 }
193 result.push(turn);
194 }
195
196 (result, dup_count)
197}
198
199pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
205 let file =
206 File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
207 let reader = BufReader::new(file);
208
209 let mut quality = DataQuality::default();
210 let mut pre_dedup_turns = Vec::new();
211 let now = Utc::now();
212 let mut last_user_text: Option<String> = None;
213
214 for line_result in reader.lines() {
215 let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
216 quality.total_lines += 1;
217
218 let entry = match parse_line(&line) {
220 Some(e) => e,
221 None => {
222 quality.skipped_parse_error += 1;
223 continue;
224 }
225 };
226
227 let msg = match entry {
229 JournalEntry::Assistant(msg) => msg,
230 JournalEntry::User(user_msg) => {
231 if let Some(text) = extract_user_text(&user_msg) {
232 last_user_text = Some(text);
233 }
234 continue;
235 }
236 _ => continue,
237 };
238
239 let fields = match validate_assistant(msg, is_agent, now) {
241 Ok(f) => f,
242 Err(FilterReason::Sidechain) => { quality.skipped_sidechain += 1; continue; }
243 Err(FilterReason::Synthetic) => { quality.skipped_synthetic += 1; continue; }
244 Err(_) => { quality.skipped_invalid += 1; continue; }
245 };
246
247 let (content_types, assistant_text, tool_names) = extract_content(&fields.content);
249
250 pre_dedup_turns.push(ValidatedTurn {
251 uuid: fields.uuid,
252 request_id: fields.request_id,
253 timestamp: fields.timestamp,
254 model: fields.model,
255 usage: fields.usage,
256 stop_reason: fields.stop_reason,
257 content_types,
258 is_agent,
259 agent_id: fields.agent_id,
260 user_text: last_user_text.take(),
261 assistant_text,
262 tool_names,
263 });
264 }
265
266 let (turns, dup_count) = dedup_by_request_id(pre_dedup_turns);
268 quality.duplicate_turns = dup_count;
269 quality.valid_turns = turns.len();
270
271 Ok((turns, quality))
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use std::io::Write;
278 use tempfile::NamedTempFile;
279
280 const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
281
282 fn write_jsonl(lines: &[&str]) -> NamedTempFile {
283 let mut f = NamedTempFile::new().unwrap();
284 for line in lines {
285 writeln!(f, "{}", line).unwrap();
286 }
287 f.flush().unwrap();
288 f
289 }
290
291 #[test]
292 fn parse_valid_assistant_turn() {
293 let f = write_jsonl(&[VALID_ASSISTANT]);
294 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
295
296 assert_eq!(turns.len(), 1);
297 assert_eq!(quality.valid_turns, 1);
298 assert_eq!(turns[0].model, "claude-opus-4-6");
299 assert_eq!(turns[0].uuid, "u1");
300 assert!(!turns[0].is_agent);
301 assert_eq!(turns[0].content_types, vec!["text"]);
302 }
303
304 #[test]
305 fn filters_synthetic_messages() {
306 let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
307 let f = write_jsonl(&[synthetic]);
308 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
309
310 assert_eq!(turns.len(), 0);
311 assert_eq!(quality.skipped_synthetic, 1);
312 }
313
314 #[test]
315 fn filters_zero_usage() {
316 let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
317 let f = write_jsonl(&[zero_usage]);
318 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
319
320 assert_eq!(turns.len(), 0);
321 assert_eq!(quality.skipped_invalid, 1);
322 }
323
324 #[test]
325 fn deduplicates_turns() {
326 let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
327 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
328
329 assert_eq!(turns.len(), 1);
330 assert_eq!(quality.duplicate_turns, 1);
331 }
332
333 #[test]
334 fn skips_malformed_lines() {
335 let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
336 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
337
338 assert_eq!(turns.len(), 1);
339 assert_eq!(quality.skipped_parse_error, 1);
340 }
341
342 #[test]
343 fn non_assistant_types_not_counted_as_parse_error() {
344 let progress = r#"{"type":"progress","data":{"type":"hook_progress"},"uuid":"u1","timestamp":"2026-03-16T13:51:19.053Z","sessionId":"s1"}"#;
345 let system = r#"{"type":"system","subtype":"turn_duration","durationMs":1234,"uuid":"u2","timestamp":"2026-03-16T13:51:19.053Z","sessionId":"s1"}"#;
346 let last_prompt = r#"{"type":"last-prompt","lastPrompt":"hello","sessionId":"s1"}"#;
347 let f = write_jsonl(&[progress, system, last_prompt, VALID_ASSISTANT]);
348 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
349
350 assert_eq!(turns.len(), 1);
351 assert_eq!(quality.skipped_parse_error, 0, "known entry types should not be parse errors");
352 assert_eq!(quality.total_lines, 4);
353 }
354
355 #[test]
356 fn parses_thinking_content_blocks() {
357 let with_thinking = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"thinking","thinking":"hmm","signature":"sig"},{"type":"text","text":"answer"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
358 let f = write_jsonl(&[with_thinking]);
359 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
360
361 assert_eq!(turns.len(), 1);
362 assert_eq!(quality.valid_turns, 1);
363 assert!(turns[0].content_types.contains(&"thinking".to_string()));
364 assert!(turns[0].content_types.contains(&"text".to_string()));
365 }
366
367 #[test]
368 fn filters_sidechain_turns() {
369 let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
370 let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
371 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
372
373 assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
374 assert_eq!(quality.skipped_sidechain, 1);
375 assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
376 }
377
378 #[test]
381 fn dedup_preserves_last_entry() {
382 let t1 = ValidatedTurn {
383 uuid: "u1".into(), request_id: Some("r1".into()),
384 timestamp: "2026-03-16T10:00:00Z".parse().unwrap(),
385 model: "m".into(), usage: Default::default(), stop_reason: None,
386 content_types: vec![], is_agent: false, agent_id: None,
387 user_text: None, assistant_text: Some("first".into()), tool_names: vec![],
388 };
389 let t2 = ValidatedTurn {
390 uuid: "u2".into(), request_id: Some("r1".into()),
391 timestamp: "2026-03-16T10:00:01Z".parse().unwrap(),
392 model: "m".into(), usage: Default::default(), stop_reason: None,
393 content_types: vec![], is_agent: false, agent_id: None,
394 user_text: None, assistant_text: Some("second".into()), tool_names: vec![],
395 };
396 let (result, dup) = dedup_by_request_id(vec![t1, t2]);
397 assert_eq!(result.len(), 1);
398 assert_eq!(dup, 1);
399 assert_eq!(result[0].assistant_text.as_deref(), Some("second"));
400 }
401
402 #[test]
403 fn extract_content_handles_all_types() {
404 use super::super::models::ContentBlock;
405 let blocks = vec![
406 ContentBlock::Text { text: Some("hello".into()) },
407 ContentBlock::ToolUse { id: None, name: Some("Bash".into()), input: None },
408 ContentBlock::Thinking { thinking: Some("hmm".into()), signature: None },
409 ContentBlock::ToolResult { tool_use_id: None, content: None, is_error: None },
410 ContentBlock::Other,
411 ];
412 let (types, text, tools) = extract_content(&Some(blocks));
413 assert_eq!(types, vec!["text", "tool_use", "thinking", "tool_result", "other"]);
414 assert_eq!(text.as_deref(), Some("hello"));
415 assert_eq!(tools, vec!["Bash"]);
416 }
417}