forest/db/car/
plain.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4//! # Varint frames
5//!
6//! CARs are made of concatenations of _varint frames_. Each varint frame is a concatenation of the
7//! _body length_ as an
8//! [varint](https://docs.rs/integer-encoding/4.0.0/integer_encoding/trait.VarInt.html), and the
9//! _frame body_ itself. [`unsigned_varint::codec::UviBytes`] can be used to read frames
10//! piecewise into memory.
11//!
12//! ```text
13//!        varint frame
14//! │◄───────────────────────►│
15//! │                         │
16//! ├───────────┬─────────────┤
17//! │varint:    │             │
18//! │body length│frame body   │
19//! └───────────┼─────────────┤
20//!             │             │
21//! frame body ►│◄───────────►│
22//!     offset     =body length
23//! ```
24//!
25//! # CARv1 layout and seeking
26//!
27//! The first varint frame is a _header frame_, where the frame body is a [`CarHeader`] encoded
28//! using [`ipld_dagcbor`](serde_ipld_dagcbor).
29//!
30//! Subsequent varint frames are _block frames_, where the frame body is a concatenation of a
31//! [`Cid`] and the _block data_ addressed by that CID.
32//!
33//! ```text
34//! block frame ►│
35//! body offset  │
36//!              │  =body length
37//!              │◄────────────►│
38//!  ┌───────────┼───┬──────────┤
39//!  │body length│cid│block data│
40//!  └───────────┴───┼──────────┤
41//!                  │◄────────►│
42//!                  │  =block data length
43//!      block data  │
44//!          offset ►│
45//! ```
46//!
47//! ## Block ordering
48//! > _... a filecoin-deterministic car-file is currently implementation-defined as containing all
49//! > DAG-forming blocks in first-seen order, as a result of a depth-first DAG traversal starting
50//! > from a single root._
51//! - [CAR documentation](https://ipld.io/specs/transport/car/carv1/#determinism)
52//!
53//! # Future work
54//! - [`fadvise`](https://linux.die.net/man/2/posix_fadvise)-based APIs to pre-fetch parts of the
55//!   file, to improve random access performance.
56//! - Use an inner [`Blockstore`] for writes.
57//! - Use safe arithmetic for all operations - a malicious frame shouldn't cause a crash.
58//! - Theoretically, file-backed blockstores should be clonable (or even [`Sync`]) with very low
59//!   overhead, so that multiple threads could perform operations concurrently.
60//! - CARv2 support
61//! - A wrapper that abstracts over car formats for reading.
62
63use crate::chain::FilecoinSnapshotMetadata;
64use crate::cid_collections::CidHashMap;
65use crate::db::PersistentStore;
66use crate::utils::db::car_stream::{CarV1Header, CarV2Header};
67use crate::{
68    blocks::{Tipset, TipsetKey},
69    utils::encoding::from_slice_with_fallback,
70};
71use cid::Cid;
72use fvm_ipld_blockstore::Blockstore;
73use fvm_ipld_encoding::CborStore as _;
74use integer_encoding::{FixedIntReader, VarIntReader};
75use nunny::Vec as NonEmpty;
76use parking_lot::RwLock;
77use positioned_io::ReadAt;
78use std::{
79    io::{
80        self, BufReader,
81        ErrorKind::{InvalidData, Unsupported},
82        Read, Seek, SeekFrom,
83    },
84    iter,
85    sync::OnceLock,
86};
87use tokio::io::{AsyncWrite, AsyncWriteExt};
88use tracing::{debug, trace};
89
90/// **Note that all operations on this store are blocking**.
91///
92/// It can often be time, memory, or disk prohibitive to read large snapshots into a database like
93/// [`ParityDb`](crate::db::parity_db::ParityDb).
94///
95/// This is an implementer of [`Blockstore`] that simply wraps an uncompressed [CARv1
96/// file](https://ipld.io/specs/transport/car/carv1).
97///
98/// On creation, [`PlainCar`] builds an in-memory index of the [`Cid`]s in the file,
99/// and their offsets into that file.
100/// Note that it prepares its own buffer for doing so.
101///
102/// When a block is requested, [`PlainCar`] scrolls to that offset, and reads the block, on-demand.
103///
104/// Writes for new blocks (which don't exist in the CAR already) are not supported.
105///
106/// Random-access performance is expected to be poor, as the OS will have to load separate parts of
107/// the file from disk, and flush it for each read. However, (near) linear access should be pretty
108/// good, as file chunks will be pre-fetched.
109///
110/// See [module documentation](mod@self) for more.
111pub struct PlainCar<ReaderT> {
112    reader: ReaderT,
113    index: RwLock<CidHashMap<UncompressedBlockDataLocation>>,
114    version: u64,
115    header_v1: CarV1Header,
116    header_v2: Option<CarV2Header>,
117    metadata: OnceLock<Option<FilecoinSnapshotMetadata>>,
118}
119
120impl<ReaderT: super::RandomAccessFileReader> PlainCar<ReaderT> {
121    /// To be correct:
122    /// - `reader` must read immutable data. e.g if it is a file, it should be
123    ///   [`flock`](https://linux.die.net/man/2/flock)ed.
124    ///   [`Blockstore`] API calls may panic if this is not upheld.
125    #[tracing::instrument(level = "debug", skip_all)]
126    pub fn new(reader: ReaderT) -> io::Result<Self> {
127        let mut cursor = positioned_io::Cursor::new(&reader);
128        let position = cursor.position();
129        let header_v2 = read_v2_header(&mut cursor)?;
130        let (limit_position, version) =
131            if let Some(header_v2) = &header_v2 {
132                cursor.set_position(position.saturating_add(
133                    u64::try_from(header_v2.data_offset).map_err(io::Error::other)?,
134                ));
135                (
136                    Some(cursor.stream_position()?.saturating_add(
137                        u64::try_from(header_v2.data_size).map_err(io::Error::other)?,
138                    )),
139                    2,
140                )
141            } else {
142                cursor.set_position(position);
143                (None, 1)
144            };
145
146        let header_v1 = read_v1_header(&mut cursor)?;
147        // When indexing, we perform small reads of the length and CID before seeking
148        // Buffering these gives us a ~50% speedup (n=10): https://github.com/ChainSafe/forest/pull/3085#discussion_r1246897333
149        let mut buf_reader = BufReader::with_capacity(1024, cursor);
150
151        // now create the index
152        let index = iter::from_fn(|| {
153            read_block_data_location_and_skip(&mut buf_reader, limit_position).transpose()
154        })
155        .collect::<Result<CidHashMap<_>, _>>()?;
156
157        match index.len() {
158            0 => Err(io::Error::new(
159                InvalidData,
160                "CARv1 files must contain at least one block",
161            )),
162            num_blocks => {
163                debug!(num_blocks, "indexed CAR");
164                Ok(Self {
165                    reader,
166                    index: RwLock::new(index),
167                    version,
168                    header_v1,
169                    header_v2,
170                    metadata: OnceLock::new(),
171                })
172            }
173        }
174    }
175
176    pub fn metadata(&self) -> &Option<FilecoinSnapshotMetadata> {
177        self.metadata.get_or_init(|| {
178            if self.header_v1.roots.len() == super::V2_SNAPSHOT_ROOT_COUNT {
179                let maybe_metadata_cid = self.header_v1.roots.first();
180                if let Ok(Some(metadata)) =
181                    self.get_cbor::<FilecoinSnapshotMetadata>(maybe_metadata_cid)
182                {
183                    return Some(metadata);
184                }
185            }
186            None
187        })
188    }
189
190    pub fn head_tipset_key(&self) -> &NonEmpty<Cid> {
191        // head tipset key is stored in v2 snapshot metadata
192        // See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
193        if let Some(metadata) = self.metadata() {
194            &metadata.head_tipset_key
195        } else {
196            &self.header_v1.roots
197        }
198    }
199
200    pub fn version(&self) -> u64 {
201        self.version
202    }
203
204    pub fn heaviest_tipset_key(&self) -> TipsetKey {
205        TipsetKey::from(self.head_tipset_key().clone())
206    }
207
208    pub fn heaviest_tipset(&self) -> anyhow::Result<Tipset> {
209        Tipset::load_required(self, &self.heaviest_tipset_key())
210    }
211
212    /// In an arbitrary order
213    #[cfg(test)]
214    pub fn cids(&self) -> Vec<Cid> {
215        self.index.read().keys().collect()
216    }
217
218    pub fn into_dyn(self) -> PlainCar<Box<dyn super::RandomAccessFileReader>> {
219        PlainCar {
220            reader: Box::new(self.reader),
221            index: self.index,
222            version: self.version,
223            header_v1: self.header_v1,
224            header_v2: self.header_v2,
225            metadata: self.metadata,
226        }
227    }
228
229    /// Gets a reader of the block data by its `Cid`
230    pub fn get_reader(&self, k: Cid) -> Option<impl Read> {
231        self.index
232            .read()
233            .get(&k)
234            .map(|UncompressedBlockDataLocation { offset, length }| {
235                positioned_io::Cursor::new_pos(&self.reader, *offset).take(*length as u64)
236            })
237    }
238}
239
240impl TryFrom<&'static [u8]> for PlainCar<&'static [u8]> {
241    type Error = io::Error;
242    fn try_from(bytes: &'static [u8]) -> io::Result<Self> {
243        PlainCar::new(bytes)
244    }
245}
246
247/// If you seek to `offset` (from the start of the file), and read `length` bytes,
248/// you should get data that corresponds to a [`Cid`] (but NOT the [`Cid`] itself).
249#[derive(Debug, serde::Serialize, serde::Deserialize)]
250pub struct UncompressedBlockDataLocation {
251    offset: u64,
252    length: u32,
253}
254
255impl<ReaderT> Blockstore for PlainCar<ReaderT>
256where
257    ReaderT: ReadAt,
258{
259    #[tracing::instrument(level = "trace", skip(self))]
260    fn get(&self, k: &Cid) -> anyhow::Result<Option<Vec<u8>>> {
261        match self.index.read().get(k) {
262            Some(UncompressedBlockDataLocation { offset, length }) => {
263                trace!("fetching from disk");
264                let mut data = vec![0; usize::try_from(*length).unwrap()];
265                self.reader.read_exact_at(*offset, &mut data)?;
266                Ok(Some(data))
267            }
268            None => {
269                trace!("not found");
270                Ok(None)
271            }
272        }
273    }
274
275    /// Not supported, use [`super::ManyCar`] instead.
276    fn put_keyed(&self, _: &Cid, _: &[u8]) -> anyhow::Result<()> {
277        unreachable!("PlainCar is read-only, use ManyCar instead");
278    }
279}
280
281impl<ReaderT> PersistentStore for PlainCar<ReaderT>
282where
283    ReaderT: ReadAt,
284{
285    fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> {
286        self.put_keyed(k, block)
287    }
288}
289
290pub async fn write_skip_frame_header_async(
291    mut writer: impl AsyncWrite + Unpin,
292    data_len: u32,
293) -> std::io::Result<()> {
294    writer
295        .write_all(&super::forest::ZSTD_SKIPPABLE_FRAME_MAGIC_HEADER)
296        .await?;
297    writer.write_all(&data_len.to_le_bytes()).await?;
298    Ok(())
299}
300
301fn cid_error_to_io_error(cid_error: cid::Error) -> io::Error {
302    match cid_error {
303        cid::Error::Io(io_error) => io_error,
304        other => io::Error::new(InvalidData, other),
305    }
306}
307
308/// <https://ipld.io/specs/transport/car/carv2/#header>
309/// ```text
310/// start ►│    reader end ►│
311///        ├──────┬─────────┤
312///        │pragma│v2 header│
313///        └──────┴─────────┘
314/// ```
315pub fn read_v2_header(mut reader: impl Read) -> io::Result<Option<CarV2Header>> {
316    /// <https://ipld.io/specs/transport/car/carv2/#pragma>
317    const CAR_V2_PRAGMA: [u8; 10] = [0xa1, 0x67, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x02];
318
319    let len = reader.read_fixedint::<u8>()? as usize;
320    if len == CAR_V2_PRAGMA.len() {
321        let mut buffer = vec![0; len];
322        reader.read_exact(&mut buffer)?;
323        if buffer[..] == CAR_V2_PRAGMA {
324            let mut characteristics = [0; 16];
325            reader.read_exact(&mut characteristics)?;
326            let data_offset: i64 = reader.read_fixedint()?;
327            let data_size: i64 = reader.read_fixedint()?;
328            let index_offset: i64 = reader.read_fixedint()?;
329            return Ok(Some(CarV2Header {
330                characteristics,
331                data_offset,
332                data_size,
333                index_offset,
334            }));
335        }
336    }
337    Ok(None)
338}
339
340/// ```text
341/// start ►│         reader end ►│
342///        ├───────────┬─────────┤
343///        │body length│v1 header│
344///        └───────────┴─────────┘
345/// ```
346#[tracing::instrument(level = "trace", skip_all, ret)]
347fn read_v1_header(mut reader: impl Read) -> io::Result<CarV1Header> {
348    let header_len = reader.read_varint()?;
349    let mut buffer = vec![0; header_len];
350    reader.read_exact(&mut buffer)?;
351    let header: CarV1Header =
352        from_slice_with_fallback(&buffer).map_err(|e| io::Error::new(InvalidData, e))?;
353    if header.version == 1 {
354        Ok(header)
355    } else {
356        Err(io::Error::new(
357            Unsupported,
358            format!("unsupported CAR version {}", header.version),
359        ))
360    }
361}
362
363/// Returns ([`Cid`], the `block data offset` and `block data length`)
364/// ```text
365/// start ►│              reader end ►│
366///        ├───────────┬───┬──────────┤
367///        │body length│cid│block data│
368///        └───────────┴───┼──────────┤
369///                        │◄────────►│
370///                        │  =block data length
371///            block data  │
372///                offset ►│
373/// ```
374/// Importantly, we seek `block data length`, rather than read any in.
375/// This allows us to keep indexing fast.
376///
377/// [`Ok(None)`] on EOF
378#[tracing::instrument(level = "trace", skip_all, ret)]
379fn read_block_data_location_and_skip(
380    mut reader: impl Read + Seek,
381    limit_position: Option<u64>,
382) -> io::Result<Option<(Cid, UncompressedBlockDataLocation)>> {
383    if let Some(limit_position) = limit_position
384        && reader.stream_position()? >= limit_position
385    {
386        return Ok(None);
387    }
388    let Some(body_length) = read_varint_body_length_or_eof(&mut reader)? else {
389        return Ok(None);
390    };
391    let frame_body_offset = reader.stream_position()?;
392    let mut reader = CountRead::new(&mut reader);
393    let cid = Cid::read_bytes(&mut reader).map_err(cid_error_to_io_error)?;
394
395    // counting the read bytes saves us a syscall for finding block data offset
396    let cid_length = reader.bytes_read();
397    let block_data_offset = frame_body_offset + u64::try_from(cid_length).unwrap();
398    let next_frame_offset = frame_body_offset + u64::from(body_length);
399    let block_data_length = u32::try_from(next_frame_offset - block_data_offset).unwrap();
400    reader
401        .into_inner()
402        .seek(SeekFrom::Start(next_frame_offset))?;
403    Ok(Some((
404        cid,
405        UncompressedBlockDataLocation {
406            offset: block_data_offset,
407            length: block_data_length,
408        },
409    )))
410}
411
412/// Reads `body length`, leaving the reader at the start of a varint frame,
413/// or returns [`Ok(None)`] if we've reached EOF
414/// ```text
415/// start ►│
416///        ├───────────┬─────────────┐
417///        │varint:    │             │
418///        │body length│frame body   │
419///        └───────────┼─────────────┘
420///        reader end ►│
421/// ```
422fn read_varint_body_length_or_eof(mut reader: impl Read) -> io::Result<Option<u32>> {
423    let mut byte = [0u8; 1]; // detect EOF
424    match reader.read(&mut byte)? {
425        0 => Ok(None),
426        1 => (byte.chain(reader)).read_varint().map(Some),
427        _ => unreachable!(),
428    }
429}
430
431/// A reader that keeps track of how many bytes it has read.
432///
433/// This is useful for calculating the _block data length_ when the (_varint frame_) _body length_ is known.
434struct CountRead<ReadT> {
435    inner: ReadT,
436    count: usize,
437}
438
439impl<ReadT> CountRead<ReadT> {
440    pub fn new(inner: ReadT) -> Self {
441        Self { inner, count: 0 }
442    }
443    pub fn bytes_read(&self) -> usize {
444        self.count
445    }
446    pub fn into_inner(self) -> ReadT {
447        self.inner
448    }
449}
450
451impl<ReadT> Read for CountRead<ReadT>
452where
453    ReadT: Read,
454{
455    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
456        let n = self.inner.read(buf)?;
457        self.count += n;
458        Ok(n)
459    }
460}
461
462#[cfg(test)]
463mod tests {
464    use super::*;
465    use crate::utils::db::{
466        car_stream::{CarStream, CarV1Header},
467        car_util::load_car,
468    };
469    use futures::{TryStreamExt as _, executor::block_on};
470    use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore};
471    use std::io::Cursor;
472    use std::sync::LazyLock;
473    use tokio::io::{AsyncBufRead, AsyncSeek, BufReader};
474
475    #[test]
476    fn test_uncompressed_v1() {
477        let car = chain4_car();
478        let car_backed = PlainCar::new(car).unwrap();
479
480        assert_eq!(car_backed.version(), 1);
481        assert_eq!(car_backed.head_tipset_key().len(), 1);
482        assert_eq!(car_backed.cids().len(), 1222);
483
484        let reference_car = reference(Cursor::new(car));
485        let reference_car_zst = reference(Cursor::new(chain4_car_zst()));
486        let reference_car_zst_unsafe = reference_unsafe(chain4_car_zst());
487        for cid in car_backed.cids() {
488            let expected = reference_car.get(&cid).unwrap().unwrap();
489            let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
490            let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
491            let mut expected4 = vec![];
492            car_backed
493                .get_reader(cid)
494                .unwrap()
495                .read_to_end(&mut expected4)
496                .unwrap();
497            let actual = car_backed.get(&cid).unwrap().unwrap();
498            assert_eq!(expected, actual);
499            assert_eq!(expected2, actual);
500            assert_eq!(expected3, actual);
501            assert_eq!(expected4, actual);
502        }
503    }
504
505    #[test]
506    fn test_uncompressed_v2() {
507        let car = carv2_car();
508        let car_backed = PlainCar::new(car).unwrap();
509
510        assert_eq!(car_backed.version(), 2);
511        assert_eq!(car_backed.head_tipset_key().len(), 1);
512        assert_eq!(car_backed.cids().len(), 7153);
513
514        let reference_car = reference(Cursor::new(car));
515        let reference_car_zst = reference(Cursor::new(carv2_car_zst()));
516        let reference_car_zst_unsafe = reference_unsafe(carv2_car_zst());
517        for cid in car_backed.cids() {
518            let expected = reference_car.get(&cid).unwrap().unwrap();
519            let expected2 = reference_car_zst.get(&cid).unwrap().unwrap();
520            let expected3 = reference_car_zst_unsafe.get(&cid).unwrap().unwrap();
521            let actual = car_backed.get(&cid).unwrap().unwrap();
522            assert_eq!(expected, actual);
523            assert_eq!(expected2, actual);
524            assert_eq!(expected3, actual);
525        }
526    }
527
528    fn reference(reader: impl AsyncBufRead + AsyncSeek + Unpin) -> MemoryBlockstore {
529        let blockstore = MemoryBlockstore::new();
530        block_on(load_car(&blockstore, reader)).unwrap();
531        blockstore
532    }
533
534    fn reference_unsafe(reader: impl AsyncBufRead + Unpin) -> MemoryBlockstore {
535        let blockstore = MemoryBlockstore::new();
536        block_on(load_car_unsafe(&blockstore, reader)).unwrap();
537        blockstore
538    }
539
540    pub async fn load_car_unsafe<R>(db: &impl Blockstore, reader: R) -> anyhow::Result<CarV1Header>
541    where
542        R: AsyncBufRead + Unpin,
543    {
544        let mut stream = CarStream::new_unsafe(BufReader::new(reader)).await?;
545        while let Some(block) = stream.try_next().await? {
546            db.put_keyed(&block.cid, &block.data)?;
547        }
548        Ok(stream.header_v1)
549    }
550
551    fn chain4_car_zst() -> &'static [u8] {
552        include_bytes!("../../../test-snapshots/chain4.car.zst")
553    }
554
555    fn chain4_car() -> &'static [u8] {
556        static CAR: LazyLock<Vec<u8>> =
557            LazyLock::new(|| zstd::decode_all(chain4_car_zst()).unwrap());
558        CAR.as_slice()
559    }
560
561    fn carv2_car_zst() -> &'static [u8] {
562        include_bytes!("../../../test-snapshots/carv2.car.zst")
563    }
564
565    fn carv2_car() -> &'static [u8] {
566        static CAR: LazyLock<Vec<u8>> =
567            LazyLock::new(|| zstd::decode_all(carv2_car_zst()).unwrap());
568        CAR.as_slice()
569    }
570}