rs_car_sync/
lib.rs

1//! Rust implementation of the [CAR specifications](https://ipld.io/specs/transport/car/),
2//! both [CARv1](https://ipld.io/specs/transport/car/carv1/) and [CARv2](https://ipld.io/specs/transport/car/carv2/).
3//!
4//! # Usage
5//!
6//! - To get a block iterator [`CarReader::new()`]
7//! - To read all blocks in memory [car_read_all]
8//!
9
10use std::io::Read;
11
12pub use ipld_core::cid::Cid;
13
14use crate::{
15    block_cid::assert_block_cid,
16    car_block::decode_block,
17    car_header::{read_car_header, StreamEnd},
18};
19pub use crate::{car_header::CarHeader, error::CarDecodeError};
20
21mod block_cid;
22mod car_block;
23mod car_header;
24mod carv1_header;
25mod carv2_header;
26mod error;
27mod varint;
28
29/// Decodes a CAR stream yielding its blocks and optionally verifying integrity.
30/// Supports CARv1 and CARv2 formats.
31///
32/// - To get a block iterator [`CarReader::new()`]
33/// - To read all blocks in memory [car_read_all]
34pub struct CarReader<'a, R> {
35    // r: &'a mut R,
36    pub header: CarHeader,
37    read_bytes: usize,
38    validate_block_hash: bool,
39    reader: &'a mut R,
40}
41
42impl<'a, R> CarReader<'a, R>
43where
44    R: Read,
45{
46    /// Decodes a CAR stream up to the header. Returns a `Stream` type that yields
47    /// blocks. The CAR header is available in [`CarReader.header`].
48    ///
49    /// # Examples
50    /// ```
51    /// use rs_car_sync::{CarReader, CarDecodeError};
52    ///
53    ///
54    /// fn main() -> Result<(), Box<dyn std::error::Error>> {
55    ///   let mut r = std::fs::File::open("./tests/custom_fixtures/helloworld.car")?;
56    ///
57    ///   let mut car_reader = CarReader::new(&mut r, true)?;
58    ///   println!("{:?}", car_reader.header);
59    ///
60    ///   while let Some(item) = car_reader.next() {
61    ///     let (cid, block) = item?;
62    ///     println!("{:?} {} bytes", cid, block.len());
63    ///   }
64    ///
65    ///   Ok(())
66    /// }
67    /// ```
68    pub fn new(
69        reader: &'a mut R,
70        validate_block_hash: bool,
71    ) -> Result<CarReader<'a, R>, CarDecodeError> {
72        let header = read_car_header(reader)?;
73        return Ok(CarReader {
74            header,
75            read_bytes: 0,
76            validate_block_hash,
77            reader,
78        });
79    }
80}
81
82/// Decodes a CAR stream buffering all blocks in memory. For a Stream API use [CarReader].
83///
84/// # Examples
85///
86/// ```
87/// use rs_car_sync::car_read_all;
88///
89/// fn main() -> Result<(), Box<dyn std::error::Error>> {
90///   let mut r = std::fs::File::open("./tests/custom_fixtures/helloworld.car")?;
91///
92///   let (blocks, header) = car_read_all(&mut r, true)?;
93///   println!("{:?}", header);
94///
95///   for (cid, block) in blocks {
96///     println!("{:?} {} bytes", cid, block.len());
97///   }
98///
99///   Ok(())
100/// }
101/// ```
102pub fn car_read_all<R: Read>(
103    r: &mut R,
104    validate_block_hash: bool,
105) -> Result<(Vec<(Cid, Vec<u8>)>, CarHeader), CarDecodeError> {
106    let mut decoder = CarReader::new(r, validate_block_hash)?;
107    let mut items: Vec<(Cid, Vec<u8>)> = vec![];
108
109    while let Some(item) = decoder.next() {
110        let item = item?;
111        items.push(item);
112    }
113
114    Ok((items, decoder.header))
115}
116
117impl<'a, R> Iterator for CarReader<'a, R>
118where
119    R: Read + 'a,
120{
121    type Item = Result<(Cid, Vec<u8>), CarDecodeError>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        if let StreamEnd::AfterNBytes(blocks_len) = self.header.eof_stream {
125            if self.read_bytes >= blocks_len {
126                return None;
127            }
128        }
129        match decode_block(&mut self.reader) {
130            Ok((_r, cid, block, block_len)) => {
131                if self.validate_block_hash {
132                    if let Err(e) = assert_block_cid(&cid, &block) {
133                        return Some(Err(e));
134                    }
135                }
136                self.read_bytes += block_len;
137                Some(Ok((cid, block)))
138            }
139            Err(CarDecodeError::BlockStartEOF)
140                if self.header.eof_stream == StreamEnd::OnBlockEOF =>
141            {
142                None
143            }
144            Err(err) => Some(Err(err)),
145        }
146    }
147}
148
149#[cfg(test)]
150mod tests {
151    use std::{collections::HashMap, str::FromStr};
152
153    use serde::{Deserialize, Serialize};
154
155    use super::*;
156    use crate::car_header::CarVersion;
157
158    #[derive(Debug, Deserialize, Serialize)]
159    struct ExpectedCarv1 {
160        header: ExpectedCarv1Header,
161        blocks: Vec<ExpectedCarBlock>,
162    }
163
164    #[derive(Debug, Deserialize, Serialize)]
165    struct ExpectedCarv1Header {
166        roots: Vec<ExpectedCid>,
167        version: u8,
168    }
169
170    #[derive(Debug, Deserialize, Serialize)]
171    #[allow(non_snake_case)]
172    struct ExpectedCarBlock {
173        cid: ExpectedCid,
174        blockLength: usize,
175    }
176
177    type ExpectedCid = HashMap<String, String>;
178
179    fn parse_expected_cids(cids: &Vec<ExpectedCid>) -> Vec<Cid> {
180        cids.iter().map(parse_expected_cid).collect()
181    }
182
183    fn parse_expected_cid(cid: &ExpectedCid) -> Cid {
184        Cid::from_str(cid.get("/").unwrap()).unwrap()
185    }
186
187    #[test]
188    fn decode_carv1_helloworld_no_stream() {
189        let car_filepath = "./tests/custom_fixtures/helloworld.car";
190        let mut file = std::fs::File::open(car_filepath).unwrap();
191        let (blocks, header) = car_read_all(&mut file, true).unwrap();
192
193        let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
194        let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
195
196        assert_eq!(blocks, vec!((root_cid, root_block)));
197        assert_eq!(header.version, CarVersion::V1);
198        assert_eq!(header.roots, vec!(root_cid));
199    }
200
201    #[test]
202    fn decode_carv1_helloworld_stream() {
203        let car_filepath = "./tests/custom_fixtures/helloworld.car";
204        let mut file = std::fs::File::open(car_filepath).unwrap();
205        let (blocks, header) = car_read_all(&mut file, true).unwrap();
206
207        let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
208        let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
209
210        assert_eq!(blocks, vec!((root_cid, root_block)));
211        assert_eq!(header.version, CarVersion::V1);
212        assert_eq!(header.roots, vec!(root_cid));
213    }
214
215    #[test]
216    fn decode_carv1_basic() {
217        // 63a265726f6f747382d82a582500
218        // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - v1 header root (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
219        // d82a582500
220        // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - v1 header root (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
221        // 6776657273696f6e01
222        // 5b - block 0 len = 91, block_len = 55
223        // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - block 0 cid (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
224        // a2646c696e6bd82a582300122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de646e616d6564626c6970 - block 0 data
225        // 8301 - block 1 len = 131, block_len = 97
226        // 122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de - block 1 cid (QmNX6Tffavsya4xgBi2VJQnSuqy9GsxongxZZ9uZBqp16d)
227        // 122e0a2401551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a274511204626561721804122f0a22122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a12067365636f6e64189501 - block 1 data
228        // 28 - block 2 len = 40, block_len = 4
229        // 01551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a27451 - block 2 cid (bafkreifw7plhl6mofk6sfvhnfh64qmkq73oeqwl6sloru6rehaoujituke)
230        // 63636363 - block 2 data
231        // 8001 - block 3 len = 128, block_len = 94
232        // 122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a - block 3 cid (QmWXZxVQ9yZfhQxLD35eDR8LiMRsYtHxYqTFCBbJoiJVys)
233        // 122d0a240155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e6468771203646f671804122d0a221220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0120566697273741833 - block 3 data
234        // 28 - block 4 len = 40, block_len = 4
235        // 0155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e646877 - block 4 cid(bafkreiebzrnroamgos2adnbpgw5apo3z4iishhbdx77gldnbk57d4zdio4)
236        // 62626262 - block 4 data
237        // 51 - block 5 len = 81, block_len = 47
238        // 1220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0 - block 5 cid (QmdwjhxpxzcMsR3qUuj7vUL8pbA7MgR3GAxWi2GLHjsKCT)
239        // 122d0a240155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b412036361741804 - block 5 data
240        // 28 - block 6 len = 40, block_len = 4
241        // 0155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b4 - block 6 cid (bafkreidbxzk2ryxwwtqxem4l3xyyjvw35yu4tcct4cqeqxwo47zhxgxqwq)
242        // 61616161 - block 6 data
243        // 36 - block 7 len = 54, block_len = 18
244        // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - block 7 cid (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
245        // a2646c696e6bf6646e616d65656c696d626f - block 7 data
246        run_car_basic_test(
247            "./tests/spec_fixtures/carv1-basic.car",
248            "./tests/spec_fixtures/carv1-basic.json",
249        );
250    }
251
252    #[test]
253    fn decode_carv2_basic() {
254        // 0aa16776657273696f6e02  - v2 pragma
255        // 00000000000000000000000000000000  - v2 header characteristics
256        // 3300000000000000  - v2 header data_offset
257        // c001000000000000  - v2 header data_size
258        // f301000000000000  - v2 header index_offset
259        // 38a265726f6f747381d82a582300
260        // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - v1 header root (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
261        // 6776657273696f6e01
262        // 51 - block 0 len = 81, block_len = 47
263        // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - block 0 cid (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
264        // 122d0a221220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6261204f09f8da418a401 - block 0 data
265        // 8501 -  block 1 len = 133, block_len = 99
266        // 1220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f626 - block 1 cid (QmczfirA7VEH7YVvKPTPoU69XM3qY4DC39nnTsWd4K3SkM)
267        // 12310a221220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1120962617272656c657965183a122e0a2401551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d1204f09f90a11807 - block 1 data
268        // 58 - block 2 len = 88, block_len = 54
269        // 1220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1 - block 2 cid (Qmcpz2FHJD7VAhg1fxFXdYJKePtkx1BsHuCrAgWVnaHMTE)
270        // 12340a2401551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d120a666973686d6f6e6765721804 - block 2 data
271        // 28 - block 3 len = 40, block_len 4
272        // 01551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d - block 3 cid (bafkreifuosuzujyf4i6psbneqtwg2fhplc2wxptc5euspa2gn3bwhnihfu)
273        // 66697368 - block 3 data
274        // 2b - block 4 len = 43, block_len 7
275        // 01551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d - block 4 cid (bafkreifc4hca3inognou377hfhvu2xfchn2ltzi7yu27jkaeujqqqdbjju)
276        // 6c6f6273746572 - block 4 data
277        // 0100000028000000c800000000000000a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d9401000000000000b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d6b01000000000000d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f11201000000000000d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6268b00000000000000fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f83900000000000000
278
279        run_car_basic_test(
280            "./tests/spec_fixtures/carv2-basic.car",
281            "./tests/spec_fixtures/carv2-basic.json",
282        );
283    }
284
285    fn run_car_basic_test(car_filepath: &str, car_json_expected: &str) {
286        let expected_car = std::fs::read_to_string(car_json_expected).unwrap();
287        let expected_car: ExpectedCarv1 = serde_json::from_str(&expected_car).unwrap();
288
289        let mut file = std::fs::File::open(car_filepath).unwrap();
290        let mut streamer = CarReader::new(&mut file, true).unwrap();
291
292        // Assert header v1
293        assert_eq!(streamer.header.version as u8, expected_car.header.version);
294        assert_eq!(
295            streamer.header.roots,
296            parse_expected_cids(&expected_car.header.roots)
297        );
298
299        // Consume stream and read all blocks into memory
300        let mut blocks: Vec<(Cid, Vec<u8>)> = vec![];
301        while let Some(item) = streamer.next() {
302            let item = item.unwrap();
303            blocks.push(item);
304        }
305
306        // Assert block's cids, with validate_block_hash == true guarantees block's integrity
307        let block_cids = blocks.iter().map(|block| block.0).collect::<Vec<Cid>>();
308        let expected_block_cids = expected_car
309            .blocks
310            .iter()
311            .map(|block| parse_expected_cid(&block.cid))
312            .collect::<Vec<Cid>>();
313        assert_eq!(block_cids, expected_block_cids);
314    }
315}