bitcoin_block_parser/blocks.rs
1//! Contains [`BlockParser`] for parsing bitcoin [`Block`] from the `blocks` directory.
2
3use crate::headers::ParsedHeader;
4use crate::xor::XorReader;
5use crate::HeaderParser;
6use anyhow::Result;
7use bitcoin::consensus::Decodable;
8use bitcoin::{Block, Transaction};
9use crossbeam_channel::{bounded, Receiver, Sender};
10use log::info;
11use std::cmp::min;
12use std::collections::HashMap;
13use std::fs::File;
14use std::io::BufReader;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Instant;
19use threadpool::ThreadPool;
20
21/// Multithreaded parser for [`bitcoin::Block`].
22///
23/// # Examples
24/// Call `parse()` to run a `Fn(Block) -> T` that returns a [`ParserIterator<T>`]. The `Fn` closure
25/// runs on multiple threads.
26/// ```no_run
27/// use bitcoin_block_parser::blocks::*;
28///
29/// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
30/// let iterator = parser.parse(|block| block.total_size() as u64);
31/// println!("Total blockchain size: {}", iterator.sum::<u64>());
32/// ```
33///
34/// You can call `start_height()` to constrain the block range and `ordered()` to ensure the
35/// iterator returns blocks in height order:
36/// ```no_run
37/// use bitcoin_block_parser::blocks::*;
38///
39/// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
40/// let iterator = parser
41/// .start_height(100_000)
42/// .end_height(100_010)
43/// .parse(|block| block.block_hash())
44/// .ordered();
45///
46/// println!("In-order block hashes from 100,000 to 100,010:");
47/// for block_hash in iterator {
48/// println!("{}", block_hash);
49/// }
50/// ```
51#[derive(Clone, Debug)]
52pub struct BlockParser {
53 /// The parsed headers used for locating the blocks
54 headers: Vec<ParsedHeader>,
55 /// A logger for reporting on the progress of the parsing
56 logger: ParserLogger,
57 /// Options that can have an effect on memory and cpu performance
58 options: ParserOptions,
59 /// The block height range to start at
60 start_height: usize,
61 /// The block height range to end at
62 end_height: usize,
63}
64
65impl BlockParser {
66 /// Creates a new parser given the `blocks` directory where the `*.blk` files are located.
67 ///
68 /// - Returns an `Err` if unable to parse the `blk` files.
69 /// - You can [specify the blocks directory](https://en.bitcoin.it/wiki/Data_directory) when
70 /// running `bitcoind`.
71 pub fn new(blocks_dir: &str) -> Result<Self> {
72 Self::new_with_opts(blocks_dir, ParserOptions::default())
73 }
74
75 /// Creates a parser with custom [`ParserOptions`].
76 pub fn new_with_opts(blocks_dir: &str, options: ParserOptions) -> Result<Self> {
77 let headers = HeaderParser::parse(blocks_dir)?;
78 Ok(Self {
79 headers,
80 logger: ParserLogger::new(),
81 options,
82 start_height: 0,
83 end_height: usize::MAX,
84 })
85 }
86
87 /// Sets the *inclusive* start of block heights to parse.
88 ///
89 /// * `start_height` - must be less than the total number of blocks, `0` will start at the
90 /// genesis block.
91 pub fn start_height(mut self, start_height: usize) -> Self {
92 self.start_height = start_height;
93 self
94 }
95
96 /// Sets the *inclusive* end of block heights to parse.
97 ///
98 /// * `end_height` - the height to end at, [`usize::MAX`] will stop at the last block
99 /// available.
100 pub fn end_height(mut self, end_height: usize) -> Self {
101 self.end_height = end_height;
102 self
103 }
104
105 /// Parse all [`bitcoin::Block`] into type `T` and return a [`ParserIterator<T>`]. Results will
106 /// be in random order due to multithreading.
107 ///
108 /// * `extract` - a closure that runs on multiple threads. For best performance perform as much
109 /// computation and data reduction here as possible.
110 pub fn parse<T: Send + 'static>(
111 &self,
112 extract: impl Fn(Block) -> T + Clone + Send + 'static,
113 ) -> ParserIterator<T> {
114 let end_height = min(self.end_height, self.headers.len() - 1);
115 let header_range = self.headers[self.start_height..=end_height].to_vec();
116 let pool = ThreadPool::new(self.options.num_threads);
117 let (tx, rx) = bounded(self.options.channel_size);
118
119 for (index, header) in header_range.into_iter().enumerate() {
120 let logger = self.logger.clone();
121 let tx = tx.clone();
122 let extract = extract.clone();
123 let start_height = self.start_height;
124 pool.execute(move || {
125 let extract = match Self::parse_block(&header) {
126 Ok(block) => extract(block),
127 // Panic here because a blk file is corrupted, nothing else to do
128 e => panic!("Error reading {:?} - {:?}", header.path, e),
129 };
130 let height = start_height + index;
131 let _ = tx.send((height, extract));
132 logger.increment();
133 });
134 }
135 ParserIterator {
136 rx,
137 options: self.options.clone(),
138 start_height: self.start_height,
139 }
140 }
141
142 /// Helper function for reading a block from the filesystem given the header.
143 fn parse_block(header: &ParsedHeader) -> Result<Block> {
144 let reader = BufReader::new(File::open(&header.path)?);
145 let mut reader = BufReader::new(XorReader::new(reader, header.xor_mask));
146 reader.seek_relative(header.offset as i64)?;
147 Ok(Block {
148 header: header.inner,
149 txdata: Vec::<Transaction>::consensus_decode_from_finite_reader(&mut reader)?,
150 })
151 }
152}
153
154/// Options that affect the performance of [`BlockParser`] and [`ParserIterator`].
155///
156/// Generally changing these will be unnessary unless you really need to tune performance.
157#[derive(Clone, Debug)]
158pub struct ParserOptions {
159 /// How many items will be parsed in each [`ParserIterator::pipeline`] batch.
160 pub pipeline_size: usize,
161 /// The size of all [`crossbeam_channel::bounded`] channels that communicate between threads.
162 pub channel_size: usize,
163 /// The number of threads that will be spawned when running a multithreaded function.
164 pub num_threads: usize,
165}
166
167impl Default for ParserOptions {
168 /// Returns sane defaults that will be optimal for most workloads.
169 fn default() -> Self {
170 Self {
171 pipeline_size: 1,
172 channel_size: 100,
173 num_threads: 64,
174 }
175 }
176}
177
178/// Iterator returned from [`BlockParser::parse`] that allows for advanced transformations.
179pub struct ParserIterator<T> {
180 /// The receiver coming from a previous transformation step. `usize` is the block height.
181 rx: Receiver<(usize, T)>,
182 /// Options for tuning performance.
183 options: ParserOptions,
184 /// The block height the parser started at.
185 start_height: usize,
186}
187
188impl<A: Send + 'static> ParserIterator<A> {
189 /// Create a new iterator from an existing one, given a new receiver.
190 fn create<T>(&self, rx: Receiver<(usize, T)>) -> ParserIterator<T> {
191 ParserIterator::<T> {
192 rx,
193 options: self.options.clone(),
194 start_height: self.start_height,
195 }
196 }
197
198 /// Adds the block height to this iterator.
199 pub fn with_height(&self) -> ParserIterator<(usize, A)> {
200 let (tx, rx) = bounded(self.options.channel_size);
201 let parser = self.create(rx);
202 let rx_a = self.rx.clone();
203
204 thread::spawn(move || {
205 for (height, a) in rx_a {
206 let _ = tx.send((height, (height, a)));
207 }
208 });
209 parser
210 }
211
212 /// Orders the results by block height, can be called for a small increase in
213 /// memory and runtime.
214 ///
215 /// # Example
216 /// Using the `ordered` function to get the first 10 block hashes in-height order:
217 /// ```no_run
218 /// use bitcoin_block_parser::blocks::*;
219 /// use bitcoin::BlockHash;
220 ///
221 /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
222 /// let ordered: ParserIterator<BlockHash> = parser.parse(|block| block.block_hash());
223 /// let first_10: Vec<BlockHash> = ordered.take(10).collect();
224 /// ```
225 pub fn ordered(&self) -> ParserIterator<A> {
226 let (tx, rx) = bounded(self.options.channel_size);
227 let parser = self.create(rx);
228 let rx_a = self.rx.clone();
229 let start_height = self.start_height;
230
231 thread::spawn(move || {
232 let mut current_height = start_height;
233 let mut unordered: HashMap<usize, A> = HashMap::default();
234
235 for (height, a) in rx_a {
236 unordered.insert(height, a);
237 while let Some(ordered) = unordered.remove(¤t_height) {
238 let _ = tx.send((current_height, ordered));
239 current_height += 1;
240 }
241 }
242 });
243 parser
244 }
245
246 /// Perform a map function using multiple threads.
247 /// * Useful if you need to perform an additional map after [`BlockParser::parse`].
248 /// * More performant than calling [`Iterator::map`] on the [`ParserIterator`].
249 /// * Returns results in random order.
250 pub fn map_parallel<B: Send + 'static>(
251 &self,
252 function: impl Fn(A) -> B + Clone + Send + 'static,
253 ) -> ParserIterator<B> {
254 let pool = ThreadPool::new(self.options.num_threads);
255 let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
256
257 for _ in 0..self.options.num_threads {
258 let tx_b = tx_b.clone();
259 let rx_a = self.rx.clone();
260 let function = function.clone();
261 pool.execute(move || {
262 for (height, a) in rx_a {
263 let _ = tx_b.send((height, function(a)));
264 }
265 });
266 }
267
268 self.create(rx_b)
269 }
270
271 /// Pipelines allow you to perform two functions on the same batch of blocks.
272 /// Useful when you want multithreaded performance while processing blocks in-order.
273 ///
274 /// # Example
275 /// For example if calculating the size difference between consecutive blocks you could use the
276 /// following code:
277 /// ```no_run
278 /// use bitcoin_block_parser::blocks::*;
279 /// use std::collections::HashMap;
280 /// use std::convert::identity;
281 /// use bitcoin::BlockHash;
282 /// use bitcoin::hashes::Hash;
283 ///
284 /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
285 /// let mut block_sizes: HashMap<BlockHash, isize> = HashMap::new();
286 /// let mut differences = vec![];
287 /// // Initial block size for the genesis block
288 /// block_sizes.insert(BlockHash::all_zeros(), 0);
289 ///
290 /// for block in parser.parse(identity).ordered() {
291 /// // Store this block's size in the shared state
292 /// let block_size = block.total_size() as isize;
293 /// block_sizes.insert(block.block_hash(), block_size);
294 /// // Look up the previous size to compute the difference
295 /// let prev_block_hash = block.header.prev_blockhash;
296 /// let prev_size = block_sizes.remove(&prev_block_hash);
297 /// differences.push(block_size - prev_size.unwrap());
298 /// }
299 ///
300 /// let max_difference = differences.into_iter().max().unwrap();
301 /// println!("Maximum increase in block size: {}", max_difference);
302 /// ```
303 ///
304 /// The previous code runs on a single thread. If we want to leverage multithreading we can use
305 /// `pipeline` functions for a large speed-up:
306 /// ```no_run
307 /// use bitcoin_block_parser::blocks::*;
308 /// use dashmap::DashMap;
309 /// use std::convert::identity;
310 /// use std::sync::Arc;
311 /// use bitcoin::BlockHash;
312 /// use bitcoin::hashes::Hash;
313 ///
314 /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
315 /// // State shared across all threads
316 /// let block_sizes: Arc<DashMap<BlockHash, isize>> = Arc::new(DashMap::new());
317 /// let blocksizes_clone = block_sizes.clone();
318 /// // Initial block size for the genesis block
319 /// block_sizes.insert(BlockHash::all_zeros(), 0);
320 ///
321 /// let iterator = parser.parse(identity).ordered().pipeline_fn(
322 /// move |block| {
323 /// // Store this block's size in the shared state
324 /// let block_size = block.total_size() as isize;
325 /// block_sizes.insert(block.block_hash(), block_size);
326 /// (block.header.prev_blockhash, block_size)
327 /// },
328 /// move |(prev_block_hash, block_size)| {
329 /// // Look up the previous size to compute the difference
330 /// let prev_size = blocksizes_clone.remove(&prev_block_hash);
331 /// block_size - prev_size.unwrap().1
332 /// },
333 /// );
334 ///
335 /// let max_difference = iterator.max().unwrap();
336 /// println!("Maximum increase in block size: {}", max_difference);
337 /// ```
338 pub fn pipeline_fn<B: Send + 'static, C: Send + 'static>(
339 &self,
340 f1: impl Fn(A) -> B + Clone + Send + 'static,
341 f2: impl Fn(B) -> C + Clone + Send + 'static,
342 ) -> ParserIterator<C> {
343 let pipeline = PipelineClosure { f1, f2 };
344 self.pipeline(&pipeline)
345 }
346
347 /// Runs [`ParserIterator::pipeline_fn`] functions by implementing a [`Pipeline`] trait for
348 /// convenience / cleaner code.
349 pub fn pipeline<B: Send + 'static, C: Send + 'static>(
350 &self,
351 pipeline: &(impl Pipeline<A, B, C> + Clone + Send + 'static),
352 ) -> ParserIterator<C> {
353 let pool_a = ThreadPool::new(self.options.num_threads);
354 let pool_b = ThreadPool::new(self.options.num_threads);
355 let rx_a = self.rx.clone();
356 let opts = self.options.clone();
357 let pipeline = pipeline.clone();
358 let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
359 let (tx_c, rx_c) = bounded(self.options.pipeline_size * self.options.num_threads);
360 let run = Arc::new(AtomicBool::new(true));
361
362 thread::spawn(move || {
363 while run.load(Ordering::Relaxed) {
364 let p1 = pipeline.clone();
365 let p2 = pipeline.clone();
366 Self::run_pipeline(&opts, &pool_a, &run, &rx_a, &tx_b, &move |a| p1.first(a));
367 pool_a.join();
368 pipeline.between();
369 Self::run_pipeline(&opts, &pool_b, &run, &rx_b, &tx_c, &move |b| p2.second(b));
370 }
371 });
372
373 self.create(rx_c)
374 }
375
376 /// Helper for running the pipeline functions on multiple threads.
377 fn run_pipeline<X: Send + 'static, Y: Send + 'static>(
378 options: &ParserOptions,
379 pool: &ThreadPool,
380 running: &Arc<AtomicBool>,
381 rx: &Receiver<(usize, X)>,
382 tx: &Sender<(usize, Y)>,
383 function: &(impl Fn(X) -> Y + Clone + Send + 'static),
384 ) {
385 // Spawns `num_threads` and run `function` on a `pipeline_size` # of items
386 for _ in 0..options.num_threads {
387 let running = running.clone();
388 let tx = tx.clone();
389 let rx = rx.clone();
390 let function = function.clone();
391 let pipeline_size = options.pipeline_size;
392 pool.execute(move || {
393 for _ in 0..pipeline_size {
394 match rx.recv() {
395 Ok((height, x)) => {
396 let _ = tx.send((height, function(x)));
397 }
398 Err(_) => {
399 // Signal to the pipeline thread that we have consumed all input
400 running.store(false, Ordering::Relaxed);
401 }
402 };
403 }
404 });
405 }
406 }
407}
408
409impl<T> Iterator for ParserIterator<T> {
410 type Item = T;
411
412 fn next(&mut self) -> Option<Self::Item> {
413 self.rx.iter().map(|(_, t)| t).next()
414 }
415}
416
417/// Implement this trait for calling [`ParserIterator::pipeline`].
418pub trait Pipeline<A, B, C> {
419 /// Transforms a batch of inputs in parallel
420 fn first(&self, a: A) -> B;
421
422 /// Runs once the batch in `first()` has finished completely.
423 fn between(&self) {}
424
425 /// Transforms the same batch processed in `first()` in parallel
426 fn second(&self, b: B) -> C;
427}
428
429/// Helper for turning closures into a pipeline trait.
430#[derive(Clone)]
431struct PipelineClosure<F1, F2> {
432 f1: F1,
433 f2: F2,
434}
435
436impl<F1, F2, A, B, C> Pipeline<A, B, C> for PipelineClosure<F1, F2>
437where
438 F1: Fn(A) -> B,
439 F2: Fn(B) -> C,
440{
441 fn first(&self, a: A) -> B {
442 (self.f1)(a)
443 }
444
445 fn second(&self, b: B) -> C {
446 (self.f2)(b)
447 }
448}
449
450/// Logs the progress of the parsing every 10K blocks in a thread-safe manner.
451#[derive(Clone, Debug)]
452struct ParserLogger {
453 num_parsed: Arc<AtomicUsize>,
454 start: Instant,
455 log_at: usize,
456}
457
458impl ParserLogger {
459 fn new() -> Self {
460 Self {
461 num_parsed: Arc::new(Default::default()),
462 start: Instant::now(),
463 log_at: 10_000,
464 }
465 }
466
467 fn increment(&self) {
468 let num = self.num_parsed.fetch_add(1, Ordering::Relaxed);
469
470 if num == 0 {
471 info!("Starting to parse blocks...");
472 } else if num % self.log_at == 0 {
473 let elapsed = (Instant::now() - self.start).as_secs();
474 let blocks = format!("{}K blocks parsed,", num / 1000);
475 info!("{} {}m{}s elapsed", blocks, elapsed / 60, elapsed % 60);
476 }
477 }
478}