1use crate::blocks::{Tipset, TipsetKey};
5use crate::db::car::ManyCar;
6use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE;
7use crate::ipld::{stream_chain, stream_graph};
8use crate::shim::clock::ChainEpoch;
9use crate::utils::db::car_stream::{CarBlock, CarStream};
10use crate::utils::encoding::extract_cids;
11use crate::utils::multihash::MultihashCode;
12use crate::utils::stream::par_buffer;
13use crate::{
14 chain::{
15 ChainEpochDelta,
16 index::{ChainIndex, ResolveNullTipset},
17 },
18 db::{Blockstore, Either, parity_db::ParityDb, parity_db_config::ParityDbConfig},
19};
20use anyhow::Context as _;
21use cid::Cid;
22use clap::Subcommand;
23use futures::{StreamExt, TryStreamExt};
24use fvm_ipld_encoding::DAG_CBOR;
25use human_repr::HumanCount as _;
26use indicatif::{ProgressBar, ProgressStyle};
27use itertools::Itertools;
28use std::path::{Path, PathBuf};
29use std::sync::Arc;
30use std::time::Instant;
31use tokio::{
32 fs::File,
33 io::{AsyncWrite, AsyncWriteExt, BufReader},
34};
35
36#[derive(Debug, Clone, Copy, clap::ValueEnum)]
37pub enum DbType {
38 Parity,
39 ParityOpt,
40}
41
42#[derive(Debug, Subcommand)]
43pub enum BenchmarkCommands {
44 CarStreaming {
46 #[arg(required = true)]
48 snapshot_files: Vec<PathBuf>,
49 #[arg(long)]
51 inspect: bool,
52 },
53 GraphTraversal {
55 #[arg(required = true)]
57 snapshot_files: Vec<PathBuf>,
58 },
59 ForestEncoding {
61 snapshot_file: PathBuf,
63 #[arg(long, default_value_t = 3)]
64 compression_level: u16,
65 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
67 frame_size: usize,
68 },
69 Export {
71 #[arg(required = true)]
73 snapshot_files: Vec<PathBuf>,
74 #[arg(long, default_value_t = 3)]
75 compression_level: u16,
76 #[arg(long, default_value_t = DEFAULT_FOREST_CAR_FRAME_SIZE)]
78 frame_size: usize,
79 #[arg(short, long)]
82 epoch: Option<ChainEpoch>,
83 #[arg(short, long, default_value_t = 2000)]
85 depth: ChainEpochDelta,
86 },
87 Blockstore {
89 #[arg(required = true)]
91 snapshot_file: PathBuf,
92 #[arg(long, default_value = "parity")]
93 db: DbType,
94 },
95}
96
97impl BenchmarkCommands {
98 pub async fn run(self) -> anyhow::Result<()> {
99 match self {
100 Self::CarStreaming {
101 snapshot_files,
102 inspect,
103 } => match inspect {
104 true => benchmark_car_streaming_inspect(snapshot_files).await,
105 false => benchmark_car_streaming(snapshot_files).await,
106 },
107 Self::GraphTraversal { snapshot_files } => {
108 benchmark_graph_traversal(snapshot_files).await
109 }
110 Self::ForestEncoding {
111 snapshot_file,
112 compression_level,
113 frame_size,
114 } => benchmark_forest_encoding(snapshot_file, compression_level, frame_size).await,
115 Self::Export {
116 snapshot_files,
117 compression_level,
118 frame_size,
119 epoch,
120 depth,
121 } => {
122 benchmark_exporting(snapshot_files, compression_level, frame_size, epoch, depth)
123 .await
124 }
125 Self::Blockstore { snapshot_file, db } => benchmark_blockstore(snapshot_file, db).await,
126 }
127 }
128}
129
130async fn benchmark_car_streaming(input: Vec<PathBuf>) -> anyhow::Result<()> {
133 let mut sink = indicatif_sink("traversed");
134
135 let mut s = Box::pin(
136 futures::stream::iter(input)
137 .then(File::open)
138 .map_ok(BufReader::new)
139 .and_then(CarStream::new)
140 .try_flatten(),
141 );
142 while let Some(block) = s.try_next().await? {
143 sink.write_all(&block.data).await?
144 }
145 Ok(())
146}
147
148async fn benchmark_car_streaming_inspect(input: Vec<PathBuf>) -> anyhow::Result<()> {
152 let mut sink = indicatif_sink("traversed");
153 let mut s = Box::pin(
154 futures::stream::iter(input)
155 .then(File::open)
156 .map_ok(BufReader::new)
157 .and_then(CarStream::new)
158 .try_flatten(),
159 );
160 while let Some(block) = s.try_next().await? {
161 let block: CarBlock = block;
162 if block.cid.codec() == DAG_CBOR {
163 let cid_vec = extract_cids(&block.data)?;
164 let _ = cid_vec.iter().unique().count();
165 }
166 sink.write_all(&block.data).await?
167 }
168 Ok(())
169}
170
171async fn benchmark_graph_traversal(input: Vec<PathBuf>) -> anyhow::Result<()> {
174 let store = open_store(input)?;
175 let heaviest = store.heaviest_tipset()?;
176
177 let mut sink = indicatif_sink("traversed");
178
179 let mut s = stream_graph(&store, heaviest.chain(&store), 0);
180 while let Some(block) = s.try_next().await? {
181 sink.write_all(&block.data).await?
182 }
183
184 Ok(())
185}
186
187async fn benchmark_forest_encoding(
189 input: PathBuf,
190 compression_level: u16,
191 frame_size: usize,
192) -> anyhow::Result<()> {
193 let file = tokio::io::BufReader::new(File::open(&input).await?);
194
195 let mut block_stream = CarStream::new(file).await?;
196 let roots = std::mem::replace(
197 &mut block_stream.header_v1.roots,
198 nunny::vec![Default::default()],
199 );
200
201 let mut dest = indicatif_sink("encoded");
202
203 let frames = crate::db::car::forest::Encoder::compress_stream(
204 frame_size,
205 compression_level,
206 par_buffer(1024, block_stream.map_err(anyhow::Error::from)),
207 );
208 crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
209 dest.flush().await?;
210 Ok(())
211}
212
213async fn benchmark_exporting(
217 input: Vec<PathBuf>,
218 compression_level: u16,
219 frame_size: usize,
220 epoch: Option<ChainEpoch>,
221 depth: ChainEpochDelta,
222) -> anyhow::Result<()> {
223 let store = Arc::new(open_store(input)?);
224 let heaviest = store.heaviest_tipset()?;
225 let idx = ChainIndex::new(&store);
226 let ts = idx.tipset_by_height(
227 epoch.unwrap_or(heaviest.epoch()),
228 heaviest,
229 ResolveNullTipset::TakeOlder,
230 )?;
231 let stateroot_lookup_limit = ts.epoch() - depth;
234
235 let mut dest = indicatif_sink("exported");
236
237 let blocks = stream_chain(
238 Arc::clone(&store),
239 ts.clone().chain_owned(Arc::clone(&store)),
240 stateroot_lookup_limit,
241 );
242
243 let frames = crate::db::car::forest::Encoder::compress_stream(
244 frame_size,
245 compression_level,
246 par_buffer(1024, blocks.map_err(anyhow::Error::from)),
247 );
248 crate::db::car::forest::Encoder::write(&mut dest, ts.key().to_cids(), frames).await?;
249 dest.flush().await?;
250 Ok(())
251}
252
253async fn benchmark_blockstore(snapshot: PathBuf, db: DbType) -> anyhow::Result<()> {
254 let tmp_db_path = tempfile::tempdir()?;
255 let bs = open_blockstore(&db, tmp_db_path.path())?;
256 let head_tsk = benchmark_blockstore_import(&snapshot, &db, &bs, tmp_db_path.path()).await?;
257 benchmark_blockstore_traversal(&bs, &head_tsk).await?;
258 Ok(())
259}
260
261fn open_blockstore(db: &DbType, db_path: &Path) -> anyhow::Result<impl Blockstore> {
262 println!("temp db path: {}", db_path.display());
263 Ok(match db {
264 DbType::Parity => Either::Left(ParityDb::open(db_path, &ParityDbConfig::default())?),
265 DbType::ParityOpt => Either::Right(ParityDbOpt::open(db_path)?),
266 })
267}
268
269async fn benchmark_blockstore_import(
270 snapshot: &Path,
271 db: &DbType,
272 bs: &impl Blockstore,
273 bs_path: &Path,
274) -> anyhow::Result<TipsetKey> {
275 let mut car_stream = CarStream::new_from_path(snapshot).await?;
276 let head_tsk = car_stream.head_tipset_key();
277 println!("head tipset key: {head_tsk}");
278 println!("importing CAR into {db:?} blockstore...");
279 let start = Instant::now();
280 let mut n = 0;
281 while let Some(CarBlock { cid, data }) = car_stream.try_next().await? {
282 bs.put_keyed(&cid, &data)?;
283 n += 1;
284 }
285 let db_size = fs_extra::dir::get_size(bs_path).unwrap_or_default();
286 println!(
287 "imported {n} records into {db:?} blockstore(size={}), took {}",
288 db_size.human_count_bytes(),
289 humantime::format_duration(start.elapsed())
290 );
291 Ok(head_tsk)
292}
293
294async fn benchmark_blockstore_traversal(
295 bs: &impl Blockstore,
296 head_tsk: &TipsetKey,
297) -> anyhow::Result<()> {
298 println!("Traversing the chain...");
299 let head = Tipset::load_required(bs, head_tsk)?;
300 let mut sink = indicatif_sink("traversed");
301 let start = Instant::now();
302 let mut s = stream_graph(bs, head.chain(bs), 0);
303 let mut n = 0;
304 while let Some(block) = s.try_next().await? {
305 sink.write_all(&block.data).await?;
306 n += 1;
307 }
308 println!(
309 "Traversed {n} records, took {}",
310 humantime::format_duration(start.elapsed())
311 );
312 Ok(())
313}
314
315fn indicatif_sink(task: &'static str) -> impl AsyncWrite {
317 let sink = tokio::io::sink();
318 let pb = ProgressBar::new_spinner()
319 .with_style(
320 ProgressStyle::with_template(
321 "{spinner} {prefix} {total_bytes} at {binary_bytes_per_sec} in {elapsed_precise}",
322 )
323 .expect("infallible"),
324 )
325 .with_prefix(task)
326 .with_finish(indicatif::ProgressFinish::AndLeave);
327 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
328 pb.wrap_async_write(sink)
329}
330
331fn open_store(input: Vec<PathBuf>) -> anyhow::Result<ManyCar> {
335 let pb = indicatif::ProgressBar::new_spinner().with_style(
336 indicatif::ProgressStyle::with_template("{spinner} opening block store")
337 .expect("indicatif template must be valid"),
338 );
339 pb.enable_steady_tick(std::time::Duration::from_secs_f32(0.1));
340
341 let store = ManyCar::try_from(input).context("couldn't read input CAR file")?;
342
343 pb.finish_and_clear();
344
345 Ok(store)
346}
347
348struct ParityDbOpt {
349 db: parity_db::Db,
350}
351
352impl ParityDbOpt {
353 fn open(path: impl Into<PathBuf>) -> anyhow::Result<Self> {
354 let opts = parity_db::Options {
355 path: path.into(),
356 sync_wal: true,
357 sync_data: true,
358 stats: false,
359 salt: None,
360 columns: vec![
361 parity_db::ColumnOptions {
362 preimage: true,
363 uniform: true,
364 compression: parity_db::CompressionType::Lz4,
365 ..Default::default()
366 },
367 parity_db::ColumnOptions {
368 preimage: true,
369 compression: parity_db::CompressionType::Lz4,
370 ..Default::default()
371 },
372 ],
373 compression_threshold: [(0, 128)].into_iter().collect(),
374 };
375 let db = parity_db::Db::open_or_create(&opts)?;
376 Ok(Self { db })
377 }
378}
379
380impl Blockstore for ParityDbOpt {
381 fn get(&self, k: &cid::Cid) -> anyhow::Result<Option<Vec<u8>>> {
382 Ok(if is_dag_cbor_blake2b256(k) {
383 self.db.get(0, k.hash().digest())?
384 } else {
385 self.db.get(1, &k.to_bytes())?
386 })
387 }
388
389 fn put_keyed(&self, k: &cid::Cid, block: &[u8]) -> anyhow::Result<()> {
390 if is_dag_cbor_blake2b256(k) {
391 self.db
392 .commit([(0, k.hash().digest(), Some(block.to_vec()))])?;
393 } else {
394 self.db.commit([(1, k.to_bytes(), Some(block.to_vec()))])?;
395 }
396 Ok(())
397 }
398}
399
400fn is_dag_cbor_blake2b256(cid: &Cid) -> bool {
401 cid.codec() == DAG_CBOR && cid.hash().code() == u64::from(MultihashCode::Blake2b256)
402}