Skip to main content

edda_transcript/
ingest.rs

1use crate::cursor::TranscriptCursor;
2use crate::filter::{classify_record, update_progress_last, FilterAction};
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::io::{Read, Seek, SeekFrom, Write};
6use std::path::Path;
7
8const DEFAULT_MAX_BYTES: u64 = 4 * 1024 * 1024; // 4MB
9
10/// Callback type for index generation during ingest.
11pub type IndexWriterFn = dyn Fn(&str, u64, u64, &serde_json::Value) -> anyhow::Result<()>;
12
13#[derive(Debug, Serialize, Deserialize, Clone)]
14pub struct IngestStats {
15    pub records_read: usize,
16    pub records_kept: usize,
17    pub records_dropped: usize,
18    pub bytes_read: u64,
19    pub kept_by_type: HashMap<String, usize>,
20    pub dropped_by_type: HashMap<String, usize>,
21    pub from_offset: u64,
22    pub to_offset: u64,
23}
24
25/// Perform cursor-based delta ingest from a Claude transcript JSONL file.
26///
27/// Reads from `transcript_path` starting at the cursor offset (or 0 if new),
28/// classifies records, writes kept records verbatim to the store,
29/// and returns ingest statistics.
30///
31/// If `index_writer` is Some, calls it for each kept record with
32/// (raw_line, store_offset, store_len, parsed_json) for index generation.
33pub fn ingest_transcript_delta(
34    project_dir: &Path,
35    session_id: &str,
36    transcript_path: &Path,
37    index_writer: Option<&IndexWriterFn>,
38) -> anyhow::Result<IngestStats> {
39    let state_dir = project_dir.join("state");
40    std::fs::create_dir_all(&state_dir)?;
41
42    // Session-level lock
43    let lock_path = state_dir.join(format!("ingest.{session_id}.lock"));
44    let _lock = edda_store::lock_file(&lock_path)?;
45
46    // Load or create cursor
47    let mut cursor = TranscriptCursor::load(&state_dir, session_id)?.unwrap_or(TranscriptCursor {
48        offset: 0,
49        file_size: 0,
50        mtime_unix: 0,
51        updated_at_unix: 0,
52    });
53
54    // Check file metadata
55    let meta = std::fs::metadata(transcript_path)?;
56    let file_size = meta.len();
57
58    // Truncation detection
59    cursor.detect_truncation(file_size);
60
61    if cursor.offset >= file_size {
62        // Nothing new to read
63        return Ok(IngestStats {
64            records_read: 0,
65            records_kept: 0,
66            records_dropped: 0,
67            bytes_read: 0,
68            kept_by_type: HashMap::new(),
69            dropped_by_type: HashMap::new(),
70            from_offset: cursor.offset,
71            to_offset: cursor.offset,
72        });
73    }
74
75    let max_bytes: u64 = std::env::var("EDDA_TRANSCRIPT_MAX_BYTES")
76        .ok()
77        .and_then(|v| v.parse().ok())
78        .unwrap_or(DEFAULT_MAX_BYTES);
79
80    // Open and seek
81    let mut file = std::fs::File::open(transcript_path)?;
82    file.seek(SeekFrom::Start(cursor.offset))?;
83
84    let bytes_to_read = (file_size - cursor.offset).min(max_bytes);
85    let mut buf = vec![0u8; bytes_to_read as usize];
86    let actually_read = file.read(&mut buf)?;
87    buf.truncate(actually_read);
88
89    // Partial line protection: only consume up to the last newline
90    let consumable_len = match buf.iter().rposition(|&b| b == b'\n') {
91        Some(pos) => pos + 1,
92        None => 0, // no complete line
93    };
94
95    if consumable_len == 0 {
96        return Ok(IngestStats {
97            records_read: 0,
98            records_kept: 0,
99            records_dropped: 0,
100            bytes_read: 0,
101            kept_by_type: HashMap::new(),
102            dropped_by_type: HashMap::new(),
103            from_offset: cursor.offset,
104            to_offset: cursor.offset,
105        });
106    }
107
108    let from_offset = cursor.offset;
109    let data = &buf[..consumable_len];
110
111    // Prepare store path (verbatim append)
112    let transcripts_dir = project_dir.join("transcripts");
113    std::fs::create_dir_all(&transcripts_dir)?;
114    let store_path = transcripts_dir.join(format!("{session_id}.jsonl"));
115    let mut store_file = std::fs::OpenOptions::new()
116        .create(true)
117        .append(true)
118        .open(&store_path)?;
119
120    // Load progress_last map
121    let progress_path = state_dir.join(format!("progress_last.{session_id}.json"));
122    let mut progress_map: HashMap<String, serde_json::Value> = if progress_path.exists() {
123        let content = std::fs::read_to_string(&progress_path)?;
124        serde_json::from_str(&content).unwrap_or_default()
125    } else {
126        HashMap::new()
127    };
128
129    let mut stats = IngestStats {
130        records_read: 0,
131        records_kept: 0,
132        records_dropped: 0,
133        bytes_read: consumable_len as u64,
134        kept_by_type: HashMap::new(),
135        dropped_by_type: HashMap::new(),
136        from_offset,
137        to_offset: from_offset + consumable_len as u64,
138    };
139
140    // Process line by line
141    for raw_line in data.split(|&b| b == b'\n') {
142        if raw_line.is_empty() {
143            continue;
144        }
145
146        stats.records_read += 1;
147
148        let parsed: serde_json::Value = match serde_json::from_slice(raw_line) {
149            Ok(v) => v,
150            Err(_) => {
151                stats.records_dropped += 1;
152                *stats
153                    .dropped_by_type
154                    .entry("parse_error".into())
155                    .or_insert(0) += 1;
156                continue;
157            }
158        };
159
160        let record_type = parsed
161            .get("type")
162            .and_then(|v| v.as_str())
163            .unwrap_or("unknown")
164            .to_string();
165
166        match classify_record(&parsed) {
167            FilterAction::Keep => {
168                // Record store_offset before write
169                let store_offset = store_file.seek(SeekFrom::End(0)).unwrap_or(0);
170
171                // Write raw line verbatim (CONTRACT BRIDGE-03)
172                store_file.write_all(raw_line)?;
173                store_file.write_all(b"\n")?;
174
175                let store_len = raw_line.len() as u64 + 1; // +1 for newline
176
177                // Call index writer if provided
178                if let Some(writer) = index_writer {
179                    let raw_str = std::str::from_utf8(raw_line).unwrap_or("");
180                    writer(raw_str, store_offset, store_len, &parsed)?;
181                }
182
183                stats.records_kept += 1;
184                *stats.kept_by_type.entry(record_type).or_insert(0) += 1;
185            }
186            FilterAction::Progress => {
187                update_progress_last(&mut progress_map, &parsed);
188                stats.records_dropped += 1;
189                *stats.dropped_by_type.entry(record_type).or_insert(0) += 1;
190            }
191            FilterAction::Drop => {
192                stats.records_dropped += 1;
193                *stats.dropped_by_type.entry(record_type).or_insert(0) += 1;
194            }
195        }
196    }
197
198    // Save progress_last map
199    let progress_json = serde_json::to_string_pretty(&progress_map)?;
200    edda_store::write_atomic(&progress_path, progress_json.as_bytes())?;
201
202    // Update and save cursor
203    cursor.offset = stats.to_offset;
204    cursor.file_size = file_size;
205    cursor.updated_at_unix = std::time::SystemTime::now()
206        .duration_since(std::time::UNIX_EPOCH)
207        .map(|d| d.as_secs() as i64)
208        .unwrap_or(0);
209    cursor.save(&state_dir, session_id)?;
210
211    Ok(stats)
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217    use std::io::Write;
218
219    fn write_transcript(dir: &Path, lines: &[&str]) -> std::path::PathBuf {
220        let path = dir.join("transcript.jsonl");
221        let mut f = std::fs::File::create(&path).unwrap();
222        for line in lines {
223            writeln!(f, "{line}").unwrap();
224        }
225        path
226    }
227
228    #[test]
229    fn ingest_basic_keep_and_drop() {
230        let tmp = tempfile::tempdir().unwrap();
231        let project_dir = tmp.path().join("project");
232        std::fs::create_dir_all(&project_dir).unwrap();
233
234        let transcript = write_transcript(
235            tmp.path(),
236            &[
237                r#"{"type":"user","uuid":"u1","message":{"content":"hello"}}"#,
238                r#"{"type":"assistant","uuid":"a1","parentUuid":"u1","message":{"content":[{"type":"text","text":"hi"}]}}"#,
239                r#"{"type":"progress","toolUseID":"t1","data":{"output":"running"}}"#,
240                r#"{"type":"system","subtype":"turn_duration","duration_ms":100}"#,
241            ],
242        );
243
244        let stats = ingest_transcript_delta(&project_dir, "sess1", &transcript, None).unwrap();
245
246        assert_eq!(stats.records_read, 4);
247        assert_eq!(stats.records_kept, 2); // user + assistant
248        assert_eq!(stats.records_dropped, 2); // progress + turn_duration
249
250        // Verify verbatim store
251        let store = project_dir.join("transcripts").join("sess1.jsonl");
252        let content = std::fs::read_to_string(&store).unwrap();
253        let lines: Vec<&str> = content.lines().collect();
254        assert_eq!(lines.len(), 2);
255        assert!(lines[0].contains("\"type\":\"user\""));
256        assert!(lines[1].contains("\"type\":\"assistant\""));
257    }
258
259    #[test]
260    fn ingest_cursor_based_delta() {
261        let tmp = tempfile::tempdir().unwrap();
262        let project_dir = tmp.path().join("project");
263        std::fs::create_dir_all(&project_dir).unwrap();
264
265        let transcript_path = tmp.path().join("transcript.jsonl");
266
267        // First write
268        {
269            let mut f = std::fs::File::create(&transcript_path).unwrap();
270            writeln!(
271                f,
272                r#"{{"type":"user","uuid":"u1","message":{{"content":"first"}}}}"#
273            )
274            .unwrap();
275        }
276        let stats1 =
277            ingest_transcript_delta(&project_dir, "sess1", &transcript_path, None).unwrap();
278        assert_eq!(stats1.records_kept, 1);
279
280        // Append more
281        {
282            let mut f = std::fs::OpenOptions::new()
283                .append(true)
284                .open(&transcript_path)
285                .unwrap();
286            writeln!(
287                f,
288                r#"{{"type":"user","uuid":"u2","message":{{"content":"second"}}}}"#
289            )
290            .unwrap();
291        }
292        let stats2 =
293            ingest_transcript_delta(&project_dir, "sess1", &transcript_path, None).unwrap();
294        assert_eq!(stats2.records_kept, 1); // only the new line
295        assert_eq!(stats2.from_offset, stats1.to_offset);
296
297        // Store should have 2 lines total
298        let store = project_dir.join("transcripts").join("sess1.jsonl");
299        let content = std::fs::read_to_string(&store).unwrap();
300        assert_eq!(content.lines().count(), 2);
301    }
302
303    #[test]
304    fn ingest_with_index_writer() {
305        let tmp = tempfile::tempdir().unwrap();
306        let project_dir = tmp.path().join("project");
307        std::fs::create_dir_all(&project_dir).unwrap();
308
309        let transcript = write_transcript(
310            tmp.path(),
311            &[r#"{"type":"user","uuid":"u1","message":{"content":"hello"}}"#],
312        );
313
314        let called = std::sync::Arc::new(std::sync::atomic::AtomicUsize::new(0));
315        let called_clone = called.clone();
316
317        let writer = move |_raw: &str,
318                           _offset: u64,
319                           _len: u64,
320                           _json: &serde_json::Value|
321              -> anyhow::Result<()> {
322            called_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
323            Ok(())
324        };
325
326        ingest_transcript_delta(&project_dir, "sess1", &transcript, Some(&writer)).unwrap();
327
328        assert_eq!(called.load(std::sync::atomic::Ordering::SeqCst), 1);
329    }
330}