brk_parser/
lib.rs

1#![doc = include_str!("../README.md")]
2#![doc = "\n## Example\n\n```rust"]
3#![doc = include_str!("../examples/main.rs")]
4#![doc = "```"]
5
6use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::PathBuf, thread};
7
8use bitcoin::{Block, BlockHash};
9use bitcoincore_rpc::RpcApi;
10use blk_index_to_blk_path::*;
11use blk_recap::BlkRecap;
12use brk_core::Height;
13use crossbeam::channel::{Receiver, bounded};
14use rayon::prelude::*;
15
16pub use bitcoin;
17pub use bitcoincore_rpc as rpc;
18
19mod blk_index_to_blk_path;
20mod blk_index_to_blk_recap;
21mod blk_metadata;
22mod blk_recap;
23mod block_state;
24mod error;
25mod utils;
26mod xor_bytes;
27mod xor_index;
28
29use blk_index_to_blk_recap::*;
30use blk_metadata::*;
31use block_state::*;
32pub use error::*;
33use utils::*;
34use xor_bytes::*;
35use xor_index::*;
36
37pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000;
38
39const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
40const BOUND_CAP: usize = 50;
41
42pub struct Parser {
43    blocks_dir: PathBuf,
44    rpc: &'static bitcoincore_rpc::Client,
45}
46
47impl Parser {
48    pub fn new(blocks_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client) -> Self {
49        Self { blocks_dir, rpc }
50    }
51
52    pub fn get(&self, height: Height) -> Block {
53        self.parse(Some(height), Some(height))
54            .iter()
55            .next()
56            .unwrap()
57            .1
58    }
59
60    ///
61    /// Returns a crossbeam channel receiver that receives `(Height, Block, BlockHash)` tuples from an **inclusive** range (`start` and `end`)
62    ///
63    /// For an example checkout `./main.rs`
64    ///
65    pub fn parse(
66        &self,
67        start: Option<Height>,
68        end: Option<Height>,
69    ) -> Receiver<(Height, Block, BlockHash)> {
70        let blocks_dir = self.blocks_dir.as_path();
71        let rpc = self.rpc;
72
73        let (send_bytes, recv_bytes) = bounded(BOUND_CAP);
74        let (send_block, recv_block) = bounded(BOUND_CAP);
75        let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
76
77        let blk_index_to_blk_path = BlkIndexToBlkPath::scan(blocks_dir);
78
79        let (mut blk_index_to_blk_recap, blk_index) =
80            BlkIndexToBlkRecap::import(blocks_dir, &blk_index_to_blk_path, start);
81
82        let xor_bytes = XORBytes::from(blocks_dir);
83
84        thread::spawn(move || {
85            let xor_bytes = xor_bytes;
86
87            blk_index_to_blk_path
88                .range(blk_index..)
89                .try_for_each(move |(blk_index, blk_path)| {
90                    let mut xor_i = XORIndex::default();
91
92                    let blk_index = *blk_index;
93
94                    let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
95
96                    let mut blk_bytes_ = fs::read(blk_path).unwrap();
97                    let blk_bytes = blk_bytes_.as_mut_slice();
98                    let blk_bytes_len = blk_bytes.len();
99
100                    let mut current_4bytes = [0; 4];
101
102                    let mut i = 0;
103
104                    'parent: loop {
105                        loop {
106                            if i == blk_bytes_len {
107                                break 'parent;
108                            }
109
110                            current_4bytes.rotate_left(1);
111
112                            current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes);
113                            i += 1;
114
115                            if current_4bytes == MAGIC_BYTES {
116                                break;
117                            }
118                        }
119
120                        let len = u32::from_le_bytes(
121                            xor_i
122                                .bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes)
123                                .try_into()
124                                .unwrap(),
125                        ) as usize;
126                        i += 4;
127
128                        let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
129
130                        if send_bytes
131                            .send((blk_metadata, BlockState::Raw(block_bytes), xor_i))
132                            .is_err()
133                        {
134                            return ControlFlow::Break(());
135                        }
136
137                        i += len;
138                        xor_i.add_assign(len);
139                    }
140
141                    ControlFlow::Continue(())
142                });
143        });
144
145        thread::spawn(move || {
146            let xor_bytes = xor_bytes;
147
148            let mut bulk = vec![];
149
150            let drain_and_send = |bulk: &mut Vec<_>| {
151                // Using a vec and sending after to not end up with stuck threads in par iter
152                bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| {
153                    BlockState::decode(block_state, xor_i, &xor_bytes);
154                });
155
156                bulk.drain(..)
157                    .try_for_each(|(blk_metadata, block_state, _)| {
158                        let block = match block_state {
159                            BlockState::Decoded(block) => block,
160                            _ => unreachable!(),
161                        };
162
163                        if send_block.send((blk_metadata, block)).is_err() {
164                            return ControlFlow::Break(());
165                        }
166
167                        ControlFlow::Continue(())
168                    })
169            };
170
171            recv_bytes.iter().try_for_each(|tuple| {
172                bulk.push(tuple);
173
174                if bulk.len() < BOUND_CAP / 2 {
175                    return ControlFlow::Continue(());
176                }
177
178                // Sending in bulk to not lock threads in standby
179                drain_and_send(&mut bulk)
180            });
181
182            drain_and_send(&mut bulk)
183        });
184
185        thread::spawn(move || {
186            let mut current_height = start.unwrap_or_default();
187
188            let mut future_blocks = BTreeMap::default();
189
190            recv_block
191                .iter()
192                .try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> {
193                    let hash = block.block_hash();
194                    let header = rpc.get_block_header_info(&hash);
195
196                    if header.is_err() {
197                        return ControlFlow::Continue(());
198                    }
199                    let header = header.unwrap();
200                    if header.confirmations <= 0 {
201                        return ControlFlow::Continue(());
202                    }
203
204                    let height = Height::from(header.height);
205
206                    let len = blk_index_to_blk_recap.tree.len();
207                    if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
208                        match (len as u16).cmp(&blk_metadata.index) {
209                            Ordering::Equal => {
210                                if len % 21 == 0 {
211                                    blk_index_to_blk_recap.export();
212                                }
213                            }
214                            Ordering::Less => panic!(),
215                            Ordering::Greater => {}
216                        }
217
218                        blk_index_to_blk_recap
219                            .tree
220                            .entry(blk_metadata.index)
221                            .and_modify(|recap| {
222                                if recap.max_height < height {
223                                    recap.max_height = height;
224                                }
225                            })
226                            .or_insert(BlkRecap {
227                                max_height: height,
228                                modified_time: blk_metadata.modified_time,
229                            });
230                    }
231
232                    let mut opt = if current_height == height {
233                        Some((block, hash))
234                    } else {
235                        if start.is_none_or(|start| start <= height)
236                            && end.is_none_or(|end| end >= height)
237                        {
238                            future_blocks.insert(height, (block, hash));
239                        }
240                        None
241                    };
242
243                    while let Some((block, hash)) = opt.take().or_else(|| {
244                        if !future_blocks.is_empty() {
245                            future_blocks.remove(&current_height)
246                        } else {
247                            None
248                        }
249                    }) {
250                        if end.is_some_and(|end| end < current_height) {
251                            return ControlFlow::Break(());
252                        }
253
254                        send_height_block_hash
255                            .send((current_height, block, hash))
256                            .unwrap();
257
258                        if end.is_some_and(|end| end == current_height) {
259                            return ControlFlow::Break(());
260                        }
261
262                        current_height.increment();
263                    }
264
265                    ControlFlow::Continue(())
266                });
267
268            blk_index_to_blk_recap.export();
269        });
270
271        recv_height_block_hash
272    }
273}