ipfrs_storage/
car.rs

1//! CAR (Content Addressable aRchive) format support.
2//!
3//! CAR files are the standard format for packaging content-addressed data.
4//! They contain a header with root CIDs followed by a series of blocks.
5//!
6//! # Format
7//!
8//! ```text
9//! | Header (CBOR) | Block 1 | Block 2 | ... | Block N |
10//! ```
11//!
12//! Each block is prefixed with a varint length and contains:
13//! - CID bytes
14//! - Block data
15//!
16//! # Example
17//!
18//! ```rust,ignore
19//! use ipfrs_storage::car::{CarWriter, CarReader};
20//!
21//! // Export blocks to CAR file
22//! let mut writer = CarWriter::new(&path, roots).await?;
23//! for block in blocks {
24//!     writer.write_block(&block).await?;
25//! }
26//! writer.finish().await?;
27//!
28//! // Import blocks from CAR file
29//! let mut reader = CarReader::open(&path).await?;
30//! while let Some(block) = reader.read_block().await? {
31//!     store.put(&block).await?;
32//! }
33//! ```
34
35use crate::traits::BlockStore;
36use bytes::Bytes;
37use ipfrs_core::{Block, Cid, Error, Result};
38use std::path::Path;
39use tokio::fs::File;
40use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
41
42/// CAR file version
43pub const CAR_VERSION: u64 = 1;
44
45/// CAR file header
46#[derive(Debug, Clone)]
47pub struct CarHeader {
48    /// CAR version (currently 1)
49    pub version: u64,
50    /// Root CIDs
51    pub roots: Vec<Cid>,
52}
53
54impl CarHeader {
55    /// Create a new CAR header
56    pub fn new(roots: Vec<Cid>) -> Self {
57        Self {
58            version: CAR_VERSION,
59            roots,
60        }
61    }
62
63    /// Encode header to CBOR bytes
64    pub fn to_cbor(&self) -> Result<Vec<u8>> {
65        // Simple CBOR encoding for header
66        // { "version": 1, "roots": [<cid bytes>...] }
67        let mut buf = Vec::new();
68
69        // Map with 2 entries
70        buf.push(0xa2); // map(2)
71
72        // Key: "version"
73        buf.push(0x67); // text(7)
74        buf.extend_from_slice(b"version");
75        // Value: version number
76        buf.push(0x01); // unsigned(1)
77
78        // Key: "roots"
79        buf.push(0x65); // text(5)
80        buf.extend_from_slice(b"roots");
81
82        // Value: array of CIDs
83        let roots_len = self.roots.len();
84        if roots_len < 24 {
85            buf.push(0x80 | roots_len as u8); // array(n)
86        } else if roots_len < 256 {
87            buf.push(0x98); // array(1 byte)
88            buf.push(roots_len as u8);
89        } else {
90            return Err(Error::InvalidData("Too many roots".to_string()));
91        }
92
93        for root in &self.roots {
94            let cid_bytes = root.to_bytes();
95            // CID as byte string (tag 42 for CID)
96            buf.push(0xd8); // tag(42)
97            buf.push(0x2a);
98            // Byte string
99            if cid_bytes.len() < 24 {
100                buf.push(0x40 | cid_bytes.len() as u8);
101            } else if cid_bytes.len() < 256 {
102                buf.push(0x58);
103                buf.push(cid_bytes.len() as u8);
104            } else {
105                buf.push(0x59);
106                buf.extend_from_slice(&(cid_bytes.len() as u16).to_be_bytes());
107            }
108            buf.extend_from_slice(&cid_bytes);
109        }
110
111        Ok(buf)
112    }
113
114    /// Decode header from CBOR bytes
115    pub fn from_cbor(data: &[u8]) -> Result<Self> {
116        // Simple CBOR parsing - just enough for CAR headers
117
118        // Expect map
119        if data.is_empty() || (data[0] & 0xe0) != 0xa0 {
120            return Err(Error::InvalidData("Expected CBOR map".to_string()));
121        }
122
123        let (map_len, mut pos) = if data[0] == 0xa2 {
124            (2, 1)
125        } else {
126            return Err(Error::InvalidData("Expected map(2)".to_string()));
127        };
128
129        let mut version = 1u64;
130        let mut roots = Vec::new();
131
132        for _ in 0..map_len {
133            // Read key (text string)
134            let (key, new_pos) = read_cbor_text(&data[pos..])?;
135            pos += new_pos;
136
137            match key.as_str() {
138                "version" => {
139                    // Read unsigned int
140                    if pos >= data.len() {
141                        return Err(Error::InvalidData("Unexpected end".to_string()));
142                    }
143                    let (v, new_pos) = read_cbor_uint(&data[pos..])?;
144                    version = v;
145                    pos += new_pos;
146                }
147                "roots" => {
148                    // Read array of CIDs
149                    let (r, new_pos) = read_cbor_roots(&data[pos..])?;
150                    roots = r;
151                    pos += new_pos;
152                }
153                _ => {
154                    // Skip unknown keys
155                    let new_pos = skip_cbor_value(&data[pos..])?;
156                    pos += new_pos;
157                }
158            }
159        }
160
161        Ok(Self { version, roots })
162    }
163}
164
165/// Read CBOR text string
166fn read_cbor_text(data: &[u8]) -> Result<(String, usize)> {
167    if data.is_empty() {
168        return Err(Error::InvalidData("Unexpected end".to_string()));
169    }
170
171    let major = data[0] >> 5;
172    if major != 3 {
173        // text string
174        return Err(Error::InvalidData("Expected text string".to_string()));
175    }
176
177    let (len, header_len) = read_cbor_len(data)?;
178    let total_len = header_len + len;
179
180    if data.len() < total_len {
181        return Err(Error::InvalidData("Text string too short".to_string()));
182    }
183
184    let text = String::from_utf8(data[header_len..total_len].to_vec())
185        .map_err(|e| Error::InvalidData(format!("Invalid UTF-8: {e}")))?;
186
187    Ok((text, total_len))
188}
189
190/// Read CBOR unsigned int
191fn read_cbor_uint(data: &[u8]) -> Result<(u64, usize)> {
192    if data.is_empty() {
193        return Err(Error::InvalidData("Unexpected end".to_string()));
194    }
195
196    let major = data[0] >> 5;
197    if major != 0 {
198        return Err(Error::InvalidData("Expected unsigned int".to_string()));
199    }
200
201    let (val, len) = read_cbor_len(data)?;
202    Ok((val as u64, len))
203}
204
205/// Read CBOR length prefix
206fn read_cbor_len(data: &[u8]) -> Result<(usize, usize)> {
207    if data.is_empty() {
208        return Err(Error::InvalidData("Unexpected end".to_string()));
209    }
210
211    let additional = data[0] & 0x1f;
212
213    match additional {
214        0..=23 => Ok((additional as usize, 1)),
215        24 => {
216            if data.len() < 2 {
217                return Err(Error::InvalidData("Length too short".to_string()));
218            }
219            Ok((data[1] as usize, 2))
220        }
221        25 => {
222            if data.len() < 3 {
223                return Err(Error::InvalidData("Length too short".to_string()));
224            }
225            Ok((u16::from_be_bytes([data[1], data[2]]) as usize, 3))
226        }
227        26 => {
228            if data.len() < 5 {
229                return Err(Error::InvalidData("Length too short".to_string()));
230            }
231            Ok((
232                u32::from_be_bytes([data[1], data[2], data[3], data[4]]) as usize,
233                5,
234            ))
235        }
236        _ => Err(Error::InvalidData(
237            "Unsupported length encoding".to_string(),
238        )),
239    }
240}
241
242/// Read array of CIDs
243fn read_cbor_roots(data: &[u8]) -> Result<(Vec<Cid>, usize)> {
244    if data.is_empty() {
245        return Err(Error::InvalidData("Unexpected end".to_string()));
246    }
247
248    let major = data[0] >> 5;
249    if major != 4 {
250        // array
251        return Err(Error::InvalidData("Expected array".to_string()));
252    }
253
254    let (arr_len, header_len) = read_cbor_len(data)?;
255    let mut pos = header_len;
256    let mut roots = Vec::with_capacity(arr_len);
257
258    for _ in 0..arr_len {
259        // Skip tag if present (tag 42 for CID)
260        if pos < data.len() && data[pos] == 0xd8 {
261            pos += 2; // Skip tag
262        }
263
264        // Read byte string
265        if pos >= data.len() {
266            return Err(Error::InvalidData("Unexpected end in roots".to_string()));
267        }
268
269        let major = data[pos] >> 5;
270        if major != 2 {
271            // byte string
272            return Err(Error::InvalidData(
273                "Expected byte string for CID".to_string(),
274            ));
275        }
276
277        let (len, header) = read_cbor_len(&data[pos..])?;
278        pos += header;
279
280        if pos + len > data.len() {
281            return Err(Error::InvalidData("CID bytes too short".to_string()));
282        }
283
284        let cid = Cid::try_from(data[pos..pos + len].to_vec())
285            .map_err(|e| Error::Cid(format!("Invalid CID: {e}")))?;
286        roots.push(cid);
287        pos += len;
288    }
289
290    Ok((roots, pos))
291}
292
293/// Skip a CBOR value
294fn skip_cbor_value(data: &[u8]) -> Result<usize> {
295    if data.is_empty() {
296        return Err(Error::InvalidData("Unexpected end".to_string()));
297    }
298
299    let major = data[0] >> 5;
300    let (len, header_len) = read_cbor_len(data)?;
301
302    match major {
303        0 | 1 => Ok(header_len),       // unsigned/negative int
304        2 | 3 => Ok(header_len + len), // byte/text string
305        4 => {
306            // array
307            let mut pos = header_len;
308            for _ in 0..len {
309                pos += skip_cbor_value(&data[pos..])?;
310            }
311            Ok(pos)
312        }
313        5 => {
314            // map
315            let mut pos = header_len;
316            for _ in 0..len {
317                pos += skip_cbor_value(&data[pos..])?; // key
318                pos += skip_cbor_value(&data[pos..])?; // value
319            }
320            Ok(pos)
321        }
322        6 => {
323            // tag
324            Ok(header_len + skip_cbor_value(&data[header_len..])?)
325        }
326        7 => Ok(header_len), // simple/float
327        _ => Err(Error::InvalidData("Unknown CBOR major type".to_string())),
328    }
329}
330
331/// Encode unsigned varint
332fn encode_varint(mut value: u64) -> Vec<u8> {
333    let mut buf = Vec::new();
334    while value >= 0x80 {
335        buf.push((value as u8) | 0x80);
336        value >>= 7;
337    }
338    buf.push(value as u8);
339    buf
340}
341
342/// Decode unsigned varint
343fn decode_varint(data: &[u8]) -> Result<(u64, usize)> {
344    let mut result: u64 = 0;
345    let mut shift = 0;
346
347    for (i, &byte) in data.iter().enumerate() {
348        result |= ((byte & 0x7f) as u64) << shift;
349        if byte & 0x80 == 0 {
350            return Ok((result, i + 1));
351        }
352        shift += 7;
353        if shift >= 64 {
354            return Err(Error::InvalidData("Varint too long".to_string()));
355        }
356    }
357
358    Err(Error::InvalidData("Incomplete varint".to_string()))
359}
360
361/// CAR file writer
362pub struct CarWriter {
363    writer: BufWriter<File>,
364    blocks_written: u64,
365    bytes_written: u64,
366}
367
368impl CarWriter {
369    /// Create a new CAR writer
370    pub async fn create(path: &Path, roots: Vec<Cid>) -> Result<Self> {
371        let file = File::create(path)
372            .await
373            .map_err(|e| Error::Storage(format!("Failed to create CAR file: {e}")))?;
374
375        let mut writer = BufWriter::new(file);
376
377        // Write header
378        let header = CarHeader::new(roots);
379        let header_bytes = header.to_cbor()?;
380        let header_len = encode_varint(header_bytes.len() as u64);
381
382        writer
383            .write_all(&header_len)
384            .await
385            .map_err(|e| Error::Storage(format!("Failed to write header length: {e}")))?;
386        writer
387            .write_all(&header_bytes)
388            .await
389            .map_err(|e| Error::Storage(format!("Failed to write header: {e}")))?;
390
391        let bytes_written = (header_len.len() + header_bytes.len()) as u64;
392
393        Ok(Self {
394            writer,
395            blocks_written: 0,
396            bytes_written,
397        })
398    }
399
400    /// Write a block to the CAR file
401    pub async fn write_block(&mut self, block: &Block) -> Result<()> {
402        let cid_bytes = block.cid().to_bytes();
403        let data = block.data();
404
405        // Block format: varint(cid_len + data_len) | cid | data
406        let block_len = cid_bytes.len() + data.len();
407        let len_bytes = encode_varint(block_len as u64);
408
409        self.writer
410            .write_all(&len_bytes)
411            .await
412            .map_err(|e| Error::Storage(format!("Failed to write block length: {e}")))?;
413        self.writer
414            .write_all(&cid_bytes)
415            .await
416            .map_err(|e| Error::Storage(format!("Failed to write CID: {e}")))?;
417        self.writer
418            .write_all(data)
419            .await
420            .map_err(|e| Error::Storage(format!("Failed to write block data: {e}")))?;
421
422        self.blocks_written += 1;
423        self.bytes_written += (len_bytes.len() + block_len) as u64;
424
425        Ok(())
426    }
427
428    /// Finish writing and close the file
429    pub async fn finish(mut self) -> Result<CarWriteStats> {
430        self.writer
431            .flush()
432            .await
433            .map_err(|e| Error::Storage(format!("Failed to flush CAR file: {e}")))?;
434
435        Ok(CarWriteStats {
436            blocks_written: self.blocks_written,
437            bytes_written: self.bytes_written,
438        })
439    }
440
441    /// Get current statistics
442    pub fn stats(&self) -> CarWriteStats {
443        CarWriteStats {
444            blocks_written: self.blocks_written,
445            bytes_written: self.bytes_written,
446        }
447    }
448}
449
450/// Statistics from CAR writing
451#[derive(Debug, Clone)]
452pub struct CarWriteStats {
453    pub blocks_written: u64,
454    pub bytes_written: u64,
455}
456
457/// CAR file reader
458pub struct CarReader {
459    reader: BufReader<File>,
460    header: CarHeader,
461    blocks_read: u64,
462    bytes_read: u64,
463}
464
465impl CarReader {
466    /// Open a CAR file for reading
467    pub async fn open(path: &Path) -> Result<Self> {
468        let file = File::open(path)
469            .await
470            .map_err(|e| Error::Storage(format!("Failed to open CAR file: {e}")))?;
471
472        let mut reader = BufReader::new(file);
473
474        // Read header length (varint)
475        let mut header_len_buf = [0u8; 10];
476        let mut header_len_size = 0;
477
478        for i in 0..10 {
479            reader
480                .read_exact(&mut header_len_buf[i..i + 1])
481                .await
482                .map_err(|e| Error::Storage(format!("Failed to read header length: {e}")))?;
483            header_len_size = i + 1;
484            if header_len_buf[i] & 0x80 == 0 {
485                break;
486            }
487        }
488
489        let (header_len, _) = decode_varint(&header_len_buf[..header_len_size])?;
490
491        // Read header
492        let mut header_bytes = vec![0u8; header_len as usize];
493        reader
494            .read_exact(&mut header_bytes)
495            .await
496            .map_err(|e| Error::Storage(format!("Failed to read header: {e}")))?;
497
498        let header = CarHeader::from_cbor(&header_bytes)?;
499
500        let bytes_read = (header_len_size + header_len as usize) as u64;
501
502        Ok(Self {
503            reader,
504            header,
505            blocks_read: 0,
506            bytes_read,
507        })
508    }
509
510    /// Get the CAR header
511    pub fn header(&self) -> &CarHeader {
512        &self.header
513    }
514
515    /// Get root CIDs
516    pub fn roots(&self) -> &[Cid] {
517        &self.header.roots
518    }
519
520    /// Read the next block from the CAR file
521    pub async fn read_block(&mut self) -> Result<Option<Block>> {
522        // Read block length (varint)
523        let mut len_buf = [0u8; 10];
524        let mut len_size = 0;
525
526        #[allow(clippy::needless_range_loop)]
527        for i in 0..10 {
528            let mut byte_buf = [0u8; 1];
529            match self.reader.read(&mut byte_buf).await {
530                Ok(0) => {
531                    if i == 0 {
532                        return Ok(None); // End of file
533                    }
534                    return Err(Error::Storage("Incomplete block length".to_string()));
535                }
536                Ok(_) => {
537                    len_buf[i] = byte_buf[0];
538                }
539                Err(e) => return Err(Error::Storage(format!("Failed to read block length: {e}"))),
540            }
541            len_size = i + 1;
542            if len_buf[i] & 0x80 == 0 {
543                break;
544            }
545        }
546
547        let (block_len, _) = decode_varint(&len_buf[..len_size])?;
548
549        // Read block data (CID + data)
550        let mut block_data = vec![0u8; block_len as usize];
551        self.reader
552            .read_exact(&mut block_data)
553            .await
554            .map_err(|e| Error::Storage(format!("Failed to read block data: {e}")))?;
555
556        // Parse CID from beginning of block data
557        let cid = Cid::try_from(block_data.clone())
558            .map_err(|e| Error::Cid(format!("Invalid CID in CAR: {e}")))?;
559
560        let cid_len = cid.to_bytes().len();
561        let data = Bytes::copy_from_slice(&block_data[cid_len..]);
562
563        self.blocks_read += 1;
564        self.bytes_read += (len_size + block_len as usize) as u64;
565
566        Ok(Some(Block::from_parts(cid, data)))
567    }
568
569    /// Get read statistics
570    pub fn stats(&self) -> CarReadStats {
571        CarReadStats {
572            blocks_read: self.blocks_read,
573            bytes_read: self.bytes_read,
574        }
575    }
576}
577
578/// Statistics from CAR reading
579#[derive(Debug, Clone)]
580pub struct CarReadStats {
581    pub blocks_read: u64,
582    pub bytes_read: u64,
583}
584
585/// Export blocks to a CAR file
586pub async fn export_to_car<S: BlockStore>(
587    store: &S,
588    path: &Path,
589    roots: Vec<Cid>,
590) -> Result<CarWriteStats> {
591    let mut writer = CarWriter::create(path, roots.clone()).await?;
592
593    // Get all CIDs reachable from roots
594    let all_cids = store.list_cids()?;
595
596    for cid in all_cids {
597        if let Some(block) = store.get(&cid).await? {
598            writer.write_block(&block).await?;
599        }
600    }
601
602    writer.finish().await
603}
604
605/// Import blocks from a CAR file
606pub async fn import_from_car<S: BlockStore>(store: &S, path: &Path) -> Result<CarReadStats> {
607    let mut reader = CarReader::open(path).await?;
608
609    while let Some(block) = reader.read_block().await? {
610        store.put(&block).await?;
611    }
612
613    Ok(reader.stats())
614}
615
616#[cfg(test)]
617mod tests {
618    use super::*;
619    use crate::blockstore::{BlockStoreConfig, SledBlockStore};
620    use std::path::PathBuf;
621
622    fn make_test_block(data: &[u8]) -> Block {
623        Block::new(Bytes::copy_from_slice(data)).unwrap()
624    }
625
626    #[test]
627    fn test_varint_encode_decode() {
628        let test_values = [0, 1, 127, 128, 255, 256, 16383, 16384, 1000000];
629
630        for &val in &test_values {
631            let encoded = encode_varint(val);
632            let (decoded, _) = decode_varint(&encoded).unwrap();
633            assert_eq!(val, decoded, "Failed for value {}", val);
634        }
635    }
636
637    #[test]
638    fn test_car_header_roundtrip() {
639        let block1 = make_test_block(b"test1");
640        let block2 = make_test_block(b"test2");
641        let roots = vec![*block1.cid(), *block2.cid()];
642
643        let header = CarHeader::new(roots.clone());
644        let cbor = header.to_cbor().unwrap();
645        let decoded = CarHeader::from_cbor(&cbor).unwrap();
646
647        assert_eq!(decoded.version, CAR_VERSION);
648        assert_eq!(decoded.roots.len(), 2);
649        assert_eq!(decoded.roots[0], roots[0]);
650        assert_eq!(decoded.roots[1], roots[1]);
651    }
652
653    #[tokio::test]
654    async fn test_car_write_read() {
655        let path = PathBuf::from("/tmp/test-car.car");
656        let _ = std::fs::remove_file(&path);
657
658        let block1 = make_test_block(b"hello world");
659        let block2 = make_test_block(b"goodbye world");
660        let roots = vec![*block1.cid()];
661
662        // Write CAR
663        {
664            let mut writer = CarWriter::create(&path, roots.clone()).await.unwrap();
665            writer.write_block(&block1).await.unwrap();
666            writer.write_block(&block2).await.unwrap();
667            let stats = writer.finish().await.unwrap();
668            assert_eq!(stats.blocks_written, 2);
669        }
670
671        // Read CAR
672        {
673            let mut reader = CarReader::open(&path).await.unwrap();
674            assert_eq!(reader.roots().len(), 1);
675            assert_eq!(reader.roots()[0], *block1.cid());
676
677            let read_block1 = reader.read_block().await.unwrap().unwrap();
678            assert_eq!(read_block1.cid(), block1.cid());
679            assert_eq!(read_block1.data(), block1.data());
680
681            let read_block2 = reader.read_block().await.unwrap().unwrap();
682            assert_eq!(read_block2.cid(), block2.cid());
683            assert_eq!(read_block2.data(), block2.data());
684
685            // No more blocks
686            assert!(reader.read_block().await.unwrap().is_none());
687        }
688
689        let _ = std::fs::remove_file(&path);
690    }
691
692    #[tokio::test]
693    async fn test_export_import_car() {
694        let store_path = PathBuf::from("/tmp/ipfrs-test-car-store");
695        let car_path = PathBuf::from("/tmp/test-export.car");
696        let _ = std::fs::remove_dir_all(&store_path);
697        let _ = std::fs::remove_file(&car_path);
698
699        let config = BlockStoreConfig {
700            path: store_path.clone(),
701            cache_size: 1024 * 1024,
702        };
703        let store = SledBlockStore::new(config).unwrap();
704
705        // Add blocks
706        let block1 = make_test_block(b"block1");
707        let block2 = make_test_block(b"block2");
708        store.put(&block1).await.unwrap();
709        store.put(&block2).await.unwrap();
710
711        // Export
712        let write_stats = export_to_car(&store, &car_path, vec![*block1.cid()])
713            .await
714            .unwrap();
715        assert_eq!(write_stats.blocks_written, 2);
716
717        // Create new store and import
718        let store_path2 = PathBuf::from("/tmp/ipfrs-test-car-store2");
719        let _ = std::fs::remove_dir_all(&store_path2);
720        let config2 = BlockStoreConfig {
721            path: store_path2.clone(),
722            cache_size: 1024 * 1024,
723        };
724        let store2 = SledBlockStore::new(config2).unwrap();
725
726        let read_stats = import_from_car(&store2, &car_path).await.unwrap();
727        assert_eq!(read_stats.blocks_read, 2);
728
729        // Verify blocks
730        assert!(store2.has(block1.cid()).await.unwrap());
731        assert!(store2.has(block2.cid()).await.unwrap());
732
733        let _ = std::fs::remove_dir_all(&store_path);
734        let _ = std::fs::remove_dir_all(&store_path2);
735        let _ = std::fs::remove_file(&car_path);
736    }
737}