Skip to main content

forest/tool/subcommands/
benchmark_cmd.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::{Tipset, TipsetKey};
5use crate::cid_collections::CidHashSet;
6use crate::db::car::ManyCar;
7use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
8use crate::ipld::{stream_chain, stream_graph};
9use crate::shim::clock::ChainEpoch;
10use crate::utils::db::car_stream::{CarBlock, CarStream};
11use crate::utils::encoding::extract_cids;
12use crate::utils::multihash::MultihashCode;
13use crate::utils::stream::par_buffer;
14use crate::{
15    chain::{
16        ChainEpochDelta,
17        index::{ChainIndex, ResolveNullTipset},
18    },
19    db::{Blockstore, Either, parity_db::ParityDb, parity_db_config::ParityDbConfig},
20};
21use anyhow::Context as _;
22use cid::Cid;
23use clap::Subcommand;
24use futures::{StreamExt, TryStreamExt};
25use fvm_ipld_encoding::DAG_CBOR;
26use human_repr::HumanCount as _;
27use indicatif::{ProgressBar, ProgressStyle};
28use itertools::Itertools;
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31use std::time::Instant;
32use tokio::{
33    fs::File,
34    io::{AsyncWrite, AsyncWriteExt, BufReader},
35};
36
37#[derive(Debug, Clone, Copy, clap::ValueEnum)]
38pub enum DbType {
39    Parity,
40    ParityOpt,
41}
42
43#[derive(Debug, Subcommand)]
44pub enum BenchmarkCommands {
45    /// Benchmark streaming data from a CAR archive
46    CarStreaming {
47        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
48        #[arg(required = true)]
49        snapshot_files: Vec<PathBuf>,
50        /// Whether or not we want to expect [`ipld_core::ipld::Ipld`] data for each block.
51        #[arg(long)]
52        inspect: bool,
53    },
54    /// Depth-first traversal of the Filecoin graph
55    GraphTraversal {
56        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
57        #[arg(required = true)]
58        snapshot_files: Vec<PathBuf>,
59    },
60    /// Encoding of a `.forest.car.zst` file
61    ForestEncoding {
62        /// Snapshot input file (`.car.`, `.car.zst`, `.forest.car.zst`)
63        snapshot_file: PathBuf,
64        #[arg(long, default_value_t = 3)]
65        compression_level: u16,
66        /// End zstd frames after they exceed this length
67        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
68        frame_size: usize,
69    },
70    /// Exporting a `.forest.car.zst` file from HEAD
71    Export {
72        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
73        #[arg(required = true)]
74        snapshot_files: Vec<PathBuf>,
75        #[arg(long, default_value_t = 3)]
76        compression_level: u16,
77        /// End zstd frames after they exceed this length
78        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
79        frame_size: usize,
80        /// Latest epoch that has to be exported for this snapshot, the upper bound. This value
81        /// cannot be greater than the latest epoch available in the input snapshot.
82        #[arg(short, long)]
83        epoch: Option<ChainEpoch>,
84        /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
85        #[arg(short, long, default_value_t = 2000)]
86        depth: ChainEpochDelta,
87    },
88    /// Benchmark key-value blockstore
89    Blockstore {
90        /// Snapshot input file (`.car.`, `.car.zst`, `.forest.car.zst`)
91        #[arg(required = true)]
92        snapshot_file: PathBuf,
93        #[arg(long, default_value = "parity")]
94        db: DbType,
95    },
96}
97
98impl BenchmarkCommands {
99    pub async fn run(self) -> anyhow::Result<()> {
100        match self {
101            Self::CarStreaming {
102                snapshot_files,
103                inspect,
104            } => match inspect {
105                true => benchmark_car_streaming_inspect(snapshot_files).await,
106                false => benchmark_car_streaming(snapshot_files).await,
107            },
108            Self::GraphTraversal { snapshot_files } => {
109                benchmark_graph_traversal(snapshot_files).await
110            }
111            Self::ForestEncoding {
112                snapshot_file,
113                compression_level,
114                frame_size,
115            } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
116            Self::Export {
117                snapshot_files,
118                compression_level,
119                frame_size,
120                epoch,
121                depth,
122            } => {
123                benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
124                    .await
125            }
126            Self::Blockstore { snapshot_file, db } => benchmark_blockstore(snapshot_file, db).await,
127        }
128    }
129}
130
131// Concatenate a set of CAR files and measure how quickly we can stream the
132// blocks.
133async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
134    let mut sink = indicatif_sink("traversed");
135
136    let mut s = Box::pin(
137        futures::stream::iter(input)
138            .then(File::open)
139            .map_ok(BufReader::new)
140            .and_then(CarStream::new)
141            .try_flatten(),
142    );
143    while let Some(block) = s.try_next().await? {
144        sink.write_all(&block.data).await?
145    }
146    Ok(())
147}
148
149// Concatenate a set of CAR files and measure how quickly we can stream the
150// blocks, while inspecting them. This a benchmark we could use for setting
151// realistic expectations in terms of DFS graph travels, for example.
152async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
153    let mut sink = indicatif_sink("traversed");
154    let mut s = Box::pin(
155        futures::stream::iter(input)
156            .then(File::open)
157            .map_ok(BufReader::new)
158            .and_then(CarStream::new)
159            .try_flatten(),
160    );
161    while let Some(block) = s.try_next().await? {
162        let block: CarBlock = block;
163        if block.cid.codec() == DAG_CBOR {
164            let cid_vec = extract_cids(&block.data)?;
165            let _ = cid_vec.iter().unique().count();
166        }
167        sink.write_all(&block.data).await?
168    }
169    Ok(())
170}
171
172// Open a set of CAR files as a block store and do a DFS traversal of all
173// reachable nodes.
174async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
175    let store = open_store(input)?;
176    let heaviest = store.heaviest_tipset()?;
177
178    let mut sink = indicatif_sink("traversed");
179
180    let mut s = stream_graph(&store, heaviest.chain(&store), 0, CidHashSet::default());
181    while let Some(block) = s.try_next().await? {
182        sink.write_all(&block.data).await?
183    }
184
185    Ok(())
186}
187
188// Encode a file to the ForestCAR.zst format and measure throughput.
189async fn benchmark_forest_encoding(
190    input: PathBuf,
191    compression_level: u16,
192    frame_size: usize,
193) -> anyhow::Result<()> {
194    let file = tokio::io::BufReader::new(File::open(&input).await?);
195
196    let mut block_stream = CarStream::new(file).await?;
197    let roots = std::mem::replace(
198        &mut block_stream.header_v1.roots,
199        nunny::vec![Default::default()],
200    );
201
202    let mut dest = indicatif_sink("encoded");
203
204    let frames = crate::db::car::forest::Encoder::compress_stream(
205        frame_size,
206        compression_level,
207        par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
208    );
209    crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
210    dest.flush().await?;
211    Ok(())
212}
213
214// Exporting combines a graph traversal with ForestCAR.zst encoding. Ideally, it
215// should be no lower than `min(benchmark_graph_traversal,
216// benchmark_forest_encoding)`.
217async fn benchmark_exporting(
218    input: Vec<PathBuf>,
219    compression_level: u16,
220    frame_size: usize,
221    epoch: Option<ChainEpoch>,
222    depth: ChainEpochDelta,
223) -> anyhow::Result<()> {
224    let store = Arc::new(open_store(input)?);
225    let heaviest = store.heaviest_tipset()?;
226    let idx = ChainIndex::new(store.clone());
227    let ts = idx.tipset_by_height(
228        epoch.unwrap_or(heaviest.epoch()),
229        heaviest,
230        ResolveNullTipset::TakeOlder,
231    )?;
232    // We don't do any sanity checking for 'depth'. The output is discarded so
233    // there's no need.
234    let stateroot_lookup_limit = ts.epoch() - depth;
235
236    let mut dest = indicatif_sink("exported");
237
238    let blocks = stream_chain(
239        Arc::clone(&store),
240        ts.clone().chain_owned(Arc::clone(&store)),
241        stateroot_lookup_limit,
242        CidHashSet::default(),
243    );
244
245    let frames = crate::db::car::forest::Encoder::compress_stream(
246        frame_size,
247        compression_level,
248        par_buffer(1024, blocks.map_err(anyhow::Error::from)),
249    );
250    crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
251    dest.flush().await?;
252    Ok(())
253}
254
255async fn benchmark_blockstore(snapshot: PathBuf, db: DbType) -> anyhow::Result<()> {
256    let tmp_db_path = tempfile::tempdir()?;
257    let bs = open_blockstore(&db, tmp_db_path.path())?;
258    let head_tsk = benchmark_blockstore_import(&snapshot, &db, &bs, tmp_db_path.path()).await?;
259    benchmark_blockstore_traversal(&bs, &head_tsk).await?;
260    Ok(())
261}
262
263fn open_blockstore(db: &DbType, db_path: &Path) -> anyhow::Result<impl Blockstore> {
264    println!("temp db path: {}", db_path.display());
265    Ok(match db {
266        DbType::Parity => Either::Left(ParityDb::open(db_path, &ParityDbConfig::default())?),
267        DbType::ParityOpt => Either::Right(ParityDbOpt::open(db_path)?),
268    })
269}
270
271async fn benchmark_blockstore_import(
272    snapshot: &Path,
273    db: &DbType,
274    bs: &impl Blockstore,
275    bs_path: &Path,
276) -> anyhow::Result<TipsetKey> {
277    let mut car_stream = CarStream::new_from_path(snapshot).await?;
278    let head_tsk = car_stream.head_tipset_key();
279    println!("head tipset key: {head_tsk}");
280    println!("importing CAR into {db:?} blockstore...");
281    let start = Instant::now();
282    let mut n = 0;
283    while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
284        bs.put_keyed(&cid, &data)?;
285        n += 1;
286    }
287    let db_size = fs_extra::dir::get_size(bs_path).unwrap_or_default();
288    println!(
289        "imported {n} records into {db:?} blockstore(size={}), took {}",
290        db_size.human_count_bytes(),
291        humantime::format_duration(start.elapsed())
292    );
293    Ok(head_tsk)
294}
295
296async fn benchmark_blockstore_traversal(
297    bs: &impl Blockstore,
298    head_tsk: &TipsetKey,
299) -> anyhow::Result<()> {
300    println!("Traversing the chain...");
301    let head = Tipset::load_required(bs, head_tsk)?;
302    let mut sink = indicatif_sink("traversed");
303    let start = Instant::now();
304    let mut s = stream_graph(bs, head.chain(bs), 0, CidHashSet::default());
305    let mut n = 0;
306    while let Some(block) = s.try_next().await? {
307        sink.write_all(&block.data).await?;
308        n += 1;
309    }
310    println!(
311        "Traversed {n} records, took {}",
312        humantime::format_duration(start.elapsed())
313    );
314    Ok(())
315}
316
317// Sink with attached progress indicator
318fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
319    let sink = tokio::io::sink();
320    let pb = ProgressBar::new_spinner()
321        .with_style(
322            ProgressStyle::with_template(
323                "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
324            )
325            .expect("infallible"),
326        )
327        .with_prefix(task)
328        .with_finish(indicatif::ProgressFinish::AndLeave);
329    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
330    pb.wrap_async_write(sink)
331}
332
333// Opening a block store may take a long time (CAR files have to be indexed,
334// CAR.zst files have to be decompressed). Show a progress indicator and clear
335// it when done.
336fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
337    let pb = indicatif::ProgressBar::new_spinner().with_style(
338        indicatif::ProgressStyle::with_template("{spinner} opening block store")
339            .expect("indicatif template must be valid"),
340    );
341    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
342
343    let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
344
345    pb.finish_and_clear();
346
347    Ok(store)
348}
349
350struct ParityDbOpt {
351    db: parity_db::Db,
352}
353
354impl ParityDbOpt {
355    fn open(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
356        let opts = parity_db::Options {
357            path: path.into(),
358            sync_wal: true,
359            sync_data: true,
360            stats: false,
361            salt: None,
362            columns: vec![
363                parity_db::ColumnOptions {
364                    preimage: true,
365                    uniform: true,
366                    compression: parity_db::CompressionType::Lz4,
367                    ..Default::default()
368                },
369                parity_db::ColumnOptions {
370                    preimage: true,
371                    compression: parity_db::CompressionType::Lz4,
372                    ..Default::default()
373                },
374            ],
375            compression_threshold: [(0, 128)].into_iter().collect(),
376        };
377        let db = parity_db::Db::open_or_create(&opts)?;
378        Ok(Self { db })
379    }
380}
381
382impl Blockstore for ParityDbOpt {
383    fn get(&self, k: &cid::Cid) -> anyhow::Result<Option<Vec<u8>>> {
384        Ok(if is_dag_cbor_blake2b256(k) {
385            self.db.get(0, k.hash().digest())?
386        } else {
387            self.db.get(1, &k.to_bytes())?
388        })
389    }
390
391    fn put_keyed(&self, k: &cid::Cid, block: &[u8]) -> anyhow::Result<()> {
392        if is_dag_cbor_blake2b256(k) {
393            self.db
394                .commit([(0, k.hash().digest(), Some(block.to_vec()))])?;
395        } else {
396            self.db.commit([(1, k.to_bytes(), Some(block.to_vec()))])?;
397        }
398        Ok(())
399    }
400}
401
402fn is_dag_cbor_blake2b256(cid: &Cid) -> bool {
403    cid.codec() == DAG_CBOR && cid.hash().code() == u64::from(MultihashCode::Blake2b256)
404}