Skip to main content

forest/tool/subcommands/
benchmark_cmd.rs

1// Copyright 2019-2026 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::blocks::{Tipset, TipsetKey};
5use crate::db::car::ManyCar;
6use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
7use crate::ipld::{stream_chain, stream_graph};
8use crate::shim::clock::ChainEpoch;
9use crate::utils::db::car_stream::{CarBlock, CarStream};
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::MultihashCode;
12use crate::utils::stream::par_buffer;
13use crate::{
14    chain::{
15        ChainEpochDelta,
16        index::{ChainIndex, ResolveNullTipset},
17    },
18    db::{Blockstore, Either, parity_db::ParityDb, parity_db_config::ParityDbConfig},
19};
20use anyhow::Context as _;
21use cid::Cid;
22use clap::Subcommand;
23use futures::{StreamExt, TryStreamExt};
24use fvm_ipld_encoding::DAG_CBOR;
25use human_repr::HumanCount as _;
26use indicatif::{ProgressBar, ProgressStyle};
27use itertools::Itertools;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::{
32    fs::File,
33    io::{AsyncWrite, AsyncWriteExt, BufReader},
34};
35
36#[derive(Debug, Clone, Copy, clap::ValueEnum)]
37pub enum DbType {
38    Parity,
39    ParityOpt,
40}
41
42#[derive(Debug, Subcommand)]
43pub enum BenchmarkCommands {
44    /// Benchmark streaming data from a CAR archive
45    CarStreaming {
46        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
47        #[arg(required = true)]
48        snapshot_files: Vec<PathBuf>,
49        /// Whether or not we want to expect [`ipld_core::ipld::Ipld`] data for each block.
50        #[arg(long)]
51        inspect: bool,
52    },
53    /// Depth-first traversal of the Filecoin graph
54    GraphTraversal {
55        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
56        #[arg(required = true)]
57        snapshot_files: Vec<PathBuf>,
58    },
59    /// Encoding of a `.forest.car.zst` file
60    ForestEncoding {
61        /// Snapshot input file (`.car.`, `.car.zst`, `.forest.car.zst`)
62        snapshot_file: PathBuf,
63        #[arg(long, default_value_t = 3)]
64        compression_level: u16,
65        /// End zstd frames after they exceed this length
66        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
67        frame_size: usize,
68    },
69    /// Exporting a `.forest.car.zst` file from HEAD
70    Export {
71        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
72        #[arg(required = true)]
73        snapshot_files: Vec<PathBuf>,
74        #[arg(long, default_value_t = 3)]
75        compression_level: u16,
76        /// End zstd frames after they exceed this length
77        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
78        frame_size: usize,
79        /// Latest epoch that has to be exported for this snapshot, the upper bound. This value
80        /// cannot be greater than the latest epoch available in the input snapshot.
81        #[arg(short, long)]
82        epoch: Option<ChainEpoch>,
83        /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
84        #[arg(short, long, default_value_t = 2000)]
85        depth: ChainEpochDelta,
86    },
87    /// Benchmark key-value blockstore
88    Blockstore {
89        /// Snapshot input file (`.car.`, `.car.zst`, `.forest.car.zst`)
90        #[arg(required = true)]
91        snapshot_file: PathBuf,
92        #[arg(long, default_value = "parity")]
93        db: DbType,
94    },
95}
96
97impl BenchmarkCommands {
98    pub async fn run(self) -> anyhow::Result<()> {
99        match self {
100            Self::CarStreaming {
101                snapshot_files,
102                inspect,
103            } => match inspect {
104                true => benchmark_car_streaming_inspect(snapshot_files).await,
105                false => benchmark_car_streaming(snapshot_files).await,
106            },
107            Self::GraphTraversal { snapshot_files } => {
108                benchmark_graph_traversal(snapshot_files).await
109            }
110            Self::ForestEncoding {
111                snapshot_file,
112                compression_level,
113                frame_size,
114            } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
115            Self::Export {
116                snapshot_files,
117                compression_level,
118                frame_size,
119                epoch,
120                depth,
121            } => {
122                benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
123                    .await
124            }
125            Self::Blockstore { snapshot_file, db } => benchmark_blockstore(snapshot_file, db).await,
126        }
127    }
128}
129
130// Concatenate a set of CAR files and measure how quickly we can stream the
131// blocks.
132async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
133    let mut sink = indicatif_sink("traversed");
134
135    let mut s = Box::pin(
136        futures::stream::iter(input)
137            .then(File::open)
138            .map_ok(BufReader::new)
139            .and_then(CarStream::new)
140            .try_flatten(),
141    );
142    while let Some(block) = s.try_next().await? {
143        sink.write_all(&block.data).await?
144    }
145    Ok(())
146}
147
148// Concatenate a set of CAR files and measure how quickly we can stream the
149// blocks, while inspecting them. This a benchmark we could use for setting
150// realistic expectations in terms of DFS graph travels, for example.
151async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
152    let mut sink = indicatif_sink("traversed");
153    let mut s = Box::pin(
154        futures::stream::iter(input)
155            .then(File::open)
156            .map_ok(BufReader::new)
157            .and_then(CarStream::new)
158            .try_flatten(),
159    );
160    while let Some(block) = s.try_next().await? {
161        let block: CarBlock = block;
162        if block.cid.codec() == DAG_CBOR {
163            let cid_vec = extract_cids(&block.data)?;
164            let _ = cid_vec.iter().unique().count();
165        }
166        sink.write_all(&block.data).await?
167    }
168    Ok(())
169}
170
171// Open a set of CAR files as a block store and do a DFS traversal of all
172// reachable nodes.
173async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
174    let store = open_store(input)?;
175    let heaviest = store.heaviest_tipset()?;
176
177    let mut sink = indicatif_sink("traversed");
178
179    let mut s = stream_graph(&store, heaviest.chain(&store), 0);
180    while let Some(block) = s.try_next().await? {
181        sink.write_all(&block.data).await?
182    }
183
184    Ok(())
185}
186
187// Encode a file to the ForestCAR.zst format and measure throughput.
188async fn benchmark_forest_encoding(
189    input: PathBuf,
190    compression_level: u16,
191    frame_size: usize,
192) -> anyhow::Result<()> {
193    let file = tokio::io::BufReader::new(File::open(&input).await?);
194
195    let mut block_stream = CarStream::new(file).await?;
196    let roots = std::mem::replace(
197        &mut block_stream.header_v1.roots,
198        nunny::vec![Default::default()],
199    );
200
201    let mut dest = indicatif_sink("encoded");
202
203    let frames = crate::db::car::forest::Encoder::compress_stream(
204        frame_size,
205        compression_level,
206        par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
207    );
208    crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
209    dest.flush().await?;
210    Ok(())
211}
212
213// Exporting combines a graph traversal with ForestCAR.zst encoding. Ideally, it
214// should be no lower than `min(benchmark_graph_traversal,
215// benchmark_forest_encoding)`.
216async fn benchmark_exporting(
217    input: Vec<PathBuf>,
218    compression_level: u16,
219    frame_size: usize,
220    epoch: Option<ChainEpoch>,
221    depth: ChainEpochDelta,
222) -> anyhow::Result<()> {
223    let store = Arc::new(open_store(input)?);
224    let heaviest = store.heaviest_tipset()?;
225    let idx = ChainIndex::new(&store);
226    let ts = idx.tipset_by_height(
227        epoch.unwrap_or(heaviest.epoch()),
228        heaviest,
229        ResolveNullTipset::TakeOlder,
230    )?;
231    // We don't do any sanity checking for 'depth'. The output is discarded so
232    // there's no need.
233    let stateroot_lookup_limit = ts.epoch() - depth;
234
235    let mut dest = indicatif_sink("exported");
236
237    let blocks = stream_chain(
238        Arc::clone(&store),
239        ts.clone().chain_owned(Arc::clone(&store)),
240        stateroot_lookup_limit,
241    );
242
243    let frames = crate::db::car::forest::Encoder::compress_stream(
244        frame_size,
245        compression_level,
246        par_buffer(1024, blocks.map_err(anyhow::Error::from)),
247    );
248    crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
249    dest.flush().await?;
250    Ok(())
251}
252
253async fn benchmark_blockstore(snapshot: PathBuf, db: DbType) -> anyhow::Result<()> {
254    let tmp_db_path = tempfile::tempdir()?;
255    let bs = open_blockstore(&db, tmp_db_path.path())?;
256    let head_tsk = benchmark_blockstore_import(&snapshot, &db, &bs, tmp_db_path.path()).await?;
257    benchmark_blockstore_traversal(&bs, &head_tsk).await?;
258    Ok(())
259}
260
261fn open_blockstore(db: &DbType, db_path: &Path) -> anyhow::Result<impl Blockstore> {
262    println!("temp db path: {}", db_path.display());
263    Ok(match db {
264        DbType::Parity => Either::Left(ParityDb::open(db_path, &ParityDbConfig::default())?),
265        DbType::ParityOpt => Either::Right(ParityDbOpt::open(db_path)?),
266    })
267}
268
269async fn benchmark_blockstore_import(
270    snapshot: &Path,
271    db: &DbType,
272    bs: &impl Blockstore,
273    bs_path: &Path,
274) -> anyhow::Result<TipsetKey> {
275    let mut car_stream = CarStream::new_from_path(snapshot).await?;
276    let head_tsk = car_stream.head_tipset_key();
277    println!("head tipset key: {head_tsk}");
278    println!("importing CAR into {db:?} blockstore...");
279    let start = Instant::now();
280    let mut n = 0;
281    while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
282        bs.put_keyed(&cid, &data)?;
283        n += 1;
284    }
285    let db_size = fs_extra::dir::get_size(bs_path).unwrap_or_default();
286    println!(
287        "imported {n} records into {db:?} blockstore(size={}), took {}",
288        db_size.human_count_bytes(),
289        humantime::format_duration(start.elapsed())
290    );
291    Ok(head_tsk)
292}
293
294async fn benchmark_blockstore_traversal(
295    bs: &impl Blockstore,
296    head_tsk: &TipsetKey,
297) -> anyhow::Result<()> {
298    println!("Traversing the chain...");
299    let head = Tipset::load_required(bs, head_tsk)?;
300    let mut sink = indicatif_sink("traversed");
301    let start = Instant::now();
302    let mut s = stream_graph(bs, head.chain(bs), 0);
303    let mut n = 0;
304    while let Some(block) = s.try_next().await? {
305        sink.write_all(&block.data).await?;
306        n += 1;
307    }
308    println!(
309        "Traversed {n} records, took {}",
310        humantime::format_duration(start.elapsed())
311    );
312    Ok(())
313}
314
315// Sink with attached progress indicator
316fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
317    let sink = tokio::io::sink();
318    let pb = ProgressBar::new_spinner()
319        .with_style(
320            ProgressStyle::with_template(
321                "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
322            )
323            .expect("infallible"),
324        )
325        .with_prefix(task)
326        .with_finish(indicatif::ProgressFinish::AndLeave);
327    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
328    pb.wrap_async_write(sink)
329}
330
331// Opening a block store may take a long time (CAR files have to be indexed,
332// CAR.zst files have to be decompressed). Show a progress indicator and clear
333// it when done.
334fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
335    let pb = indicatif::ProgressBar::new_spinner().with_style(
336        indicatif::ProgressStyle::with_template("{spinner} opening block store")
337            .expect("indicatif template must be valid"),
338    );
339    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
340
341    let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
342
343    pb.finish_and_clear();
344
345    Ok(store)
346}
347
348struct ParityDbOpt {
349    db: parity_db::Db,
350}
351
352impl ParityDbOpt {
353    fn open(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
354        let opts = parity_db::Options {
355            path: path.into(),
356            sync_wal: true,
357            sync_data: true,
358            stats: false,
359            salt: None,
360            columns: vec![
361                parity_db::ColumnOptions {
362                    preimage: true,
363                    uniform: true,
364                    compression: parity_db::CompressionType::Lz4,
365                    ..Default::default()
366                },
367                parity_db::ColumnOptions {
368                    preimage: true,
369                    compression: parity_db::CompressionType::Lz4,
370                    ..Default::default()
371                },
372            ],
373            compression_threshold: [(0, 128)].into_iter().collect(),
374        };
375        let db = parity_db::Db::open_or_create(&opts)?;
376        Ok(Self { db })
377    }
378}
379
380impl Blockstore for ParityDbOpt {
381    fn get(&self, k: &cid::Cid) -> anyhow::Result<Option<Vec<u8>>> {
382        Ok(if is_dag_cbor_blake2b256(k) {
383            self.db.get(0, k.hash().digest())?
384        } else {
385            self.db.get(1, &k.to_bytes())?
386        })
387    }
388
389    fn put_keyed(&self, k: &cid::Cid, block: &[u8]) -> anyhow::Result<()> {
390        if is_dag_cbor_blake2b256(k) {
391            self.db
392                .commit([(0, k.hash().digest(), Some(block.to_vec()))])?;
393        } else {
394            self.db.commit([(1, k.to_bytes(), Some(block.to_vec()))])?;
395        }
396        Ok(())
397    }
398}
399
400fn is_dag_cbor_blake2b256(cid: &Cid) -> bool {
401    cid.codec() == DAG_CBOR && cid.hash().code() == u64::from(MultihashCode::Blake2b256)
402}