bitcoin_block_parser/
blocks.rs

1//! Contains [`BlockParser`] for parsing bitcoin [`Block`] from the `blocks` directory.
2
3use crate::headers::ParsedHeader;
4use crate::xor::XorReader;
5use crate::HeaderParser;
6use anyhow::Result;
7use bitcoin::consensus::Decodable;
8use bitcoin::{Block, Transaction};
9use crossbeam_channel::{bounded, Receiver, Sender};
10use log::info;
11use std::cmp::min;
12use std::collections::HashMap;
13use std::fs::File;
14use std::io::BufReader;
15use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
16use std::sync::Arc;
17use std::thread;
18use std::time::Instant;
19use threadpool::ThreadPool;
20
21/// Multithreaded parser for [`bitcoin::Block`].
22///
23/// # Examples
24/// Call `parse()` to run a `Fn(Block) -> T` that returns a [`ParserIterator<T>`].  The `Fn` closure
25/// runs on multiple threads.
26/// ```no_run
27/// use bitcoin_block_parser::blocks::*;
28///
29/// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
30/// let iterator = parser.parse(|block| block.total_size() as u64);
31/// println!("Total blockchain size: {}", iterator.sum::<u64>());
32/// ```
33///
34/// You can call `start_height()` to constrain the block range and `ordered()` to ensure the
35/// iterator returns blocks in height order:
36/// ```no_run
37/// use bitcoin_block_parser::blocks::*;
38///
39/// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
40/// let iterator = parser
41///     .start_height(100_000)
42///     .end_height(100_010)
43///     .parse(|block| block.block_hash())
44///     .ordered();
45///
46/// println!("In-order block hashes from 100,000 to 100,010:");
47/// for block_hash in iterator {
48///     println!("{}", block_hash);
49/// }
50/// ```
51#[derive(Clone, Debug)]
52pub struct BlockParser {
53    /// The parsed headers used for locating the blocks
54    headers: Vec<ParsedHeader>,
55    /// A logger for reporting on the progress of the parsing
56    logger: ParserLogger,
57    /// Options that can have an effect on memory and cpu performance
58    options: ParserOptions,
59    /// The block height range to start at
60    start_height: usize,
61    /// The block height range to end at
62    end_height: usize,
63}
64
65impl BlockParser {
66    /// Creates a new parser given the `blocks` directory where the `*.blk` files are located.
67    ///
68    /// - Returns an `Err` if unable to parse the `blk` files.
69    /// - You can [specify the blocks directory](https://en.bitcoin.it/wiki/Data_directory) when
70    ///   running `bitcoind`.
71    pub fn new(blocks_dir: &str) -> Result<Self> {
72        Self::new_with_opts(blocks_dir, ParserOptions::default())
73    }
74
75    /// Creates a parser with custom [`ParserOptions`].
76    pub fn new_with_opts(blocks_dir: &str, options: ParserOptions) -> Result<Self> {
77        let headers = HeaderParser::parse(blocks_dir)?;
78        Ok(Self {
79            headers,
80            logger: ParserLogger::new(),
81            options,
82            start_height: 0,
83            end_height: usize::MAX,
84        })
85    }
86
87    /// Sets the *inclusive* start of block heights to parse.
88    ///
89    /// * `start_height` - must be less than the total number of blocks, `0` will start at the
90    ///    genesis block.
91    pub fn start_height(mut self, start_height: usize) -> Self {
92        self.start_height = start_height;
93        self
94    }
95
96    /// Sets the *inclusive* end of block heights to parse.
97    ///
98    /// * `end_height` - the height to end at, [`usize::MAX`] will stop at the last block
99    ///    available.
100    pub fn end_height(mut self, end_height: usize) -> Self {
101        self.end_height = end_height;
102        self
103    }
104
105    /// Parse all [`bitcoin::Block`] into type `T` and return a [`ParserIterator<T>`].  Results will
106    /// be in random order due to multithreading.
107    ///
108    /// * `extract` - a closure that runs on multiple threads.  For best performance perform as much
109    ///    computation and data reduction here as possible.
110    pub fn parse<T: Send + 'static>(
111        &self,
112        extract: impl Fn(Block) -> T + Clone + Send + 'static,
113    ) -> ParserIterator<T> {
114        let end_height = min(self.end_height, self.headers.len() - 1);
115        let header_range = self.headers[self.start_height..=end_height].to_vec();
116        let pool = ThreadPool::new(self.options.num_threads);
117        let (tx, rx) = bounded(self.options.channel_size);
118
119        for (index, header) in header_range.into_iter().enumerate() {
120            let logger = self.logger.clone();
121            let tx = tx.clone();
122            let extract = extract.clone();
123            let start_height = self.start_height;
124            pool.execute(move || {
125                let extract = match Self::parse_block(&header) {
126                    Ok(block) => extract(block),
127                    // Panic here because a blk file is corrupted, nothing else to do
128                    e => panic!("Error reading {:?} - {:?}", header.path, e),
129                };
130                let height = start_height + index;
131                let _ = tx.send((height, extract));
132                logger.increment();
133            });
134        }
135        ParserIterator {
136            rx,
137            options: self.options.clone(),
138            start_height: self.start_height,
139        }
140    }
141
142    /// Helper function for reading a block from the filesystem given the header.
143    fn parse_block(header: &ParsedHeader) -> Result<Block> {
144        let reader = BufReader::new(File::open(&header.path)?);
145        let mut reader = BufReader::new(XorReader::new(reader, header.xor_mask));
146        reader.seek_relative(header.offset as i64)?;
147        Ok(Block {
148            header: header.inner,
149            txdata: Vec::<Transaction>::consensus_decode_from_finite_reader(&mut reader)?,
150        })
151    }
152}
153
154/// Options that affect the performance of [`BlockParser`] and [`ParserIterator`].
155///
156/// Generally changing these will be unnessary unless you really need to tune performance.
157#[derive(Clone, Debug)]
158pub struct ParserOptions {
159    /// How many items will be parsed in each [`ParserIterator::pipeline`] batch.
160    pub pipeline_size: usize,
161    /// The size of all [`crossbeam_channel::bounded`] channels that communicate between threads.
162    pub channel_size: usize,
163    /// The number of threads that will be spawned when running a multithreaded function.
164    pub num_threads: usize,
165}
166
167impl Default for ParserOptions {
168    /// Returns sane defaults that will be optimal for most workloads.
169    fn default() -> Self {
170        Self {
171            pipeline_size: 1,
172            channel_size: 100,
173            num_threads: 64,
174        }
175    }
176}
177
178/// Iterator returned from [`BlockParser::parse`] that allows for advanced transformations.
179pub struct ParserIterator<T> {
180    /// The receiver coming from a previous transformation step.  `usize` is the block height.
181    rx: Receiver<(usize, T)>,
182    /// Options for tuning performance.
183    options: ParserOptions,
184    /// The block height the parser started at.
185    start_height: usize,
186}
187
188impl<A: Send + 'static> ParserIterator<A> {
189    /// Create a new iterator from an existing one, given a new receiver.
190    fn create<T>(&self, rx: Receiver<(usize, T)>) -> ParserIterator<T> {
191        ParserIterator::<T> {
192            rx,
193            options: self.options.clone(),
194            start_height: self.start_height,
195        }
196    }
197
198    /// Adds the block height to this iterator.
199    pub fn with_height(&self) -> ParserIterator<(usize, A)> {
200        let (tx, rx) = bounded(self.options.channel_size);
201        let parser = self.create(rx);
202        let rx_a = self.rx.clone();
203
204        thread::spawn(move || {
205            for (height, a) in rx_a {
206                let _ = tx.send((height, (height, a)));
207            }
208        });
209        parser
210    }
211
212    /// Orders the results by block height, can be called for a small increase in
213    /// memory and runtime.
214    ///
215    /// # Example
216    /// Using the `ordered` function to get the first 10 block hashes in-height order:
217    /// ```no_run
218    /// use bitcoin_block_parser::blocks::*;
219    /// use bitcoin::BlockHash;
220    ///
221    /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
222    /// let ordered: ParserIterator<BlockHash> = parser.parse(|block| block.block_hash());
223    /// let first_10: Vec<BlockHash> = ordered.take(10).collect();
224    /// ```
225    pub fn ordered(&self) -> ParserIterator<A> {
226        let (tx, rx) = bounded(self.options.channel_size);
227        let parser = self.create(rx);
228        let rx_a = self.rx.clone();
229        let start_height = self.start_height;
230
231        thread::spawn(move || {
232            let mut current_height = start_height;
233            let mut unordered: HashMap<usize, A> = HashMap::default();
234
235            for (height, a) in rx_a {
236                unordered.insert(height, a);
237                while let Some(ordered) = unordered.remove(&current_height) {
238                    let _ = tx.send((current_height, ordered));
239                    current_height += 1;
240                }
241            }
242        });
243        parser
244    }
245
246    /// Perform a map function using multiple threads.
247    /// * Useful if you need to perform an additional map after [`BlockParser::parse`].
248    /// * More performant than calling [`Iterator::map`] on the [`ParserIterator`].
249    /// * Returns results in random order.
250    pub fn map_parallel<B: Send + 'static>(
251        &self,
252        function: impl Fn(A) -> B + Clone + Send + 'static,
253    ) -> ParserIterator<B> {
254        let pool = ThreadPool::new(self.options.num_threads);
255        let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
256
257        for _ in 0..self.options.num_threads {
258            let tx_b = tx_b.clone();
259            let rx_a = self.rx.clone();
260            let function = function.clone();
261            pool.execute(move || {
262                for (height, a) in rx_a {
263                    let _ = tx_b.send((height, function(a)));
264                }
265            });
266        }
267
268        self.create(rx_b)
269    }
270
271    /// Pipelines allow you to perform two functions on the same batch of blocks.
272    /// Useful when you want multithreaded performance while processing blocks in-order.
273    ///
274    /// # Example
275    /// For example if calculating the size difference between consecutive blocks you could use the
276    /// following code:
277    /// ```no_run
278    /// use bitcoin_block_parser::blocks::*;
279    /// use std::collections::HashMap;
280    /// use std::convert::identity;
281    /// use bitcoin::BlockHash;
282    /// use bitcoin::hashes::Hash;
283    ///
284    /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
285    /// let mut block_sizes: HashMap<BlockHash, isize> = HashMap::new();
286    /// let mut differences = vec![];
287    /// // Initial block size for the genesis block
288    /// block_sizes.insert(BlockHash::all_zeros(), 0);
289    ///
290    /// for block in parser.parse(identity).ordered() {
291    ///     // Store this block's size in the shared state
292    ///     let block_size = block.total_size() as isize;
293    ///     block_sizes.insert(block.block_hash(), block_size);
294    ///     // Look up the previous size to compute the difference
295    ///     let prev_block_hash = block.header.prev_blockhash;
296    ///     let prev_size = block_sizes.remove(&prev_block_hash);
297    ///     differences.push(block_size - prev_size.unwrap());
298    /// }
299    ///
300    /// let max_difference = differences.into_iter().max().unwrap();
301    /// println!("Maximum increase in block size: {}", max_difference);
302    /// ```
303    ///
304    /// The previous code runs on a single thread.  If we want to leverage multithreading we can use
305    /// `pipeline` functions for a large speed-up:
306    /// ```no_run
307    /// use bitcoin_block_parser::blocks::*;
308    /// use dashmap::DashMap;
309    /// use std::convert::identity;
310    /// use std::sync::Arc;
311    /// use bitcoin::BlockHash;
312    /// use bitcoin::hashes::Hash;
313    ///
314    /// let parser = BlockParser::new("/home/user/.bitcoin/blocks/").unwrap();
315    /// // State shared across all threads
316    /// let block_sizes: Arc<DashMap<BlockHash, isize>> = Arc::new(DashMap::new());
317    /// let blocksizes_clone = block_sizes.clone();
318    /// // Initial block size for the genesis block
319    /// block_sizes.insert(BlockHash::all_zeros(), 0);
320    ///
321    /// let iterator = parser.parse(identity).ordered().pipeline_fn(
322    ///     move |block| {
323    ///         // Store this block's size in the shared state
324    ///         let block_size = block.total_size() as isize;
325    ///         block_sizes.insert(block.block_hash(), block_size);
326    ///         (block.header.prev_blockhash, block_size)
327    ///     },
328    ///     move |(prev_block_hash, block_size)| {
329    ///         // Look up the previous size to compute the difference
330    ///         let prev_size = blocksizes_clone.remove(&prev_block_hash);
331    ///         block_size - prev_size.unwrap().1
332    ///     },
333    /// );
334    ///
335    /// let max_difference = iterator.max().unwrap();
336    /// println!("Maximum increase in block size: {}", max_difference);
337    /// ```
338    pub fn pipeline_fn<B: Send + 'static, C: Send + 'static>(
339        &self,
340        f1: impl Fn(A) -> B + Clone + Send + 'static,
341        f2: impl Fn(B) -> C + Clone + Send + 'static,
342    ) -> ParserIterator<C> {
343        let pipeline = PipelineClosure { f1, f2 };
344        self.pipeline(&pipeline)
345    }
346
347    /// Runs [`ParserIterator::pipeline_fn`] functions by implementing a [`Pipeline`] trait for
348    /// convenience / cleaner code.
349    pub fn pipeline<B: Send + 'static, C: Send + 'static>(
350        &self,
351        pipeline: &(impl Pipeline<A, B, C> + Clone + Send + 'static),
352    ) -> ParserIterator<C> {
353        let pool_a = ThreadPool::new(self.options.num_threads);
354        let pool_b = ThreadPool::new(self.options.num_threads);
355        let rx_a = self.rx.clone();
356        let opts = self.options.clone();
357        let pipeline = pipeline.clone();
358        let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
359        let (tx_c, rx_c) = bounded(self.options.pipeline_size * self.options.num_threads);
360        let run = Arc::new(AtomicBool::new(true));
361
362        thread::spawn(move || {
363            while run.load(Ordering::Relaxed) {
364                let p1 = pipeline.clone();
365                let p2 = pipeline.clone();
366                Self::run_pipeline(&opts, &pool_a, &run, &rx_a, &tx_b, &move |a| p1.first(a));
367                pool_a.join();
368                pipeline.between();
369                Self::run_pipeline(&opts, &pool_b, &run, &rx_b, &tx_c, &move |b| p2.second(b));
370            }
371        });
372
373        self.create(rx_c)
374    }
375
376    /// Helper for running the pipeline functions on multiple threads.
377    fn run_pipeline<X: Send + 'static, Y: Send + 'static>(
378        options: &ParserOptions,
379        pool: &ThreadPool,
380        running: &Arc<AtomicBool>,
381        rx: &Receiver<(usize, X)>,
382        tx: &Sender<(usize, Y)>,
383        function: &(impl Fn(X) -> Y + Clone + Send + 'static),
384    ) {
385        // Spawns `num_threads` and run `function` on a `pipeline_size` # of items
386        for _ in 0..options.num_threads {
387            let running = running.clone();
388            let tx = tx.clone();
389            let rx = rx.clone();
390            let function = function.clone();
391            let pipeline_size = options.pipeline_size;
392            pool.execute(move || {
393                for _ in 0..pipeline_size {
394                    match rx.recv() {
395                        Ok((height, x)) => {
396                            let _ = tx.send((height, function(x)));
397                        }
398                        Err(_) => {
399                            // Signal to the pipeline thread that we have consumed all input
400                            running.store(false, Ordering::Relaxed);
401                        }
402                    };
403                }
404            });
405        }
406    }
407}
408
409impl<T> Iterator for ParserIterator<T> {
410    type Item = T;
411
412    fn next(&mut self) -> Option<Self::Item> {
413        self.rx.iter().map(|(_, t)| t).next()
414    }
415}
416
417/// Implement this trait for calling [`ParserIterator::pipeline`].
418pub trait Pipeline<A, B, C> {
419    /// Transforms a batch of inputs in parallel
420    fn first(&self, a: A) -> B;
421
422    /// Runs once the batch in `first()` has finished completely.
423    fn between(&self) {}
424
425    /// Transforms the same batch processed in `first()` in parallel
426    fn second(&self, b: B) -> C;
427}
428
429/// Helper for turning closures into a pipeline trait.
430#[derive(Clone)]
431struct PipelineClosure<F1, F2> {
432    f1: F1,
433    f2: F2,
434}
435
436impl<F1, F2, A, B, C> Pipeline<A, B, C> for PipelineClosure<F1, F2>
437where
438    F1: Fn(A) -> B,
439    F2: Fn(B) -> C,
440{
441    fn first(&self, a: A) -> B {
442        (self.f1)(a)
443    }
444
445    fn second(&self, b: B) -> C {
446        (self.f2)(b)
447    }
448}
449
450/// Logs the progress of the parsing every 10K blocks in a thread-safe manner.
451#[derive(Clone, Debug)]
452struct ParserLogger {
453    num_parsed: Arc<AtomicUsize>,
454    start: Instant,
455    log_at: usize,
456}
457
458impl ParserLogger {
459    fn new() -> Self {
460        Self {
461            num_parsed: Arc::new(Default::default()),
462            start: Instant::now(),
463            log_at: 10_000,
464        }
465    }
466
467    fn increment(&self) {
468        let num = self.num_parsed.fetch_add(1, Ordering::Relaxed);
469
470        if num == 0 {
471            info!("Starting to parse blocks...");
472        } else if num % self.log_at == 0 {
473            let elapsed = (Instant::now() - self.start).as_secs();
474            let blocks = format!("{}K blocks parsed,", num / 1000);
475            info!("{} {}m{}s elapsed", blocks, elapsed / 60, elapsed % 60);
476        }
477    }
478}