Skip to main content

forest/chain/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4pub mod ec_finality;
5mod snapshot_format;
6pub mod store;
7#[cfg(test)]
8mod tests;
9mod weight;
10
11pub use self::{snapshot_format::*, store::*, weight::*};
12
13use crate::blocks::{Tipset, TipsetKey};
14use crate::cid_collections::CidHashSetLike;
15use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
16use crate::db::{SettingsStore, SettingsStoreExt};
17use crate::ipld::stream_chain;
18use crate::utils::ShallowClone as _;
19use crate::utils::db::car_stream::{CarBlock, CarBlockWrite};
20use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
21use crate::utils::multihash::MultihashCode;
22use crate::utils::stream::par_buffer;
23use anyhow::Context as _;
24use cid::Cid;
25use futures::StreamExt as _;
26use fvm_ipld_blockstore::Blockstore;
27use fvm_ipld_encoding::DAG_CBOR;
28use multihash_derive::MultihashDigest as _;
29use nunny::Vec as NonEmpty;
30use sha2::digest::{self, Digest};
31use std::io::{Read, Seek, SeekFrom};
32use std::sync::Arc;
33use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
34
35pub struct ExportOptions<S> {
36    pub skip_checksum: bool,
37    pub include_receipts: bool,
38    pub include_events: bool,
39    pub include_tipset_keys: bool,
40    pub seen: S,
41}
42
43impl<S: Default> Default for ExportOptions<S> {
44    fn default() -> Self {
45        Self {
46            skip_checksum: Default::default(),
47            include_receipts: Default::default(),
48            include_events: Default::default(),
49            include_tipset_keys: Default::default(),
50            seen: Default::default(),
51        }
52    }
53}
54
55pub async fn export_from_head<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
56    db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
57    lookup_depth: ChainEpochDelta,
58    writer: impl AsyncWrite + Unpin,
59    options: ExportOptions<S>,
60) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
61    let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
62        .context("chain head key not found")?;
63    let head_ts = Tipset::load_required(&db, &head_key)?;
64    let digest = export::<D, S>(db, &head_ts, lookup_depth, writer, options).await?;
65    Ok((head_ts, digest))
66}
67
68/// Exports a Filecoin snapshot in v1 format
69/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v1-specification>
70pub async fn export<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
71    db: &Arc<impl Blockstore + Send + Sync + 'static>,
72    tipset: &Tipset,
73    lookup_depth: ChainEpochDelta,
74    writer: impl AsyncWrite + Unpin,
75    options: ExportOptions<S>,
76) -> anyhow::Result<Option<digest::Output<D>>> {
77    let roots = tipset.key().to_cids();
78    export_to_forest_car::<D, S>(roots, None, db, tipset, lookup_depth, writer, options).await
79}
80
81/// Exports a Filecoin snapshot in v2 format
82/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
83pub async fn export_v2<D: Digest, F: Seek + Read, S: CidHashSetLike + Send + Sync + 'static>(
84    db: &Arc<impl Blockstore + Send + Sync + 'static>,
85    mut f3: Option<(Cid, F)>,
86    tipset: &Tipset,
87    lookup_depth: ChainEpochDelta,
88    writer: impl AsyncWrite + Unpin,
89    options: ExportOptions<S>,
90) -> anyhow::Result<Option<digest::Output<D>>> {
91    // validate f3 data
92    if let Some((f3_cid, f3_data)) = &mut f3 {
93        f3_data.seek(SeekFrom::Start(0))?;
94        let expected_cid = crate::f3::snapshot::get_f3_snapshot_cid(f3_data)?;
95        anyhow::ensure!(
96            f3_cid == &expected_cid,
97            "f3 snapshot integrity check failed, actual cid: {f3_cid}, expected cid: {expected_cid}"
98        );
99    }
100
101    let head = tipset.key().to_cids();
102    let f3_cid = f3.as_ref().map(|(cid, _)| *cid);
103    let snap_meta = FilecoinSnapshotMetadata::new_v2(head, f3_cid);
104    let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
105    let snap_meta_block = CarBlock {
106        cid: Cid::new_v1(
107            DAG_CBOR,
108            MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
109        ),
110        data: snap_meta_cbor_encoded,
111    };
112    let roots = nunny::vec![snap_meta_block.cid];
113    let mut prefix_data_frames = vec![{
114        let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
115        snap_meta_block.write(&mut encoder)?;
116        anyhow::Ok((
117            vec![snap_meta_block.cid],
118            finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
119        ))
120    }];
121
122    if let Some((f3_cid, mut f3_data)) = f3 {
123        let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
124        f3_data.seek(SeekFrom::Start(0))?;
125        prefix_data_frames.push({
126            let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
127            encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
128            anyhow::Ok((
129                vec![f3_cid],
130                finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
131            ))
132        });
133    }
134
135    export_to_forest_car::<D, S>(
136        roots,
137        Some(prefix_data_frames),
138        db,
139        tipset,
140        lookup_depth,
141        writer,
142        options,
143    )
144    .await
145}
146
147#[allow(clippy::too_many_arguments)]
148async fn export_to_forest_car<D: Digest, S: CidHashSetLike + Send + Sync + 'static>(
149    roots: NonEmpty<Cid>,
150    prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
151    db: &Arc<impl Blockstore + Send + Sync + 'static>,
152    tipset: &Tipset,
153    lookup_depth: ChainEpochDelta,
154    writer: impl AsyncWrite + Unpin,
155    ExportOptions {
156        skip_checksum,
157        include_receipts,
158        include_events,
159        include_tipset_keys,
160        seen,
161    }: ExportOptions<S>,
162) -> anyhow::Result<Option<digest::Output<D>>> {
163    if include_events && !include_receipts {
164        anyhow::bail!("message receipts must be included when events are included");
165    }
166
167    let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
168
169    // Wrap writer in optional checksum calculator
170    let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
171
172    // Stream stateroots in range (stateroot_lookup_limit+1)..=tipset.epoch(). Also
173    // stream all block headers until genesis.
174    let blocks = par_buffer(
175        // Queue 1k blocks. This is enough to saturate the compressor and blocks
176        // are small enough that keeping 1k in memory isn't a problem. Average
177        // block size is between 1kb and 2kb.
178        1024,
179        stream_chain(
180            db.shallow_clone(),
181            tipset.shallow_clone().chain_owned(db.shallow_clone()),
182            stateroot_lookup_limit,
183            seen,
184        )
185        .with_message_receipts(include_receipts)
186        .with_events(include_events)
187        .with_tipset_keys(include_tipset_keys)
188        .track_progress(true),
189    );
190
191    // Encode Ipld key-value pairs in zstd frames
192    let block_frames = forest::Encoder::compress_stream_default(blocks);
193    let frames = futures::stream::iter(prefix_data_frames.unwrap_or_default()).chain(block_frames);
194
195    // Write zstd frames and include a skippable index
196    forest::Encoder::write(&mut writer, roots, frames).await?;
197
198    // Flush to ensure everything has been successfully written
199    writer.flush().await.context("failed to flush")?;
200
201    let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;
202
203    Ok(digest)
204}