1use anyhow::{Context, Result};
2use chrono::{DateTime, Utc};
3use std::collections::HashSet;
4use std::fs::File;
5use std::io::{BufRead, BufReader};
6use std::path::Path;
7
8use super::models::{ContentBlock, DataQuality, JournalEntry, ValidatedTurn};
9
10pub fn parse_session_file(path: &Path, is_agent: bool) -> Result<(Vec<ValidatedTurn>, DataQuality)> {
15 let file =
16 File::open(path).with_context(|| format!("failed to open session file: {}", path.display()))?;
17 let reader = BufReader::new(file);
18
19 let mut quality = DataQuality::default();
20 let mut turns = Vec::new();
21 let mut seen_keys = HashSet::new();
22 let now = Utc::now();
23 let mut last_user_text: Option<String> = None;
24
25 for line_result in reader.lines() {
26 let line = line_result.with_context(|| format!("failed to read line from {}", path.display()))?;
27 quality.total_lines += 1;
28
29 let entry: JournalEntry = match serde_json::from_str(&line) {
31 Ok(e) => e,
32 Err(_) => {
33 quality.skipped_parse_error += 1;
34 continue;
35 }
36 };
37
38 let msg = match entry {
40 JournalEntry::Assistant(msg) => msg,
41 JournalEntry::User(user_msg) => {
42 let content_val = user_msg.message.as_ref()
45 .and_then(|m| m.get("content"));
46 if let Some(content) = content_val {
47 let text = if let Some(s) = content.as_str() {
48 s.to_string()
50 } else if let Some(arr) = content.as_array() {
51 arr.iter()
53 .filter_map(|b| {
54 if b.get("type").and_then(|t| t.as_str()) == Some("text") {
55 b.get("text").and_then(|t| t.as_str()).map(|s| s.to_string())
56 } else {
57 None
58 }
59 })
60 .collect::<Vec<_>>()
61 .join("\n")
62 } else {
63 String::new()
64 };
65
66 if !text.is_empty() {
67 let truncated = if text.len() > 500 {
68 format!("{}...", &text[..text.floor_char_boundary(500)])
69 } else {
70 text
71 };
72 last_user_text = Some(truncated);
73 }
74 }
75 continue;
76 }
77 JournalEntry::QueueOperation(_) => continue,
78 };
79
80 let api = match msg.message {
81 Some(api) => api,
82 None => {
83 quality.skipped_invalid += 1;
84 continue;
85 }
86 };
87
88 if msg.is_sidechain == Some(true) {
90 quality.skipped_sidechain += 1;
91 continue;
92 }
93
94 if api.model.as_deref() == Some("<synthetic>") {
96 quality.skipped_synthetic += 1;
97 continue;
98 }
99
100 let model = match api.model {
102 Some(m) => m,
103 None => {
104 quality.skipped_invalid += 1;
105 continue;
106 }
107 };
108
109 let usage = match api.usage {
111 Some(u) => u,
112 None => {
113 quality.skipped_invalid += 1;
114 continue;
115 }
116 };
117
118 let total_tokens = usage.input_tokens.unwrap_or(0)
120 + usage.output_tokens.unwrap_or(0)
121 + usage.cache_creation_input_tokens.unwrap_or(0)
122 + usage.cache_read_input_tokens.unwrap_or(0);
123 if total_tokens == 0 {
124 quality.skipped_invalid += 1;
125 continue;
126 }
127
128 let timestamp_str = match &msg.timestamp {
130 Some(ts) if !ts.is_empty() => ts.as_str(),
131 _ => {
132 quality.skipped_invalid += 1;
133 continue;
134 }
135 };
136 let timestamp: DateTime<Utc> = match timestamp_str.parse() {
137 Ok(ts) if ts <= now => ts,
138 _ => {
139 quality.skipped_invalid += 1;
140 continue;
141 }
142 };
143
144 let uuid = msg.uuid.unwrap_or_default();
146 let dedup_key = format!("{}:{}", uuid, msg.request_id.as_deref().unwrap_or(""));
147 if !seen_keys.insert(dedup_key) {
148 quality.duplicate_turns += 1;
149 continue;
150 }
151
152 let mut content_types = Vec::new();
154 let mut assistant_text_parts = Vec::new();
155 let mut tool_names = Vec::new();
156
157 if let Some(ref blocks) = api.content {
158 for b in blocks {
159 match b {
160 ContentBlock::Text { text } => {
161 content_types.push("text".to_string());
162 if let Some(t) = text {
163 assistant_text_parts.push(t.clone());
164 }
165 }
166 ContentBlock::ToolUse { name, .. } => {
167 content_types.push("tool_use".to_string());
168 if let Some(n) = name {
169 tool_names.push(n.clone());
170 }
171 }
172 ContentBlock::Other => {
173 content_types.push("other".to_string());
174 }
175 }
176 }
177 }
178
179 let assistant_text = if assistant_text_parts.is_empty() {
181 None
182 } else {
183 let full = assistant_text_parts.join("\n");
184 Some(if full.len() > 500 {
185 format!("{}...", &full[..full.floor_char_boundary(500)])
186 } else {
187 full
188 })
189 };
190
191 turns.push(ValidatedTurn {
193 uuid,
194 request_id: msg.request_id,
195 timestamp,
196 model,
197 usage,
198 stop_reason: api.stop_reason,
199 content_types,
200 is_agent,
201 agent_id: msg.agent_id,
202 user_text: last_user_text.take(),
203 assistant_text,
204 tool_names,
205 });
206 }
207
208 quality.valid_turns = turns.len();
209
210 Ok((turns, quality))
211}
212
213#[cfg(test)]
214mod tests {
215 use super::*;
216 use std::io::Write;
217 use tempfile::NamedTempFile;
218
219 const VALID_ASSISTANT: &str = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
220
221 fn write_jsonl(lines: &[&str]) -> NamedTempFile {
222 let mut f = NamedTempFile::new().unwrap();
223 for line in lines {
224 writeln!(f, "{}", line).unwrap();
225 }
226 f.flush().unwrap();
227 f
228 }
229
230 #[test]
231 fn parse_valid_assistant_turn() {
232 let f = write_jsonl(&[VALID_ASSISTANT]);
233 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
234
235 assert_eq!(turns.len(), 1);
236 assert_eq!(quality.valid_turns, 1);
237 assert_eq!(turns[0].model, "claude-opus-4-6");
238 assert_eq!(turns[0].uuid, "u1");
239 assert!(!turns[0].is_agent);
240 assert_eq!(turns[0].content_types, vec!["text"]);
241 }
242
243 #[test]
244 fn filters_synthetic_messages() {
245 let synthetic = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"<synthetic>","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
246 let f = write_jsonl(&[synthetic]);
247 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
248
249 assert_eq!(turns.len(), 0);
250 assert_eq!(quality.skipped_synthetic, 1);
251 }
252
253 #[test]
254 fn filters_zero_usage() {
255 let zero_usage = r#"{"type":"assistant","uuid":"u1","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":0,"output_tokens":0,"cache_creation_input_tokens":0,"cache_read_input_tokens":0},"content":[{"type":"text","text":"hi"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":false,"parentUuid":null,"requestId":"r1"}"#;
256 let f = write_jsonl(&[zero_usage]);
257 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
258
259 assert_eq!(turns.len(), 0);
260 assert_eq!(quality.skipped_invalid, 1);
261 }
262
263 #[test]
264 fn deduplicates_turns() {
265 let f = write_jsonl(&[VALID_ASSISTANT, VALID_ASSISTANT]);
266 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
267
268 assert_eq!(turns.len(), 1);
269 assert_eq!(quality.duplicate_turns, 1);
270 }
271
272 #[test]
273 fn skips_malformed_lines() {
274 let f = write_jsonl(&["not valid json at all", VALID_ASSISTANT]);
275 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
276
277 assert_eq!(turns.len(), 1);
278 assert_eq!(quality.skipped_parse_error, 1);
279 }
280
281 #[test]
282 fn filters_sidechain_turns() {
283 let sidechain = r#"{"type":"assistant","uuid":"u2","timestamp":"2026-03-16T10:00:00Z","message":{"model":"claude-opus-4-6","role":"assistant","stop_reason":"end_turn","usage":{"input_tokens":3,"output_tokens":100,"cache_creation_input_tokens":500,"cache_read_input_tokens":10000},"content":[{"type":"text","text":"abandoned"}]},"sessionId":"s1","cwd":"/tmp","gitBranch":"","userType":"external","isSidechain":true,"parentUuid":"p1","requestId":"r2"}"#;
284 let f = write_jsonl(&[sidechain, VALID_ASSISTANT]);
285 let (turns, quality) = parse_session_file(f.path(), false).unwrap();
286
287 assert_eq!(turns.len(), 1, "sidechain turn should be filtered out");
288 assert_eq!(quality.skipped_sidechain, 1);
289 assert_eq!(turns[0].uuid, "u1", "only main-chain turn should remain");
290 }
291}