rs_car/lib.rs
1//! Rust implementation of the [CAR specifications](https://ipld.io/specs/transport/car/),
2//! both [CARv1](https://ipld.io/specs/transport/car/carv1/) and [CARv2](https://ipld.io/specs/transport/car/carv2/).
3//!
4//! # Usage
5//!
6//! - To get a block streamer [`CarReader::new()`]
7//! - To read all blocks in memory [car_read_all]
8//!
9
10use std::{
11 pin::Pin,
12 task::{Context, Poll},
13};
14
15use futures::{future::BoxFuture, AsyncRead, Stream, StreamExt};
16pub use ipld_core::cid::Cid;
17
18use crate::{
19 block_cid::assert_block_cid,
20 car_block::decode_block,
21 car_header::{read_car_header, StreamEnd},
22};
23pub use crate::{car_header::CarHeader, error::CarDecodeError};
24
25mod block_cid;
26mod car_block;
27mod car_header;
28mod carv1_header;
29mod carv2_header;
30mod error;
31mod varint;
32
33/// Decodes a CAR stream yielding its blocks and optionally verifying integrity.
34/// Supports CARv1 and CARv2 formats.
35///
36/// - To get a block streamer [`CarReader::new()`]
37/// - To read all blocks in memory [car_read_all]
38pub struct CarReader<'a, R> {
39 // r: &'a mut R,
40 pub header: CarHeader,
41 read_bytes: usize,
42 validate_block_hash: bool,
43 decode_header_future: Option<DecodeBlockFuture<'a, R>>,
44}
45
46impl<'a, R> CarReader<'a, R>
47where
48 R: AsyncRead + Send + Unpin,
49{
50 /// Decodes a CAR stream up to the header. Returns a `Stream` type that yields
51 /// blocks. The CAR header is available in [`CarReader.header`].
52 ///
53 /// # Examples
54 /// ```
55 /// use rs_car::{CarReader, CarDecodeError};
56 /// use futures::StreamExt;
57 ///
58 /// #[async_std::main]
59 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
60 /// let mut r = async_std::fs::File::open("./tests/custom_fixtures/helloworld.car").await?;
61 ///
62 /// let mut car_reader = CarReader::new(&mut r, true).await?;
63 /// println!("{:?}", car_reader.header);
64 ///
65 /// while let Some(item) = car_reader.next().await {
66 /// let (cid, block) = item?;
67 /// println!("{:?} {} bytes", cid, block.len());
68 /// }
69 ///
70 /// Ok(())
71 /// }
72 /// ```
73 pub async fn new(
74 r: &'a mut R,
75 validate_block_hash: bool,
76 ) -> Result<CarReader<'a, R>, CarDecodeError> {
77 let header = read_car_header(r).await?;
78 return Ok(CarReader {
79 header,
80 read_bytes: 0,
81 validate_block_hash,
82 decode_header_future: Some(Box::pin(decode_block(r))),
83 });
84 }
85}
86
87/// Decodes a CAR stream buffering all blocks in memory. For a Stream API use [CarReader].
88///
89/// # Examples
90///
91/// ```
92/// use rs_car::car_read_all;
93///
94/// #[async_std::main]
95/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
96/// let mut r = async_std::fs::File::open("./tests/custom_fixtures/helloworld.car").await?;
97///
98/// let (blocks, header) = car_read_all(&mut r, true).await?;
99/// println!("{:?}", header);
100///
101/// for (cid, block) in blocks {
102/// println!("{:?} {} bytes", cid, block.len());
103/// }
104///
105/// Ok(())
106/// }
107/// ```
108pub async fn car_read_all<R: AsyncRead + Unpin + Send>(
109 r: &mut R,
110 validate_block_hash: bool,
111) -> Result<(Vec<(Cid, Vec<u8>)>, CarHeader), CarDecodeError> {
112 let mut decoder = CarReader::new(r, validate_block_hash).await?;
113 let mut items: Vec<(Cid, Vec<u8>)> = vec![];
114
115 while let Some(item) = decoder.next().await {
116 let item = item?;
117 items.push(item);
118 }
119
120 Ok((items, decoder.header))
121}
122
123type DecodeBlockFuture<'a, R> =
124 BoxFuture<'a, Result<(&'a mut R, Cid, Vec<u8>, usize), CarDecodeError>>;
125
126impl<'a, R> Stream for CarReader<'a, R>
127where
128 R: AsyncRead + Send + Unpin + 'a,
129{
130 type Item = Result<(Cid, Vec<u8>), CarDecodeError>;
131
132 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
133 let me = Pin::into_inner(self);
134
135 if let StreamEnd::AfterNBytes(blocks_len) = me.header.eof_stream {
136 if me.read_bytes >= blocks_len {
137 return Poll::Ready(None);
138 }
139 }
140
141 match &mut me.decode_header_future {
142 Some(decode_future) => match decode_future.as_mut().poll(cx) {
143 Poll::Pending => Poll::Pending,
144 Poll::Ready(Ok((r, cid, block, block_len))) => {
145 if me.validate_block_hash {
146 assert_block_cid(&cid, &block)?;
147 }
148 me.read_bytes += block_len;
149 me.decode_header_future = Some(Box::pin(decode_block(r)));
150 Poll::Ready(Some(Ok((cid, block))))
151 }
152 Poll::Ready(Err(CarDecodeError::BlockStartEOF))
153 if me.header.eof_stream == StreamEnd::OnBlockEOF =>
154 {
155 Poll::Ready(None)
156 }
157 Poll::Ready(Err(err)) => {
158 me.decode_header_future = None;
159 Poll::Ready(Some(Err(err)))
160 }
161 },
162 None => Poll::Ready(None),
163 }
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use std::{collections::HashMap, str::FromStr};
170
171 use futures::executor;
172 use serde::{Deserialize, Serialize};
173
174 use super::*;
175 use crate::car_header::CarVersion;
176
177 #[derive(Debug, Deserialize, Serialize)]
178 struct ExpectedCarv1 {
179 header: ExpectedCarv1Header,
180 blocks: Vec<ExpectedCarBlock>,
181 }
182
183 #[derive(Debug, Deserialize, Serialize)]
184 struct ExpectedCarv1Header {
185 roots: Vec<ExpectedCid>,
186 version: u8,
187 }
188
189 #[derive(Debug, Deserialize, Serialize)]
190 #[allow(non_snake_case)]
191 struct ExpectedCarBlock {
192 cid: ExpectedCid,
193 blockLength: usize,
194 }
195
196 type ExpectedCid = HashMap<String, String>;
197
198 fn parse_expected_cids(cids: &[ExpectedCid]) -> Vec<Cid> {
199 cids.iter().map(parse_expected_cid).collect()
200 }
201
202 fn parse_expected_cid(cid: &ExpectedCid) -> Cid {
203 Cid::from_str(cid.get("/").unwrap()).unwrap()
204 }
205
206 #[test]
207 fn decode_carv1_helloworld_no_stream() {
208 executor::block_on(async {
209 let car_filepath = "./tests/custom_fixtures/helloworld.car";
210 let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
211 let (blocks, header) = car_read_all(&mut file, true).await.unwrap();
212
213 let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
214 let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
215
216 assert_eq!(blocks, vec!((root_cid, root_block)));
217 assert_eq!(header.version, CarVersion::V1);
218 assert_eq!(header.roots, vec!(root_cid));
219 })
220 }
221
222 #[test]
223 fn decode_carv1_helloworld_stream() {
224 executor::block_on(async {
225 let car_filepath = "./tests/custom_fixtures/helloworld.car";
226 let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
227 let (blocks, header) = car_read_all(&mut file, true).await.unwrap();
228
229 let root_cid = Cid::from_str("QmUU2HcUBVSXkfWPUc3WUSeCMrWWeEJTuAgR9uyWBhh9Nf").unwrap();
230 let root_block = hex::decode("0a110802120b68656c6c6f776f726c640a180b").unwrap();
231
232 assert_eq!(blocks, vec!((root_cid, root_block)));
233 assert_eq!(header.version, CarVersion::V1);
234 assert_eq!(header.roots, vec!(root_cid));
235 })
236 }
237
238 #[test]
239 fn decode_carv1_basic() {
240 // 63a265726f6f747382d82a582500
241 // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - v1 header root (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
242 // d82a582500
243 // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - v1 header root (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
244 // 6776657273696f6e01
245 // 5b - block 0 len = 91, block_len = 55
246 // 01711220f88bc853804cf294fe417e4fa83028689fcdb1b1592c5102e1474dbc200fab8b - block 0 cid (bafyreihyrpefhacm6kkp4ql6j6udakdit7g3dmkzfriqfykhjw6cad5lrm)
247 // a2646c696e6bd82a582300122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de646e616d6564626c6970 - block 0 data
248 // 8301 - block 1 len = 131, block_len = 97
249 // 122002acecc5de2438ea4126a3010ecb1f8a599c8eff22fff1a1dcffe999b27fd3de - block 1 cid (QmNX6Tffavsya4xgBi2VJQnSuqy9GsxongxZZ9uZBqp16d)
250 // 122e0a2401551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a274511204626561721804122f0a22122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a12067365636f6e64189501 - block 1 data
251 // 28 - block 2 len = 40, block_len = 4
252 // 01551220b6fbd675f98e2abd22d4ed29fdc83150fedc48597e92dd1a7a24381d44a27451 - block 2 cid (bafkreifw7plhl6mofk6sfvhnfh64qmkq73oeqwl6sloru6rehaoujituke)
253 // 63636363 - block 2 data
254 // 8001 - block 3 len = 128, block_len = 94
255 // 122079a982de3c9907953d4d323cee1d0fb1ed8f45f8ef02870c0cb9e09246bd530a - block 3 cid (QmWXZxVQ9yZfhQxLD35eDR8LiMRsYtHxYqTFCBbJoiJVys)
256 // 122d0a240155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e6468771203646f671804122d0a221220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0120566697273741833 - block 3 data
257 // 28 - block 4 len = 40, block_len = 4
258 // 0155122081cc5b17018674b401b42f35ba07bb79e211239c23bffe658da1577e3e646877 - block 4 cid(bafkreiebzrnroamgos2adnbpgw5apo3z4iishhbdx77gldnbk57d4zdio4)
259 // 62626262 - block 4 data
260 // 51 - block 5 len = 81, block_len = 47
261 // 1220e7dc486e97e6ebe5cdabab3e392bdad128b6e09acc94bb4e2aa2af7b986d24d0 - block 5 cid (QmdwjhxpxzcMsR3qUuj7vUL8pbA7MgR3GAxWi2GLHjsKCT)
262 // 122d0a240155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b412036361741804 - block 5 data
263 // 28 - block 6 len = 40, block_len = 4
264 // 0155122061be55a8e2f6b4e172338bddf184d6dbee29c98853e0a0485ecee7f27b9af0b4 - block 6 cid (bafkreidbxzk2ryxwwtqxem4l3xyyjvw35yu4tcct4cqeqxwo47zhxgxqwq)
265 // 61616161 - block 6 data
266 // 36 - block 7 len = 54, block_len = 18
267 // 0171122069ea0740f9807a28f4d932c62e7c1c83be055e55072c90266ab3e79df63a365b - block 7 cid (bafyreidj5idub6mapiupjwjsyyxhyhedxycv4vihfsicm2vt46o7morwlm)
268 // a2646c696e6bf6646e616d65656c696d626f - block 7 data
269 executor::block_on(async {
270 run_car_basic_test(
271 "./tests/spec_fixtures/carv1-basic.car",
272 "./tests/spec_fixtures/carv1-basic.json",
273 )
274 .await;
275 })
276 }
277
278 #[test]
279 fn decode_carv2_basic() {
280 // 0aa16776657273696f6e02 - v2 pragma
281 // 00000000000000000000000000000000 - v2 header characteristics
282 // 3300000000000000 - v2 header data_offset
283 // c001000000000000 - v2 header data_size
284 // f301000000000000 - v2 header index_offset
285 // 38a265726f6f747381d82a582300
286 // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - v1 header root (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
287 // 6776657273696f6e01
288 // 51 - block 0 len = 81, block_len = 47
289 // 1220fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f8 - block 0 cid (QmfEoLyB5NndqeKieExd1rtJzTduQUPEV8TwAYcUiy3H5Z)
290 // 122d0a221220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6261204f09f8da418a401 - block 0 data
291 // 8501 - block 1 len = 133, block_len = 99
292 // 1220d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f626 - block 1 cid (QmczfirA7VEH7YVvKPTPoU69XM3qY4DC39nnTsWd4K3SkM)
293 // 12310a221220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1120962617272656c657965183a122e0a2401551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d1204f09f90a11807 - block 1 data
294 // 58 - block 2 len = 88, block_len = 54
295 // 1220d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f1 - block 2 cid (Qmcpz2FHJD7VAhg1fxFXdYJKePtkx1BsHuCrAgWVnaHMTE)
296 // 12340a2401551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d120a666973686d6f6e6765721804 - block 2 data
297 // 28 - block 3 len = 40, block_len 4
298 // 01551220b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d - block 3 cid (bafkreifuosuzujyf4i6psbneqtwg2fhplc2wxptc5euspa2gn3bwhnihfu)
299 // 66697368 - block 3 data
300 // 2b - block 4 len = 43, block_len 7
301 // 01551220a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d - block 4 cid (bafkreifc4hca3inognou377hfhvu2xfchn2ltzi7yu27jkaeujqqqdbjju)
302 // 6c6f6273746572 - block 4 data
303 // 0100000028000000c800000000000000a2e1c40da1ae335d4dffe729eb4d5ca23b74b9e51fc535f4a804a261080c294d9401000000000000b474a99a2705e23cf905a484ec6d14ef58b56bbe62e9292783466ec363b5072d6b01000000000000d745b7757f5b4593eeab7820306c7bc64eb496a7410a0d07df7a34ffec4b97f11201000000000000d9c0d5376d26f1931f7ad52d7acc00fc1090d2edb0808bf61eeb0a152826f6268b00000000000000fb16f5083412ef1371d031ed4aa239903d84efdadf1ba3cd678e6475b1a232f83900000000000000
304
305 executor::block_on(async {
306 run_car_basic_test(
307 "./tests/spec_fixtures/carv2-basic.car",
308 "./tests/spec_fixtures/carv2-basic.json",
309 )
310 .await;
311 })
312 }
313
314 async fn run_car_basic_test(car_filepath: &str, car_json_expected: &str) {
315 let expected_car = std::fs::read_to_string(car_json_expected).unwrap();
316 let expected_car: ExpectedCarv1 = serde_json::from_str(&expected_car).unwrap();
317
318 let mut file = async_std::fs::File::open(car_filepath).await.unwrap();
319 let mut streamer = CarReader::new(&mut file, true).await.unwrap();
320
321 // Assert header v1
322 assert_eq!(streamer.header.version as u8, expected_car.header.version);
323 assert_eq!(
324 streamer.header.roots,
325 parse_expected_cids(&expected_car.header.roots)
326 );
327
328 // Consume stream and read all blocks into memory
329 let mut blocks: Vec<(Cid, Vec<u8>)> = vec![];
330 while let Some(item) = streamer.next().await {
331 let item = item.unwrap();
332 blocks.push(item);
333 }
334
335 // Assert block's cids, with validate_block_hash == true guarantees block's integrity
336 let block_cids = blocks.iter().map(|block| block.0).collect::<Vec<Cid>>();
337 let expected_block_cids = expected_car
338 .blocks
339 .iter()
340 .map(|block| parse_expected_cid(&block.cid))
341 .collect::<Vec<Cid>>();
342 assert_eq!(block_cids, expected_block_cids);
343 }
344}