1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15 let file =
16 File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17 let reader = BufReader::new(file);
18
19 let mut quality = DataQuality::default();
20 let mut turns = Vec::new();
21 let mut seen_keys = HashSet::new();
22 let now = Utc::now();
23 let mut last_user_text: Option<String> = None;
24
25 for line_result in reader.lines() {
26 let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27 quality.total_lines += 1;
28
29 let entry: JournalEntry = match serde_json::from_str(&line) {
31 Ok(e) => e,
32 Err(_) => {
33 quality.skipped_parse_error += 1;
34 continue;
35 }
36 };
37
38 let msg = match entry {
40 JournalEntry::Assistant(msg) => msg,
41 JournalEntry::User(user_msg) => {
42 let content_val = user_msg.message.as_ref()
45 .and_then(|m| m.get("content"));
46 if let Some(content) = content_val {
47 let text = if let Some(s) = content.as_str() {
48 s.to_string()
50 } else if let Some(arr) = content.as_array() {
51 arr.iter()
53 .filter_map(|b| {
54 if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55 b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56 } else {
57 None
58 }
59 })
60 .collect::<Vec<_>>()
61 .join("\n")
62 } else {
63 String::new()
64 };
65
66 if !text.is_empty() {
67 let truncated = if text.len() > 500 {
68 format!("{}...", &text[..text.floor_char_boundary(500)])
69 } else {
70 text
71 };
72 last_user_text = Some(truncated);
73 }
74 }
75 continue;
76 }
77 JournalEntry::QueueOperation(_) => continue,
78 };
79
80 let api = match msg.message {
81 Some(api) => api,
82 None => {
83 quality.skipped_invalid += 1;
84 continue;
85 }
86 };
87
88 if !is_agent && msg.is_sidechain == Some(true) {
92 quality.skipped_sidechain += 1;
93 continue;
94 }
95
96 if api.model.as_deref() == Some("<synthetic>") {
98 quality.skipped_synthetic += 1;
99 continue;
100 }
101
102 let model = match api.model {
104 Some(m) => m,
105 None => {
106 quality.skipped_invalid += 1;
107 continue;
108 }
109 };
110
111 let usage = match api.usage {
113 Some(u) => u,
114 None => {
115 quality.skipped_invalid += 1;
116 continue;
117 }
118 };
119
120 let total_tokens = usage.input_tokens.unwrap_or(0)
122 + usage.output_tokens.unwrap_or(0)
123 + usage.cache_creation_input_tokens.unwrap_or(0)
124 + usage.cache_read_input_tokens.unwrap_or(0);
125 if total_tokens == 0 {
126 quality.skipped_invalid += 1;
127 continue;
128 }
129
130 let timestamp_str = match &msg.timestamp {
132 Some(ts) if !ts.is_empty() => ts.as_str(),
133 _ => {
134 quality.skipped_invalid += 1;
135 continue;
136 }
137 };
138 let timestamp: DateTime<Utc> = match timestamp_str.parse() {
139 Ok(ts) if ts <= now => ts,
140 _ => {
141 quality.skipped_invalid += 1;
142 continue;
143 }
144 };
145
146 let uuid = msg.uuid.unwrap_or_default();
148 let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
149 if !seen_keys.insert(dedup_key) {
150 quality.duplicate_turns += 1;
151 continue;
152 }
153
154 let mut content_types = Vec::new();
156 let mut assistant_text_parts = Vec::new();
157 let mut tool_names = Vec::new();
158
159 if let Some(ref blocks) = api.content {
160 for b in blocks {
161 match b {
162 ContentBlock::Text { text } => {
163 content_types.push("text".to_string());
164 if let Some(t) = text {
165 assistant_text_parts.push(t.clone());
166 }
167 }
168 ContentBlock::ToolUse { name, .. } => {
169 content_types.push("tool_use".to_string());
170 if let Some(n) = name {
171 tool_names.push(n.clone());
172 }
173 }
174 ContentBlock::Other => {
175 content_types.push("other".to_string());
176 }
177 }
178 }
179 }
180
181 let assistant_text = if assistant_text_parts.is_empty() {
183 None
184 } else {
185 let full = assistant_text_parts.join("\n");
186 Some(if full.len() > 500 {
187 format!("{}...", &full[..full.floor_char_boundary(500)])
188 } else {
189 full
190 })
191 };
192
193 turns.push(ValidatedTurn {
195 uuid,
196 request_id: msg.request_id,
197 timestamp,
198 model,
199 usage,
200 stop_reason: api.stop_reason,
201 content_types,
202 is_agent,
203 agent_id: msg.agent_id,
204 user_text: last_user_text.take(),
205 assistant_text,
206 tool_names,
207 });
208 }
209
210 quality.valid_turns = turns.len();
211
212 Ok((turns, quality))
213}
214
215#[cfg(test)]
216mod tests {
217 use super::*;
218 use std::io::Write;
219 use tempfile::NamedTempFile;
220
221 const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
222
223 fn write_jsonl(lines: &[&str]) -> NamedTempFile {
224 let mut f = NamedTempFile::new().unwrap();
225 for line in lines {
226 writeln!(f, "{}", line).unwrap();
227 }
228 f.flush().unwrap();
229 f
230 }
231
232 #[test]
233 fn parse_valid_assistant_turn() {
234 let f = write_jsonl(&[VALID_ASSISTANT]);
235 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
236
237 assert_eq!(turns.len(), 1);
238 assert_eq!(quality.valid_turns, 1);
239 assert_eq!(turns[0].model, "claude-opus-4-6");
240 assert_eq!(turns[0].uuid, "u1");
241 assert!(!turns[0].is_agent);
242 assert_eq!(turns[0].content_types, vec!["text"]);
243 }
244
245 #[test]
246 fn filters_synthetic_messages() {
247 let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
248 let f = write_jsonl(&[synthetic]);
249 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
250
251 assert_eq!(turns.len(), 0);
252 assert_eq!(quality.skipped_synthetic, 1);
253 }
254
255 #[test]
256 fn filters_zero_usage() {
257 let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
258 let f = write_jsonl(&[zero_usage]);
259 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
260
261 assert_eq!(turns.len(), 0);
262 assert_eq!(quality.skipped_invalid, 1);
263 }
264
265 #[test]
266 fn deduplicates_turns() {
267 let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
268 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
269
270 assert_eq!(turns.len(), 1);
271 assert_eq!(quality.duplicate_turns, 1);
272 }
273
274 #[test]
275 fn skips_malformed_lines() {
276 let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
277 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
278
279 assert_eq!(turns.len(), 1);
280 assert_eq!(quality.skipped_parse_error, 1);
281 }
282
283 #[test]
284 fn filters_sidechain_turns() {
285 let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
286 let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
287 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
288
289 assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
290 assert_eq!(quality.skipped_sidechain, 1);
291 assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
292 }
293}