1use crate::varint::{read_varint, write_varint};
5use anyhow::{Context, Result};
6use std::collections::{BTreeMap, HashMap};
7use std::fs::File;
8use std::io::{BufReader, BufWriter, Read, Seek, SeekFrom, Write};
9use std::path::Path;
10
11#[derive(Debug, Clone)]
13struct Part {
14 offset: u64,
15 size: u64,
16}
17
18impl Part {
19 fn new(offset: u64, size: u64) -> Self {
20 Part { offset, size }
21 }
22}
23
24#[derive(Debug)]
26struct Stream {
27 stream_name: String,
28 cur_id: usize,
29 raw_size: u64,
30 packed_size: u64,
31 packed_data_size: u64,
32 parts: Vec<Part>,
33}
34
35impl Stream {
36 fn new(stream_name: String) -> Self {
37 Stream {
38 stream_name,
39 cur_id: 0,
40 raw_size: 0,
41 packed_size: 0,
42 packed_data_size: 0,
43 parts: Vec::new(),
44 }
45 }
46}
47
48pub struct Archive {
50 input_mode: bool,
51 file: Option<File>,
52 reader: Option<BufReader<File>>,
53 writer: Option<BufWriter<File>>,
54 f_offset: u64,
55 streams: Vec<Stream>,
56 stream_map: HashMap<String, usize>,
57 write_buffer: BTreeMap<usize, Vec<(Vec<u8>, u64)>>,
61}
62
63impl Archive {
64 pub fn new_reader() -> Self {
66 Archive {
67 input_mode: true,
68 file: None,
69 reader: None,
70 writer: None,
71 f_offset: 0,
72 streams: Vec::new(),
73 stream_map: HashMap::new(),
74 write_buffer: BTreeMap::new(),
75 }
76 }
77
78 pub fn new_writer() -> Self {
80 Archive {
81 input_mode: false,
82 file: None,
83 reader: None,
84 writer: None,
85 f_offset: 0,
86 streams: Vec::new(),
87 stream_map: HashMap::new(),
88 write_buffer: BTreeMap::new(),
89 }
90 }
91
92 pub fn open<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
94 if self.input_mode {
95 let file = File::open(path).context("Failed to open archive for reading")?;
96 self.reader = Some(BufReader::new(file.try_clone()?));
97 self.file = Some(file);
98 self.deserialize()?;
99 } else {
100 let file = File::create(path).context("Failed to create archive for writing")?;
101 self.writer = Some(BufWriter::with_capacity(4 * 1024 * 1024, file.try_clone()?));
104 self.file = Some(file);
105 }
106 self.f_offset = 0;
107 Ok(())
108 }
109
110 pub fn close(&mut self) -> Result<()> {
112 if !self.input_mode {
113 if let Some(ref mut writer) = self.writer {
114 writer.flush()?;
115 }
116 self.serialize()?;
117 }
118
119 self.reader = None;
120 self.writer = None;
121 self.file = None;
122 Ok(())
123 }
124
125 pub fn register_stream(&mut self, stream_name: &str) -> usize {
127 if let Some(&id) = self.stream_map.get(stream_name) {
129 return id;
130 }
131
132 let id = self.streams.len();
133 self.streams.push(Stream::new(stream_name.to_string()));
134 self.stream_map.insert(stream_name.to_string(), id);
135 id
136 }
137
138 pub fn get_stream_id(&self, stream_name: &str) -> Option<usize> {
140 self.stream_map.get(stream_name).copied()
141 }
142
143 pub fn get_stream_names(&self) -> Vec<String> {
145 self.streams.iter().map(|s| s.stream_name.clone()).collect()
146 }
147
148 pub fn add_part(&mut self, stream_id: usize, data: &[u8], metadata: u64) -> Result<()> {
150 if stream_id >= self.streams.len() {
151 anyhow::bail!("Invalid stream ID: {stream_id}");
152 }
153
154 let writer = self
155 .writer
156 .as_mut()
157 .context("Archive not open for writing")?;
158
159 let part_offset = self.f_offset;
161
162 let mut metadata_buf = Vec::new();
164 write_varint(&mut metadata_buf, metadata)?;
165 writer.write_all(&metadata_buf)?;
166 self.f_offset += metadata_buf.len() as u64;
167
168 writer.write_all(data)?;
170 self.f_offset += data.len() as u64;
171
172 self.streams[stream_id]
174 .parts
175 .push(Part::new(part_offset, data.len() as u64));
176
177 let total_size = self.f_offset - part_offset;
179 self.streams[stream_id].packed_size += total_size;
180 self.streams[stream_id].packed_data_size += data.len() as u64;
181
182 Ok(())
183 }
184
185 pub fn add_part_buffered(&mut self, stream_id: usize, data: Vec<u8>, metadata: u64) {
189 self.write_buffer
190 .entry(stream_id)
191 .or_default()
192 .push((data, metadata));
193 }
194
195 pub fn flush_buffers(&mut self) -> Result<()> {
198 let buffer = std::mem::take(&mut self.write_buffer);
201 for (stream_id, parts) in buffer {
202 for (data, metadata) in parts {
203 self.add_part(stream_id, &data, metadata)?;
204 }
205 }
206 Ok(())
207 }
208
209 pub fn set_raw_size(&mut self, stream_id: usize, raw_size: u64) {
211 if stream_id < self.streams.len() {
212 self.streams[stream_id].raw_size = raw_size;
213 }
214 }
215
216 pub fn get_raw_size(&self, stream_id: usize) -> u64 {
218 if stream_id < self.streams.len() {
219 self.streams[stream_id].raw_size
220 } else {
221 0
222 }
223 }
224
225 pub fn get_packed_size(&self, stream_id: usize) -> u64 {
227 if stream_id < self.streams.len() {
228 self.streams[stream_id].packed_size
229 } else {
230 0
231 }
232 }
233
234 pub fn get_packed_data_size(&self, stream_id: usize) -> u64 {
236 if stream_id < self.streams.len() {
237 self.streams[stream_id].packed_data_size
238 } else {
239 0
240 }
241 }
242
243 pub fn get_stream_name(&self, stream_id: usize) -> Option<&str> {
245 if stream_id < self.streams.len() {
246 Some(&self.streams[stream_id].stream_name)
247 } else {
248 None
249 }
250 }
251
252 pub fn get_num_streams(&self) -> usize {
254 self.streams.len()
255 }
256
257 pub fn get_num_parts(&self, stream_id: usize) -> usize {
259 if stream_id < self.streams.len() {
260 self.streams[stream_id].parts.len()
261 } else {
262 0
263 }
264 }
265
266 pub fn get_part(&mut self, stream_id: usize) -> Result<Option<(Vec<u8>, u64)>> {
268 if stream_id >= self.streams.len() {
269 anyhow::bail!("Invalid stream ID: {stream_id}");
270 }
271
272 let stream = &mut self.streams[stream_id];
273 if stream.cur_id >= stream.parts.len() {
274 return Ok(None); }
276
277 let part = stream.parts[stream.cur_id].clone();
278 stream.cur_id += 1;
279
280 self.read_part_data(&part)
281 }
282
283 pub fn get_part_by_id(&mut self, stream_id: usize, part_id: usize) -> Result<(Vec<u8>, u64)> {
285 if stream_id >= self.streams.len() {
286 anyhow::bail!("Invalid stream ID: {stream_id}");
287 }
288
289 let stream = &self.streams[stream_id];
290 if part_id >= stream.parts.len() {
291 anyhow::bail!("Invalid part ID: {part_id}");
292 }
293
294 let part = stream.parts[part_id].clone();
295 self.read_part_data(&part)
296 .map(|opt| opt.expect("Part should exist"))
297 }
298
299 fn read_part_data(&mut self, part: &Part) -> Result<Option<(Vec<u8>, u64)>> {
301 if part.size == 0 {
302 return Ok(Some((Vec::new(), 0)));
303 }
304
305 let reader = self
306 .reader
307 .as_mut()
308 .context("Archive not open for reading")?;
309
310 reader.seek(SeekFrom::Start(part.offset))?;
312
313 let (metadata, _) = read_varint(reader)?;
315
316 let mut data = vec![0u8; part.size as usize];
318 reader.read_exact(&mut data)?;
319
320 Ok(Some((data, metadata)))
321 }
322
323 fn serialize(&mut self) -> Result<()> {
325 let writer = self
326 .writer
327 .as_mut()
328 .context("Archive not open for writing")?;
329
330 let mut footer = Vec::new();
331
332 write_varint(&mut footer, self.streams.len() as u64)?;
334
335 for stream in &mut self.streams {
337 footer.extend_from_slice(stream.stream_name.as_bytes());
339 footer.push(0);
340
341 write_varint(&mut footer, stream.parts.len() as u64)?;
343
344 write_varint(&mut footer, stream.raw_size)?;
346
347 for part in &stream.parts {
349 write_varint(&mut footer, part.offset)?;
350 write_varint(&mut footer, part.size)?;
351 }
352
353 stream.packed_size += footer.len() as u64;
355 }
356
357 writer.write_all(&footer)?;
359
360 let footer_size = footer.len() as u64;
362 writer.write_all(&footer_size.to_le_bytes())?;
363
364 writer.flush()?;
365 Ok(())
366 }
367
368 fn deserialize(&mut self) -> Result<()> {
370 let file = self.file.as_mut().context("Archive not open")?;
371
372 let file_size = file.metadata()?.len();
374
375 file.seek(SeekFrom::End(-8))?;
377 let mut footer_size_bytes = [0u8; 8];
378 file.read_exact(&mut footer_size_bytes)?;
379 let footer_size = u64::from_le_bytes(footer_size_bytes);
380
381 file.seek(SeekFrom::Start(file_size - 8 - footer_size))?;
383
384 let mut footer = vec![0u8; footer_size as usize];
386 file.read_exact(&mut footer)?;
387
388 let mut cursor = std::io::Cursor::new(&footer);
390
391 let (num_streams, _) = read_varint(&mut cursor)?;
393
394 self.streams.clear();
395 self.stream_map.clear();
396
397 for i in 0..num_streams {
398 let mut stream_name = String::new();
400 loop {
401 let mut byte = [0u8; 1];
402 cursor.read_exact(&mut byte)?;
403 if byte[0] == 0 {
404 break;
405 }
406 stream_name.push(byte[0] as char);
407 }
408
409 let (num_parts, _) = read_varint(&mut cursor)?;
411
412 let (raw_size, _) = read_varint(&mut cursor)?;
414
415 let mut stream = Stream::new(stream_name.clone());
417 stream.raw_size = raw_size;
418
419 for _ in 0..num_parts {
421 let (offset, _) = read_varint(&mut cursor)?;
422 let (size, _) = read_varint(&mut cursor)?;
423 stream.parts.push(Part::new(offset, size));
424 }
425
426 stream.cur_id = 0;
428
429 self.streams.push(stream);
430 self.stream_map.insert(stream_name, i as usize);
431 }
432
433 file.seek(SeekFrom::Start(0))?;
435 if let Some(ref mut reader) = self.reader {
436 reader.seek(SeekFrom::Start(0))?;
437 }
438
439 Ok(())
440 }
441}
442
443impl Drop for Archive {
444 fn drop(&mut self) {
445 let _ = self.close();
446 }
447}
448
449#[cfg(test)]
450mod tests {
451 use super::*;
452 use std::fs;
453
454 #[test]
455 fn test_archive_write_read() {
456 let path = "test_archive.agc";
457
458 {
460 let mut archive = Archive::new_writer();
461 archive.open(path).unwrap();
462
463 let stream_id = archive.register_stream("test_stream");
464 archive.add_part(stream_id, b"Hello", 42).unwrap();
465 archive.add_part(stream_id, b"World", 99).unwrap();
466 archive.set_raw_size(stream_id, 100);
467
468 archive.close().unwrap();
469 }
470
471 {
473 let mut archive = Archive::new_reader();
474 archive.open(path).unwrap();
475
476 let stream_id = archive.get_stream_id("test_stream").unwrap();
477 assert_eq!(archive.get_num_parts(stream_id), 2);
478 assert_eq!(archive.get_raw_size(stream_id), 100);
479
480 let (data1, meta1) = archive.get_part(stream_id).unwrap().unwrap();
481 assert_eq!(data1, b"Hello");
482 assert_eq!(meta1, 42);
483
484 let (data2, meta2) = archive.get_part(stream_id).unwrap().unwrap();
485 assert_eq!(data2, b"World");
486 assert_eq!(meta2, 99);
487
488 assert!(archive.get_part(stream_id).unwrap().is_none());
489 }
490
491 fs::remove_file(path).unwrap();
492 }
493
494 #[test]
495 fn test_multiple_streams() {
496 let path = "test_multi_stream.agc";
497
498 {
500 let mut archive = Archive::new_writer();
501 archive.open(path).unwrap();
502
503 let stream1 = archive.register_stream("stream1");
504 let stream2 = archive.register_stream("stream2");
505
506 archive.add_part(stream1, b"Data1", 1).unwrap();
507 archive.add_part(stream2, b"Data2", 2).unwrap();
508 archive.add_part(stream1, b"Data3", 3).unwrap();
509
510 archive.close().unwrap();
511 }
512
513 {
515 let mut archive = Archive::new_reader();
516 archive.open(path).unwrap();
517
518 let stream1 = archive.get_stream_id("stream1").unwrap();
519 let stream2 = archive.get_stream_id("stream2").unwrap();
520
521 assert_eq!(archive.get_num_parts(stream1), 2);
522 assert_eq!(archive.get_num_parts(stream2), 1);
523
524 let (data, meta) = archive.get_part_by_id(stream1, 0).unwrap();
525 assert_eq!(data, b"Data1");
526 assert_eq!(meta, 1);
527
528 let (data, meta) = archive.get_part_by_id(stream2, 0).unwrap();
529 assert_eq!(data, b"Data2");
530 assert_eq!(meta, 2);
531 }
532
533 fs::remove_file(path).unwrap();
534 }
535}