forest/tool/subcommands/
benchmark_cmd.rs

1// Copyright 2019-2025 ChainSafe Systems
2// SPDX-License-Identifier: Apache-2.0, MIT
3
4use crate::chain::{
5    ChainEpochDelta,
6    index::{ChainIndex, ResolveNullTipset},
7};
8use crate::db::car::ManyCar;
9use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
10use crate::ipld::{stream_chain, stream_graph};
11use crate::shim::clock::ChainEpoch;
12use crate::utils::db::car_stream::{CarBlock, CarStream};
13use crate::utils::encoding::extract_cids;
14use crate::utils::stream::par_buffer;
15use anyhow::Context as _;
16use clap::Subcommand;
17use futures::{StreamExt, TryStreamExt};
18use fvm_ipld_encoding::DAG_CBOR;
19use indicatif::{ProgressBar, ProgressStyle};
20use itertools::Itertools;
21use std::ops::Deref;
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::{
25    fs::File,
26    io::{AsyncWrite, AsyncWriteExt, BufReader},
27};
28
29#[derive(Debug, Subcommand)]
30pub enum BenchmarkCommands {
31    /// Benchmark streaming data from a CAR archive
32    CarStreaming {
33        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
34        #[arg(required = true)]
35        snapshot_files: Vec<PathBuf>,
36        /// Whether or not we want to expect [`ipld_core::ipld::Ipld`] data for each block.
37        #[arg(long)]
38        inspect: bool,
39    },
40    /// Depth-first traversal of the Filecoin graph
41    GraphTraversal {
42        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
43        #[arg(required = true)]
44        snapshot_files: Vec<PathBuf>,
45    },
46    /// Encoding of a `.forest.car.zst` file
47    ForestEncoding {
48        /// Snapshot input file (`.car.`, `.car.zst`, `.forest.car.zst`)
49        snapshot_file: PathBuf,
50        #[arg(long, default_value_t = 3)]
51        compression_level: u16,
52        /// End zstd frames after they exceed this length
53        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
54        frame_size: usize,
55    },
56    /// Exporting a `.forest.car.zst` file from HEAD
57    Export {
58        /// Snapshot input files (`.car.`, `.car.zst`, `.forest.car.zst`)
59        #[arg(required = true)]
60        snapshot_files: Vec<PathBuf>,
61        #[arg(long, default_value_t = 3)]
62        compression_level: u16,
63        /// End zstd frames after they exceed this length
64        #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
65        frame_size: usize,
66        /// Latest epoch that has to be exported for this snapshot, the upper bound. This value
67        /// cannot be greater than the latest epoch available in the input snapshot.
68        #[arg(short, long)]
69        epoch: Option<ChainEpoch>,
70        /// How many state-roots to include. Lower limit is 900 for `calibnet` and `mainnet`.
71        #[arg(short, long, default_value_t = 2000)]
72        depth: ChainEpochDelta,
73    },
74}
75
76impl BenchmarkCommands {
77    pub async fn run(self) -> anyhow::Result<()> {
78        match self {
79            Self::CarStreaming {
80                snapshot_files,
81                inspect,
82            } => match inspect {
83                true => benchmark_car_streaming_inspect(snapshot_files).await,
84                false => benchmark_car_streaming(snapshot_files).await,
85            },
86            Self::GraphTraversal { snapshot_files } => {
87                benchmark_graph_traversal(snapshot_files).await
88            }
89            Self::ForestEncoding {
90                snapshot_file,
91                compression_level,
92                frame_size,
93            } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
94            Self::Export {
95                snapshot_files,
96                compression_level,
97                frame_size,
98                epoch,
99                depth,
100            } => {
101                benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
102                    .await
103            }
104        }
105    }
106}
107
108// Concatenate a set of CAR files and measure how quickly we can stream the
109// blocks.
110async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
111    let mut sink = indicatif_sink("traversed");
112
113    let mut s = Box::pin(
114        futures::stream::iter(input)
115            .then(File::open)
116            .map_ok(BufReader::new)
117            .and_then(CarStream::new)
118            .try_flatten(),
119    );
120    while let Some(block) = s.try_next().await? {
121        sink.write_all(&block.data).await?
122    }
123    Ok(())
124}
125
126// Concatenate a set of CAR files and measure how quickly we can stream the
127// blocks, while inspecting them. This a benchmark we could use for setting
128// realistic expectations in terms of DFS graph travels, for example.
129async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
130    let mut sink = indicatif_sink("traversed");
131    let mut s = Box::pin(
132        futures::stream::iter(input)
133            .then(File::open)
134            .map_ok(BufReader::new)
135            .and_then(CarStream::new)
136            .try_flatten(),
137    );
138    while let Some(block) = s.try_next().await? {
139        let block: CarBlock = block;
140        if block.cid.codec() == DAG_CBOR {
141            let cid_vec = extract_cids(&block.data)?;
142            let _ = cid_vec.iter().unique().count();
143        }
144        sink.write_all(&block.data).await?
145    }
146    Ok(())
147}
148
149// Open a set of CAR files as a block store and do a DFS traversal of all
150// reachable nodes.
151async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
152    let store = open_store(input)?;
153    let heaviest = store.heaviest_tipset()?;
154
155    let mut sink = indicatif_sink("traversed");
156
157    let mut s = stream_graph(&store, heaviest.chain(&store), 0);
158    while let Some(block) = s.try_next().await? {
159        sink.write_all(&block.data).await?
160    }
161
162    Ok(())
163}
164
165// Encode a file to the ForestCAR.zst format and measure throughput.
166async fn benchmark_forest_encoding(
167    input: PathBuf,
168    compression_level: u16,
169    frame_size: usize,
170) -> anyhow::Result<()> {
171    let file = tokio::io::BufReader::new(File::open(&input).await?);
172
173    let mut block_stream = CarStream::new(file).await?;
174    let roots = std::mem::replace(
175        &mut block_stream.header_v1.roots,
176        nunny::vec![Default::default()],
177    );
178
179    let mut dest = indicatif_sink("encoded");
180
181    let frames = crate::db::car::forest::Encoder::compress_stream(
182        frame_size,
183        compression_level,
184        par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
185    );
186    crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
187    dest.flush().await?;
188    Ok(())
189}
190
191// Exporting combines a graph traversal with ForestCAR.zst encoding. Ideally, it
192// should be no lower than `min(benchmark_graph_traversal,
193// benchmark_forest_encoding)`.
194async fn benchmark_exporting(
195    input: Vec<PathBuf>,
196    compression_level: u16,
197    frame_size: usize,
198    epoch: Option<ChainEpoch>,
199    depth: ChainEpochDelta,
200) -> anyhow::Result<()> {
201    let store = Arc::new(open_store(input)?);
202    let heaviest = store.heaviest_tipset()?;
203    let idx = ChainIndex::new(&store);
204    let ts = idx.tipset_by_height(
205        epoch.unwrap_or(heaviest.epoch()),
206        Arc::new(heaviest),
207        ResolveNullTipset::TakeOlder,
208    )?;
209    // We don't do any sanity checking for 'depth'. The output is discarded so
210    // there's no need.
211    let stateroot_lookup_limit = ts.epoch() - depth;
212
213    let mut dest = indicatif_sink("exported");
214
215    let blocks = stream_chain(
216        Arc::clone(&store),
217        ts.deref().clone().chain_owned(Arc::clone(&store)),
218        stateroot_lookup_limit,
219    );
220
221    let frames = crate::db::car::forest::Encoder::compress_stream(
222        frame_size,
223        compression_level,
224        par_buffer(1024, blocks.map_err(anyhow::Error::from)),
225    );
226    crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
227    dest.flush().await?;
228    Ok(())
229}
230
231// Sink with attached progress indicator
232fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
233    let sink = tokio::io::sink();
234    let pb = ProgressBar::new_spinner()
235        .with_style(
236            ProgressStyle::with_template(
237                "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
238            )
239            .expect("infallible"),
240        )
241        .with_prefix(task)
242        .with_finish(indicatif::ProgressFinish::AndLeave);
243    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
244    pb.wrap_async_write(sink)
245}
246
247// Opening a block store may take a long time (CAR files have to be indexed,
248// CAR.zst files have to be decompressed). Show a progress indicator and clear
249// it when done.
250fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
251    let pb = indicatif::ProgressBar::new_spinner().with_style(
252        indicatif::ProgressStyle::with_template("{spinner} opening block store")
253            .expect("indicatif template must be valid"),
254    );
255    pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
256
257    let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
258
259    pb.finish_and_clear();
260
261    Ok(store)
262}