forest/utils/db/
car_stream.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::chain::FilecoinSnapshotMetadata;
5use crate::db::car::plain::read_v2_header;
6use crate::utils::io::skip_bytes;
7use crate::utils::multihash::prelude::*;
8use async_compression::tokio::bufread::ZstdDecoder;
9use bytes::{Buf, BufMut, Bytes, BytesMut};
10use cid::Cid;
11use futures::ready;
12use futures::{Stream, sink::Sink};
13use fvm_ipld_encoding::to_vec;
14use integer_encoding::{VarInt, VarIntAsyncReader as _};
15use nunny::Vec as NonEmpty;
16use pin_project_lite::pin_project;
17use serde::{Deserialize, Serialize};
18use std::io::{self, Cursor, Read, SeekFrom, Write};
19use std::path::Path;
20use std::pin::Pin;
21use std::task::{Context, Poll};
22use tokio::io::{
23    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
24    Take,
25};
26use tokio_util::codec::{Encoder, FramedRead};
27use tokio_util::either::Either;
28use unsigned_varint::codec::UviBytes;
29
30use crate::utils::encoding::from_slice_with_fallback;
31
32// 512MiB
33const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
34
35#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
36pub struct CarV1Header {
37    // The roots array must contain one or more CIDs,
38    // each of which should be present somewhere in the remainder of the CAR.
39    // See <https://ipld.io/specs/transport/car/carv1/#constraints>
40    pub roots: NonEmpty<Cid>,
41    pub version: u64,
42}
43
44/// <https://ipld.io/specs/transport/car/carv2/#header>
45#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
46pub struct CarV2Header {
47    pub characteristics: [u8; 16],
48    pub data_offset: i64,
49    pub data_size: i64,
50    pub index_offset: i64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
54pub struct CarBlock {
55    pub cid: Cid,
56    pub data: Vec<u8>,
57}
58
59impl CarBlock {
60    // Write a varint frame containing the cid and the data
61    pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
62        writer.write_car_block(
63            self.cid,
64            self.data.len() as u64,
65            &mut Cursor::new(&self.data),
66        )
67    }
68
69    pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
70        let bytes: Bytes = bytes.into();
71        let mut cursor = bytes.reader();
72        let cid = Cid::read_bytes(&mut cursor)
73            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
74        let bytes = cursor.into_inner();
75        Ok(CarBlock {
76            cid,
77            data: bytes.to_vec(),
78        })
79    }
80
81    pub fn valid(&self) -> bool {
82        self.validate().is_ok()
83    }
84
85    pub fn validate(&self) -> anyhow::Result<()> {
86        let actual = {
87            let code = MultihashCode::try_from(self.cid.hash().code())?;
88            Cid::new_v1(self.cid.codec(), code.digest(&self.data))
89        };
90        anyhow::ensure!(
91            actual == self.cid,
92            "CID/Block mismatch for block {}, actual: {actual}",
93            self.cid
94        );
95        Ok(())
96    }
97}
98
99pub trait CarBlockWrite {
100    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
101}
102
103impl<T: Write> CarBlockWrite for T {
104    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
105        let frame_length = cid.encoded_len() as u64 + data_len;
106        self.write_all(&frame_length.encode_var_vec())?;
107        cid.write_bytes(&mut *self)
108            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
109        std::io::copy(data, self)?;
110        Ok(())
111    }
112}
113
114pin_project! {
115    /// Stream of CAR blocks. If the input data is compressed with zstd, it will
116    /// automatically be decompressed.
117    /// Note that [`CarStream`] automatically skips the metadata block and F3 data
118    /// block defined in [`FRC-0108`](https://github.com/filecoin-project/FIPs/blob/master/FRCs/frc-0108.md)
119    pub struct CarStream<ReaderT> {
120        #[pin]
121        reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
122        pub header_v1: CarV1Header,
123        pub header_v2: Option<CarV2Header>,
124        first_block: Option<CarBlock>,
125    }
126}
127
128// This method checks the header in order to see whether or not we are operating on a zstd
129// archive. The zstd header has a maximum size of 18 bytes:
130// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames.
131fn is_zstd(buf: &[u8]) -> bool {
132    zstd::zstd_safe::get_frame_content_size(buf).is_ok()
133}
134
135impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
136    /// Create a stream with automatic but unsafe CARv2 header extraction.
137    ///
138    /// Note that if the input is zstd compressed, the CARv2 header extraction
139    /// is on a best efforts basis. It could fail when `reader.fill_buf()` is insufficient
140    /// for decoding the first zstd frame, and treat input as CARv1, because this method
141    /// does not require the input to be [`tokio::io::AsyncSeek`].
142    /// It's recommended to use [`CarStream::new`] for zstd compressed CARv2 input.
143    #[allow(dead_code)]
144    pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
145        let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
146            // treat input as CARv1 if zstd decoding failed
147            .ok()
148            .flatten();
149        Self::new_with_header_v2(reader, header_v2).await
150    }
151
152    /// Create a stream with pre-extracted CARv2 header
153    pub async fn new_with_header_v2(
154        mut reader: ReaderT,
155        header_v2: Option<CarV2Header>,
156    ) -> io::Result<Self> {
157        let is_compressed = is_zstd(reader.fill_buf().await?);
158        let mut reader = if is_compressed {
159            let mut zstd = ZstdDecoder::new(reader);
160            zstd.multiple_members(true);
161            Either::Right(zstd)
162        } else {
163            Either::Left(reader)
164        };
165
166        // Skip v2 header bytes
167        if let Some(header_v2) = &header_v2 {
168            reader = skip_bytes(
169                reader,
170                u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
171            )
172            .await?;
173        }
174
175        let max_car_v1_bytes = header_v2
176            .as_ref()
177            .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
178            .transpose()?
179            .unwrap_or(u64::MAX);
180        let mut reader = reader.take(max_car_v1_bytes);
181        let header_v1 = read_v1_header(&mut reader).await?;
182
183        // Read the first block and check if it is valid. This check helps to
184        // catch invalid CAR files as soon as we open.
185        if let Some(block) = read_car_block(&mut reader).await? {
186            if !block.valid() {
187                return Err(io::Error::new(
188                    io::ErrorKind::InvalidData,
189                    "invalid first block",
190                ));
191            }
192
193            let first_block = if header_v1.roots.len() == crate::db::car::V2_SNAPSHOT_ROOT_COUNT {
194                let maybe_metadata_cid = header_v1.roots.first();
195                if maybe_metadata_cid == &block.cid
196                    && let Ok(metadata) =
197                        fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
198                {
199                    // Skip the F3 block in the block stream
200                    if metadata.f3_data.is_some() {
201                        // 16GiB
202                        const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
203                        let len: u64 = reader.read_varint_async().await?;
204                        if len > MAX_F3_FRAME_LEN {
205                            return Err(io::Error::new(
206                                io::ErrorKind::InvalidData,
207                                format!(
208                                    "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
209                                ),
210                            ));
211                        }
212                        reader = skip_bytes(reader, len).await?;
213                    }
214
215                    // Skip the metadata block in the block stream
216                    None
217                } else {
218                    Some(block)
219                }
220            } else {
221                Some(block)
222            };
223
224            Ok(CarStream {
225                reader: FramedRead::new(reader, uvi_bytes()),
226                header_v1,
227                header_v2,
228                first_block,
229            })
230        } else {
231            Ok(CarStream {
232                reader: FramedRead::new(reader, uvi_bytes()),
233                header_v1,
234                header_v2,
235                first_block: None,
236            })
237        }
238    }
239
240    /// Extracts CARv2 header from the input, returns the reader and CARv2 header.
241    ///
242    /// Note that position of the input reader has to be reset before calling [`CarStream::new_with_header_v2`].
243    /// Use [`CarStream::extract_header_v2_and_reset_reader_position`] to automatically reset stream position.
244    pub async fn extract_header_v2(
245        mut reader: ReaderT,
246    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
247        let is_compressed = is_zstd(reader.fill_buf().await?);
248        let mut reader = if is_compressed {
249            let mut zstd = ZstdDecoder::new(reader);
250            zstd.multiple_members(true);
251            Either::Right(zstd)
252        } else {
253            Either::Left(reader)
254        };
255        let mut possible_header_bytes = [0; 51];
256        reader.read_exact(&mut possible_header_bytes).await?;
257        let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
258        let reader = match reader {
259            Either::Left(reader) => reader,
260            Either::Right(zstd) => zstd.into_inner(),
261        };
262        Ok((reader, header_v2))
263    }
264
265    fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
266        let is_compressed = is_zstd(fill_buf);
267        let fill_buf_reader = if is_compressed {
268            itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
269        } else {
270            itertools::Either::Left(fill_buf)
271        };
272        read_v2_header(fill_buf_reader)
273    }
274}
275
276impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
277    /// Create a stream with automatic CARv2 header extraction.
278    pub async fn new(reader: ReaderT) -> io::Result<Self> {
279        let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
280        Self::new_with_header_v2(reader, header_v2).await
281    }
282
283    /// Extracts CARv2 header from the input, resets the reader position and returns the reader and CARv2 header.
284    pub async fn extract_header_v2_and_reset_reader_position(
285        mut reader: ReaderT,
286    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
287        let stream_position = reader.stream_position().await?;
288        let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
289        reader.seek(SeekFrom::Start(stream_position)).await?;
290        Ok((reader, header_v2))
291    }
292}
293
294impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
295    pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
296        Self::new(tokio::io::BufReader::new(
297            tokio::fs::File::open(path.as_ref()).await?,
298        ))
299        .await
300    }
301}
302
303impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
304    type Item = io::Result<CarBlock>;
305
306    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
307        let this = self.project();
308        if let Some(block) = this.first_block.take() {
309            return Poll::Ready(Some(Ok(block)));
310        }
311        let item = futures::ready!(this.reader.poll_next(cx));
312        Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
313    }
314}
315
316pin_project! {
317    pub struct CarWriter<W> {
318        #[pin]
319        inner: W,
320        buffer: BytesMut,
321    }
322}
323
324impl<W: AsyncWrite> CarWriter<W> {
325    pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
326        let car_header = CarV1Header { roots, version: 1 };
327
328        let mut header_uvi_frame = BytesMut::new();
329        uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
330
331        Ok(Self {
332            inner: writer,
333            buffer: header_uvi_frame,
334        })
335    }
336}
337
338impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
339    type Error = io::Error;
340
341    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
342        let mut this = self.as_mut().project();
343
344        while !this.buffer.is_empty() {
345            this = self.as_mut().project();
346            let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
347            this.buffer.advance(bytes_written);
348        }
349        Poll::Ready(Ok(()))
350    }
351    fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
352        item.write(&mut self.project().buffer.writer())
353    }
354    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
355        ready!(self.as_mut().poll_ready(cx))?;
356        self.project().inner.poll_flush(cx)
357    }
358    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
359        ready!(self.as_mut().poll_ready(cx))?;
360        self.project().inner.poll_shutdown(cx)
361    }
362}
363
364async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
365    reader: &mut ReaderT,
366) -> std::io::Result<CarV1Header> {
367    let Some(frame) = read_frame(reader).await? else {
368        return Err(std::io::Error::new(
369            std::io::ErrorKind::UnexpectedEof,
370            "failed to decode v1 header frame",
371        ));
372    };
373    let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
374    if header.version != 1 {
375        return Err(std::io::Error::other(format!(
376            "unexpected header version {}, 1 expected",
377            header.version
378        )));
379    }
380    Ok(header)
381}
382
383async fn read_frame<ReaderT: AsyncRead + Unpin>(
384    reader: &mut ReaderT,
385) -> std::io::Result<Option<Vec<u8>>> {
386    let len: usize = match reader.read_varint_async().await {
387        Ok(len) if len > MAX_FRAME_LEN => {
388            return Err(std::io::Error::new(
389                std::io::ErrorKind::InvalidData,
390                format!("frame too large: {len} > {MAX_FRAME_LEN}"),
391            ));
392        }
393        Ok(len) => len,
394        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
395        Err(e) => return Err(e),
396    };
397    let mut bytes = vec![0; len];
398    reader.read_exact(&mut bytes[..]).await?;
399    Ok(Some(bytes))
400}
401
402async fn read_car_block<ReaderT: AsyncRead + Unpin>(
403    reader: &mut ReaderT,
404) -> std::io::Result<Option<CarBlock>> {
405    read_frame(reader)
406        .await?
407        .map(CarBlock::from_bytes)
408        .transpose()
409}
410
411pub fn uvi_bytes() -> UviBytes {
412    let mut decoder = UviBytes::default();
413    decoder.set_max_len(MAX_FRAME_LEN);
414    decoder
415}
416
417#[cfg(test)]
418mod tests;