Skip to main content

forest/chain/
mod.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4mod snapshot_format;
5pub mod store;
6#[cfg(test)]
7mod tests;
8mod weight;
9
10pub use self::{snapshot_format::*, store::*, weight::*};
11
12use crate::blocks::{Tipset, TipsetKey};
13use crate::cid_collections::CidHashSet;
14use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
15use crate::db::{SettingsStore, SettingsStoreExt};
16use crate::ipld::stream_chain;
17use crate::utils::db::car_stream::{CarBlock, CarBlockWrite};
18use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
19use crate::utils::multihash::MultihashCode;
20use crate::utils::stream::par_buffer;
21use anyhow::Context as _;
22use cid::Cid;
23use digest::Digest;
24use futures::StreamExt as _;
25use fvm_ipld_blockstore::Blockstore;
26use fvm_ipld_encoding::DAG_CBOR;
27use multihash_derive::MultihashDigest as _;
28use nunny::Vec as NonEmpty;
29use std::io::{Read, Seek, SeekFrom};
30use std::sync::Arc;
31use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
32
33#[derive(Debug, Clone, Default)]
34pub struct ExportOptions {
35    pub skip_checksum: bool,
36    pub seen: CidHashSet,
37}
38
39pub async fn export_from_head<D: Digest>(
40    db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
41    lookup_depth: ChainEpochDelta,
42    writer: impl AsyncWrite + Unpin,
43    options: Option<ExportOptions>,
44) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
45    let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
46        .context("chain head key not found")?;
47    let head_ts = Tipset::load_required(&db, &head_key)?;
48    let digest = export::<D>(db, &head_ts, lookup_depth, writer, options).await?;
49    Ok((head_ts, digest))
50}
51
52/// Exports a Filecoin snapshot in v1 format
53/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v1-specification>
54pub async fn export<D: Digest>(
55    db: &Arc<impl Blockstore + Send + Sync + 'static>,
56    tipset: &Tipset,
57    lookup_depth: ChainEpochDelta,
58    writer: impl AsyncWrite + Unpin,
59    options: Option<ExportOptions>,
60) -> anyhow::Result<Option<digest::Output<D>>> {
61    let roots = tipset.key().to_cids();
62    export_to_forest_car::<D>(roots, None, db, tipset, lookup_depth, writer, options).await
63}
64
65/// Exports a Filecoin snapshot in v2 format
66/// See <https://github.com/filecoin-project/FIPs/blob/98e33b9fa306959aa0131519eb4cc155522b2081/FRCs/frc-0108.md#v2-specification>
67pub async fn export_v2<D: Digest, F: Seek + Read>(
68    db: &Arc<impl Blockstore + Send + Sync + 'static>,
69    mut f3: Option<(Cid, F)>,
70    tipset: &Tipset,
71    lookup_depth: ChainEpochDelta,
72    writer: impl AsyncWrite + Unpin,
73    options: Option<ExportOptions>,
74) -> anyhow::Result<Option<digest::Output<D>>> {
75    // validate f3 data
76    if let Some((f3_cid, f3_data)) = &mut f3 {
77        f3_data.seek(SeekFrom::Start(0))?;
78        let expected_cid = crate::f3::snapshot::get_f3_snapshot_cid(f3_data)?;
79        anyhow::ensure!(
80            f3_cid == &expected_cid,
81            "f3 snapshot integrity check failed, actual cid: {f3_cid}, expected cid: {expected_cid}"
82        );
83    }
84
85    let head = tipset.key().to_cids();
86    let f3_cid = f3.as_ref().map(|(cid, _)| *cid);
87    let snap_meta = FilecoinSnapshotMetadata::new_v2(head, f3_cid);
88    let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
89    let snap_meta_block = CarBlock {
90        cid: Cid::new_v1(
91            DAG_CBOR,
92            MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
93        ),
94        data: snap_meta_cbor_encoded,
95    };
96    let roots = nunny::vec![snap_meta_block.cid];
97    let mut prefix_data_frames = vec![{
98        let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
99        snap_meta_block.write(&mut encoder)?;
100        anyhow::Ok((
101            vec![snap_meta_block.cid],
102            finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
103        ))
104    }];
105
106    if let Some((f3_cid, mut f3_data)) = f3 {
107        let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
108        f3_data.seek(SeekFrom::Start(0))?;
109        prefix_data_frames.push({
110            let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
111            encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
112            anyhow::Ok((
113                vec![f3_cid],
114                finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
115            ))
116        });
117    }
118
119    export_to_forest_car::<D>(
120        roots,
121        Some(prefix_data_frames),
122        db,
123        tipset,
124        lookup_depth,
125        writer,
126        options,
127    )
128    .await
129}
130
131#[allow(clippy::too_many_arguments)]
132async fn export_to_forest_car<D: Digest>(
133    roots: NonEmpty<Cid>,
134    prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
135    db: &Arc<impl Blockstore + Send + Sync + 'static>,
136    tipset: &Tipset,
137    lookup_depth: ChainEpochDelta,
138    writer: impl AsyncWrite + Unpin,
139    options: Option<ExportOptions>,
140) -> anyhow::Result<Option<digest::Output<D>>> {
141    let ExportOptions {
142        skip_checksum,
143        seen,
144    } = options.unwrap_or_default();
145
146    let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
147
148    // Wrap writer in optional checksum calculator
149    let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
150
151    // Stream stateroots in range (stateroot_lookup_limit+1)..=tipset.epoch(). Also
152    // stream all block headers until genesis.
153    let blocks = par_buffer(
154        // Queue 1k blocks. This is enough to saturate the compressor and blocks
155        // are small enough that keeping 1k in memory isn't a problem. Average
156        // block size is between 1kb and 2kb.
157        1024,
158        stream_chain(
159            Arc::clone(db),
160            tipset.clone().chain_owned(Arc::clone(db)),
161            stateroot_lookup_limit,
162        )
163        .with_seen(seen)
164        .track_progress(true),
165    );
166
167    // Encode Ipld key-value pairs in zstd frames
168    let block_frames = forest::Encoder::compress_stream_default(blocks);
169    let frames = futures::stream::iter(prefix_data_frames.unwrap_or_default()).chain(block_frames);
170
171    // Write zstd frames and include a skippable index
172    forest::Encoder::write(&mut writer, roots, frames).await?;
173
174    // Flush to ensure everything has been successfully written
175    writer.flush().await.context("failed to flush")?;
176
177    let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;
178
179    Ok(digest)
180}