1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15 let file =
16 File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17 let reader = BufReader::new(file);
18
19 let mut quality = DataQuality::default();
20 let mut turns = Vec::new();
21 let mut seen_keys = HashSet::new();
22 let now = Utc::now();
23 let mut last_user_text: Option<String> = None;
24
25 for line_result in reader.lines() {
26 let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27 quality.total_lines += 1;
28
29 let entry: JournalEntry = match serde_json::from_str(&line) {
31 Ok(e) => e,
32 Err(_) => {
33 quality.skipped_parse_error += 1;
34 continue;
35 }
36 };
37
38 let msg = match entry {
40 JournalEntry::Assistant(msg) => msg,
41 JournalEntry::User(user_msg) => {
42 let content_val = user_msg.message.as_ref()
45 .and_then(|m| m.get("content"));
46 if let Some(content) = content_val {
47 let text = if let Some(s) = content.as_str() {
48 s.to_string()
50 } else if let Some(arr) = content.as_array() {
51 arr.iter()
53 .filter_map(|b| {
54 if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55 b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56 } else {
57 None
58 }
59 })
60 .collect::<Vec<_>>()
61 .join("\n")
62 } else {
63 String::new()
64 };
65
66 if !text.is_empty() {
67 let truncated = if text.len() > 500 {
68 format!("{}...", &text[..text.floor_char_boundary(500)])
69 } else {
70 text
71 };
72 last_user_text = Some(truncated);
73 }
74 }
75 continue;
76 }
77 JournalEntry::QueueOperation(_) => continue,
78 };
79
80 let api = match msg.message {
81 Some(api) => api,
82 None => {
83 quality.skipped_invalid += 1;
84 continue;
85 }
86 };
87
88 if api.model.as_deref() == Some("<synthetic>") {
90 quality.skipped_synthetic += 1;
91 continue;
92 }
93
94 let model = match api.model {
96 Some(m) => m,
97 None => {
98 quality.skipped_invalid += 1;
99 continue;
100 }
101 };
102
103 let usage = match api.usage {
105 Some(u) => u,
106 None => {
107 quality.skipped_invalid += 1;
108 continue;
109 }
110 };
111
112 let total_tokens = usage.input_tokens.unwrap_or(0)
114 + usage.output_tokens.unwrap_or(0)
115 + usage.cache_creation_input_tokens.unwrap_or(0)
116 + usage.cache_read_input_tokens.unwrap_or(0);
117 if total_tokens == 0 {
118 quality.skipped_invalid += 1;
119 continue;
120 }
121
122 let timestamp_str = match &msg.timestamp {
124 Some(ts) if !ts.is_empty() => ts.as_str(),
125 _ => {
126 quality.skipped_invalid += 1;
127 continue;
128 }
129 };
130 let timestamp: DateTime<Utc> = match timestamp_str.parse() {
131 Ok(ts) if ts <= now => ts,
132 _ => {
133 quality.skipped_invalid += 1;
134 continue;
135 }
136 };
137
138 let uuid = msg.uuid.unwrap_or_default();
140 let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
141 if !seen_keys.insert(dedup_key) {
142 quality.duplicate_turns += 1;
143 continue;
144 }
145
146 let mut content_types = Vec::new();
148 let mut assistant_text_parts = Vec::new();
149 let mut tool_names = Vec::new();
150
151 if let Some(ref blocks) = api.content {
152 for b in blocks {
153 match b {
154 ContentBlock::Text { text } => {
155 content_types.push("text".to_string());
156 if let Some(t) = text {
157 assistant_text_parts.push(t.clone());
158 }
159 }
160 ContentBlock::ToolUse { name, .. } => {
161 content_types.push("tool_use".to_string());
162 if let Some(n) = name {
163 tool_names.push(n.clone());
164 }
165 }
166 ContentBlock::Other => {
167 content_types.push("other".to_string());
168 }
169 }
170 }
171 }
172
173 let assistant_text = if assistant_text_parts.is_empty() {
175 None
176 } else {
177 let full = assistant_text_parts.join("\n");
178 Some(if full.len() > 500 {
179 format!("{}...", &full[..full.floor_char_boundary(500)])
180 } else {
181 full
182 })
183 };
184
185 turns.push(ValidatedTurn {
187 uuid,
188 request_id: msg.request_id,
189 timestamp,
190 model,
191 usage,
192 stop_reason: api.stop_reason,
193 content_types,
194 is_agent,
195 agent_id: msg.agent_id,
196 user_text: last_user_text.take(),
197 assistant_text,
198 tool_names,
199 });
200 }
201
202 quality.valid_turns = turns.len();
203
204 Ok((turns, quality))
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210 use std::io::Write;
211 use tempfile::NamedTempFile;
212
213 const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
214
215 fn write_jsonl(lines: &[&str]) -> NamedTempFile {
216 let mut f = NamedTempFile::new().unwrap();
217 for line in lines {
218 writeln!(f, "{}", line).unwrap();
219 }
220 f.flush().unwrap();
221 f
222 }
223
224 #[test]
225 fn parse_valid_assistant_turn() {
226 let f = write_jsonl(&[VALID_ASSISTANT]);
227 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
228
229 assert_eq!(turns.len(), 1);
230 assert_eq!(quality.valid_turns, 1);
231 assert_eq!(turns[0].model, "claude-opus-4-6");
232 assert_eq!(turns[0].uuid, "u1");
233 assert!(!turns[0].is_agent);
234 assert_eq!(turns[0].content_types, vec!["text"]);
235 }
236
237 #[test]
238 fn filters_synthetic_messages() {
239 let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
240 let f = write_jsonl(&[synthetic]);
241 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
242
243 assert_eq!(turns.len(), 0);
244 assert_eq!(quality.skipped_synthetic, 1);
245 }
246
247 #[test]
248 fn filters_zero_usage() {
249 let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
250 let f = write_jsonl(&[zero_usage]);
251 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
252
253 assert_eq!(turns.len(), 0);
254 assert_eq!(quality.skipped_invalid, 1);
255 }
256
257 #[test]
258 fn deduplicates_turns() {
259 let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
260 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
261
262 assert_eq!(turns.len(), 1);
263 assert_eq!(quality.duplicate_turns, 1);
264 }
265
266 #[test]
267 fn skips_malformed_lines() {
268 let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
269 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
270
271 assert_eq!(turns.len(), 1);
272 assert_eq!(quality.skipped_parse_error, 1);
273 }
274}