Skip to main content

forest/db/car/
plain.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! # Varint frames
5//!
6//! CARs are made of concatenations of _varint frames_. Each varint frame is a concatenation of the
7//! _body length_ as an
8//! [varint](https://docs.rs/integer-encoding/4.0.0/integer_encoding/trait.VarInt.html), and the
9//! _frame body_ itself. [`unsigned_varint::codec::UviBytes`] can be used to read frames
10//! piecewise into memory.
11//!
12//! ```text
13//!        varint frame
14//! │◄───────────────────────►│
15//! │                         │
16//! ├───────────┬─────────────┤
17//! │varint:    │             │
18//! │body length│frame body   │
19//! └───────────┼─────────────┤
20//!             │             │
21//! frame body ►│◄───────────►│
22//!     offset     =body length
23//! ```
24//!
25//! # CARv1 layout and seeking
26//!
27//! The first varint frame is a _header frame_, where the frame body is a [`CarHeader`] encoded
28//! using [`ipld_dagcbor`](serde_ipld_dagcbor).
29//!
30//! Subsequent varint frames are _block frames_, where the frame body is a concatenation of a
31//! [`Cid`] and the _block data_ addressed by that CID.
32//!
33//! ```text
34//! block frame ►│
35//! body offset  │
36//!              │  =body length
37//!              │◄────────────►│
38//!  ┌───────────┼───┬──────────┤
39//!  │body length│cid│block data│
40//!  └───────────┴───┼──────────┤
41//!                  │◄────────►│
42//!                  │  =block data length
43//!      block data  │
44//!          offset ►│
45//! ```
46//!
47//! ## Block ordering
48//! > _... a filecoin-deterministic car-file is currently implementation-defined as containing all
49//! > DAG-forming blocks in first-seen order, as a result of a depth-first DAG traversal starting
50//! > from a single root._
51//! - [CAR documentation](https://ipld.io/specs/transport/car/carv1/#determinism)
52//!
53//! # Future work
54//! - [`fadvise`](https://linux.die.net/man/2/posix_fadvise)-based APIs to pre-fetch parts of the
55//!   file, to improve random access performance.
56//! - Use an inner [`Blockstore`] for writes.
57//! - Use safe arithmetic for all operations - a malicious frame shouldn't cause a crash.
58//! - Theoretically, file-backed blockstores should be clonable (or even [`Sync`]) with very low
59//!   overhead, so that multiple threads could perform operations concurrently.
60//! - CARv2 support
61//! - A wrapper that abstracts over car formats for reading.
62
63use crate::chain::FilecoinSnapshotMetadata;
64use crate::cid_collections::CidHashMap;
65use crate::db::PersistentStore;
66use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
67use crate::{
68    blocks::{Tipset, TipsetKey},
69    utils::encoding::from_slice_with_fallback,
70};
71use cid::Cid;
72use fvm_ipld_blockstore::Blockstore;
73use fvm_ipld_encoding::CborStore as _;
74use integer_encoding::{FixedIntReader, VarIntReader};
75use nunny::Vec as NonEmpty;
76use parking_lot::RwLock;
77use positioned_io::ReadAt;
78use std::{
79    io::{
80        self, BufReader,
81        ErrorKind::{InvalidData, Unsupported},
82        Read, Seek, SeekFrom,
83    },
84    iter,
85    sync::OnceLock,
86};
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tracing::{debug, trace};
89
90/// **Note that all operations on this store are blocking**.
91///
92/// It can often be time, memory, or disk prohibitive to read large snapshots into a database like
93/// [`ParityDb`](crate::db::parity_db::ParityDb).
94///
95/// This is an implementer of [`Blockstore`] that simply wraps an uncompressed [CARv1
96/// file](https://ipld.io/specs/transport/car/carv1).
97///
98/// On creation, [`PlainCar`] builds an in-memory index of the [`Cid`]s in the file,
99/// and their offsets into that file.
100/// Note that it prepares its own buffer for doing so.
101///
102/// When a block is requested, [`PlainCar`] scrolls to that offset, and reads the block, on-demand.
103///
104/// Writes for new blocks (which don't exist in the CAR already) are not supported.
105///
106/// Random-access performance is expected to be poor, as the OS will have to load separate parts of
107/// the file from disk, and flush it for each read. However, (near) linear access should be pretty
108/// good, as file chunks will be pre-fetched.
109///
110/// See [module documentation](mod@self) for more.
111pub struct PlainCar<ReaderT> {
112    reader: ReaderT,
113    index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
114    version: u64,
115    header_v1: CarV1Header,
116    header_v2: Option<CarV2Header>,
117    metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
118}
119
120impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
121    /// To be correct:
122    /// - `reader` must read immutable data. e.g if it is a file, it should be
123    ///   [`flock`](https://linux.die.net/man/2/flock)ed.
124    ///   [`Blockstore`] API calls may panic if this is not upheld.
125    #[tracing::instrument(level = "debug", skip_all)]
126    pub fn new(reader: ReaderT) -> io::Result<Self> {
127        let mut cursor = positioned_io::Cursor::new(&reader);
128        let position = cursor.position();
129        let header_v2 = read_v2_header(&mut cursor)?;
130        let (limit_position, version) =
131            if let Some(header_v2) = &header_v2 {
132                cursor.set_position(position.saturating_add(
133                    u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
134                ));
135                (
136                    Some(cursor.stream_position()?.saturating_add(
137                        u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
138                    )),
139                    2,
140                )
141            } else {
142                cursor.set_position(position);
143                (None, 1)
144            };
145
146        let header_v1 = read_v1_header(&mut cursor)?;
147        // When indexing, we perform small reads of the length and CID before seeking
148        // Buffering these gives us a ~50% speedup (n=10): https://github.com/ChainSafe/forest/pull/3085#discussion_r1246897333
149        let mut buf_reader = BufReader::with_capacity(1024, cursor);
150
151        // now create the index
152        let index = iter::from_fn(|| {
153            read_block_data_location_and_skip(&mut buf_reader, limit_position).transpose()
154        })
155        .collect::<Result<CidHashMap<_>, _>>()?;
156
157        match index.len() {
158            0 => Err(io::Error::new(
159                InvalidData,
160                "CARv1 files must contain at least one block",
161            )),
162            num_blocks => {
163                debug!(num_blocks, "indexed CAR");
164                Ok(Self {
165                    reader,
166                    index: RwLock::new(index),
167                    version,
168                    header_v1,
169                    header_v2,
170                    metadata: OnceLock::new(),
171                })
172            }
173        }
174    }
175
176    pub fn metadata(&self) -> Option<&FilecoinSnapshotMetadata> {
177        self.metadata
178            .get_or_init(|| {
179                if self.header_v1.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
180                    let maybe_metadata_cid = self.header_v1.roots.first();
181                    if let Ok(Some(metadata)) =
182                        self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
183                    {
184                        return Some(metadata);
185                    }
186                }
187                None
188            })
189            .as_ref()
190    }
191
192    pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
193        // head tipset key is stored in v2 snapshot metadata
194        // See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
195        if let Some(metadata) = self.metadata() {
196            &metadata.head_tipset_key
197        } else {
198            &self.header_v1.roots
199        }
200    }
201
202    pub fn version(&self) -> u64 {
203        self.version
204    }
205
206    pub fn heaviest_tipset_key(&self) -> TipsetKey {
207        TipsetKey::from(self.head_tipset_key().clone())
208    }
209
210    pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
211        Tipset::load_required(self, &self.heaviest_tipset_key())
212    }
213
214    /// In an arbitrary order
215    #[cfg(test)]
216    pub fn cids(&self) -> Vec<Cid> {
217        self.index.read().keys().collect()
218    }
219
220    pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
221        PlainCar {
222            reader: Box::new(self.reader),
223            index: self.index,
224            version: self.version,
225            header_v1: self.header_v1,
226            header_v2: self.header_v2,
227            metadata: self.metadata,
228        }
229    }
230
231    /// Gets a reader of the block data by its `Cid`
232    pub fn get_reader(&self, k: Cid) -> Option<impl Read> {
233        self.index
234            .read()
235            .get(&k)
236            .map(|UncompressedBlockDataLocation { offset, length }| {
237                positioned_io::Cursor::new_pos(&self.reader, *offset).take(u64::from(*length))
238            })
239    }
240}
241
242impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> {
243    type Error = io::Error;
244    fn try_from(bytes: &'static [u8]) -> io::Result<Self> {
245        PlainCar::new(bytes)
246    }
247}
248
249/// If you seek to `offset` (from the start of the file), and read `length` bytes,
250/// you should get data that corresponds to a [`Cid`] (but NOT the [`Cid`] itself).
251#[derive(Debug, serde::Serialize, serde::Deserialize)]
252pub struct UncompressedBlockDataLocation {
253    offset: u64,
254    length: u32,
255}
256
257impl<ReaderT> Blockstore for PlainCar<ReaderT>
258where
259    ReaderT: ReadAt,
260{
261    #[tracing::instrument(level = "trace", skip(self))]
262    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
263        match self.index.read().get(k) {
264            Some(UncompressedBlockDataLocation { offset, length }) => {
265                trace!("fetching from disk");
266                let mut data = vec![0; usize::try_from(*length).unwrap()];
267                self.reader.read_exact_at(*offset, &mut data)?;
268                Ok(Some(data))
269            }
270            None => {
271                trace!("not found");
272                Ok(None)
273            }
274        }
275    }
276
277    /// Not supported, use [`super::ManyCar`] instead.
278    fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
279        unreachable!("PlainCar is read-only, use ManyCar instead");
280    }
281}
282
283impl<ReaderT> PersistentStore for PlainCar<ReaderT>
284where
285    ReaderT: ReadAt,
286{
287    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
288        self.put_keyed(k, block)
289    }
290}
291
292pub async fn write_skip_frame_header_async(
293    mut writer: impl AsyncWrite + Unpin,
294    data_len: u32,
295) -> std::io::Result<()> {
296    writer
297        .write_all(&super::forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER)
298        .await?;
299    writer.write_all(&data_len.to_le_bytes()).await?;
300    Ok(())
301}
302
303fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
304    match cid_error {
305        cid::Error::Io(io_error) => io_error,
306        other => io::Error::new(InvalidData, other),
307    }
308}
309
310/// <https://ipld.io/specs/transport/car/carv2/#header>
311/// ```text
312/// start ►│    reader end ►│
313///        ├──────┬─────────┤
314///        │pragma│v2 header│
315///        └──────┴─────────┘
316/// ```
317pub fn read_v2_header(mut reader: impl Read) -> io::Result<Option<CarV2Header>> {
318    /// <https://ipld.io/specs/transport/car/carv2/#pragma>
319    const CAR_V2_PRAGMA: [u8; 10] = [0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02];
320
321    let len = reader.read_fixedint::<u8>()? as usize;
322    if len == CAR_V2_PRAGMA.len() {
323        let mut buffer = vec![0; len];
324        reader.read_exact(&mut buffer)?;
325        if buffer[..] == CAR_V2_PRAGMA {
326            let mut characteristics = [0; 16];
327            reader.read_exact(&mut characteristics)?;
328            let data_offset: i64 = reader.read_fixedint()?;
329            let data_size: i64 = reader.read_fixedint()?;
330            let index_offset: i64 = reader.read_fixedint()?;
331            return Ok(Some(CarV2Header {
332                characteristics,
333                data_offset,
334                data_size,
335                index_offset,
336            }));
337        }
338    }
339    Ok(None)
340}
341
342/// ```text
343/// start ►│         reader end ►│
344///        ├───────────┬─────────┤
345///        │body length│v1 header│
346///        └───────────┴─────────┘
347/// ```
348#[tracing::instrument(level = "trace", skip_all, ret)]
349fn read_v1_header(mut reader: impl Read) -> io::Result<CarV1Header> {
350    let header_len = reader.read_varint()?;
351    let mut buffer = vec![0; header_len];
352    reader.read_exact(&mut buffer)?;
353    let header: CarV1Header =
354        from_slice_with_fallback(&buffer).map_err(|e| io::Error::new(InvalidData, e))?;
355    if header.version == 1 {
356        Ok(header)
357    } else {
358        Err(io::Error::new(
359            Unsupported,
360            format!("unsupported CAR version {}", header.version),
361        ))
362    }
363}
364
365/// Returns ([`Cid`], the `block data offset` and `block data length`)
366/// ```text
367/// start ►│              reader end ►│
368///        ├───────────┬───┬──────────┤
369///        │body length│cid│block data│
370///        └───────────┴───┼──────────┤
371///                        │◄────────►│
372///                        │  =block data length
373///            block data  │
374///                offset ►│
375/// ```
376/// Importantly, we seek `block data length`, rather than read any in.
377/// This allows us to keep indexing fast.
378///
379/// [`Ok(None)`] on EOF
380#[tracing::instrument(level = "trace", skip_all, ret)]
381fn read_block_data_location_and_skip(
382    mut reader: impl Read + Seek,
383    limit_position: Option<u64>,
384) -> io::Result<Option<(Cid, UncompressedBlockDataLocation)>> {
385    if let Some(limit_position) = limit_position
386        && reader.stream_position()? >= limit_position
387    {
388        return Ok(None);
389    }
390    let Some(body_length) = read_varint_body_length_or_eof(&mut reader)? else {
391        return Ok(None);
392    };
393    let frame_body_offset = reader.stream_position()?;
394    let mut reader = CountRead::new(&mut reader);
395    let cid = Cid::read_bytes(&mut reader).map_err(cid_error_to_io_error)?;
396
397    // counting the read bytes saves us a syscall for finding block data offset
398    let cid_length = reader.bytes_read();
399    let block_data_offset = frame_body_offset + u64::try_from(cid_length).unwrap();
400    let next_frame_offset = frame_body_offset + u64::from(body_length);
401    let block_data_length = u32::try_from(next_frame_offset - block_data_offset).unwrap();
402    reader
403        .into_inner()
404        .seek(SeekFrom::Start(next_frame_offset))?;
405    Ok(Some((
406        cid,
407        UncompressedBlockDataLocation {
408            offset: block_data_offset,
409            length: block_data_length,
410        },
411    )))
412}
413
414/// Reads `body length`, leaving the reader at the start of a varint frame,
415/// or returns [`Ok(None)`] if we've reached EOF
416/// ```text
417/// start ►│
418///        ├───────────┬─────────────┐
419///        │varint:    │             │
420///        │body length│frame body   │
421///        └───────────┼─────────────┘
422///        reader end ►│
423/// ```
424fn read_varint_body_length_or_eof(mut reader: impl Read) -> io::Result<Option<u32>> {
425    let mut byte = [0u8; 1]; // detect EOF
426    match reader.read(&mut byte)? {
427        0 => Ok(None),
428        1 => (byte.chain(reader)).read_varint().map(Some),
429        _ => unreachable!(),
430    }
431}
432
433/// A reader that keeps track of how many bytes it has read.
434///
435/// This is useful for calculating the _block data length_ when the (_varint frame_) _body length_ is known.
436struct CountRead<ReadT> {
437    inner: ReadT,
438    count: usize,
439}
440
441impl<ReadT> CountRead<ReadT> {
442    pub fn new(inner: ReadT) -> Self {
443        Self { inner, count: 0 }
444    }
445    pub fn bytes_read(&self) -> usize {
446        self.count
447    }
448    pub fn into_inner(self) -> ReadT {
449        self.inner
450    }
451}
452
453impl<ReadT> Read for CountRead<ReadT>
454where
455    ReadT: Read,
456{
457    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
458        let n = self.inner.read(buf)?;
459        self.count += n;
460        Ok(n)
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use crate::utils::db::{
468        car_stream::{CarStream, CarV1Header},
469        car_util::load_car,
470    };
471    use futures::TryStreamExt as _;
472    use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
473    use std::io::Cursor;
474    use std::sync::LazyLock;
475    use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};
476    use tokio_test::block_on;
477
478    #[test]
479    fn test_uncompressed_v1() {
480        let car = chain4_car();
481        let car_backed = PlainCar::new(car).unwrap();
482
483        assert_eq!(car_backed.version(), 1);
484        assert_eq!(car_backed.head_tipset_key().len(), 1);
485        assert_eq!(car_backed.cids().len(), 1222);
486
487        let reference_car = reference(Cursor::new(car));
488        let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
489        let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
490        for cid in car_backed.cids() {
491            let expected = reference_car.get(&cid).unwrap().unwrap();
492            let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
493            let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
494            let mut expected4 = vec![];
495            car_backed
496                .get_reader(cid)
497                .unwrap()
498                .read_to_end(&mut expected4)
499                .unwrap();
500            let actual = car_backed.get(&cid).unwrap().unwrap();
501            assert_eq!(expected, actual);
502            assert_eq!(expected2, actual);
503            assert_eq!(expected3, actual);
504            assert_eq!(expected4, actual);
505        }
506    }
507
508    #[test]
509    fn test_uncompressed_v2() {
510        let car = carv2_car();
511        let car_backed = PlainCar::new(car).unwrap();
512
513        assert_eq!(car_backed.version(), 2);
514        assert_eq!(car_backed.head_tipset_key().len(), 1);
515        assert_eq!(car_backed.cids().len(), 7153);
516
517        let reference_car = reference(Cursor::new(car));
518        let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
519        let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
520        for cid in car_backed.cids() {
521            let expected = reference_car.get(&cid).unwrap().unwrap();
522            let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
523            let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
524            let actual = car_backed.get(&cid).unwrap().unwrap();
525            assert_eq!(expected, actual);
526            assert_eq!(expected2, actual);
527            assert_eq!(expected3, actual);
528        }
529    }
530
531    fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
532        let blockstore = MemoryBlockstore::new();
533        block_on(load_car(&blockstore, reader)).unwrap();
534        blockstore
535    }
536
537    fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
538        let blockstore = MemoryBlockstore::new();
539        block_on(load_car_unsafe(&blockstore, reader)).unwrap();
540        blockstore
541    }
542
543    pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
544    where
545        R: AsyncBufRead + Unpin,
546    {
547        let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
548        while let Some(block) = stream.try_next().await? {
549            db.put_keyed(&block.cid, &block.data)?;
550        }
551        Ok(stream.header_v1)
552    }
553
554    fn chain4_car_zst() -> &'static [u8] {
555        include_bytes!("../../../test-snapshots/chain4.car.zst")
556    }
557
558    fn chain4_car() -> &'static [u8] {
559        static CAR: LazyLock<Vec<u8>> =
560            LazyLock::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
561        CAR.as_slice()
562    }
563
564    fn carv2_car_zst() -> &'static [u8] {
565        include_bytes!("../../../test-snapshots/carv2.car.zst")
566    }
567
568    fn carv2_car() -> &'static [u8] {
569        static CAR: LazyLock<Vec<u8>> =
570            LazyLock::new(|| zstd::decode_all(carv2_car_zst()).unwrap());
571        CAR.as_slice()
572    }
573}