1mod snapshot_format;
5pub mod store;
6#[cfg(test)]
7mod tests;
8mod weight;
9
10pub use self::{snapshot_format::*, store::*, weight::*};
11
12use crate::blocks::{Tipset, TipsetKey};
13use crate::cid_collections::CidHashSet;
14use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
15use crate::db::{SettingsStore, SettingsStoreExt};
16use crate::ipld::stream_chain;
17use crate::utils::db::car_stream::{CarBlock, CarBlockWrite};
18use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
19use crate::utils::multihash::MultihashCode;
20use crate::utils::stream::par_buffer;
21use anyhow::Context as _;
22use cid::Cid;
23use futures::StreamExt as _;
24use fvm_ipld_blockstore::Blockstore;
25use fvm_ipld_encoding::DAG_CBOR;
26use multihash_derive::MultihashDigest as _;
27use nunny::Vec as NonEmpty;
28use sha2::digest::{self, Digest};
29use std::io::{Read, Seek, SeekFrom};
30use std::sync::Arc;
31use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
32
33#[derive(Debug, Clone, Default)]
34pub struct ExportOptions {
35 pub skip_checksum: bool,
36 pub include_receipts: bool,
37 pub include_events: bool,
38 pub seen: CidHashSet,
39}
40
41pub async fn export_from_head<D: Digest>(
42 db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
43 lookup_depth: ChainEpochDelta,
44 writer: impl AsyncWrite + Unpin,
45 options: Option<ExportOptions>,
46) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
47 let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
48 .context("chain head key not found")?;
49 let head_ts = Tipset::load_required(&db, &head_key)?;
50 let digest = export::<D>(db, &head_ts, lookup_depth, writer, options).await?;
51 Ok((head_ts, digest))
52}
53
54pub async fn export<D: Digest>(
57 db: &Arc<impl Blockstore + Send + Sync + 'static>,
58 tipset: &Tipset,
59 lookup_depth: ChainEpochDelta,
60 writer: impl AsyncWrite + Unpin,
61 options: Option<ExportOptions>,
62) -> anyhow::Result<Option<digest::Output<D>>> {
63 let roots = tipset.key().to_cids();
64 export_to_forest_car::<D>(roots, None, db, tipset, lookup_depth, writer, options).await
65}
66
67pub async fn export_v2<D: Digest, F: Seek + Read>(
70 db: &Arc<impl Blockstore + Send + Sync + 'static>,
71 mut f3: Option<(Cid, F)>,
72 tipset: &Tipset,
73 lookup_depth: ChainEpochDelta,
74 writer: impl AsyncWrite + Unpin,
75 options: Option<ExportOptions>,
76) -> anyhow::Result<Option<digest::Output<D>>> {
77 if let Some((f3_cid, f3_data)) = &mut f3 {
79 f3_data.seek(SeekFrom::Start(0))?;
80 let expected_cid = crate::f3::snapshot::get_f3_snapshot_cid(f3_data)?;
81 anyhow::ensure!(
82 f3_cid == &expected_cid,
83 "f3 snapshot integrity check failed, actual cid: {f3_cid}, expected cid: {expected_cid}"
84 );
85 }
86
87 let head = tipset.key().to_cids();
88 let f3_cid = f3.as_ref().map(|(cid, _)| *cid);
89 let snap_meta = FilecoinSnapshotMetadata::new_v2(head, f3_cid);
90 let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
91 let snap_meta_block = CarBlock {
92 cid: Cid::new_v1(
93 DAG_CBOR,
94 MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
95 ),
96 data: snap_meta_cbor_encoded,
97 };
98 let roots = nunny::vec![snap_meta_block.cid];
99 let mut prefix_data_frames = vec![{
100 let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
101 snap_meta_block.write(&mut encoder)?;
102 anyhow::Ok((
103 vec![snap_meta_block.cid],
104 finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
105 ))
106 }];
107
108 if let Some((f3_cid, mut f3_data)) = f3 {
109 let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
110 f3_data.seek(SeekFrom::Start(0))?;
111 prefix_data_frames.push({
112 let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
113 encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
114 anyhow::Ok((
115 vec![f3_cid],
116 finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
117 ))
118 });
119 }
120
121 export_to_forest_car::<D>(
122 roots,
123 Some(prefix_data_frames),
124 db,
125 tipset,
126 lookup_depth,
127 writer,
128 options,
129 )
130 .await
131}
132
133#[allow(clippy::too_many_arguments)]
134async fn export_to_forest_car<D: Digest>(
135 roots: NonEmpty<Cid>,
136 prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
137 db: &Arc<impl Blockstore + Send + Sync + 'static>,
138 tipset: &Tipset,
139 lookup_depth: ChainEpochDelta,
140 writer: impl AsyncWrite + Unpin,
141 options: Option<ExportOptions>,
142) -> anyhow::Result<Option<digest::Output<D>>> {
143 let ExportOptions {
144 skip_checksum,
145 include_receipts,
146 include_events,
147 seen,
148 } = options.unwrap_or_default();
149
150 if include_events && !include_receipts {
151 anyhow::bail!("message receipts must be included when events are included");
152 }
153
154 let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
155
156 let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
158
159 let blocks = par_buffer(
162 1024,
166 stream_chain(
167 Arc::clone(db),
168 tipset.clone().chain_owned(Arc::clone(db)),
169 stateroot_lookup_limit,
170 )
171 .with_seen(seen)
172 .with_message_receipts(include_receipts)
173 .with_events(include_events)
174 .track_progress(true),
175 );
176
177 let block_frames = forest::Encoder::compress_stream_default(blocks);
179 let frames = futures::stream::iter(prefix_data_frames.unwrap_or_default()).chain(block_frames);
180
181 forest::Encoder::write(&mut writer, roots, frames).await?;
183
184 writer.flush().await.context("failed to flush")?;
186
187 let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;
188
189 Ok(digest)
190}