brk_parser/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc = "\n## Example\n\n```rust"]
3#![doc = include_str!("../examples/main.rs")]
4#![doc = "```"]
5
6use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::PathBuf, thread};
7
8use bitcoin::{Block, BlockHash};
9use bitcoincore_rpc::RpcApi;
10use blk_index_to_blk_path::*;
11use blk_recap::BlkRecap;
12use brk_core::Height;
13use crossbeam::channel::{Receiver, bounded};
14use rayon::prelude::*;
15
16mod blk_index_to_blk_path;
17mod blk_index_to_blk_recap;
18mod blk_metadata;
19mod blk_recap;
20mod block_state;
21mod error;
22mod utils;
23mod xor_bytes;
24mod xor_index;
25
26use blk_index_to_blk_recap::*;
27use blk_metadata::*;
28use block_state::*;
29pub use error::*;
30use utils::*;
31use xor_bytes::*;
32use xor_index::*;
33
34pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000;
35
36const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
37const BOUND_CAP: usize = 50;
38
39pub struct Parser {
40    blocks_dir: PathBuf,
41    rpc: &'static bitcoincore_rpc::Client,
42}
43
44impl Parser {
45    pub fn new(blocks_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client) -> Self {
46        Self { blocks_dir, rpc }
47    }
48
49    pub fn get(&self, height: Height) -> Block {
50        self.parse(Some(height), Some(height))
51            .iter()
52            .next()
53            .unwrap()
54            .1
55    }
56
57    ///
58    /// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`)
59    ///
60    /// For an example checkout `./main.rs`
61    ///
62    pub fn parse(
63        &self,
64        start: Option<Height>,
65        end: Option<Height>,
66    ) -> Receiver<(Height, Block, BlockHash)> {
67        let blocks_dir = self.blocks_dir.as_path();
68        let rpc = self.rpc;
69
70        let (send_bytes, recv_bytes) = bounded(BOUND_CAP);
71        let (send_block, recv_block) = bounded(BOUND_CAP);
72        let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
73
74        let blk_index_to_blk_path = BlkIndexToBlkPath::scan(blocks_dir);
75
76        let (mut blk_index_to_blk_recap, blk_index) =
77            BlkIndexToBlkRecap::import(blocks_dir, &blk_index_to_blk_path, start);
78
79        let xor_bytes = XORBytes::from(blocks_dir);
80
81        thread::spawn(move || {
82            let xor_bytes = xor_bytes;
83
84            let _ = blk_index_to_blk_path.range(blk_index..).try_for_each(
85                move |(blk_index, blk_path)| {
86                    let mut xor_i = XORIndex::default();
87
88                    let blk_index = *blk_index;
89
90                    let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
91
92                    let mut blk_bytes_ = fs::read(blk_path).unwrap();
93                    let blk_bytes = blk_bytes_.as_mut_slice();
94                    let blk_bytes_len = blk_bytes.len();
95
96                    let mut current_4bytes = [0; 4];
97
98                    let mut i = 0;
99
100                    'parent: loop {
101                        loop {
102                            if i == blk_bytes_len {
103                                break 'parent;
104                            }
105
106                            current_4bytes.rotate_left(1);
107
108                            current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes);
109                            i += 1;
110
111                            if current_4bytes == MAGIC_BYTES {
112                                break;
113                            }
114                        }
115
116                        let len = u32::from_le_bytes(
117                            xor_i
118                                .bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes)
119                                .try_into()
120                                .unwrap(),
121                        ) as usize;
122                        i += 4;
123
124                        let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
125
126                        if send_bytes
127                            .send((blk_metadata, BlockState::Raw(block_bytes), xor_i))
128                            .is_err()
129                        {
130                            return ControlFlow::Break(());
131                        }
132
133                        i += len;
134                        xor_i.add_assign(len);
135                    }
136
137                    ControlFlow::Continue(())
138                },
139            );
140        });
141
142        thread::spawn(move || {
143            let xor_bytes = xor_bytes;
144
145            let mut bulk = vec![];
146
147            let drain_and_send = |bulk: &mut Vec<_>| {
148                // Using a vec and sending after to not end up with stuck threads in par iter
149                bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| {
150                    BlockState::decode(block_state, xor_i, &xor_bytes);
151                });
152
153                bulk.drain(..)
154                    .try_for_each(|(blk_metadata, block_state, _)| {
155                        let block = match block_state {
156                            BlockState::Decoded(block) => block,
157                            _ => unreachable!(),
158                        };
159
160                        if send_block.send((blk_metadata, block)).is_err() {
161                            return ControlFlow::Break(());
162                        }
163
164                        ControlFlow::Continue(())
165                    })
166            };
167
168            recv_bytes.iter().try_for_each(|tuple| {
169                bulk.push(tuple);
170
171                if bulk.len() < BOUND_CAP / 2 {
172                    return ControlFlow::Continue(());
173                }
174
175                // Sending in bulk to not lock threads in standby
176                drain_and_send(&mut bulk)
177            })?;
178
179            drain_and_send(&mut bulk)
180        });
181
182        thread::spawn(move || {
183            let mut current_height = start.unwrap_or_default();
184
185            let mut future_blocks = BTreeMap::default();
186
187            let _ = recv_block
188                .iter()
189                .try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> {
190                    let hash = block.block_hash();
191                    let header = rpc.get_block_header_info(&hash);
192
193                    if header.is_err() {
194                        return ControlFlow::Continue(());
195                    }
196                    let header = header.unwrap();
197                    if header.confirmations <= 0 {
198                        return ControlFlow::Continue(());
199                    }
200
201                    let height = Height::from(header.height);
202
203                    let len = blk_index_to_blk_recap.tree.len();
204                    if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
205                        match (len as u16).cmp(&blk_metadata.index) {
206                            Ordering::Equal => {
207                                if len % 21 == 0 {
208                                    blk_index_to_blk_recap.export();
209                                }
210                            }
211                            Ordering::Less => panic!(),
212                            Ordering::Greater => {}
213                        }
214
215                        blk_index_to_blk_recap
216                            .tree
217                            .entry(blk_metadata.index)
218                            .and_modify(|recap| {
219                                if recap.max_height < height {
220                                    recap.max_height = height;
221                                }
222                            })
223                            .or_insert(BlkRecap {
224                                max_height: height,
225                                modified_time: blk_metadata.modified_time,
226                            });
227                    }
228
229                    let mut opt = if current_height == height {
230                        Some((block, hash))
231                    } else {
232                        if start.is_none_or(|start| start <= height)
233                            && end.is_none_or(|end| end >= height)
234                        {
235                            future_blocks.insert(height, (block, hash));
236                        }
237                        None
238                    };
239
240                    while let Some((block, hash)) = opt.take().or_else(|| {
241                        if !future_blocks.is_empty() {
242                            future_blocks.remove(&current_height)
243                        } else {
244                            None
245                        }
246                    }) {
247                        if end.is_some_and(|end| end < current_height) {
248                            return ControlFlow::Break(());
249                        }
250
251                        send_height_block_hash
252                            .send((current_height, block, hash))
253                            .unwrap();
254
255                        if end.is_some_and(|end| end == current_height) {
256                            return ControlFlow::Break(());
257                        }
258
259                        current_height.increment();
260                    }
261
262                    ControlFlow::Continue(())
263                });
264
265            blk_index_to_blk_recap.export();
266        });
267
268        recv_height_block_hash
269    }
270}