1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashMap;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15 let file =
16 File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17 let reader = BufReader::new(file);
18
19 let mut quality = DataQuality::default();
20 let mut turns = Vec::new();
21 let mut request_id_index: HashMap<String, usize> = HashMap::new();
22 let now = Utc::now();
23 let mut last_user_text: Option<String> = None;
24
25 for line_result in reader.lines() {
26 let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27 quality.total_lines += 1;
28
29 let entry: JournalEntry = match serde_json::from_str(&line) {
31 Ok(e) => e,
32 Err(_) => {
33 quality.skipped_parse_error += 1;
34 continue;
35 }
36 };
37
38 let msg = match entry {
40 JournalEntry::Assistant(msg) => msg,
41 JournalEntry::User(user_msg) => {
42 let content_val = user_msg.message.as_ref()
45 .and_then(|m| m.get("content"));
46 if let Some(content) = content_val {
47 let text = if let Some(s) = content.as_str() {
48 s.to_string()
50 } else if let Some(arr) = content.as_array() {
51 arr.iter()
53 .filter_map(|b| {
54 if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55 b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56 } else {
57 None
58 }
59 })
60 .collect::<Vec<_>>()
61 .join("\n")
62 } else {
63 String::new()
64 };
65
66 if !text.is_empty() {
67 let truncated = if text.len() > 500 {
68 format!("{}...", &text[..text.floor_char_boundary(500)])
69 } else {
70 text
71 };
72 last_user_text = Some(truncated);
73 }
74 }
75 continue;
76 }
77 JournalEntry::QueueOperation(_) => continue,
78 };
79
80 let api = match msg.message {
81 Some(api) => api,
82 None => {
83 quality.skipped_invalid += 1;
84 continue;
85 }
86 };
87
88 if !is_agent && msg.is_sidechain == Some(true) {
92 quality.skipped_sidechain += 1;
93 continue;
94 }
95
96 if api.model.as_deref() == Some("<synthetic>") {
98 quality.skipped_synthetic += 1;
99 continue;
100 }
101
102 let model = match api.model {
104 Some(m) => m,
105 None => {
106 quality.skipped_invalid += 1;
107 continue;
108 }
109 };
110
111 let usage = match api.usage {
113 Some(u) => u,
114 None => {
115 quality.skipped_invalid += 1;
116 continue;
117 }
118 };
119
120 let total_tokens = usage.input_tokens.unwrap_or(0)
122 + usage.output_tokens.unwrap_or(0)
123 + usage.cache_creation_input_tokens.unwrap_or(0)
124 + usage.cache_read_input_tokens.unwrap_or(0);
125 if total_tokens == 0 {
126 quality.skipped_invalid += 1;
127 continue;
128 }
129
130 let timestamp_str = match &msg.timestamp {
132 Some(ts) if !ts.is_empty() => ts.as_str(),
133 _ => {
134 quality.skipped_invalid += 1;
135 continue;
136 }
137 };
138 let timestamp: DateTime<Utc> = match timestamp_str.parse() {
139 Ok(ts) if ts <= now => ts,
140 _ => {
141 quality.skipped_invalid += 1;
142 continue;
143 }
144 };
145
146 let uuid = msg.uuid.unwrap_or_default();
151 let request_id_key = msg.request_id.clone().unwrap_or_default();
152
153 let mut content_types = Vec::new();
155 let mut assistant_text_parts = Vec::new();
156 let mut tool_names = Vec::new();
157
158 if let Some(ref blocks) = api.content {
159 for b in blocks {
160 match b {
161 ContentBlock::Text { text } => {
162 content_types.push("text".to_string());
163 if let Some(t) = text {
164 assistant_text_parts.push(t.clone());
165 }
166 }
167 ContentBlock::ToolUse { name, .. } => {
168 content_types.push("tool_use".to_string());
169 if let Some(n) = name {
170 tool_names.push(n.clone());
171 }
172 }
173 ContentBlock::Other => {
174 content_types.push("other".to_string());
175 }
176 }
177 }
178 }
179
180 let assistant_text = if assistant_text_parts.is_empty() {
182 None
183 } else {
184 let full = assistant_text_parts.join("\n");
185 Some(if full.len() > 500 {
186 format!("{}...", &full[..full.floor_char_boundary(500)])
187 } else {
188 full
189 })
190 };
191
192 let turn = ValidatedTurn {
194 uuid,
195 request_id: msg.request_id,
196 timestamp,
197 model,
198 usage,
199 stop_reason: api.stop_reason,
200 content_types,
201 is_agent,
202 agent_id: msg.agent_id,
203 user_text: last_user_text.take(),
204 assistant_text,
205 tool_names,
206 };
207
208 if !request_id_key.is_empty() {
210 if let Some(&idx) = request_id_index.get(&request_id_key) {
211 turns[idx] = turn;
212 quality.duplicate_turns += 1;
213 continue;
214 }
215 request_id_index.insert(request_id_key, turns.len());
216 }
217 turns.push(turn);
218 }
219
220 quality.valid_turns = turns.len();
221
222 Ok((turns, quality))
223}
224
225#[cfg(test)]
226mod tests {
227 use super::*;
228 use std::io::Write;
229 use tempfile::NamedTempFile;
230
231 const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
232
233 fn write_jsonl(lines: &[&str]) -> NamedTempFile {
234 let mut f = NamedTempFile::new().unwrap();
235 for line in lines {
236 writeln!(f, "{}", line).unwrap();
237 }
238 f.flush().unwrap();
239 f
240 }
241
242 #[test]
243 fn parse_valid_assistant_turn() {
244 let f = write_jsonl(&[VALID_ASSISTANT]);
245 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
246
247 assert_eq!(turns.len(), 1);
248 assert_eq!(quality.valid_turns, 1);
249 assert_eq!(turns[0].model, "claude-opus-4-6");
250 assert_eq!(turns[0].uuid, "u1");
251 assert!(!turns[0].is_agent);
252 assert_eq!(turns[0].content_types, vec!["text"]);
253 }
254
255 #[test]
256 fn filters_synthetic_messages() {
257 let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
258 let f = write_jsonl(&[synthetic]);
259 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
260
261 assert_eq!(turns.len(), 0);
262 assert_eq!(quality.skipped_synthetic, 1);
263 }
264
265 #[test]
266 fn filters_zero_usage() {
267 let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
268 let f = write_jsonl(&[zero_usage]);
269 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
270
271 assert_eq!(turns.len(), 0);
272 assert_eq!(quality.skipped_invalid, 1);
273 }
274
275 #[test]
276 fn deduplicates_turns() {
277 let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
278 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
279
280 assert_eq!(turns.len(), 1);
281 assert_eq!(quality.duplicate_turns, 1);
282 }
283
284 #[test]
285 fn skips_malformed_lines() {
286 let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
287 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
288
289 assert_eq!(turns.len(), 1);
290 assert_eq!(quality.skipped_parse_error, 1);
291 }
292
293 #[test]
294 fn filters_sidechain_turns() {
295 let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
296 let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
297 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
298
299 assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
300 assert_eq!(quality.skipped_sidechain, 1);
301 assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
302 }
303}