1use crate::block::Block;
35use crate::cid::{Cid, SerializableCid};
36use crate::compression::CompressionAlgorithm;
37use crate::error::{Error, Result};
38use bytes::Bytes;
39use std::io::{Read, Write};
40
41const CAR_VERSION: u64 = 1;
43
44const MAX_VARINT_SIZE: usize = 10;
46
47#[derive(Debug, Clone)]
49pub struct CarHeader {
50 pub version: u64,
52 pub roots: Vec<Cid>,
54}
55
56impl CarHeader {
57 pub fn new(roots: Vec<Cid>) -> Self {
74 Self {
75 version: CAR_VERSION,
76 roots,
77 }
78 }
79
80 fn encode(&self) -> Result<Bytes> {
82 use crate::ipld::Ipld;
83 use std::collections::BTreeMap;
84
85 let mut map = BTreeMap::new();
86 map.insert("version".to_string(), Ipld::Integer(self.version as i128));
87
88 let roots: Vec<Ipld> = self
89 .roots
90 .iter()
91 .map(|cid| Ipld::Link(SerializableCid(*cid)))
92 .collect();
93 map.insert("roots".to_string(), Ipld::List(roots));
94
95 let ipld = Ipld::Map(map);
96 ipld.to_dag_cbor().map(Bytes::from)
97 }
98
99 fn decode(data: &[u8]) -> Result<Self> {
101 use crate::ipld::Ipld;
102
103 let ipld = Ipld::from_dag_cbor(data)?;
104
105 let map = match ipld {
106 Ipld::Map(m) => m,
107 _ => {
108 return Err(Error::Deserialization(
109 "CAR header must be a map".to_string(),
110 ))
111 }
112 };
113
114 let version = match map.get("version") {
115 Some(Ipld::Integer(v)) => *v as u64,
116 _ => {
117 return Err(Error::Deserialization(
118 "CAR header missing version".to_string(),
119 ))
120 }
121 };
122
123 if version != CAR_VERSION {
124 return Err(Error::Deserialization(format!(
125 "Unsupported CAR version: {}",
126 version
127 )));
128 }
129
130 let roots = match map.get("roots") {
131 Some(Ipld::List(list)) => list
132 .iter()
133 .map(|item| match item {
134 Ipld::Link(SerializableCid(cid)) => Ok(*cid),
135 _ => Err(Error::Deserialization(
136 "Invalid root CID in header".to_string(),
137 )),
138 })
139 .collect::<Result<Vec<Cid>>>()?,
140 _ => {
141 return Err(Error::Deserialization(
142 "CAR header missing roots".to_string(),
143 ))
144 }
145 };
146
147 Ok(Self { version, roots })
148 }
149}
150
151#[derive(Debug, Clone, Default)]
153pub struct CarCompressionStats {
154 pub blocks_processed: usize,
156 pub uncompressed_bytes: usize,
158 pub compressed_bytes: usize,
160 pub blocks_compressed: usize,
162}
163
164impl CarCompressionStats {
165 pub fn new() -> Self {
167 Self::default()
168 }
169
170 pub fn compression_ratio(&self) -> f64 {
174 if self.uncompressed_bytes == 0 {
175 1.0
176 } else {
177 self.compressed_bytes as f64 / self.uncompressed_bytes as f64
178 }
179 }
180
181 pub fn bytes_saved(&self) -> usize {
183 self.uncompressed_bytes
184 .saturating_sub(self.compressed_bytes)
185 }
186
187 pub fn compression_percentage(&self) -> f64 {
189 if self.uncompressed_bytes == 0 {
190 0.0
191 } else {
192 (self.bytes_saved() as f64 / self.uncompressed_bytes as f64) * 100.0
193 }
194 }
195}
196
197pub struct CarWriterBuilder {
212 roots: Vec<Cid>,
213 compression: Option<(CompressionAlgorithm, i32)>,
214}
215
216impl CarWriterBuilder {
217 pub fn new(roots: Vec<Cid>) -> Self {
219 Self {
220 roots,
221 compression: None,
222 }
223 }
224
225 pub fn with_compression(mut self, algorithm: CompressionAlgorithm, level: i32) -> Self {
232 self.compression = Some((algorithm, level));
233 self
234 }
235
236 pub fn build<W: Write>(self, writer: W) -> Result<CarWriter<W>> {
238 CarWriter::new_with_options(writer, self.roots, self.compression)
239 }
240}
241
242pub struct CarWriter<W: Write> {
247 writer: W,
248 header_written: bool,
249 compression: Option<(CompressionAlgorithm, i32)>,
250 stats: CarCompressionStats,
251}
252
253impl<W: Write> CarWriter<W> {
254 pub fn new(writer: W, roots: Vec<Cid>) -> Result<Self> {
273 Self::new_with_options(writer, roots, None)
274 }
275
276 fn new_with_options(
286 writer: W,
287 roots: Vec<Cid>,
288 compression: Option<(CompressionAlgorithm, i32)>,
289 ) -> Result<Self> {
290 let mut car_writer = Self {
291 writer,
292 header_written: false,
293 compression,
294 stats: CarCompressionStats::new(),
295 };
296 car_writer.write_header(&CarHeader::new(roots))?;
297 Ok(car_writer)
298 }
299
300 fn write_header(&mut self, header: &CarHeader) -> Result<()> {
302 let header_bytes = header.encode()?;
303 let header_len = header_bytes.len();
304
305 write_varint(&mut self.writer, header_len as u64)?;
307
308 self.writer.write_all(&header_bytes)?;
310
311 self.header_written = true;
312 Ok(())
313 }
314
315 pub fn write_block(&mut self, block: &Block) -> Result<()> {
335 if !self.header_written {
336 return Err(Error::InvalidData("CAR header not written".to_string()));
337 }
338
339 let cid_bytes = block.cid().to_bytes();
341 let data = block.data();
342
343 self.stats.blocks_processed += 1;
345 self.stats.uncompressed_bytes += data.len();
346
347 let (final_data, was_compressed) = if let Some((algorithm, level)) = self.compression {
349 let compressed = crate::compression::compress(data, algorithm, level as u8)?;
350 let is_compressed = algorithm != CompressionAlgorithm::None;
352 if is_compressed {
353 self.stats.blocks_compressed += 1;
354 }
355 self.stats.compressed_bytes += compressed.len();
356 (compressed, is_compressed)
357 } else {
358 self.stats.compressed_bytes += data.len();
359 (data.clone(), false)
360 };
361
362 let total_len = cid_bytes.len() + 1 + final_data.len();
364
365 write_varint(&mut self.writer, total_len as u64)?;
367
368 self.writer.write_all(&cid_bytes)?;
370
371 self.writer.write_all(&[was_compressed as u8])?;
373
374 self.writer.write_all(&final_data)?;
376
377 Ok(())
378 }
379
380 pub fn stats(&self) -> &CarCompressionStats {
399 &self.stats
400 }
401
402 pub fn finish(mut self) -> Result<()> {
404 self.writer.flush()?;
405 Ok(())
406 }
407}
408
409pub struct CarReader<R: Read> {
413 reader: R,
414 header: CarHeader,
415}
416
417impl<R: Read> CarReader<R> {
418 pub fn new(mut reader: R) -> Result<Self> {
442 let header_len = read_varint(&mut reader)?;
444
445 let mut header_bytes = vec![0u8; header_len as usize];
447 reader.read_exact(&mut header_bytes)?;
448
449 let header = CarHeader::decode(&header_bytes)?;
451
452 Ok(Self { reader, header })
453 }
454
455 pub fn roots(&self) -> &[Cid] {
457 &self.header.roots
458 }
459
460 pub fn read_block(&mut self) -> Result<Option<Block>> {
482 let total_len = match read_varint_opt(&mut self.reader) {
484 Ok(Some(len)) => len,
485 Ok(None) => return Ok(None), Err(e) => return Err(e),
487 };
488
489 let mut block_bytes = vec![0u8; total_len as usize];
491 self.reader.read_exact(&mut block_bytes)?;
492
493 let mut cursor = &block_bytes[..];
494
495 let cid = Cid::read_bytes(&mut cursor)
497 .map_err(|e| Error::Cid(format!("Failed to parse CID: {}", e)))?;
498
499 let (is_compressed, data_start) = if !cursor.is_empty() {
501 let flag = cursor[0];
502 if flag == 0 || flag == 1 {
503 (flag == 1, 1)
505 } else {
506 (false, 0)
508 }
509 } else {
510 return Err(Error::Deserialization("Empty block data".to_string()));
511 };
512
513 let raw_data = &cursor[data_start..];
515
516 let final_data = if is_compressed {
518 let raw_bytes = Bytes::from(raw_data.to_vec());
519 crate::compression::decompress(&raw_bytes, CompressionAlgorithm::Zstd).or_else(
522 |_| crate::compression::decompress(&raw_bytes, CompressionAlgorithm::Lz4),
523 )?
524 } else {
525 Bytes::from(raw_data.to_vec())
526 };
527
528 let block = Block::new(final_data)?;
530
531 if block.cid() != &cid {
533 return Err(Error::InvalidData(format!(
534 "Block CID mismatch: expected {}, got {}",
535 cid,
536 block.cid()
537 )));
538 }
539
540 Ok(Some(block))
541 }
542
543 pub fn read_all_blocks(&mut self) -> Result<Vec<Block>> {
564 let mut blocks = Vec::new();
565
566 while let Some(block) = self.read_block()? {
567 blocks.push(block);
568 }
569
570 Ok(blocks)
571 }
572}
573
574fn write_varint<W: Write>(writer: &mut W, mut value: u64) -> Result<()> {
576 let mut buf = [0u8; MAX_VARINT_SIZE];
577 let mut i = 0;
578
579 loop {
580 let mut byte = (value & 0x7F) as u8;
581 value >>= 7;
582
583 if value != 0 {
584 byte |= 0x80;
585 }
586
587 buf[i] = byte;
588 i += 1;
589
590 if value == 0 {
591 break;
592 }
593 }
594
595 writer.write_all(&buf[..i])?;
596 Ok(())
597}
598
599fn read_varint<R: Read>(reader: &mut R) -> Result<u64> {
601 read_varint_opt(reader)?
602 .ok_or_else(|| Error::Deserialization("Unexpected EOF reading varint".to_string()))
603}
604
605fn read_varint_opt<R: Read>(reader: &mut R) -> Result<Option<u64>> {
607 let mut result = 0u64;
608 let mut shift = 0;
609 let mut buf = [0u8; 1];
610
611 for _ in 0..MAX_VARINT_SIZE {
612 match reader.read_exact(&mut buf) {
613 Ok(()) => {}
614 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof && shift == 0 => {
615 return Ok(None); }
617 Err(e) => return Err(Error::from(e)),
618 }
619
620 let byte = buf[0];
621 result |= ((byte & 0x7F) as u64) << shift;
622
623 if byte & 0x80 == 0 {
624 return Ok(Some(result));
625 }
626
627 shift += 7;
628
629 if shift >= 64 {
630 return Err(Error::Deserialization("Varint too large".to_string()));
631 }
632 }
633
634 Err(Error::Deserialization(
635 "Varint exceeds maximum size".to_string(),
636 ))
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642 use crate::block::Block;
643 use bytes::Bytes;
644
645 #[test]
646 fn test_car_header_encode_decode() {
647 use crate::cid::CidBuilder;
648
649 let cid1 = CidBuilder::new().build(b"test1").unwrap();
650 let cid2 = CidBuilder::new().build(b"test2").unwrap();
651
652 let header = CarHeader::new(vec![cid1, cid2]);
653 let encoded = header.encode().unwrap();
654 let decoded = CarHeader::decode(&encoded).unwrap();
655
656 assert_eq!(decoded.version, 1);
657 assert_eq!(decoded.roots.len(), 2);
658 assert_eq!(decoded.roots[0], cid1);
659 assert_eq!(decoded.roots[1], cid2);
660 }
661
662 #[test]
663 fn test_car_write_read() {
664 let block1 = Block::new(Bytes::from_static(b"Hello, CAR!")).unwrap();
665 let block2 = Block::new(Bytes::from_static(b"CAR format test")).unwrap();
666
667 let mut car_data = Vec::new();
669 let mut writer = CarWriter::new(&mut car_data, vec![*block1.cid()]).unwrap();
670 writer.write_block(&block1).unwrap();
671 writer.write_block(&block2).unwrap();
672 writer.finish().unwrap();
673
674 let mut reader = CarReader::new(&car_data[..]).unwrap();
676
677 assert_eq!(reader.roots().len(), 1);
678 assert_eq!(reader.roots()[0], *block1.cid());
679
680 let read_block1 = reader.read_block().unwrap().unwrap();
681 assert_eq!(read_block1.cid(), block1.cid());
682 assert_eq!(read_block1.data(), block1.data());
683
684 let read_block2 = reader.read_block().unwrap().unwrap();
685 assert_eq!(read_block2.cid(), block2.cid());
686 assert_eq!(read_block2.data(), block2.data());
687
688 assert!(reader.read_block().unwrap().is_none());
689 }
690
691 #[test]
692 fn test_car_read_all_blocks() {
693 let blocks: Vec<Block> = (0..5)
694 .map(|i| Block::new(Bytes::from(format!("Block {}", i))).unwrap())
695 .collect();
696
697 let mut car_data = Vec::new();
698 let mut writer = CarWriter::new(&mut car_data, vec![*blocks[0].cid()]).unwrap();
699
700 for block in &blocks {
701 writer.write_block(block).unwrap();
702 }
703 writer.finish().unwrap();
704
705 let mut reader = CarReader::new(&car_data[..]).unwrap();
706 let read_blocks = reader.read_all_blocks().unwrap();
707
708 assert_eq!(read_blocks.len(), blocks.len());
709
710 for (i, block) in read_blocks.iter().enumerate() {
711 assert_eq!(block.cid(), blocks[i].cid());
712 assert_eq!(block.data(), blocks[i].data());
713 }
714 }
715
716 #[test]
717 fn test_varint_roundtrip() {
718 let test_values = vec![0, 1, 127, 128, 255, 256, 65535, 65536, u64::MAX];
719
720 for value in test_values {
721 let mut buf = Vec::new();
722 write_varint(&mut buf, value).unwrap();
723
724 let mut cursor = &buf[..];
725 let decoded = read_varint(&mut cursor).unwrap();
726
727 assert_eq!(decoded, value);
728 }
729 }
730
731 #[test]
732 fn test_car_empty_roots() {
733 let block = Block::new(Bytes::from_static(b"test")).unwrap();
734
735 let mut car_data = Vec::new();
736 let mut writer = CarWriter::new(&mut car_data, vec![]).unwrap();
737 writer.write_block(&block).unwrap();
738 writer.finish().unwrap();
739
740 let reader = CarReader::new(&car_data[..]).unwrap();
741 assert_eq!(reader.roots().len(), 0);
742 }
743
744 #[test]
745 fn test_car_multiple_roots() {
746 use crate::cid::CidBuilder;
747
748 let cid1 = CidBuilder::new().build(b"root1").unwrap();
749 let cid2 = CidBuilder::new().build(b"root2").unwrap();
750 let cid3 = CidBuilder::new().build(b"root3").unwrap();
751
752 let block = Block::new(Bytes::from_static(b"data")).unwrap();
753
754 let mut car_data = Vec::new();
755 let mut writer = CarWriter::new(&mut car_data, vec![cid1, cid2, cid3]).unwrap();
756 writer.write_block(&block).unwrap();
757 writer.finish().unwrap();
758
759 let reader = CarReader::new(&car_data[..]).unwrap();
760 let roots = reader.roots();
761 assert_eq!(roots.len(), 3);
762 assert_eq!(roots[0], cid1);
763 assert_eq!(roots[1], cid2);
764 assert_eq!(roots[2], cid3);
765 }
766
767 #[test]
768 fn test_car_large_blocks() {
769 let large_data = vec![0x42u8; 1_000_000]; let block = Block::new(Bytes::from(large_data.clone())).unwrap();
772
773 let mut car_data = Vec::new();
774 let mut writer = CarWriter::new(&mut car_data, vec![*block.cid()]).unwrap();
775 writer.write_block(&block).unwrap();
776 writer.finish().unwrap();
777
778 let mut reader = CarReader::new(&car_data[..]).unwrap();
779 let read_block = reader.read_block().unwrap().unwrap();
780
781 assert_eq!(read_block.cid(), block.cid());
782 assert_eq!(read_block.data().len(), large_data.len());
783 }
784
785 #[test]
786 fn test_car_compression_zstd() {
787 use crate::compression::CompressionAlgorithm;
788
789 let block1 = Block::new(Bytes::from(vec![0x42u8; 1000])).unwrap();
790 let block2 = Block::new(Bytes::from(vec![0xAAu8; 2000])).unwrap();
791
792 let mut car_data = Vec::new();
794 let mut writer = CarWriterBuilder::new(vec![*block1.cid()])
795 .with_compression(CompressionAlgorithm::Zstd, 3)
796 .build(&mut car_data)
797 .unwrap();
798
799 writer.write_block(&block1).unwrap();
800 writer.write_block(&block2).unwrap();
801
802 let stats = writer.stats();
803 assert_eq!(stats.blocks_processed, 2);
804 assert_eq!(stats.blocks_compressed, 2);
805 assert_eq!(stats.uncompressed_bytes, 3000);
806 assert!(stats.compressed_bytes < stats.uncompressed_bytes);
807 assert!(stats.compression_ratio() < 1.0);
808
809 writer.finish().unwrap();
810
811 let mut reader = CarReader::new(&car_data[..]).unwrap();
813 let read_block1 = reader.read_block().unwrap().unwrap();
814 let read_block2 = reader.read_block().unwrap().unwrap();
815
816 assert_eq!(read_block1.cid(), block1.cid());
817 assert_eq!(read_block1.data(), block1.data());
818 assert_eq!(read_block2.cid(), block2.cid());
819 assert_eq!(read_block2.data(), block2.data());
820 }
821
822 #[test]
823 fn test_car_compression_lz4() {
824 use crate::compression::CompressionAlgorithm;
825
826 let block = Block::new(Bytes::from(vec![0x11u8; 5000])).unwrap();
827
828 let mut car_data = Vec::new();
830 let mut writer = CarWriterBuilder::new(vec![*block.cid()])
831 .with_compression(CompressionAlgorithm::Lz4, 1)
832 .build(&mut car_data)
833 .unwrap();
834
835 writer.write_block(&block).unwrap();
836
837 let stats = writer.stats();
838 assert_eq!(stats.blocks_processed, 1);
839 assert_eq!(stats.blocks_compressed, 1);
840 assert!(stats.compressed_bytes < stats.uncompressed_bytes);
841
842 writer.finish().unwrap();
843
844 let mut reader = CarReader::new(&car_data[..]).unwrap();
846 let read_block = reader.read_block().unwrap().unwrap();
847
848 assert_eq!(read_block.cid(), block.cid());
849 assert_eq!(read_block.data(), block.data());
850 }
851
852 #[test]
853 fn test_car_compression_none() {
854 use crate::compression::CompressionAlgorithm;
855
856 let block = Block::new(Bytes::from_static(b"test data")).unwrap();
857
858 let mut car_data = Vec::new();
860 let mut writer = CarWriterBuilder::new(vec![*block.cid()])
861 .with_compression(CompressionAlgorithm::None, 0)
862 .build(&mut car_data)
863 .unwrap();
864
865 writer.write_block(&block).unwrap();
866
867 let stats = writer.stats();
868 assert_eq!(stats.blocks_processed, 1);
869 assert_eq!(stats.blocks_compressed, 0); assert_eq!(stats.uncompressed_bytes, stats.compressed_bytes);
871 assert_eq!(stats.compression_ratio(), 1.0);
872
873 writer.finish().unwrap();
874
875 let mut reader = CarReader::new(&car_data[..]).unwrap();
877 let read_block = reader.read_block().unwrap().unwrap();
878
879 assert_eq!(read_block.cid(), block.cid());
880 assert_eq!(read_block.data(), block.data());
881 }
882
883 #[test]
884 fn test_car_compression_stats() {
885 use crate::compression::CompressionAlgorithm;
886
887 let blocks: Vec<Block> = (0..10)
888 .map(|_| Block::new(Bytes::from(vec![0x42u8; 500])).unwrap())
889 .collect();
890
891 let mut car_data = Vec::new();
892 let mut writer = CarWriterBuilder::new(vec![*blocks[0].cid()])
893 .with_compression(CompressionAlgorithm::Zstd, 5)
894 .build(&mut car_data)
895 .unwrap();
896
897 for block in &blocks {
898 writer.write_block(block).unwrap();
899 }
900
901 let stats = writer.stats();
902 assert_eq!(stats.blocks_processed, 10);
903 assert_eq!(stats.blocks_compressed, 10);
904 assert_eq!(stats.uncompressed_bytes, 5000);
905 assert!(stats.bytes_saved() > 0);
906 assert!(stats.compression_percentage() > 0.0);
907
908 writer.finish().unwrap();
909 }
910
911 #[test]
912 fn test_car_mixed_compression_backward_compat() {
913 let block = Block::new(Bytes::from_static(b"legacy data")).unwrap();
915
916 let mut car_data = Vec::new();
918 let mut writer = CarWriter::new(&mut car_data, vec![*block.cid()]).unwrap();
919 writer.write_block(&block).unwrap();
920 writer.finish().unwrap();
921
922 let mut reader = CarReader::new(&car_data[..]).unwrap();
924 let read_block = reader.read_block().unwrap().unwrap();
925
926 assert_eq!(read_block.cid(), block.cid());
927 assert_eq!(read_block.data(), block.data());
928 }
929
930 #[test]
931 fn test_car_compression_large_file() {
932 use crate::compression::CompressionAlgorithm;
933
934 let large_block = Block::new(Bytes::from(vec![0x55u8; 100_000])).unwrap();
936
937 let mut car_data = Vec::new();
938 let mut writer = CarWriterBuilder::new(vec![*large_block.cid()])
939 .with_compression(CompressionAlgorithm::Zstd, 6)
940 .build(&mut car_data)
941 .unwrap();
942
943 writer.write_block(&large_block).unwrap();
944
945 let stats = writer.stats();
946 assert_eq!(stats.uncompressed_bytes, 100_000);
947 assert!(stats.compressed_bytes < 1_000);
949 assert!(stats.compression_ratio() < 0.01);
950 assert!(stats.compression_percentage() > 99.0);
951
952 writer.finish().unwrap();
953
954 let mut reader = CarReader::new(&car_data[..]).unwrap();
956 let read_block = reader.read_block().unwrap().unwrap();
957
958 assert_eq!(read_block.cid(), large_block.cid());
959 assert_eq!(read_block.data().len(), 100_000);
960 }
961
962 #[test]
963 fn test_car_builder_without_compression() {
964 let block = Block::new(Bytes::from_static(b"test")).unwrap();
965
966 let mut car_data = Vec::new();
968 let mut writer = CarWriterBuilder::new(vec![*block.cid()])
969 .build(&mut car_data)
970 .unwrap();
971
972 writer.write_block(&block).unwrap();
973 writer.finish().unwrap();
974
975 let mut reader = CarReader::new(&car_data[..]).unwrap();
976 let read_block = reader.read_block().unwrap().unwrap();
977
978 assert_eq!(read_block.cid(), block.cid());
979 assert_eq!(read_block.data(), block.data());
980 }
981}