use crate::blocks::{Tipset, TipsetKey};
use crate::cid_collections::CidHashSet;
use crate::db::car::ManyCar;
use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
use crate::ipld::{stream_chain, stream_graph};
use crate::shim::clock::ChainEpoch;
use crate::utils::db::car_stream::{CarBlock, CarStream};
use crate::utils::encoding::extract_cids;
use crate::utils::multihash::MultihashCode;
use crate::utils::stream::par_buffer;
use crate::{
chain::{
ChainEpochDelta,
index::{ChainIndex, ResolveNullTipset},
},
db::{Blockstore, Either, parity_db::ParityDb, parity_db_config::ParityDbConfig},
};
use anyhow::Context as _;
use cid::Cid;
use clap::Subcommand;
use futures::{StreamExt, TryStreamExt};
use fvm_ipld_encoding::DAG_CBOR;
use human_repr::HumanCount as _;
use indicatif::{ProgressBar, ProgressStyle};
use itertools::Itertools;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Instant;
use tokio::{
fs::File,
io::{AsyncWrite, AsyncWriteExt, BufReader},
};
#[derive(Debug, Clone, Copy, clap::ValueEnum)]
pub enum DbType {
Parity,
ParityOpt,
}
#[derive(Debug, Subcommand)]
pub enum BenchmarkCommands {
CarStreaming {
#[arg(required = true)]
snapshot_files: Vec<PathBuf>,
#[arg(long)]
inspect: bool,
},
GraphTraversal {
#[arg(required = true)]
snapshot_files: Vec<PathBuf>,
},
ForestEncoding {
snapshot_file: PathBuf,
#[arg(long, default_value_t = 3)]
compression_level: u16,
#[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
frame_size: usize,
},
Export {
#[arg(required = true)]
snapshot_files: Vec<PathBuf>,
#[arg(long, default_value_t = 3)]
compression_level: u16,
#[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
frame_size: usize,
#[arg(short, long)]
epoch: Option<ChainEpoch>,
#[arg(short, long, default_value_t = 2000)]
depth: ChainEpochDelta,
},
Blockstore {
#[arg(required = true)]
snapshot_file: PathBuf,
#[arg(long, default_value = "parity")]
db: DbType,
},
}
impl BenchmarkCommands {
pub async fn run(self) -> anyhow::Result<()> {
match self {
Self::CarStreaming {
snapshot_files,
inspect,
} => match inspect {
true => benchmark_car_streaming_inspect(snapshot_files).await,
false => benchmark_car_streaming(snapshot_files).await,
},
Self::GraphTraversal { snapshot_files } => {
benchmark_graph_traversal(snapshot_files).await
}
Self::ForestEncoding {
snapshot_file,
compression_level,
frame_size,
} => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
Self::Export {
snapshot_files,
compression_level,
frame_size,
epoch,
depth,
} => {
benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
.await
}
Self::Blockstore { snapshot_file, db } => benchmark_blockstore(snapshot_file, db).await,
}
}
}
async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
let mut sink = indicatif_sink("traversed");
let mut s = Box::pin(
futures::stream::iter(input)
.then(File::open)
.map_ok(BufReader::new)
.and_then(CarStream::new)
.try_flatten(),
);
while let Some(block) = s.try_next().await? {
sink.write_all(&block.data).await?
}
Ok(())
}
async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
let mut sink = indicatif_sink("traversed");
let mut s = Box::pin(
futures::stream::iter(input)
.then(File::open)
.map_ok(BufReader::new)
.and_then(CarStream::new)
.try_flatten(),
);
while let Some(block) = s.try_next().await? {
let block: CarBlock = block;
if block.cid.codec() == DAG_CBOR {
let cid_vec = extract_cids(&block.data)?;
let _ = cid_vec.iter().unique().count();
}
sink.write_all(&block.data).await?
}
Ok(())
}
async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
let store = open_store(input)?;
let heaviest = store.heaviest_tipset()?;
let mut sink = indicatif_sink("traversed");
let mut s = stream_graph(&store, heaviest.chain(&store), 0, CidHashSet::default());
while let Some(block) = s.try_next().await? {
sink.write_all(&block.data).await?
}
Ok(())
}
async fn benchmark_forest_encoding(
input: PathBuf,
compression_level: u16,
frame_size: usize,
) -> anyhow::Result<()> {
let file = tokio::io::BufReader::new(File::open(&input).await?);
let mut block_stream = CarStream::new(file).await?;
let roots = std::mem::replace(
&mut block_stream.header_v1.roots,
nunny::vec![Default::default()],
);
let mut dest = indicatif_sink("encoded");
let frames = crate::db::car::forest::Encoder::compress_stream(
frame_size,
compression_level,
par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
);
crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
dest.flush().await?;
Ok(())
}
async fn benchmark_exporting(
input: Vec<PathBuf>,
compression_level: u16,
frame_size: usize,
epoch: Option<ChainEpoch>,
depth: ChainEpochDelta,
) -> anyhow::Result<()> {
let store = Arc::new(open_store(input)?);
let heaviest = store.heaviest_tipset()?;
let idx = ChainIndex::new(store.clone());
let ts = idx.tipset_by_height(
epoch.unwrap_or(heaviest.epoch()),
heaviest,
ResolveNullTipset::TakeOlder,
)?;
let stateroot_lookup_limit = ts.epoch() - depth;
let mut dest = indicatif_sink("exported");
let blocks = stream_chain(
Arc::clone(&store),
ts.clone().chain_owned(Arc::clone(&store)),
stateroot_lookup_limit,
CidHashSet::default(),
);
let frames = crate::db::car::forest::Encoder::compress_stream(
frame_size,
compression_level,
par_buffer(1024, blocks.map_err(anyhow::Error::from)),
);
crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
dest.flush().await?;
Ok(())
}
async fn benchmark_blockstore(snapshot: PathBuf, db: DbType) -> anyhow::Result<()> {
let tmp_db_path = tempfile::tempdir()?;
let bs = open_blockstore(&db, tmp_db_path.path())?;
let head_tsk = benchmark_blockstore_import(&snapshot, &db, &bs, tmp_db_path.path()).await?;
benchmark_blockstore_traversal(&bs, &head_tsk).await?;
Ok(())
}
fn open_blockstore(db: &DbType, db_path: &Path) -> anyhow::Result<impl Blockstore> {
println!("temp db path: {}", db_path.display());
Ok(match db {
DbType::Parity => Either::Left(ParityDb::open(db_path, &ParityDbConfig::default())?),
DbType::ParityOpt => Either::Right(ParityDbOpt::open(db_path)?),
})
}
async fn benchmark_blockstore_import(
snapshot: &Path,
db: &DbType,
bs: &impl Blockstore,
bs_path: &Path,
) -> anyhow::Result<TipsetKey> {
let mut car_stream = CarStream::new_from_path(snapshot).await?;
let head_tsk = car_stream.head_tipset_key();
println!("head tipset key: {head_tsk}");
println!("importing CAR into {db:?} blockstore...");
let start = Instant::now();
let mut n = 0;
while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
bs.put_keyed(&cid, &data)?;
n += 1;
}
let db_size = fs_extra::dir::get_size(bs_path).unwrap_or_default();
println!(
"imported {n} records into {db:?} blockstore(size={}), took {}",
db_size.human_count_bytes(),
humantime::format_duration(start.elapsed())
);
Ok(head_tsk)
}
async fn benchmark_blockstore_traversal(
bs: &impl Blockstore,
head_tsk: &TipsetKey,
) -> anyhow::Result<()> {
println!("Traversing the chain...");
let head = Tipset::load_required(bs, head_tsk)?;
let mut sink = indicatif_sink("traversed");
let start = Instant::now();
let mut s = stream_graph(bs, head.chain(bs), 0, CidHashSet::default());
let mut n = 0;
while let Some(block) = s.try_next().await? {
sink.write_all(&block.data).await?;
n += 1;
}
println!(
"Traversed {n} records, took {}",
humantime::format_duration(start.elapsed())
);
Ok(())
}
fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
let sink = tokio::io::sink();
let pb = ProgressBar::new_spinner()
.with_style(
ProgressStyle::with_template(
"{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
)
.expect("infallible"),
)
.with_prefix(task)
.with_finish(indicatif::ProgressFinish::AndLeave);
pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
pb.wrap_async_write(sink)
}
fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
let pb = indicatif::ProgressBar::new_spinner().with_style(
indicatif::ProgressStyle::with_template("{spinner} opening block store")
.expect("indicatif template must be valid"),
);
pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
pb.finish_and_clear();
Ok(store)
}
struct ParityDbOpt {
db: parity_db::Db,
}
impl ParityDbOpt {
fn open(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
let opts = parity_db::Options {
path: path.into(),
sync_wal: true,
sync_data: true,
stats: false,
salt: None,
columns: vec![
parity_db::ColumnOptions {
preimage: true,
uniform: true,
compression: parity_db::CompressionType::Lz4,
..Default::default()
},
parity_db::ColumnOptions {
preimage: true,
compression: parity_db::CompressionType::Lz4,
..Default::default()
},
],
compression_threshold: [(0, 128)].into_iter().collect(),
};
let db = parity_db::Db::open_or_create(&opts)?;
Ok(Self { db })
}
}
impl Blockstore for ParityDbOpt {
fn get(&self, k: &cid::Cid) -> anyhow::Result<Option<Vec<u8>>> {
Ok(if is_dag_cbor_blake2b256(k) {
self.db.get(0, k.hash().digest())?
} else {
self.db.get(1, &k.to_bytes())?
})
}
fn put_keyed(&self, k: &cid::Cid, block: &[u8]) -> anyhow::Result<()> {
if is_dag_cbor_blake2b256(k) {
self.db
.commit([(0, k.hash().digest(), Some(block.to_vec()))])?;
} else {
self.db.commit([(1, k.to_bytes(), Some(block.to_vec()))])?;
}
Ok(())
}
}
fn is_dag_cbor_blake2b256(cid: &Cid) -> bool {
cid.codec() == DAG_CBOR && cid.hash().code() == u64::from(MultihashCode::Blake2b256)
}