1use crate::chain::{
5 ChainEpochDelta,
6 index::{ChainIndex, ResolveNullTipset},
7};
8use crate::db::car::ManyCar;
9use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
10use crate::ipld::{stream_chain, stream_graph};
11use crate::shim::clock::ChainEpoch;
12use crate::utils::db::car_stream::{CarBlock, CarStream};
13use crate::utils::encoding::extract_cids;
14use crate::utils::stream::par_buffer;
15use anyhow::Context as _;
16use clap::Subcommand;
17use futures::{StreamExt, TryStreamExt};
18use fvm_ipld_encoding::DAG_CBOR;
19use indicatif::{ProgressBar, ProgressStyle};
20use itertools::Itertools;
21use std::ops::Deref;
22use std::path::PathBuf;
23use std::sync::Arc;
24use tokio::{
25 fs::File,
26 io::{AsyncWrite, AsyncWriteExt, BufReader},
27};
28
29#[derive(Debug, Subcommand)]
30pub enum BenchmarkCommands {
31 CarStreaming {
33 #[arg(required = true)]
35 snapshot_files: Vec<PathBuf>,
36 #[arg(long)]
38 inspect: bool,
39 },
40 GraphTraversal {
42 #[arg(required = true)]
44 snapshot_files: Vec<PathBuf>,
45 },
46 ForestEncoding {
48 snapshot_file: PathBuf,
50 #[arg(long, default_value_t = 3)]
51 compression_level: u16,
52 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
54 frame_size: usize,
55 },
56 Export {
58 #[arg(required = true)]
60 snapshot_files: Vec<PathBuf>,
61 #[arg(long, default_value_t = 3)]
62 compression_level: u16,
63 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
65 frame_size: usize,
66 #[arg(short, long)]
69 epoch: Option<ChainEpoch>,
70 #[arg(short, long, default_value_t = 2000)]
72 depth: ChainEpochDelta,
73 },
74}
75
76impl BenchmarkCommands {
77 pub async fn run(self) -> anyhow::Result<()> {
78 match self {
79 Self::CarStreaming {
80 snapshot_files,
81 inspect,
82 } => match inspect {
83 true => benchmark_car_streaming_inspect(snapshot_files).await,
84 false => benchmark_car_streaming(snapshot_files).await,
85 },
86 Self::GraphTraversal { snapshot_files } => {
87 benchmark_graph_traversal(snapshot_files).await
88 }
89 Self::ForestEncoding {
90 snapshot_file,
91 compression_level,
92 frame_size,
93 } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
94 Self::Export {
95 snapshot_files,
96 compression_level,
97 frame_size,
98 epoch,
99 depth,
100 } => {
101 benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
102 .await
103 }
104 }
105 }
106}
107
108async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
111 let mut sink = indicatif_sink("traversed");
112
113 let mut s = Box::pin(
114 futures::stream::iter(input)
115 .then(File::open)
116 .map_ok(BufReader::new)
117 .and_then(CarStream::new)
118 .try_flatten(),
119 );
120 while let Some(block) = s.try_next().await? {
121 sink.write_all(&block.data).await?
122 }
123 Ok(())
124}
125
126async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
130 let mut sink = indicatif_sink("traversed");
131 let mut s = Box::pin(
132 futures::stream::iter(input)
133 .then(File::open)
134 .map_ok(BufReader::new)
135 .and_then(CarStream::new)
136 .try_flatten(),
137 );
138 while let Some(block) = s.try_next().await? {
139 let block: CarBlock = block;
140 if block.cid.codec() == DAG_CBOR {
141 let cid_vec = extract_cids(&block.data)?;
142 let _ = cid_vec.iter().unique().count();
143 }
144 sink.write_all(&block.data).await?
145 }
146 Ok(())
147}
148
149async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
152 let store = open_store(input)?;
153 let heaviest = store.heaviest_tipset()?;
154
155 let mut sink = indicatif_sink("traversed");
156
157 let mut s = stream_graph(&store, heaviest.chain(&store), 0);
158 while let Some(block) = s.try_next().await? {
159 sink.write_all(&block.data).await?
160 }
161
162 Ok(())
163}
164
165async fn benchmark_forest_encoding(
167 input: PathBuf,
168 compression_level: u16,
169 frame_size: usize,
170) -> anyhow::Result<()> {
171 let file = tokio::io::BufReader::new(File::open(&input).await?);
172
173 let mut block_stream = CarStream::new(file).await?;
174 let roots = std::mem::replace(
175 &mut block_stream.header_v1.roots,
176 nunny::vec![Default::default()],
177 );
178
179 let mut dest = indicatif_sink("encoded");
180
181 let frames = crate::db::car::forest::Encoder::compress_stream(
182 frame_size,
183 compression_level,
184 par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
185 );
186 crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
187 dest.flush().await?;
188 Ok(())
189}
190
191async fn benchmark_exporting(
195 input: Vec<PathBuf>,
196 compression_level: u16,
197 frame_size: usize,
198 epoch: Option<ChainEpoch>,
199 depth: ChainEpochDelta,
200) -> anyhow::Result<()> {
201 let store = Arc::new(open_store(input)?);
202 let heaviest = store.heaviest_tipset()?;
203 let idx = ChainIndex::new(&store);
204 let ts = idx.tipset_by_height(
205 epoch.unwrap_or(heaviest.epoch()),
206 Arc::new(heaviest),
207 ResolveNullTipset::TakeOlder,
208 )?;
209 let stateroot_lookup_limit = ts.epoch() - depth;
212
213 let mut dest = indicatif_sink("exported");
214
215 let blocks = stream_chain(
216 Arc::clone(&store),
217 ts.deref().clone().chain_owned(Arc::clone(&store)),
218 stateroot_lookup_limit,
219 );
220
221 let frames = crate::db::car::forest::Encoder::compress_stream(
222 frame_size,
223 compression_level,
224 par_buffer(1024, blocks.map_err(anyhow::Error::from)),
225 );
226 crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
227 dest.flush().await?;
228 Ok(())
229}
230
231fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
233 let sink = tokio::io::sink();
234 let pb = ProgressBar::new_spinner()
235 .with_style(
236 ProgressStyle::with_template(
237 "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
238 )
239 .expect("infallible"),
240 )
241 .with_prefix(task)
242 .with_finish(indicatif::ProgressFinish::AndLeave);
243 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
244 pb.wrap_async_write(sink)
245}
246
247fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
251 let pb = indicatif::ProgressBar::new_spinner().with_style(
252 indicatif::ProgressStyle::with_template("{spinner} opening block store")
253 .expect("indicatif template must be valid"),
254 );
255 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
256
257 let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
258
259 pb.finish_and_clear();
260
261 Ok(store)
262}