ragc_common/
archive.rs

1// Archive I/O
2// Custom binary format for storing compressed genome data in streams/parts
3
4use crate::varint::{read_varint, write_varint};
5use anyhow::{Context, Result};
6use std::collections::{BTreeMap, HashMap};
7use std::fs::File;
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11/// A part within a stream (offset and size in file)
12#[derive(Debug, Clone)]
13struct Part {
14    offset: u64,
15    size: u64,
16}
17
18impl Part {
19    fn new(offset: u64, size: u64) -> Self {
20        Part { offset, size }
21    }
22}
23
24/// A stream containing multiple parts
25#[derive(Debug)]
26struct Stream {
27    stream_name: String,
28    cur_id: usize,
29    raw_size: u64,
30    packed_size: u64,
31    packed_data_size: u64,
32    parts: Vec<Part>,
33}
34
35impl Stream {
36    fn new(stream_name: String) -> Self {
37        Stream {
38            stream_name,
39            cur_id: 0,
40            raw_size: 0,
41            packed_size: 0,
42            packed_data_size: 0,
43            parts: Vec::new(),
44        }
45    }
46}
47
48/// Archive for reading/writing AGC data
49pub struct Archive {
50    input_mode: bool,
51    file: Option<File>,
52    reader: Option<BufReader<File>>,
53    writer: Option<BufWriter<File>>,
54    f_offset: u64,
55    streams: Vec<Stream>,
56    stream_map: HashMap<String, usize>,
57    /// Write buffer for parallel Phase 3 (C++ AGC: m_buffer)
58    /// Maps stream_id -> Vec of (data, metadata) tuples
59    /// BTreeMap ensures writes are flushed in sorted stream_id order for determinism
60    write_buffer: BTreeMap<usize, Vec<(Vec<u8>, u64)>>,
61}
62
63impl Archive {
64    /// Create a new archive in input (read) mode
65    pub fn new_reader() -> Self {
66        Archive {
67            input_mode: true,
68            file: None,
69            reader: None,
70            writer: None,
71            f_offset: 0,
72            streams: Vec::new(),
73            stream_map: HashMap::new(),
74            write_buffer: BTreeMap::new(),
75        }
76    }
77
78    /// Create a new archive in output (write) mode
79    pub fn new_writer() -> Self {
80        Archive {
81            input_mode: false,
82            file: None,
83            reader: None,
84            writer: None,
85            f_offset: 0,
86            streams: Vec::new(),
87            stream_map: HashMap::new(),
88            write_buffer: BTreeMap::new(),
89        }
90    }
91
92    /// Open an archive file
93    pub fn open<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
94        if self.input_mode {
95            let file = File::open(path).context("Failed to open archive for reading")?;
96            self.reader = Some(BufReader::new(file.try_clone()?));
97            self.file = Some(file);
98            self.deserialize()?;
99        } else {
100            let file = File::create(path).context("Failed to create archive for writing")?;
101            // Use 4MB buffer to batch writes and minimize syscalls
102            // Default 8KB buffer causes ~200 syscalls for typical archives
103            self.writer = Some(BufWriter::with_capacity(4 * 1024 * 1024, file.try_clone()?));
104            self.file = Some(file);
105        }
106        self.f_offset = 0;
107        Ok(())
108    }
109
110    /// Close the archive (writes footer in write mode)
111    pub fn close(&mut self) -> Result<()> {
112        if !self.input_mode {
113            if let Some(ref mut writer) = self.writer {
114                writer.flush()?;
115            }
116            self.serialize()?;
117        }
118
119        self.reader = None;
120        self.writer = None;
121        self.file = None;
122        Ok(())
123    }
124
125    /// Register a new stream and return its ID
126    pub fn register_stream(&mut self, stream_name: &str) -> usize {
127        // Check if already registered
128        if let Some(&id) = self.stream_map.get(stream_name) {
129            return id;
130        }
131
132        let id = self.streams.len();
133        self.streams.push(Stream::new(stream_name.to_string()));
134        self.stream_map.insert(stream_name.to_string(), id);
135        id
136    }
137
138    /// Get stream ID by name (returns None if not found)
139    pub fn get_stream_id(&self, stream_name: &str) -> Option<usize> {
140        self.stream_map.get(stream_name).copied()
141    }
142
143    /// Get list of all stream names
144    pub fn get_stream_names(&self) -> Vec<String> {
145        self.streams.iter().map(|s| s.stream_name.clone()).collect()
146    }
147
148    /// Add a part to a stream
149    pub fn add_part(&mut self, stream_id: usize, data: &[u8], metadata: u64) -> Result<()> {
150        if stream_id >= self.streams.len() {
151            anyhow::bail!("Invalid stream ID: {stream_id}");
152        }
153
154        let writer = self
155            .writer
156            .as_mut()
157            .context("Archive not open for writing")?;
158
159        // Record part offset (before writing anything)
160        let part_offset = self.f_offset;
161
162        // Write metadata as varint
163        let mut metadata_buf = Vec::new();
164        write_varint(&mut metadata_buf, metadata)?;
165        writer.write_all(&metadata_buf)?;
166        self.f_offset += metadata_buf.len() as u64;
167
168        // Write data
169        writer.write_all(data)?;
170        self.f_offset += data.len() as u64;
171
172        // Record part (size is only the data size, not including metadata)
173        self.streams[stream_id]
174            .parts
175            .push(Part::new(part_offset, data.len() as u64));
176
177        // packed_size includes both metadata and data
178        let total_size = self.f_offset - part_offset;
179        self.streams[stream_id].packed_size += total_size;
180        self.streams[stream_id].packed_data_size += data.len() as u64;
181
182        Ok(())
183    }
184
185    /// Buffer a part for later writing (C++ AGC: AddPartBuffered)
186    /// This allows parallel compression while maintaining deterministic write order.
187    /// Call flush_buffers() after all parts are buffered to write in sorted stream_id order.
188    pub fn add_part_buffered(&mut self, stream_id: usize, data: Vec<u8>, metadata: u64) {
189        self.write_buffer
190            .entry(stream_id)
191            .or_default()
192            .push((data, metadata));
193    }
194
195    /// Flush all buffered parts to disk in sorted stream_id order (C++ AGC: flush_out_buffers)
196    /// This ensures deterministic output regardless of which thread buffered the data.
197    pub fn flush_buffers(&mut self) -> Result<()> {
198        // BTreeMap iterates in sorted key order (stream_id)
199        // This matches C++ AGC's flush_out_buffers() which iterates m_buffer (std::map)
200        let buffer = std::mem::take(&mut self.write_buffer);
201        for (stream_id, parts) in buffer {
202            for (data, metadata) in parts {
203                self.add_part(stream_id, &data, metadata)?;
204            }
205        }
206        Ok(())
207    }
208
209    /// Set raw (uncompressed) size for a stream
210    pub fn set_raw_size(&mut self, stream_id: usize, raw_size: u64) {
211        if stream_id < self.streams.len() {
212            self.streams[stream_id].raw_size = raw_size;
213        }
214    }
215
216    /// Get raw size for a stream
217    pub fn get_raw_size(&self, stream_id: usize) -> u64 {
218        if stream_id < self.streams.len() {
219            self.streams[stream_id].raw_size
220        } else {
221            0
222        }
223    }
224
225    /// Get packed (compressed) size for a stream
226    pub fn get_packed_size(&self, stream_id: usize) -> u64 {
227        if stream_id < self.streams.len() {
228            self.streams[stream_id].packed_size
229        } else {
230            0
231        }
232    }
233
234    /// Get packed data size (compressed data only, without metadata)
235    pub fn get_packed_data_size(&self, stream_id: usize) -> u64 {
236        if stream_id < self.streams.len() {
237            self.streams[stream_id].packed_data_size
238        } else {
239            0
240        }
241    }
242
243    /// Get stream name by ID
244    pub fn get_stream_name(&self, stream_id: usize) -> Option<&str> {
245        if stream_id < self.streams.len() {
246            Some(&self.streams[stream_id].stream_name)
247        } else {
248            None
249        }
250    }
251
252    /// Get number of streams
253    pub fn get_num_streams(&self) -> usize {
254        self.streams.len()
255    }
256
257    /// Get number of parts in a stream
258    pub fn get_num_parts(&self, stream_id: usize) -> usize {
259        if stream_id < self.streams.len() {
260            self.streams[stream_id].parts.len()
261        } else {
262            0
263        }
264    }
265
266    /// Get the next part from a stream (sequential reading)
267    pub fn get_part(&mut self, stream_id: usize) -> Result<Option<(Vec<u8>, u64)>> {
268        if stream_id >= self.streams.len() {
269            anyhow::bail!("Invalid stream ID: {stream_id}");
270        }
271
272        let stream = &mut self.streams[stream_id];
273        if stream.cur_id >= stream.parts.len() {
274            return Ok(None); // No more parts
275        }
276
277        let part = stream.parts[stream.cur_id].clone();
278        stream.cur_id += 1;
279
280        self.read_part_data(&part)
281    }
282
283    /// Get a specific part by ID from a stream (random access)
284    pub fn get_part_by_id(&mut self, stream_id: usize, part_id: usize) -> Result<(Vec<u8>, u64)> {
285        if stream_id >= self.streams.len() {
286            anyhow::bail!("Invalid stream ID: {stream_id}");
287        }
288
289        let stream = &self.streams[stream_id];
290        if part_id >= stream.parts.len() {
291            anyhow::bail!("Invalid part ID: {part_id}");
292        }
293
294        let part = stream.parts[part_id].clone();
295        self.read_part_data(&part)
296            .map(|opt| opt.expect("Part should exist"))
297    }
298
299    /// Read part data from file
300    fn read_part_data(&mut self, part: &Part) -> Result<Option<(Vec<u8>, u64)>> {
301        if part.size == 0 {
302            return Ok(Some((Vec::new(), 0)));
303        }
304
305        let reader = self
306            .reader
307            .as_mut()
308            .context("Archive not open for reading")?;
309
310        // Seek to part offset
311        reader.seek(SeekFrom::Start(part.offset))?;
312
313        // Read metadata
314        let (metadata, _) = read_varint(reader)?;
315
316        // Read data (part.size is the data size, not including metadata)
317        let mut data = vec![0u8; part.size as usize];
318        reader.read_exact(&mut data)?;
319
320        Ok(Some((data, metadata)))
321    }
322
323    /// Serialize footer to file (write mode)
324    fn serialize(&mut self) -> Result<()> {
325        let writer = self
326            .writer
327            .as_mut()
328            .context("Archive not open for writing")?;
329
330        let mut footer = Vec::new();
331
332        // Write number of streams
333        write_varint(&mut footer, self.streams.len() as u64)?;
334
335        // Write each stream's metadata
336        for stream in &mut self.streams {
337            // Stream name (null-terminated string)
338            footer.extend_from_slice(stream.stream_name.as_bytes());
339            footer.push(0);
340
341            // Number of parts
342            write_varint(&mut footer, stream.parts.len() as u64)?;
343
344            // Raw size
345            write_varint(&mut footer, stream.raw_size)?;
346
347            // Part offsets and sizes
348            for part in &stream.parts {
349                write_varint(&mut footer, part.offset)?;
350                write_varint(&mut footer, part.size)?;
351            }
352
353            // Update packed size to include footer overhead
354            stream.packed_size += footer.len() as u64;
355        }
356
357        // Write footer
358        writer.write_all(&footer)?;
359
360        // Write footer size as fixed 8-byte value
361        let footer_size = footer.len() as u64;
362        writer.write_all(&footer_size.to_le_bytes())?;
363
364        writer.flush()?;
365        Ok(())
366    }
367
368    /// Deserialize footer from file (read mode)
369    fn deserialize(&mut self) -> Result<()> {
370        let file = self.file.as_mut().context("Archive not open")?;
371
372        // Get file size
373        let file_size = file.metadata()?.len();
374
375        // Read footer size (last 8 bytes)
376        file.seek(SeekFrom::End(-8))?;
377        let mut footer_size_bytes = [0u8; 8];
378        file.read_exact(&mut footer_size_bytes)?;
379        let footer_size = u64::from_le_bytes(footer_size_bytes);
380
381        // Seek to start of footer
382        file.seek(SeekFrom::Start(file_size - 8 - footer_size))?;
383
384        // Read footer into buffer
385        let mut footer = vec![0u8; footer_size as usize];
386        file.read_exact(&mut footer)?;
387
388        // Parse footer
389        let mut cursor = std::io::Cursor::new(&footer);
390
391        // Read number of streams
392        let (num_streams, _) = read_varint(&mut cursor)?;
393
394        self.streams.clear();
395        self.stream_map.clear();
396
397        for i in 0..num_streams {
398            // Read stream name (null-terminated)
399            let mut stream_name = String::new();
400            loop {
401                let mut byte = [0u8; 1];
402                cursor.read_exact(&mut byte)?;
403                if byte[0] == 0 {
404                    break;
405                }
406                stream_name.push(byte[0] as char);
407            }
408
409            // Read number of parts
410            let (num_parts, _) = read_varint(&mut cursor)?;
411
412            // Read raw size
413            let (raw_size, _) = read_varint(&mut cursor)?;
414
415            // Create stream
416            let mut stream = Stream::new(stream_name.clone());
417            stream.raw_size = raw_size;
418
419            // Read parts
420            for _ in 0..num_parts {
421                let (offset, _) = read_varint(&mut cursor)?;
422                let (size, _) = read_varint(&mut cursor)?;
423                stream.parts.push(Part::new(offset, size));
424            }
425
426            // Reset cur_id for reading
427            stream.cur_id = 0;
428
429            self.streams.push(stream);
430            self.stream_map.insert(stream_name, i as usize);
431        }
432
433        // Seek back to beginning for reading parts
434        file.seek(SeekFrom::Start(0))?;
435        if let Some(ref mut reader) = self.reader {
436            reader.seek(SeekFrom::Start(0))?;
437        }
438
439        Ok(())
440    }
441}
442
443impl Drop for Archive {
444    fn drop(&mut self) {
445        let _ = self.close();
446    }
447}
448
449#[cfg(test)]
450mod tests {
451    use super::*;
452    use std::fs;
453
454    #[test]
455    fn test_archive_write_read() {
456        let path = "test_archive.agc";
457
458        // Write
459        {
460            let mut archive = Archive::new_writer();
461            archive.open(path).unwrap();
462
463            let stream_id = archive.register_stream("test_stream");
464            archive.add_part(stream_id, b"Hello", 42).unwrap();
465            archive.add_part(stream_id, b"World", 99).unwrap();
466            archive.set_raw_size(stream_id, 100);
467
468            archive.close().unwrap();
469        }
470
471        // Read
472        {
473            let mut archive = Archive::new_reader();
474            archive.open(path).unwrap();
475
476            let stream_id = archive.get_stream_id("test_stream").unwrap();
477            assert_eq!(archive.get_num_parts(stream_id), 2);
478            assert_eq!(archive.get_raw_size(stream_id), 100);
479
480            let (data1, meta1) = archive.get_part(stream_id).unwrap().unwrap();
481            assert_eq!(data1, b"Hello");
482            assert_eq!(meta1, 42);
483
484            let (data2, meta2) = archive.get_part(stream_id).unwrap().unwrap();
485            assert_eq!(data2, b"World");
486            assert_eq!(meta2, 99);
487
488            assert!(archive.get_part(stream_id).unwrap().is_none());
489        }
490
491        fs::remove_file(path).unwrap();
492    }
493
494    #[test]
495    fn test_multiple_streams() {
496        let path = "test_multi_stream.agc";
497
498        // Write
499        {
500            let mut archive = Archive::new_writer();
501            archive.open(path).unwrap();
502
503            let stream1 = archive.register_stream("stream1");
504            let stream2 = archive.register_stream("stream2");
505
506            archive.add_part(stream1, b"Data1", 1).unwrap();
507            archive.add_part(stream2, b"Data2", 2).unwrap();
508            archive.add_part(stream1, b"Data3", 3).unwrap();
509
510            archive.close().unwrap();
511        }
512
513        // Read
514        {
515            let mut archive = Archive::new_reader();
516            archive.open(path).unwrap();
517
518            let stream1 = archive.get_stream_id("stream1").unwrap();
519            let stream2 = archive.get_stream_id("stream2").unwrap();
520
521            assert_eq!(archive.get_num_parts(stream1), 2);
522            assert_eq!(archive.get_num_parts(stream2), 1);
523
524            let (data, meta) = archive.get_part_by_id(stream1, 0).unwrap();
525            assert_eq!(data, b"Data1");
526            assert_eq!(meta, 1);
527
528            let (data, meta) = archive.get_part_by_id(stream2, 0).unwrap();
529            assert_eq!(data, b"Data2");
530            assert_eq!(meta, 2);
531        }
532
533        fs::remove_file(path).unwrap();
534    }
535}