1use crate::cursor::TranscriptCursor;
2use crate::filter::{classify_record, update_progress_last, FilterAction};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::io::{Read, Seek, SeekFrom, Write};
6use std::path::Path;
7
8const DEFAULT_MAX_BYTES: u64 = 4 * 1024 * 1024; pub type IndexWriterFn = dyn Fn(&str, u64, u64, &serde_json::Value) -> anyhow::Result<()>;
12
13#[derive(Debug, Serialize, Deserialize, Clone)]
14pub struct IngestStats {
15 pub records_read: usize,
16 pub records_kept: usize,
17 pub records_dropped: usize,
18 pub bytes_read: u64,
19 pub kept_by_type: HashMap<String, usize>,
20 pub dropped_by_type: HashMap<String, usize>,
21 pub from_offset: u64,
22 pub to_offset: u64,
23}
24
25pub fn ingest_transcript_delta(
34 project_dir: &Path,
35 session_id: &str,
36 transcript_path: &Path,
37 index_writer: Option<&IndexWriterFn>,
38) -> anyhow::Result<IngestStats> {
39 let state_dir = project_dir.join("state");
40 std::fs::create_dir_all(&state_dir)?;
41
42 let lock_path = state_dir.join(format!("ingest.{session_id}.lock"));
44 let _lock = edda_store::lock_file(&lock_path)?;
45
46 let mut cursor = TranscriptCursor::load(&state_dir, session_id)?.unwrap_or(TranscriptCursor {
48 offset: 0,
49 file_size: 0,
50 mtime_unix: 0,
51 updated_at_unix: 0,
52 });
53
54 let meta = std::fs::metadata(transcript_path)?;
56 let file_size = meta.len();
57
58 cursor.detect_truncation(file_size);
60
61 if cursor.offset >= file_size {
62 return Ok(IngestStats {
64 records_read: 0,
65 records_kept: 0,
66 records_dropped: 0,
67 bytes_read: 0,
68 kept_by_type: HashMap::new(),
69 dropped_by_type: HashMap::new(),
70 from_offset: cursor.offset,
71 to_offset: cursor.offset,
72 });
73 }
74
75 let max_bytes: u64 = std::env::var("EDDA_TRANSCRIPT_MAX_BYTES")
76 .ok()
77 .and_then(|v| v.parse().ok())
78 .unwrap_or(DEFAULT_MAX_BYTES);
79
80 let mut file = std::fs::File::open(transcript_path)?;
82 file.seek(SeekFrom::Start(cursor.offset))?;
83
84 let bytes_to_read = (file_size - cursor.offset).min(max_bytes);
85 let mut buf = vec![0u8; bytes_to_read as usize];
86 let actually_read = file.read(&mut buf)?;
87 buf.truncate(actually_read);
88
89 let consumable_len = match buf.iter().rposition(|&b| b == b'\n') {
91 Some(pos) => pos + 1,
92 None => 0, };
94
95 if consumable_len == 0 {
96 return Ok(IngestStats {
97 records_read: 0,
98 records_kept: 0,
99 records_dropped: 0,
100 bytes_read: 0,
101 kept_by_type: HashMap::new(),
102 dropped_by_type: HashMap::new(),
103 from_offset: cursor.offset,
104 to_offset: cursor.offset,
105 });
106 }
107
108 let from_offset = cursor.offset;
109 let data = &buf[..consumable_len];
110
111 let transcripts_dir = project_dir.join("transcripts");
113 std::fs::create_dir_all(&transcripts_dir)?;
114 let store_path = transcripts_dir.join(format!("{session_id}.jsonl"));
115 let mut store_file = std::fs::OpenOptions::new()
116 .create(true)
117 .append(true)
118 .open(&store_path)?;
119
120 let progress_path = state_dir.join(format!("progress_last.{session_id}.json"));
122 let mut progress_map: HashMap<String, serde_json::Value> = if progress_path.exists() {
123 let content = std::fs::read_to_string(&progress_path)?;
124 serde_json::from_str(&content).unwrap_or_default()
125 } else {
126 HashMap::new()
127 };
128
129 let mut stats = IngestStats {
130 records_read: 0,
131 records_kept: 0,
132 records_dropped: 0,
133 bytes_read: consumable_len as u64,
134 kept_by_type: HashMap::new(),
135 dropped_by_type: HashMap::new(),
136 from_offset,
137 to_offset: from_offset + consumable_len as u64,
138 };
139
140 for raw_line in data.split(|&b| b == b'\n') {
142 if raw_line.is_empty() {
143 continue;
144 }
145
146 stats.records_read += 1;
147
148 let parsed: serde_json::Value = match serde_json::from_slice(raw_line) {
149 Ok(v) => v,
150 Err(_) => {
151 stats.records_dropped += 1;
152 *stats
153 .dropped_by_type
154 .entry("parse_error".into())
155 .or_insert(0) += 1;
156 continue;
157 }
158 };
159
160 let record_type = parsed
161 .get("type")
162 .and_then(|v| v.as_str())
163 .unwrap_or("unknown")
164 .to_string();
165
166 match classify_record(&parsed) {
167 FilterAction::Keep => {
168 let store_offset = store_file.seek(SeekFrom::End(0)).unwrap_or(0);
170
171 store_file.write_all(raw_line)?;
173 store_file.write_all(b"\n")?;
174
175 let store_len = raw_line.len() as u64 + 1; if let Some(writer) = index_writer {
179 let raw_str = std::str::from_utf8(raw_line).unwrap_or("");
180 writer(raw_str, store_offset, store_len, &parsed)?;
181 }
182
183 stats.records_kept += 1;
184 *stats.kept_by_type.entry(record_type).or_insert(0) += 1;
185 }
186 FilterAction::Progress => {
187 update_progress_last(&mut progress_map, &parsed);
188 stats.records_dropped += 1;
189 *stats.dropped_by_type.entry(record_type).or_insert(0) += 1;
190 }
191 FilterAction::Drop => {
192 stats.records_dropped += 1;
193 *stats.dropped_by_type.entry(record_type).or_insert(0) += 1;
194 }
195 }
196 }
197
198 let progress_json = serde_json::to_string_pretty(&progress_map)?;
200 edda_store::write_atomic(&progress_path, progress_json.as_bytes())?;
201
202 cursor.offset = stats.to_offset;
204 cursor.file_size = file_size;
205 cursor.updated_at_unix = std::time::SystemTime::now()
206 .duration_since(std::time::UNIX_EPOCH)
207 .map(|d| d.as_secs() as i64)
208 .unwrap_or(0);
209 cursor.save(&state_dir, session_id)?;
210
211 Ok(stats)
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use std::io::Write;
218
219 fn write_transcript(dir: &Path, lines: &[&str]) -> std::path::PathBuf {
220 let path = dir.join("transcript.jsonl");
221 let mut f = std::fs::File::create(&path).unwrap();
222 for line in lines {
223 writeln!(f, "{line}").unwrap();
224 }
225 path
226 }
227
228 #[test]
229 fn ingest_basic_keep_and_drop() {
230 let tmp = tempfile::tempdir().unwrap();
231 let project_dir = tmp.path().join("project");
232 std::fs::create_dir_all(&project_dir).unwrap();
233
234 let transcript = write_transcript(
235 tmp.path(),
236 &[
237 r#"{"type":"user","uuid":"u1","message":{"content":"hello"}}"#,
238 r#"{"type":"assistant","uuid":"a1","parentUuid":"u1","message":{"content":[{"type":"text","text":"hi"}]}}"#,
239 r#"{"type":"progress","toolUseID":"t1","data":{"output":"running"}}"#,
240 r#"{"type":"system","subtype":"turn_duration","duration_ms":100}"#,
241 ],
242 );
243
244 let stats = ingest_transcript_delta(&project_dir, "sess1", &transcript, None).unwrap();
245
246 assert_eq!(stats.records_read, 4);
247 assert_eq!(stats.records_kept, 2); assert_eq!(stats.records_dropped, 2); let store = project_dir.join("transcripts").join("sess1.jsonl");
252 let content = std::fs::read_to_string(&store).unwrap();
253 let lines: Vec<&str> = content.lines().collect();
254 assert_eq!(lines.len(), 2);
255 assert!(lines[0].contains("\"type\":\"user\""));
256 assert!(lines[1].contains("\"type\":\"assistant\""));
257 }
258
259 #[test]
260 fn ingest_cursor_based_delta() {
261 let tmp = tempfile::tempdir().unwrap();
262 let project_dir = tmp.path().join("project");
263 std::fs::create_dir_all(&project_dir).unwrap();
264
265 let transcript_path = tmp.path().join("transcript.jsonl");
266
267 {
269 let mut f = std::fs::File::create(&transcript_path).unwrap();
270 writeln!(
271 f,
272 r#"{{"type":"user","uuid":"u1","message":{{"content":"first"}}}}"#
273 )
274 .unwrap();
275 }
276 let stats1 =
277 ingest_transcript_delta(&project_dir, "sess1", &transcript_path, None).unwrap();
278 assert_eq!(stats1.records_kept, 1);
279
280 {
282 let mut f = std::fs::OpenOptions::new()
283 .append(true)
284 .open(&transcript_path)
285 .unwrap();
286 writeln!(
287 f,
288 r#"{{"type":"user","uuid":"u2","message":{{"content":"second"}}}}"#
289 )
290 .unwrap();
291 }
292 let stats2 =
293 ingest_transcript_delta(&project_dir, "sess1", &transcript_path, None).unwrap();
294 assert_eq!(stats2.records_kept, 1); assert_eq!(stats2.from_offset, stats1.to_offset);
296
297 let store = project_dir.join("transcripts").join("sess1.jsonl");
299 let content = std::fs::read_to_string(&store).unwrap();
300 assert_eq!(content.lines().count(), 2);
301 }
302
303 #[test]
304 fn ingest_with_index_writer() {
305 let tmp = tempfile::tempdir().unwrap();
306 let project_dir = tmp.path().join("project");
307 std::fs::create_dir_all(&project_dir).unwrap();
308
309 let transcript = write_transcript(
310 tmp.path(),
311 &[r#"{"type":"user","uuid":"u1","message":{"content":"hello"}}"#],
312 );
313
314 let called = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
315 let called_clone = called.clone();
316
317 let writer = move |_raw: &str,
318 _offset: u64,
319 _len: u64,
320 _json: &serde_json::Value|
321 -> anyhow::Result<()> {
322 called_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
323 Ok(())
324 };
325
326 ingest_transcript_delta(&project_dir, "sess1", &transcript, Some(&writer)).unwrap();
327
328 assert_eq!(called.load(std::sync::atomic::Ordering::SeqCst), 1);
329 }
330}