ragc_common/
archive.rs

1// Archive I/O
2// Custom binary format for storing compressed genome data in streams/parts
3
4use crate::varint::{read_varint, write_varint};
5use anyhow::{Context, Result};
6use std::collections::HashMap;
7use std::fs::File;
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11/// A part within a stream (offset and size in file)
12#[derive(Debug, Clone)]
13struct Part {
14    offset: u64,
15    size: u64,
16}
17
18impl Part {
19    fn new(offset: u64, size: u64) -> Self {
20        Part { offset, size }
21    }
22}
23
24/// A stream containing multiple parts
25#[derive(Debug)]
26struct Stream {
27    stream_name: String,
28    cur_id: usize,
29    raw_size: u64,
30    packed_size: u64,
31    packed_data_size: u64,
32    parts: Vec<Part>,
33}
34
35impl Stream {
36    fn new(stream_name: String) -> Self {
37        Stream {
38            stream_name,
39            cur_id: 0,
40            raw_size: 0,
41            packed_size: 0,
42            packed_data_size: 0,
43            parts: Vec::new(),
44        }
45    }
46}
47
48/// Archive for reading/writing AGC data
49pub struct Archive {
50    input_mode: bool,
51    file: Option<File>,
52    reader: Option<BufReader<File>>,
53    writer: Option<BufWriter<File>>,
54    f_offset: u64,
55    streams: Vec<Stream>,
56    stream_map: HashMap<String, usize>,
57}
58
59impl Archive {
60    /// Create a new archive in input (read) mode
61    pub fn new_reader() -> Self {
62        Archive {
63            input_mode: true,
64            file: None,
65            reader: None,
66            writer: None,
67            f_offset: 0,
68            streams: Vec::new(),
69            stream_map: HashMap::new(),
70        }
71    }
72
73    /// Create a new archive in output (write) mode
74    pub fn new_writer() -> Self {
75        Archive {
76            input_mode: false,
77            file: None,
78            reader: None,
79            writer: None,
80            f_offset: 0,
81            streams: Vec::new(),
82            stream_map: HashMap::new(),
83        }
84    }
85
86    /// Open an archive file
87    pub fn open<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
88        if self.input_mode {
89            let file = File::open(path).context("Failed to open archive for reading")?;
90            self.reader = Some(BufReader::new(file.try_clone()?));
91            self.file = Some(file);
92            self.deserialize()?;
93        } else {
94            let file = File::create(path).context("Failed to create archive for writing")?;
95            self.writer = Some(BufWriter::new(file.try_clone()?));
96            self.file = Some(file);
97        }
98        self.f_offset = 0;
99        Ok(())
100    }
101
102    /// Close the archive (writes footer in write mode)
103    pub fn close(&mut self) -> Result<()> {
104        if !self.input_mode {
105            if let Some(ref mut writer) = self.writer {
106                writer.flush()?;
107            }
108            self.serialize()?;
109        }
110
111        self.reader = None;
112        self.writer = None;
113        self.file = None;
114        Ok(())
115    }
116
117    /// Register a new stream and return its ID
118    pub fn register_stream(&mut self, stream_name: &str) -> usize {
119        // Check if already registered
120        if let Some(&id) = self.stream_map.get(stream_name) {
121            return id;
122        }
123
124        let id = self.streams.len();
125        self.streams.push(Stream::new(stream_name.to_string()));
126        self.stream_map.insert(stream_name.to_string(), id);
127        id
128    }
129
130    /// Get stream ID by name (returns None if not found)
131    pub fn get_stream_id(&self, stream_name: &str) -> Option<usize> {
132        self.stream_map.get(stream_name).copied()
133    }
134
135    /// Get list of all stream names
136    pub fn get_stream_names(&self) -> Vec<String> {
137        self.streams.iter().map(|s| s.stream_name.clone()).collect()
138    }
139
140    /// Add a part to a stream
141    pub fn add_part(&mut self, stream_id: usize, data: &[u8], metadata: u64) -> Result<()> {
142        if stream_id >= self.streams.len() {
143            anyhow::bail!("Invalid stream ID: {stream_id}");
144        }
145
146        let writer = self
147            .writer
148            .as_mut()
149            .context("Archive not open for writing")?;
150
151        // Record part offset (before writing anything)
152        let part_offset = self.f_offset;
153
154        // Write metadata as varint
155        let mut metadata_buf = Vec::new();
156        write_varint(&mut metadata_buf, metadata)?;
157        writer.write_all(&metadata_buf)?;
158        self.f_offset += metadata_buf.len() as u64;
159
160        // Write data
161        writer.write_all(data)?;
162        self.f_offset += data.len() as u64;
163
164        // Record part (size is only the data size, not including metadata)
165        self.streams[stream_id]
166            .parts
167            .push(Part::new(part_offset, data.len() as u64));
168
169        // packed_size includes both metadata and data
170        let total_size = self.f_offset - part_offset;
171        self.streams[stream_id].packed_size += total_size;
172        self.streams[stream_id].packed_data_size += data.len() as u64;
173
174        Ok(())
175    }
176
177    /// Set raw (uncompressed) size for a stream
178    pub fn set_raw_size(&mut self, stream_id: usize, raw_size: u64) {
179        if stream_id < self.streams.len() {
180            self.streams[stream_id].raw_size = raw_size;
181        }
182    }
183
184    /// Get raw size for a stream
185    pub fn get_raw_size(&self, stream_id: usize) -> u64 {
186        if stream_id < self.streams.len() {
187            self.streams[stream_id].raw_size
188        } else {
189            0
190        }
191    }
192
193    /// Get number of streams
194    pub fn get_num_streams(&self) -> usize {
195        self.streams.len()
196    }
197
198    /// Get number of parts in a stream
199    pub fn get_num_parts(&self, stream_id: usize) -> usize {
200        if stream_id < self.streams.len() {
201            self.streams[stream_id].parts.len()
202        } else {
203            0
204        }
205    }
206
207    /// Get the next part from a stream (sequential reading)
208    pub fn get_part(&mut self, stream_id: usize) -> Result<Option<(Vec<u8>, u64)>> {
209        if stream_id >= self.streams.len() {
210            anyhow::bail!("Invalid stream ID: {stream_id}");
211        }
212
213        let stream = &mut self.streams[stream_id];
214        if stream.cur_id >= stream.parts.len() {
215            return Ok(None); // No more parts
216        }
217
218        let part = stream.parts[stream.cur_id].clone();
219        stream.cur_id += 1;
220
221        self.read_part_data(&part)
222    }
223
224    /// Get a specific part by ID from a stream (random access)
225    pub fn get_part_by_id(&mut self, stream_id: usize, part_id: usize) -> Result<(Vec<u8>, u64)> {
226        if stream_id >= self.streams.len() {
227            anyhow::bail!("Invalid stream ID: {stream_id}");
228        }
229
230        let stream = &self.streams[stream_id];
231        if part_id >= stream.parts.len() {
232            anyhow::bail!("Invalid part ID: {part_id}");
233        }
234
235        let part = stream.parts[part_id].clone();
236        self.read_part_data(&part)
237            .map(|opt| opt.expect("Part should exist"))
238    }
239
240    /// Read part data from file
241    fn read_part_data(&mut self, part: &Part) -> Result<Option<(Vec<u8>, u64)>> {
242        if part.size == 0 {
243            return Ok(Some((Vec::new(), 0)));
244        }
245
246        let reader = self
247            .reader
248            .as_mut()
249            .context("Archive not open for reading")?;
250
251        // Seek to part offset
252        reader.seek(SeekFrom::Start(part.offset))?;
253
254        // Read metadata
255        let (metadata, _) = read_varint(reader)?;
256
257        // Read data (part.size is the data size, not including metadata)
258        let mut data = vec![0u8; part.size as usize];
259        reader.read_exact(&mut data)?;
260
261        Ok(Some((data, metadata)))
262    }
263
264    /// Serialize footer to file (write mode)
265    fn serialize(&mut self) -> Result<()> {
266        let writer = self
267            .writer
268            .as_mut()
269            .context("Archive not open for writing")?;
270
271        let mut footer = Vec::new();
272
273        // Write number of streams
274        write_varint(&mut footer, self.streams.len() as u64)?;
275
276        // Write each stream's metadata
277        for stream in &mut self.streams {
278            // Stream name (null-terminated string)
279            footer.extend_from_slice(stream.stream_name.as_bytes());
280            footer.push(0);
281
282            // Number of parts
283            write_varint(&mut footer, stream.parts.len() as u64)?;
284
285            // Raw size
286            write_varint(&mut footer, stream.raw_size)?;
287
288            // Part offsets and sizes
289            for part in &stream.parts {
290                write_varint(&mut footer, part.offset)?;
291                write_varint(&mut footer, part.size)?;
292            }
293
294            // Update packed size to include footer overhead
295            stream.packed_size += footer.len() as u64;
296        }
297
298        // Write footer
299        writer.write_all(&footer)?;
300
301        // Write footer size as fixed 8-byte value
302        let footer_size = footer.len() as u64;
303        writer.write_all(&footer_size.to_le_bytes())?;
304
305        writer.flush()?;
306        Ok(())
307    }
308
309    /// Deserialize footer from file (read mode)
310    fn deserialize(&mut self) -> Result<()> {
311        let file = self.file.as_mut().context("Archive not open")?;
312
313        // Get file size
314        let file_size = file.metadata()?.len();
315
316        // Read footer size (last 8 bytes)
317        file.seek(SeekFrom::End(-8))?;
318        let mut footer_size_bytes = [0u8; 8];
319        file.read_exact(&mut footer_size_bytes)?;
320        let footer_size = u64::from_le_bytes(footer_size_bytes);
321
322        // Seek to start of footer
323        file.seek(SeekFrom::Start(file_size - 8 - footer_size))?;
324
325        // Read footer into buffer
326        let mut footer = vec![0u8; footer_size as usize];
327        file.read_exact(&mut footer)?;
328
329        // Parse footer
330        let mut cursor = std::io::Cursor::new(&footer);
331
332        // Read number of streams
333        let (num_streams, _) = read_varint(&mut cursor)?;
334
335        self.streams.clear();
336        self.stream_map.clear();
337
338        for i in 0..num_streams {
339            // Read stream name (null-terminated)
340            let mut stream_name = String::new();
341            loop {
342                let mut byte = [0u8; 1];
343                cursor.read_exact(&mut byte)?;
344                if byte[0] == 0 {
345                    break;
346                }
347                stream_name.push(byte[0] as char);
348            }
349
350            // Read number of parts
351            let (num_parts, _) = read_varint(&mut cursor)?;
352
353            // Read raw size
354            let (raw_size, _) = read_varint(&mut cursor)?;
355
356            // Create stream
357            let mut stream = Stream::new(stream_name.clone());
358            stream.raw_size = raw_size;
359
360            // Read parts
361            for _ in 0..num_parts {
362                let (offset, _) = read_varint(&mut cursor)?;
363                let (size, _) = read_varint(&mut cursor)?;
364                stream.parts.push(Part::new(offset, size));
365            }
366
367            // Reset cur_id for reading
368            stream.cur_id = 0;
369
370            self.streams.push(stream);
371            self.stream_map.insert(stream_name, i as usize);
372        }
373
374        // Seek back to beginning for reading parts
375        file.seek(SeekFrom::Start(0))?;
376        if let Some(ref mut reader) = self.reader {
377            reader.seek(SeekFrom::Start(0))?;
378        }
379
380        Ok(())
381    }
382}
383
384impl Drop for Archive {
385    fn drop(&mut self) {
386        let _ = self.close();
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use std::fs;
394
395    #[test]
396    fn test_archive_write_read() {
397        let path = "test_archive.agc";
398
399        // Write
400        {
401            let mut archive = Archive::new_writer();
402            archive.open(path).unwrap();
403
404            let stream_id = archive.register_stream("test_stream");
405            archive.add_part(stream_id, b"Hello", 42).unwrap();
406            archive.add_part(stream_id, b"World", 99).unwrap();
407            archive.set_raw_size(stream_id, 100);
408
409            archive.close().unwrap();
410        }
411
412        // Read
413        {
414            let mut archive = Archive::new_reader();
415            archive.open(path).unwrap();
416
417            let stream_id = archive.get_stream_id("test_stream").unwrap();
418            assert_eq!(archive.get_num_parts(stream_id), 2);
419            assert_eq!(archive.get_raw_size(stream_id), 100);
420
421            let (data1, meta1) = archive.get_part(stream_id).unwrap().unwrap();
422            assert_eq!(data1, b"Hello");
423            assert_eq!(meta1, 42);
424
425            let (data2, meta2) = archive.get_part(stream_id).unwrap().unwrap();
426            assert_eq!(data2, b"World");
427            assert_eq!(meta2, 99);
428
429            assert!(archive.get_part(stream_id).unwrap().is_none());
430        }
431
432        fs::remove_file(path).unwrap();
433    }
434
435    #[test]
436    fn test_multiple_streams() {
437        let path = "test_multi_stream.agc";
438
439        // Write
440        {
441            let mut archive = Archive::new_writer();
442            archive.open(path).unwrap();
443
444            let stream1 = archive.register_stream("stream1");
445            let stream2 = archive.register_stream("stream2");
446
447            archive.add_part(stream1, b"Data1", 1).unwrap();
448            archive.add_part(stream2, b"Data2", 2).unwrap();
449            archive.add_part(stream1, b"Data3", 3).unwrap();
450
451            archive.close().unwrap();
452        }
453
454        // Read
455        {
456            let mut archive = Archive::new_reader();
457            archive.open(path).unwrap();
458
459            let stream1 = archive.get_stream_id("stream1").unwrap();
460            let stream2 = archive.get_stream_id("stream2").unwrap();
461
462            assert_eq!(archive.get_num_parts(stream1), 2);
463            assert_eq!(archive.get_num_parts(stream2), 1);
464
465            let (data, meta) = archive.get_part_by_id(stream1, 0).unwrap();
466            assert_eq!(data, b"Data1");
467            assert_eq!(meta, 1);
468
469            let (data, meta) = archive.get_part_by_id(stream2, 0).unwrap();
470            assert_eq!(data, b"Data2");
471            assert_eq!(meta, 2);
472        }
473
474        fs::remove_file(path).unwrap();
475    }
476}