Skip to main content

forest/utils/db/
car_stream.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::TipsetKey;
5use crate::chain::FilecoinSnapshotMetadata;
6use crate::db::car::plain::read_v2_header;
7use crate::utils::io::skip_bytes;
8use crate::utils::multihash::prelude::*;
9use async_compression::tokio::bufread::ZstdDecoder;
10use bytes::{Buf, BufMut, Bytes, BytesMut};
11use cid::Cid;
12use futures::ready;
13use futures::{Stream, sink::Sink};
14use fvm_ipld_encoding::to_vec;
15use integer_encoding::{VarInt, VarIntAsyncReader as _};
16use nunny::Vec as NonEmpty;
17use pin_project_lite::pin_project;
18use serde::{Deserialize, Serialize};
19use std::io::{self, Cursor, Read, SeekFrom, Write};
20use std::path::Path;
21use std::pin::Pin;
22use std::task::{Context, Poll};
23use tokio::io::{
24    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
25    Take,
26};
27use tokio_util::codec::{Encoder, FramedRead};
28use tokio_util::either::Either;
29use unsigned_varint::codec::UviBytes;
30
31use crate::utils::encoding::from_slice_with_fallback;
32
33// 512MiB
34const MAX_FRAME_LEN: usize = 512 * 1024 * 1024;
35
36#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
37pub struct CarV1Header {
38    // The roots array must contain one or more CIDs,
39    // each of which should be present somewhere in the remainder of the CAR.
40    // See <https://ipld.io/specs/transport/car/carv1/#constraints>
41    pub roots: NonEmpty<Cid>,
42    pub version: u64,
43}
44
45/// <https://ipld.io/specs/transport/car/carv2/#header>
46#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
47pub struct CarV2Header {
48    pub characteristics: [u8; 16],
49    pub data_offset: i64,
50    pub data_size: i64,
51    pub index_offset: i64,
52}
53
54#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
55pub struct CarBlock {
56    pub cid: Cid,
57    pub data: Vec<u8>,
58}
59
60impl CarBlock {
61    // Write a varint frame containing the cid and the data
62    pub fn write(&self, writer: &mut impl Write) -> io::Result<()> {
63        writer.write_car_block(
64            self.cid,
65            self.data.len() as u64,
66            &mut Cursor::new(&self.data),
67        )
68    }
69
70    pub fn from_bytes(bytes: impl Into<Bytes>) -> io::Result<CarBlock> {
71        let bytes: Bytes = bytes.into();
72        let mut cursor = bytes.reader();
73        let cid = Cid::read_bytes(&mut cursor)
74            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
75        let bytes = cursor.into_inner();
76        Ok(CarBlock {
77            cid,
78            data: bytes.to_vec(),
79        })
80    }
81
82    pub fn valid(&self) -> bool {
83        self.validate().is_ok()
84    }
85
86    pub fn validate(&self) -> anyhow::Result<()> {
87        let actual = {
88            let code = MultihashCode::try_from(self.cid.hash().code())?;
89            Cid::new_v1(self.cid.codec(), code.digest(&self.data))
90        };
91        anyhow::ensure!(
92            actual == self.cid,
93            "CID/Block mismatch for block {}, actual: {actual}",
94            self.cid
95        );
96        Ok(())
97    }
98}
99
100pub trait CarBlockWrite {
101    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()>;
102}
103
104impl<T: Write> CarBlockWrite for T {
105    fn write_car_block(&mut self, cid: Cid, data_len: u64, data: &mut impl Read) -> io::Result<()> {
106        let frame_length = cid.encoded_len() as u64 + data_len;
107        self.write_all(&frame_length.encode_var_vec())?;
108        cid.write_bytes(&mut *self)
109            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
110        std::io::copy(data, self)?;
111        Ok(())
112    }
113}
114
115pin_project! {
116    /// Stream of CAR blocks. If the input data is compressed with zstd, it will
117    /// automatically be decompressed.
118    /// Note that [`CarStream`] automatically skips the metadata block and F3 data
119    /// block defined in [`FRC-0108`](https://github.com/filecoin-project/FIPs/blob/master/FRCs/frc-0108.md)
120    pub struct CarStream<ReaderT> {
121        #[pin]
122        reader: FramedRead<Take<Either<ReaderT, ZstdDecoder<ReaderT>>>,UviBytes>,
123        pub header_v1: CarV1Header,
124        pub header_v2: Option<CarV2Header>,
125        pub metadata: Option<FilecoinSnapshotMetadata>,
126        first_block: Option<CarBlock>,
127    }
128}
129
130// This method checks the header in order to see whether or not we are operating on a zstd
131// archive. The zstd header has a maximum size of 18 bytes:
132// https://github.com/facebook/zstd/blob/dev/doc/zstd_compression_format.md#zstandard-frames.
133fn is_zstd(buf: &[u8]) -> bool {
134    zstd::zstd_safe::get_frame_content_size(buf).is_ok()
135}
136
137impl<ReaderT: AsyncBufRead + Unpin> CarStream<ReaderT> {
138    /// Create a stream with automatic but unsafe CARv2 header extraction.
139    ///
140    /// Note that if the input is zstd compressed, the CARv2 header extraction
141    /// is on a best efforts basis. It could fail when `reader.fill_buf()` is insufficient
142    /// for decoding the first zstd frame, and treat input as CARv1, because this method
143    /// does not require the input to be [`tokio::io::AsyncSeek`].
144    /// It's recommended to use [`CarStream::new`] for zstd compressed CARv2 input.
145    #[allow(dead_code)]
146    pub async fn new_unsafe(mut reader: ReaderT) -> io::Result<Self> {
147        let header_v2 = Self::try_decode_header_v2_from_fill_buf(reader.fill_buf().await?)
148            // treat input as CARv1 if zstd decoding failed
149            .ok()
150            .flatten();
151        Self::new_with_header_v2(reader, header_v2).await
152    }
153
154    /// Create a stream with pre-extracted CARv2 header
155    pub async fn new_with_header_v2(
156        mut reader: ReaderT,
157        header_v2: Option<CarV2Header>,
158    ) -> io::Result<Self> {
159        let is_compressed = is_zstd(reader.fill_buf().await?);
160        let mut reader = if is_compressed {
161            let mut zstd = ZstdDecoder::new(reader);
162            zstd.multiple_members(true);
163            Either::Right(zstd)
164        } else {
165            Either::Left(reader)
166        };
167
168        // Skip v2 header bytes
169        if let Some(header_v2) = &header_v2 {
170            reader = skip_bytes(
171                reader,
172                u64::try_from(header_v2.data_offset).map_err(std::io::Error::other)?,
173            )
174            .await?;
175        }
176
177        let max_car_v1_bytes = header_v2
178            .as_ref()
179            .map(|h| u64::try_from(h.data_size).map_err(std::io::Error::other))
180            .transpose()?
181            .unwrap_or(u64::MAX);
182        let mut reader = reader.take(max_car_v1_bytes);
183        let header_v1 = read_v1_header(&mut reader).await?;
184
185        // Read the first block and check if it is valid. This check helps to
186        // catch invalid CAR files as soon as we open.
187        if let Some(block) = read_car_block(&mut reader).await? {
188            if !block.valid() {
189                return Err(io::Error::new(
190                    io::ErrorKind::InvalidData,
191                    "invalid first block",
192                ));
193            }
194
195            let (first_block, metadata) = if header_v1.roots.len()
196                == crate::db::car::V2_SNAPSHOT_ROOT_COUNT
197            {
198                let maybe_metadata_cid = header_v1.roots.first();
199                if maybe_metadata_cid == &block.cid
200                    && let Ok(metadata) =
201                        fvm_ipld_encoding::from_slice::<FilecoinSnapshotMetadata>(&block.data)
202                {
203                    // Skip the F3 block in the block stream
204                    if metadata.f3_data.is_some() {
205                        // 16GiB
206                        const MAX_F3_FRAME_LEN: u64 = 16 * 1024 * 1024 * 1024;
207                        let len: u64 = reader.read_varint_async().await?;
208                        if len > MAX_F3_FRAME_LEN {
209                            return Err(io::Error::new(
210                                io::ErrorKind::InvalidData,
211                                format!(
212                                    "f3 block frame length too large: {len} > {MAX_F3_FRAME_LEN}"
213                                ),
214                            ));
215                        }
216                        reader = skip_bytes(reader, len).await?;
217                    }
218
219                    // Skip the metadata block in the block stream
220                    (None, Some(metadata))
221                } else {
222                    (Some(block), None)
223                }
224            } else {
225                (Some(block), None)
226            };
227
228            Ok(CarStream {
229                reader: FramedRead::new(reader, uvi_bytes()),
230                header_v1,
231                header_v2,
232                metadata,
233                first_block,
234            })
235        } else {
236            Ok(CarStream {
237                reader: FramedRead::new(reader, uvi_bytes()),
238                header_v1,
239                header_v2,
240                metadata: None,
241                first_block: None,
242            })
243        }
244    }
245
246    /// Extracts CARv2 header from the input, returns the reader and CARv2 header.
247    ///
248    /// Note that position of the input reader has to be reset before calling [`CarStream::new_with_header_v2`].
249    /// Use [`CarStream::extract_header_v2_and_reset_reader_position`] to automatically reset stream position.
250    pub async fn extract_header_v2(
251        mut reader: ReaderT,
252    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
253        let is_compressed = is_zstd(reader.fill_buf().await?);
254        let mut reader = if is_compressed {
255            let mut zstd = ZstdDecoder::new(reader);
256            zstd.multiple_members(true);
257            Either::Right(zstd)
258        } else {
259            Either::Left(reader)
260        };
261        let mut possible_header_bytes = [0; 51];
262        reader.read_exact(&mut possible_header_bytes).await?;
263        let header_v2 = read_v2_header(possible_header_bytes.as_slice())?;
264        let reader = match reader {
265            Either::Left(reader) => reader,
266            Either::Right(zstd) => zstd.into_inner(),
267        };
268        Ok((reader, header_v2))
269    }
270
271    fn try_decode_header_v2_from_fill_buf(fill_buf: &[u8]) -> io::Result<Option<CarV2Header>> {
272        let is_compressed = is_zstd(fill_buf);
273        let fill_buf_reader = if is_compressed {
274            itertools::Either::Right(zstd::Decoder::new(fill_buf)?)
275        } else {
276            itertools::Either::Left(fill_buf)
277        };
278        read_v2_header(fill_buf_reader)
279    }
280}
281
282impl<ReaderT: AsyncBufRead + AsyncSeek + Unpin> CarStream<ReaderT> {
283    /// Create a stream with automatic CARv2 header extraction.
284    pub async fn new(reader: ReaderT) -> io::Result<Self> {
285        let (reader, header_v2) = Self::extract_header_v2_and_reset_reader_position(reader).await?;
286        Self::new_with_header_v2(reader, header_v2).await
287    }
288
289    /// Extracts CARv2 header from the input, resets the reader position and returns the reader and CARv2 header.
290    pub async fn extract_header_v2_and_reset_reader_position(
291        mut reader: ReaderT,
292    ) -> io::Result<(ReaderT, Option<CarV2Header>)> {
293        let stream_position = reader.stream_position().await?;
294        let (mut reader, header_v2) = Self::extract_header_v2(reader).await?;
295        reader.seek(SeekFrom::Start(stream_position)).await?;
296        Ok((reader, header_v2))
297    }
298}
299
300impl<ReaderT> CarStream<ReaderT> {
301    pub fn head_tipset_key(&self) -> TipsetKey {
302        self.metadata
303            .as_ref()
304            .map(|m| m.head_tipset_key.clone())
305            .unwrap_or_else(|| self.header_v1.roots.clone())
306            .into()
307    }
308}
309
310impl CarStream<tokio::io::BufReader<tokio::fs::File>> {
311    pub async fn new_from_path(path: impl AsRef<Path>) -> io::Result<Self> {
312        Self::new(tokio::io::BufReader::new(
313            tokio::fs::File::open(path.as_ref()).await?,
314        ))
315        .await
316    }
317}
318
319impl<ReaderT: AsyncBufRead> Stream for CarStream<ReaderT> {
320    type Item = io::Result<CarBlock>;
321
322    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
323        let this = self.project();
324        if let Some(block) = this.first_block.take() {
325            return Poll::Ready(Some(Ok(block)));
326        }
327        let item = futures::ready!(this.reader.poll_next(cx));
328        Poll::Ready(item.map(|ret| ret.and_then(CarBlock::from_bytes)))
329    }
330}
331
332pin_project! {
333    pub struct CarWriter<W> {
334        #[pin]
335        inner: W,
336        buffer: BytesMut,
337    }
338}
339
340impl<W: AsyncWrite> CarWriter<W> {
341    pub fn new_carv1(roots: NonEmpty<Cid>, writer: W) -> io::Result<Self> {
342        let car_header = CarV1Header { roots, version: 1 };
343
344        let mut header_uvi_frame = BytesMut::new();
345        uvi_bytes().encode(Bytes::from(to_vec(&car_header)?), &mut header_uvi_frame)?;
346
347        Ok(Self {
348            inner: writer,
349            buffer: header_uvi_frame,
350        })
351    }
352}
353
354impl<W: AsyncWrite> Sink<CarBlock> for CarWriter<W> {
355    type Error = io::Error;
356
357    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
358        let mut this = self.as_mut().project();
359
360        while !this.buffer.is_empty() {
361            this = self.as_mut().project();
362            let bytes_written = ready!(this.inner.poll_write(cx, this.buffer))?;
363            this.buffer.advance(bytes_written);
364        }
365        Poll::Ready(Ok(()))
366    }
367    fn start_send(self: Pin<&mut Self>, item: CarBlock) -> Result<(), Self::Error> {
368        item.write(&mut self.project().buffer.writer())
369    }
370    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
371        ready!(self.as_mut().poll_ready(cx))?;
372        self.project().inner.poll_flush(cx)
373    }
374    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
375        ready!(self.as_mut().poll_ready(cx))?;
376        self.project().inner.poll_shutdown(cx)
377    }
378}
379
380async fn read_v1_header<ReaderT: AsyncRead + Unpin>(
381    reader: &mut ReaderT,
382) -> std::io::Result<CarV1Header> {
383    let Some(frame) = read_frame(reader).await? else {
384        return Err(std::io::Error::new(
385            std::io::ErrorKind::UnexpectedEof,
386            "failed to decode v1 header frame",
387        ));
388    };
389    let header = from_slice_with_fallback::<CarV1Header>(&frame).map_err(std::io::Error::other)?;
390    if header.version != 1 {
391        return Err(std::io::Error::other(format!(
392            "unexpected header version {}, 1 expected",
393            header.version
394        )));
395    }
396    Ok(header)
397}
398
399async fn read_frame<ReaderT: AsyncRead + Unpin>(
400    reader: &mut ReaderT,
401) -> std::io::Result<Option<Vec<u8>>> {
402    let len: usize = match reader.read_varint_async().await {
403        Ok(len) if len > MAX_FRAME_LEN => {
404            return Err(std::io::Error::new(
405                std::io::ErrorKind::InvalidData,
406                format!("frame too large: {len} > {MAX_FRAME_LEN}"),
407            ));
408        }
409        Ok(len) => len,
410        Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => return Ok(None),
411        Err(e) => return Err(e),
412    };
413    let mut bytes = vec![0; len];
414    reader.read_exact(&mut bytes[..]).await?;
415    Ok(Some(bytes))
416}
417
418async fn read_car_block<ReaderT: AsyncRead + Unpin>(
419    reader: &mut ReaderT,
420) -> std::io::Result<Option<CarBlock>> {
421    read_frame(reader)
422        .await?
423        .map(CarBlock::from_bytes)
424        .transpose()
425}
426
427pub fn uvi_bytes() -> UviBytes {
428    let mut decoder = UviBytes::default();
429    decoder.set_max_len(MAX_FRAME_LEN);
430    decoder
431}
432
433#[cfg(test)]
434mod tests;