1use crate::blocks::{Tipset, TipsetKey};
5use crate::cid_collections::CidHashSet;
6use crate::db::car::ManyCar;
7use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
8use crate::ipld::{stream_chain, stream_graph};
9use crate::shim::clock::ChainEpoch;
10use crate::utils::db::car_stream::{CarBlock, CarStream};
11use crate::utils::encoding::extract_cids;
12use crate::utils::multihash::MultihashCode;
13use crate::utils::stream::par_buffer;
14use crate::{
15 chain::{
16 ChainEpochDelta,
17 index::{ChainIndex, ResolveNullTipset},
18 },
19 db::{Blockstore, Either, parity_db::ParityDb, parity_db_config::ParityDbConfig},
20};
21use anyhow::Context as _;
22use cid::Cid;
23use clap::Subcommand;
24use futures::{StreamExt, TryStreamExt};
25use fvm_ipld_encoding::DAG_CBOR;
26use human_repr::HumanCount as _;
27use indicatif::{ProgressBar, ProgressStyle};
28use itertools::Itertools;
29use std::path::{Path, PathBuf};
30use std::sync::Arc;
31use std::time::Instant;
32use tokio::{
33 fs::File,
34 io::{AsyncWrite, AsyncWriteExt, BufReader},
35};
36
37#[derive(Debug, Clone, Copy, clap::ValueEnum)]
38pub enum DbType {
39 Parity,
40 ParityOpt,
41}
42
43#[derive(Debug, Subcommand)]
44pub enum BenchmarkCommands {
45 CarStreaming {
47 #[arg(required = true)]
49 snapshot_files: Vec<PathBuf>,
50 #[arg(long)]
52 inspect: bool,
53 },
54 GraphTraversal {
56 #[arg(required = true)]
58 snapshot_files: Vec<PathBuf>,
59 },
60 ForestEncoding {
62 snapshot_file: PathBuf,
64 #[arg(long, default_value_t = 3)]
65 compression_level: u16,
66 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
68 frame_size: usize,
69 },
70 Export {
72 #[arg(required = true)]
74 snapshot_files: Vec<PathBuf>,
75 #[arg(long, default_value_t = 3)]
76 compression_level: u16,
77 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
79 frame_size: usize,
80 #[arg(short, long)]
83 epoch: Option<ChainEpoch>,
84 #[arg(short, long, default_value_t = 2000)]
86 depth: ChainEpochDelta,
87 },
88 Blockstore {
90 #[arg(required = true)]
92 snapshot_file: PathBuf,
93 #[arg(long, default_value = "parity")]
94 db: DbType,
95 },
96}
97
98impl BenchmarkCommands {
99 pub async fn run(self) -> anyhow::Result<()> {
100 match self {
101 Self::CarStreaming {
102 snapshot_files,
103 inspect,
104 } => match inspect {
105 true => benchmark_car_streaming_inspect(snapshot_files).await,
106 false => benchmark_car_streaming(snapshot_files).await,
107 },
108 Self::GraphTraversal { snapshot_files } => {
109 benchmark_graph_traversal(snapshot_files).await
110 }
111 Self::ForestEncoding {
112 snapshot_file,
113 compression_level,
114 frame_size,
115 } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
116 Self::Export {
117 snapshot_files,
118 compression_level,
119 frame_size,
120 epoch,
121 depth,
122 } => {
123 benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
124 .await
125 }
126 Self::Blockstore { snapshot_file, db } => benchmark_blockstore(snapshot_file, db).await,
127 }
128 }
129}
130
131async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
134 let mut sink = indicatif_sink("traversed");
135
136 let mut s = Box::pin(
137 futures::stream::iter(input)
138 .then(File::open)
139 .map_ok(BufReader::new)
140 .and_then(CarStream::new)
141 .try_flatten(),
142 );
143 while let Some(block) = s.try_next().await? {
144 sink.write_all(&block.data).await?
145 }
146 Ok(())
147}
148
149async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
153 let mut sink = indicatif_sink("traversed");
154 let mut s = Box::pin(
155 futures::stream::iter(input)
156 .then(File::open)
157 .map_ok(BufReader::new)
158 .and_then(CarStream::new)
159 .try_flatten(),
160 );
161 while let Some(block) = s.try_next().await? {
162 let block: CarBlock = block;
163 if block.cid.codec() == DAG_CBOR {
164 let cid_vec = extract_cids(&block.data)?;
165 let _ = cid_vec.iter().unique().count();
166 }
167 sink.write_all(&block.data).await?
168 }
169 Ok(())
170}
171
172async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
175 let store = open_store(input)?;
176 let heaviest = store.heaviest_tipset()?;
177
178 let mut sink = indicatif_sink("traversed");
179
180 let mut s = stream_graph(&store, heaviest.chain(&store), 0, CidHashSet::default());
181 while let Some(block) = s.try_next().await? {
182 sink.write_all(&block.data).await?
183 }
184
185 Ok(())
186}
187
188async fn benchmark_forest_encoding(
190 input: PathBuf,
191 compression_level: u16,
192 frame_size: usize,
193) -> anyhow::Result<()> {
194 let file = tokio::io::BufReader::new(File::open(&input).await?);
195
196 let mut block_stream = CarStream::new(file).await?;
197 let roots = std::mem::replace(
198 &mut block_stream.header_v1.roots,
199 nunny::vec![Default::default()],
200 );
201
202 let mut dest = indicatif_sink("encoded");
203
204 let frames = crate::db::car::forest::Encoder::compress_stream(
205 frame_size,
206 compression_level,
207 par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
208 );
209 crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
210 dest.flush().await?;
211 Ok(())
212}
213
214async fn benchmark_exporting(
218 input: Vec<PathBuf>,
219 compression_level: u16,
220 frame_size: usize,
221 epoch: Option<ChainEpoch>,
222 depth: ChainEpochDelta,
223) -> anyhow::Result<()> {
224 let store = Arc::new(open_store(input)?);
225 let heaviest = store.heaviest_tipset()?;
226 let idx = ChainIndex::new(store.clone());
227 let ts = idx.tipset_by_height(
228 epoch.unwrap_or(heaviest.epoch()),
229 heaviest,
230 ResolveNullTipset::TakeOlder,
231 )?;
232 let stateroot_lookup_limit = ts.epoch() - depth;
235
236 let mut dest = indicatif_sink("exported");
237
238 let blocks = stream_chain(
239 Arc::clone(&store),
240 ts.clone().chain_owned(Arc::clone(&store)),
241 stateroot_lookup_limit,
242 CidHashSet::default(),
243 );
244
245 let frames = crate::db::car::forest::Encoder::compress_stream(
246 frame_size,
247 compression_level,
248 par_buffer(1024, blocks.map_err(anyhow::Error::from)),
249 );
250 crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
251 dest.flush().await?;
252 Ok(())
253}
254
255async fn benchmark_blockstore(snapshot: PathBuf, db: DbType) -> anyhow::Result<()> {
256 let tmp_db_path = tempfile::tempdir()?;
257 let bs = open_blockstore(&db, tmp_db_path.path())?;
258 let head_tsk = benchmark_blockstore_import(&snapshot, &db, &bs, tmp_db_path.path()).await?;
259 benchmark_blockstore_traversal(&bs, &head_tsk).await?;
260 Ok(())
261}
262
263fn open_blockstore(db: &DbType, db_path: &Path) -> anyhow::Result<impl Blockstore> {
264 println!("temp db path: {}", db_path.display());
265 Ok(match db {
266 DbType::Parity => Either::Left(ParityDb::open(db_path, &ParityDbConfig::default())?),
267 DbType::ParityOpt => Either::Right(ParityDbOpt::open(db_path)?),
268 })
269}
270
271async fn benchmark_blockstore_import(
272 snapshot: &Path,
273 db: &DbType,
274 bs: &impl Blockstore,
275 bs_path: &Path,
276) -> anyhow::Result<TipsetKey> {
277 let mut car_stream = CarStream::new_from_path(snapshot).await?;
278 let head_tsk = car_stream.head_tipset_key();
279 println!("head tipset key: {head_tsk}");
280 println!("importing CAR into {db:?} blockstore...");
281 let start = Instant::now();
282 let mut n = 0;
283 while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
284 bs.put_keyed(&cid, &data)?;
285 n += 1;
286 }
287 let db_size = fs_extra::dir::get_size(bs_path).unwrap_or_default();
288 println!(
289 "imported {n} records into {db:?} blockstore(size={}), took {}",
290 db_size.human_count_bytes(),
291 humantime::format_duration(start.elapsed())
292 );
293 Ok(head_tsk)
294}
295
296async fn benchmark_blockstore_traversal(
297 bs: &impl Blockstore,
298 head_tsk: &TipsetKey,
299) -> anyhow::Result<()> {
300 println!("Traversing the chain...");
301 let head = Tipset::load_required(bs, head_tsk)?;
302 let mut sink = indicatif_sink("traversed");
303 let start = Instant::now();
304 let mut s = stream_graph(bs, head.chain(bs), 0, CidHashSet::default());
305 let mut n = 0;
306 while let Some(block) = s.try_next().await? {
307 sink.write_all(&block.data).await?;
308 n += 1;
309 }
310 println!(
311 "Traversed {n} records, took {}",
312 humantime::format_duration(start.elapsed())
313 );
314 Ok(())
315}
316
317fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
319 let sink = tokio::io::sink();
320 let pb = ProgressBar::new_spinner()
321 .with_style(
322 ProgressStyle::with_template(
323 "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
324 )
325 .expect("infallible"),
326 )
327 .with_prefix(task)
328 .with_finish(indicatif::ProgressFinish::AndLeave);
329 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
330 pb.wrap_async_write(sink)
331}
332
333fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
337 let pb = indicatif::ProgressBar::new_spinner().with_style(
338 indicatif::ProgressStyle::with_template("{spinner} opening block store")
339 .expect("indicatif template must be valid"),
340 );
341 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
342
343 let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
344
345 pb.finish_and_clear();
346
347 Ok(store)
348}
349
350struct ParityDbOpt {
351 db: parity_db::Db,
352}
353
354impl ParityDbOpt {
355 fn open(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
356 let opts = parity_db::Options {
357 path: path.into(),
358 sync_wal: true,
359 sync_data: true,
360 stats: false,
361 salt: None,
362 columns: vec![
363 parity_db::ColumnOptions {
364 preimage: true,
365 uniform: true,
366 compression: parity_db::CompressionType::Lz4,
367 ..Default::default()
368 },
369 parity_db::ColumnOptions {
370 preimage: true,
371 compression: parity_db::CompressionType::Lz4,
372 ..Default::default()
373 },
374 ],
375 compression_threshold: [(0, 128)].into_iter().collect(),
376 };
377 let db = parity_db::Db::open_or_create(&opts)?;
378 Ok(Self { db })
379 }
380}
381
382impl Blockstore for ParityDbOpt {
383 fn get(&self, k: &cid::Cid) -> anyhow::Result<Option<Vec<u8>>> {
384 Ok(if is_dag_cbor_blake2b256(k) {
385 self.db.get(0, k.hash().digest())?
386 } else {
387 self.db.get(1, &k.to_bytes())?
388 })
389 }
390
391 fn put_keyed(&self, k: &cid::Cid, block: &[u8]) -> anyhow::Result<()> {
392 if is_dag_cbor_blake2b256(k) {
393 self.db
394 .commit([(0, k.hash().digest(), Some(block.to_vec()))])?;
395 } else {
396 self.db.commit([(1, k.to_bytes(), Some(block.to_vec()))])?;
397 }
398 Ok(())
399 }
400}
401
402fn is_dag_cbor_blake2b256(cid: &Cid) -> bool {
403 cid.codec() == DAG_CBOR && cid.hash().code() == u64::from(MultihashCode::Blake2b256)
404}