rs_car/
lib.rs

1//! Rust implementation of the [CAR specifications](https://ipld.io/specs/transport/car/),
2//! both [CARv1](https://ipld.io/specs/transport/car/carv1/) and [CARv2](https://ipld.io/specs/transport/car/carv2/).
3//!
4//! # Usage
5//!
6//! - To get a block streamer [`CarReader::new()`]
7//! - To read all blocks in memory [car_read_all]
8//!
9
10use std::{
11    pin::Pin,
12    task::{Context, Poll},
13};
14
15use futures::{future::BoxFuture, AsyncRead, Stream, StreamExt};
16pub use ipld_core::cid::Cid;
17
18use crate::{
19    block_cid::assert_block_cid,
20    car_block::decode_block,
21    car_header::{read_car_header, StreamEnd},
22};
23pub use crate::{car_header::CarHeader, error::CarDecodeError};
24
25mod block_cid;
26mod car_block;
27mod car_header;
28mod carv1_header;
29mod carv2_header;
30mod error;
31mod varint;
32
33/// Decodes a CAR stream yielding its blocks and optionally verifying integrity.
34/// Supports CARv1 and CARv2 formats.
35///
36/// - To get a block streamer [`CarReader::new()`]
37/// - To read all blocks in memory [car_read_all]
38pub struct CarReader<'a, R> {
39    // r: &'a mut R,
40    pub header: CarHeader,
41    read_bytes: usize,
42    validate_block_hash: bool,
43    decode_header_future: Option<DecodeBlockFuture<'a, R>>,
44}
45
46impl<'a, R> CarReader<'a, R>
47where
48    R: AsyncRead + Send + Unpin,
49{
50    /// Decodes a CAR stream up to the header. Returns a `Stream` type that yields
51    /// blocks. The CAR header is available in [`CarReader.header`].
52    ///
53    /// # Examples
54    /// ```
55    /// use rs_car::{CarReader, CarDecodeError};
56    /// use futures::StreamExt;
57    ///
58    /// #[async_std::main]
59    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
60    ///   let mut r = async_std::fs::File::open("./tests/custom_fixtures/helloworld.car").await?;
61    ///
62    ///   let mut car_reader = CarReader::new(&mut r, true).await?;
63    ///   println!("{:?}", car_reader.header);
64    ///
65    ///   while let Some(item) = car_reader.next().await {
66    ///     let (cid, block) = item?;
67    ///     println!("{:?} {} bytes", cid, block.len());
68    ///   }
69    ///
70    ///   Ok(())
71    /// }
72    /// ```
73    pub async fn new(
74        r: &'a mut R,
75        validate_block_hash: bool,
76    ) -> Result<CarReader<'a, R>, CarDecodeError> {
77        let header = read_car_header(r).await?;
78        return Ok(CarReader {
79            header,
80            read_bytes: 0,
81            validate_block_hash,
82            decode_header_future: Some(Box::pin(decode_block(r))),
83        });
84    }
85}
86
87/// Decodes a CAR stream buffering all blocks in memory. For a Stream API use [CarReader].
88///
89/// # Examples
90///
91/// ```
92/// use rs_car::car_read_all;
93///
94/// #[async_std::main]
95/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
96///   let mut r = async_std::fs::File::open("./tests/custom_fixtures/helloworld.car").await?;
97///
98///   let (blocks, header) = car_read_all(&mut r, true).await?;
99///   println!("{:?}", header);
100///
101///   for (cid, block) in blocks {
102///     println!("{:?} {} bytes", cid, block.len());
103///   }
104///
105///   Ok(())
106/// }
107/// ```
108pub async fn car_read_all<R: AsyncRead + Unpin + Send>(
109    r: &mut R,
110    validate_block_hash: bool,
111) -> Result<(Vec<(Cid, Vec<u8>)>, CarHeader), CarDecodeError> {
112    let mut decoder = CarReader::new(r, validate_block_hash).await?;
113    let mut items: Vec<(Cid, Vec<u8>)> = vec![];
114
115    while let Some(item) = decoder.next().await {
116        let item = item?;
117        items.push(item);
118    }
119
120    Ok((items, decoder.header))
121}
122
123type DecodeBlockFuture<'a, R> =
124    BoxFuture<'a, Result<(&'a mut R, Cid, Vec<u8>, usize), CarDecodeError>>;
125
126impl<'a, R> Stream for CarReader<'a, R>
127where
128    R: AsyncRead + Send + Unpin + 'a,
129{
130    type Item = Result<(Cid, Vec<u8>), CarDecodeError>;
131
132    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133        let me = Pin::into_inner(self);
134
135        if let StreamEnd::AfterNBytes(blocks_len) = me.header.eof_stream {
136            if me.read_bytes >= blocks_len {
137                return Poll::Ready(None);
138            }
139        }
140
141        match &mut me.decode_header_future {
142            Some(decode_future) => match decode_future.as_mut().poll(cx) {
143                Poll::Pending => Poll::Pending,
144                Poll::Ready(Ok((r, cid, block, block_len))) => {
145                    if me.validate_block_hash {
146                        assert_block_cid(&cid, &block)?;
147                    }
148                    me.read_bytes += block_len;
149                    me.decode_header_future = Some(Box::pin(decode_block(r)));
150                    Poll::Ready(Some(Ok((cid, block))))
151                }
152                Poll::Ready(Err(CarDecodeError::BlockStartEOF))
153                    if me.header.eof_stream == StreamEnd::OnBlockEOF =>
154                {
155                    Poll::Ready(None)
156                }
157                Poll::Ready(Err(err)) => {
158                    me.decode_header_future = None;
159                    Poll::Ready(Some(Err(err)))
160                }
161            },
162            None => Poll::Ready(None),
163        }
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use std::{collections::HashMap, str::FromStr};
170
171    use futures::executor;
172    use serde::{Deserialize, Serialize};
173
174    use super::*;
175    use crate::car_header::CarVersion;
176
177    #[derive(Debug, Deserialize, Serialize)]
178    struct ExpectedCarv1 {
179        header: ExpectedCarv1Header,
180        blocks: Vec<ExpectedCarBlock>,
181    }
182
183    #[derive(Debug, Deserialize, Serialize)]
184    struct ExpectedCarv1Header {
185        roots: Vec<ExpectedCid>,
186        version: u8,
187    }
188
189    #[derive(Debug, Deserialize, Serialize)]
190    #[allow(non_snake_case)]
191    struct ExpectedCarBlock {
192        cid: ExpectedCid,
193        blockLength: usize,
194    }
195
196    type ExpectedCid = HashMap<String, String>;
197
198    fn parse_expected_cids(cids: &[ExpectedCid]) -> Vec<Cid> {
199        cids.iter().map(parse_expected_cid).collect()
200    }
201
202    fn parse_expected_cid(cid: &ExpectedCid) -> Cid {
203        Cid::from_str(cid.get("/").unwrap()).unwrap()
204    }
205
206    #[test]
207    fn decode_carv1_helloworld_no_stream() {
208        executor::block_on(async {
209            let car_filepath = "./tests/custom_fixtures/helloworld.car";
210            let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
211            let (blocks, header) = car_read_all(&mut file, true).await.unwrap();
212
213            let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
214            let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
215
216            assert_eq!(blocks, vec!((root_cid, root_block)));
217            assert_eq!(header.version, CarVersion::V1);
218            assert_eq!(header.roots, vec!(root_cid));
219        })
220    }
221
222    #[test]
223    fn decode_carv1_helloworld_stream() {
224        executor::block_on(async {
225            let car_filepath = "./tests/custom_fixtures/helloworld.car";
226            let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
227            let (blocks, header) = car_read_all(&mut file, true).await.unwrap();
228
229            let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
230            let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
231
232            assert_eq!(blocks, vec!((root_cid, root_block)));
233            assert_eq!(header.version, CarVersion::V1);
234            assert_eq!(header.roots, vec!(root_cid));
235        })
236    }
237
238    #[test]
239    fn decode_carv1_basic() {
240        // 63a265726f6f747382d82a582500
241        // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - v1 header root (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
242        // d82a582500
243        // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - v1 header root (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
244        // 6776657273696f6e01
245        // 5b - block 0 len = 91, block_len = 55
246        // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - block 0 cid (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
247        // a2646c696e6bd82a582300122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de646e616d6564626c6970 - block 0 data
248        // 8301 - block 1 len = 131, block_len = 97
249        // 122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de - block 1 cid (QmNX6Tffavsya4xgBi2VJQnSuqy9GsxongxZZ9uZBqp16d)
250        // 122e0a2401551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a274511204626561721804122f0a22122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a12067365636f6e64189501 - block 1 data
251        // 28 - block 2 len = 40, block_len = 4
252        // 01551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a27451 - block 2 cid (bafkreifw7plhl6mofk6sfvhnfh64qmkq73oeqwl6sloru6rehaoujituke)
253        // 63636363 - block 2 data
254        // 8001 - block 3 len = 128, block_len = 94
255        // 122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a - block 3 cid (QmWXZxVQ9yZfhQxLD35eDR8LiMRsYtHxYqTFCBbJoiJVys)
256        // 122d0a240155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e6468771203646f671804122d0a221220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0120566697273741833 - block 3 data
257        // 28 - block 4 len = 40, block_len = 4
258        // 0155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e646877 - block 4 cid(bafkreiebzrnroamgos2adnbpgw5apo3z4iishhbdx77gldnbk57d4zdio4)
259        // 62626262 - block 4 data
260        // 51 - block 5 len = 81, block_len = 47
261        // 1220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0 - block 5 cid (QmdwjhxpxzcMsR3qUuj7vUL8pbA7MgR3GAxWi2GLHjsKCT)
262        // 122d0a240155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b412036361741804 - block 5 data
263        // 28 - block 6 len = 40, block_len = 4
264        // 0155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b4 - block 6 cid (bafkreidbxzk2ryxwwtqxem4l3xyyjvw35yu4tcct4cqeqxwo47zhxgxqwq)
265        // 61616161 - block 6 data
266        // 36 - block 7 len = 54, block_len = 18
267        // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - block 7 cid (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
268        // a2646c696e6bf6646e616d65656c696d626f - block 7 data
269        executor::block_on(async {
270            run_car_basic_test(
271                "./tests/spec_fixtures/carv1-basic.car",
272                "./tests/spec_fixtures/carv1-basic.json",
273            )
274            .await;
275        })
276    }
277
278    #[test]
279    fn decode_carv2_basic() {
280        // 0aa16776657273696f6e02  - v2 pragma
281        // 00000000000000000000000000000000  - v2 header characteristics
282        // 3300000000000000  - v2 header data_offset
283        // c001000000000000  - v2 header data_size
284        // f301000000000000  - v2 header index_offset
285        // 38a265726f6f747381d82a582300
286        // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - v1 header root (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
287        // 6776657273696f6e01
288        // 51 - block 0 len = 81, block_len = 47
289        // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - block 0 cid (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
290        // 122d0a221220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6261204f09f8da418a401 - block 0 data
291        // 8501 -  block 1 len = 133, block_len = 99
292        // 1220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f626 - block 1 cid (QmczfirA7VEH7YVvKPTPoU69XM3qY4DC39nnTsWd4K3SkM)
293        // 12310a221220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1120962617272656c657965183a122e0a2401551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d1204f09f90a11807 - block 1 data
294        // 58 - block 2 len = 88, block_len = 54
295        // 1220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1 - block 2 cid (Qmcpz2FHJD7VAhg1fxFXdYJKePtkx1BsHuCrAgWVnaHMTE)
296        // 12340a2401551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d120a666973686d6f6e6765721804 - block 2 data
297        // 28 - block 3 len = 40, block_len 4
298        // 01551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d - block 3 cid (bafkreifuosuzujyf4i6psbneqtwg2fhplc2wxptc5euspa2gn3bwhnihfu)
299        // 66697368 - block 3 data
300        // 2b - block 4 len = 43, block_len 7
301        // 01551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d - block 4 cid (bafkreifc4hca3inognou377hfhvu2xfchn2ltzi7yu27jkaeujqqqdbjju)
302        // 6c6f6273746572 - block 4 data
303        // 0100000028000000c800000000000000a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d9401000000000000b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d6b01000000000000d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f11201000000000000d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6268b00000000000000fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f83900000000000000
304
305        executor::block_on(async {
306            run_car_basic_test(
307                "./tests/spec_fixtures/carv2-basic.car",
308                "./tests/spec_fixtures/carv2-basic.json",
309            )
310            .await;
311        })
312    }
313
314    async fn run_car_basic_test(car_filepath: &str, car_json_expected: &str) {
315        let expected_car = std::fs::read_to_string(car_json_expected).unwrap();
316        let expected_car: ExpectedCarv1 = serde_json::from_str(&expected_car).unwrap();
317
318        let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
319        let mut streamer = CarReader::new(&mut file, true).await.unwrap();
320
321        // Assert header v1
322        assert_eq!(streamer.header.version as u8, expected_car.header.version);
323        assert_eq!(
324            streamer.header.roots,
325            parse_expected_cids(&expected_car.header.roots)
326        );
327
328        // Consume stream and read all blocks into memory
329        let mut blocks: Vec<(Cid, Vec<u8>)> = vec![];
330        while let Some(item) = streamer.next().await {
331            let item = item.unwrap();
332            blocks.push(item);
333        }
334
335        // Assert block's cids, with validate_block_hash == true guarantees block's integrity
336        let block_cids = blocks.iter().map(|block| block.0).collect::<Vec<Cid>>();
337        let expected_block_cids = expected_car
338            .blocks
339            .iter()
340            .map(|block| parse_expected_cid(&block.cid))
341            .collect::<Vec<Cid>>();
342        assert_eq!(block_cids, expected_block_cids);
343    }
344}