pub mod ec_finality;
mod snapshot_format;
pub mod store;
#[cfg(test)]
mod tests;
mod weight;
pub use self::{snapshot_format::*, store::*, weight::*};
use crate::blocks::{Tipset, TipsetKey};
use crate::cid_collections::CidHashSet;
use crate::db::car::forest::{self, ForestCarFrame, finalize_frame};
use crate::db::{SettingsStore, SettingsStoreExt};
use crate::ipld::stream_chain;
use crate::utils::ShallowClone as _;
use crate::utils::db::car_stream::{CarBlock, CarBlockWrite};
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use crate::utils::multihash::MultihashCode;
use crate::utils::stream::par_buffer;
use anyhow::Context as _;
use cid::Cid;
use futures::StreamExt as _;
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_encoding::DAG_CBOR;
use multihash_derive::MultihashDigest as _;
use nunny::Vec as NonEmpty;
use sha2::digest::{self, Digest};
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};
#[derive(Debug, Clone, Default)]
pub struct ExportOptions {
pub skip_checksum: bool,
pub include_receipts: bool,
pub include_events: bool,
pub include_tipset_keys: bool,
pub seen: CidHashSet,
}
pub async fn export_from_head<D: Digest>(
db: &Arc<impl Blockstore + SettingsStore + Send + Sync + 'static>,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<(Tipset, Option<digest::Output<D>>)> {
let head_key = SettingsStoreExt::read_obj::<TipsetKey>(db, crate::db::setting_keys::HEAD_KEY)?
.context("chain head key not found")?;
let head_ts = Tipset::load_required(&db, &head_key)?;
let digest = export::<D>(db, &head_ts, lookup_depth, writer, options).await?;
Ok((head_ts, digest))
}
pub async fn export<D: Digest>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let roots = tipset.key().to_cids();
export_to_forest_car::<D>(roots, None, db, tipset, lookup_depth, writer, options).await
}
pub async fn export_v2<D: Digest, F: Seek + Read>(
db: &Arc<impl Blockstore + Send + Sync + 'static>,
mut f3: Option<(Cid, F)>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<Option<digest::Output<D>>> {
if let Some((f3_cid, f3_data)) = &mut f3 {
f3_data.seek(SeekFrom::Start(0))?;
let expected_cid = crate::f3::snapshot::get_f3_snapshot_cid(f3_data)?;
anyhow::ensure!(
f3_cid == &expected_cid,
"f3 snapshot integrity check failed, actual cid: {f3_cid}, expected cid: {expected_cid}"
);
}
let head = tipset.key().to_cids();
let f3_cid = f3.as_ref().map(|(cid, _)| *cid);
let snap_meta = FilecoinSnapshotMetadata::new_v2(head, f3_cid);
let snap_meta_cbor_encoded = fvm_ipld_encoding::to_vec(&snap_meta)?;
let snap_meta_block = CarBlock {
cid: Cid::new_v1(
DAG_CBOR,
MultihashCode::Blake2b256.digest(&snap_meta_cbor_encoded),
),
data: snap_meta_cbor_encoded,
};
let roots = nunny::vec![snap_meta_block.cid];
let mut prefix_data_frames = vec![{
let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
snap_meta_block.write(&mut encoder)?;
anyhow::Ok((
vec![snap_meta_block.cid],
finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
))
}];
if let Some((f3_cid, mut f3_data)) = f3 {
let f3_data_len = f3_data.seek(SeekFrom::End(0))?;
f3_data.seek(SeekFrom::Start(0))?;
prefix_data_frames.push({
let mut encoder = forest::new_encoder(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL)?;
encoder.write_car_block(f3_cid, f3_data_len, &mut f3_data)?;
anyhow::Ok((
vec![f3_cid],
finalize_frame(forest::DEFAULT_FOREST_CAR_COMPRESSION_LEVEL, &mut encoder)?,
))
});
}
export_to_forest_car::<D>(
roots,
Some(prefix_data_frames),
db,
tipset,
lookup_depth,
writer,
options,
)
.await
}
#[allow(clippy::too_many_arguments)]
async fn export_to_forest_car<D: Digest>(
roots: NonEmpty<Cid>,
prefix_data_frames: Option<Vec<anyhow::Result<ForestCarFrame>>>,
db: &Arc<impl Blockstore + Send + Sync + 'static>,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: impl AsyncWrite + Unpin,
options: Option<ExportOptions>,
) -> anyhow::Result<Option<digest::Output<D>>> {
let ExportOptions {
skip_checksum,
include_receipts,
include_events,
include_tipset_keys,
seen,
} = options.unwrap_or_default();
if include_events && !include_receipts {
anyhow::bail!("message receipts must be included when events are included");
}
let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
let blocks = par_buffer(
1024,
stream_chain(
db.shallow_clone(),
tipset.shallow_clone().chain_owned(db.shallow_clone()),
stateroot_lookup_limit,
)
.with_seen(seen)
.with_message_receipts(include_receipts)
.with_events(include_events)
.with_tipset_keys(include_tipset_keys)
.track_progress(true),
);
let block_frames = forest::Encoder::compress_stream_default(blocks);
let frames = futures::stream::iter(prefix_data_frames.unwrap_or_default()).chain(block_frames);
forest::Encoder::write(&mut writer, roots, frames).await?;
writer.flush().await.context("failed to flush")?;
let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;
Ok(digest)
}