Skip to main content

forest/utils/db/
car_stream.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::TipsetKey;
5use crate::chain::FilecoinSnapshotMetadata;
6use crate::db::car::plain::read_v2_header;
7use crate::utils::io::skip_bytes;
8use crate::utils::multihash::prelude::*;
9use async_compression::tokio::bufread::ZstdDecoder;
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use cid::Cid;
12use futures::ready;
13use futures::{Stream, sink::Sink};
14use fvm_ipld_encoding::to_vec;
15use integer_encoding::{VarInt, VarIntAsyncReader as _};
16use nunny::Vec as NonEmpty;
17use pin_project_lite::pin_project;
18use serde::{Deserialize, Serialize};
19use std::io::{self, Cursor, Read, SeekFrom, Write};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use tokio::io::{
24    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
25    Take,
26};
27use tokio_util::codec::{Encoder, FramedRead};
28use tokio_util::either::Either;
29use unsigned_varint::codec::UviBytes;
30
31use crate::utils::encoding::from_slice_with_fallback;
32
33// 512MiB
34const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
35
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
37pub struct CarV1Header {
38    // The roots array must contain one or more CIDs,
39    // each of which should be present somewhere in the remainder of the CAR.
40    // See <https://ipld.io/specs/transport/car/carv1/#constraints>
41    pub roots: NonEmpty<Cid>,
42    pub version: u64,
43}
44
45/// <https://ipld.io/specs/transport/car/carv2/#header>
46#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub struct CarV2Header {
48    pub characteristics: [u8; 16],
49    pub data_offset: i64,
50    pub data_size: i64,
51    pub index_offset: i64,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct CarBlock {
56    pub cid: Cid,
57    pub data: Bytes,
58}
59
60impl CarBlock {
61    // Write a varint frame containing the cid and the data
62    pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
63        writer.write_car_block(
64            self.cid,
65            self.data.len() as u64,
66            &mut Cursor::new(&self.data),
67        )
68    }
69
70    pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
71        let bytes: Bytes = bytes.into();
72        let mut cursor = bytes.reader();
73        let cid = Cid::read_bytes(&mut cursor)
74            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
75        let data = cursor.into_inner();
76        Ok(CarBlock { cid, data })
77    }
78
79    pub fn valid(&self) -> bool {
80        self.validate().is_ok()
81    }
82
83    pub fn validate(&self) -> anyhow::Result<()> {
84        let actual = {
85            let code = MultihashCode::try_from(self.cid.hash().code())?;
86            Cid::new_v1(self.cid.codec(), code.digest(&self.data))
87        };
88        anyhow::ensure!(
89            actual == self.cid,
90            "CID/Block mismatch for block {}, actual: {actual}",
91            self.cid
92        );
93        Ok(())
94    }
95}
96
97pub trait CarBlockWrite {
98    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
99}
100
101impl<T: Write> CarBlockWrite for T {
102    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
103        let frame_length = cid.encoded_len() as u64 + data_len;
104        self.write_all(&frame_length.encode_var_vec())?;
105        cid.write_bytes(&mut *self)
106            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
107        std::io::copy(data, self)?;
108        Ok(())
109    }
110}
111
112pin_project! {
113    /// Stream of CAR blocks. If the input data is compressed with zstd, it will
114    /// automatically be decompressed.
115    /// Note that [`CarStream`] automatically skips the metadata block and F3 data
116    /// block defined in [`FRC-0108`](https://github.com/filecoin-project/FIPs/blob/master/FRCs/frc-0108.md)
117    pub struct CarStream<ReaderT> {
118        #[pin]
119        reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
120        pub header_v1: CarV1Header,
121        pub header_v2: Option<CarV2Header>,
122        pub metadata: Option<FilecoinSnapshotMetadata>,
123        first_block: Option<CarBlock>,
124    }
125}
126
127// This method checks the header in order to see whether or not we are operating on a zstd
128// archive. The zstd header has a maximum size of 18 bytes:
129// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames.
130fn is_zstd(buf: &[u8]) -> bool {
131    zstd::zstd_safe::get_frame_content_size(buf).is_ok()
132}
133
134impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
135    /// Create a stream with automatic but unsafe CARv2 header extraction.
136    ///
137    /// Note that if the input is zstd compressed, the CARv2 header extraction
138    /// is on a best efforts basis. It could fail when `reader.fill_buf()` is insufficient
139    /// for decoding the first zstd frame, and treat input as CARv1, because this method
140    /// does not require the input to be [`tokio::io::AsyncSeek`].
141    /// It's recommended to use [`CarStream::new`] for zstd compressed CARv2 input.
142    #[allow(dead_code)]
143    pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
144        let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
145            // treat input as CARv1 if zstd decoding failed
146            .ok()
147            .flatten();
148        Self::new_with_header_v2(reader, header_v2).await
149    }
150
151    /// Create a stream with pre-extracted CARv2 header
152    pub async fn new_with_header_v2(
153        mut reader: ReaderT,
154        header_v2: Option<CarV2Header>,
155    ) -> io::Result<Self> {
156        let is_compressed = is_zstd(reader.fill_buf().await?);
157        let mut reader = if is_compressed {
158            let mut zstd = ZstdDecoder::new(reader);
159            zstd.multiple_members(true);
160            Either::Right(zstd)
161        } else {
162            Either::Left(reader)
163        };
164
165        // Skip v2 header bytes
166        if let Some(header_v2) = &header_v2 {
167            reader = skip_bytes(
168                reader,
169                u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
170            )
171            .await?;
172        }
173
174        let max_car_v1_bytes = header_v2
175            .as_ref()
176            .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
177            .transpose()?
178            .unwrap_or(u64::MAX);
179        let mut reader = reader.take(max_car_v1_bytes);
180        let header_v1 = read_v1_header(&mut reader).await?;
181
182        // Read the first block and check if it is valid. This check helps to
183        // catch invalid CAR files as soon as we open.
184        if let Some(block) = read_car_block(&mut reader).await? {
185            if !block.valid() {
186                return Err(io::Error::new(
187                    io::ErrorKind::InvalidData,
188                    "invalid first block",
189                ));
190            }
191
192            let (first_block, metadata) = if header_v1.roots.len()
193                == crate::db::car::V2_SNAPSHOT_ROOT_COUNT
194            {
195                let maybe_metadata_cid = header_v1.roots.first();
196                if maybe_metadata_cid == &block.cid
197                    && let Ok(metadata) =
198                        fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
199                {
200                    // Skip the F3 block in the block stream
201                    if metadata.f3_data.is_some() {
202                        // 16GiB
203                        const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
204                        let len: u64 = reader.read_varint_async().await?;
205                        if len > MAX_F3_FRAME_LEN {
206                            return Err(io::Error::new(
207                                io::ErrorKind::InvalidData,
208                                format!(
209                                    "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
210                                ),
211                            ));
212                        }
213                        reader = skip_bytes(reader, len).await?;
214                    }
215
216                    // Skip the metadata block in the block stream
217                    (None, Some(metadata))
218                } else {
219                    (Some(block), None)
220                }
221            } else {
222                (Some(block), None)
223            };
224
225            Ok(CarStream {
226                reader: FramedRead::new(reader, uvi_bytes()),
227                header_v1,
228                header_v2,
229                metadata,
230                first_block,
231            })
232        } else {
233            Ok(CarStream {
234                reader: FramedRead::new(reader, uvi_bytes()),
235                header_v1,
236                header_v2,
237                metadata: None,
238                first_block: None,
239            })
240        }
241    }
242
243    /// Extracts CARv2 header from the input, returns the reader and CARv2 header.
244    ///
245    /// Note that position of the input reader has to be reset before calling [`CarStream::new_with_header_v2`].
246    /// Use [`CarStream::extract_header_v2_and_reset_reader_position`] to automatically reset stream position.
247    pub async fn extract_header_v2(
248        mut reader: ReaderT,
249    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
250        let is_compressed = is_zstd(reader.fill_buf().await?);
251        let mut reader = if is_compressed {
252            let mut zstd = ZstdDecoder::new(reader);
253            zstd.multiple_members(true);
254            Either::Right(zstd)
255        } else {
256            Either::Left(reader)
257        };
258        let mut possible_header_bytes = [0; 51];
259        reader.read_exact(&mut possible_header_bytes).await?;
260        let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
261        let reader = match reader {
262            Either::Left(reader) => reader,
263            Either::Right(zstd) => zstd.into_inner(),
264        };
265        Ok((reader, header_v2))
266    }
267
268    fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
269        let is_compressed = is_zstd(fill_buf);
270        let fill_buf_reader = if is_compressed {
271            itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
272        } else {
273            itertools::Either::Left(fill_buf)
274        };
275        read_v2_header(fill_buf_reader)
276    }
277}
278
279impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
280    /// Create a stream with automatic CARv2 header extraction.
281    pub async fn new(reader: ReaderT) -> io::Result<Self> {
282        let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
283        Self::new_with_header_v2(reader, header_v2).await
284    }
285
286    /// Extracts CARv2 header from the input, resets the reader position and returns the reader and CARv2 header.
287    pub async fn extract_header_v2_and_reset_reader_position(
288        mut reader: ReaderT,
289    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
290        let stream_position = reader.stream_position().await?;
291        let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
292        reader.seek(SeekFrom::Start(stream_position)).await?;
293        Ok((reader, header_v2))
294    }
295}
296
297impl<ReaderT> CarStream<ReaderT> {
298    pub fn head_tipset_key(&self) -> TipsetKey {
299        self.metadata
300            .as_ref()
301            .map(|m| m.head_tipset_key.clone())
302            .unwrap_or_else(|| self.header_v1.roots.clone())
303            .into()
304    }
305}
306
307impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
308    pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
309        Self::new(tokio::io::BufReader::new(
310            tokio::fs::File::open(path.as_ref()).await?,
311        ))
312        .await
313    }
314}
315
316impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
317    type Item = io::Result<CarBlock>;
318
319    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
320        let this = self.project();
321        if let Some(block) = this.first_block.take() {
322            return Poll::Ready(Some(Ok(block)));
323        }
324        let item = futures::ready!(this.reader.poll_next(cx));
325        Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
326    }
327}
328
329pin_project! {
330    pub struct CarWriter<W> {
331        #[pin]
332        inner: W,
333        buffer: BytesMut,
334    }
335}
336
337impl<W: AsyncWrite> CarWriter<W> {
338    pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
339        let car_header = CarV1Header { roots, version: 1 };
340
341        let mut header_uvi_frame = BytesMut::new();
342        uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
343
344        Ok(Self {
345            inner: writer,
346            buffer: header_uvi_frame,
347        })
348    }
349}
350
351impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
352    type Error = io::Error;
353
354    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
355        let mut this = self.as_mut().project();
356
357        while !this.buffer.is_empty() {
358            this = self.as_mut().project();
359            let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
360            this.buffer.advance(bytes_written);
361        }
362        Poll::Ready(Ok(()))
363    }
364    fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
365        item.write(&mut self.project().buffer.writer())
366    }
367    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
368        ready!(self.as_mut().poll_ready(cx))?;
369        self.project().inner.poll_flush(cx)
370    }
371    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
372        ready!(self.as_mut().poll_ready(cx))?;
373        self.project().inner.poll_shutdown(cx)
374    }
375}
376
377async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
378    reader: &mut ReaderT,
379) -> std::io::Result<CarV1Header> {
380    let Some(frame) = read_frame(reader).await? else {
381        return Err(std::io::Error::new(
382            std::io::ErrorKind::UnexpectedEof,
383            "failed to decode v1 header frame",
384        ));
385    };
386    let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
387    if header.version != 1 {
388        return Err(std::io::Error::other(format!(
389            "unexpected header version {}, 1 expected",
390            header.version
391        )));
392    }
393    Ok(header)
394}
395
396async fn read_frame<ReaderT: AsyncRead + Unpin>(
397    reader: &mut ReaderT,
398) -> std::io::Result<Option<Vec<u8>>> {
399    let len: usize = match reader.read_varint_async().await {
400        Ok(len) if len > MAX_FRAME_LEN => {
401            return Err(std::io::Error::new(
402                std::io::ErrorKind::InvalidData,
403                format!("frame too large: {len} > {MAX_FRAME_LEN}"),
404            ));
405        }
406        Ok(len) => len,
407        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
408        Err(e) => return Err(e),
409    };
410    let mut bytes = vec![0; len];
411    reader.read_exact(&mut bytes[..]).await?;
412    Ok(Some(bytes))
413}
414
415async fn read_car_block<ReaderT: AsyncRead + Unpin>(
416    reader: &mut ReaderT,
417) -> std::io::Result<Option<CarBlock>> {
418    read_frame(reader)
419        .await?
420        .map(CarBlock::from_bytes)
421        .transpose()
422}
423
424pub fn uvi_bytes() -> UviBytes {
425    let mut decoder = UviBytes::default();
426    decoder.set_max_len(MAX_FRAME_LEN);
427    decoder
428}
429
430#[cfg(test)]
431mod tests;