1#![doc = include_str!("../README.md")]
2#![doc = "\n## Example\n\n```rust"]
3#![doc = include_str!("../examples/main.rs")]
4#![doc = "```"]
5
6use std::{cmp::Ordering, collections::BTreeMap, fs, ops::ControlFlow, path::PathBuf, thread};
7
8use bitcoin::{Block, BlockHash};
9use bitcoincore_rpc::RpcApi;
10use blk_index_to_blk_path::*;
11use blk_recap::BlkRecap;
12use brk_core::Height;
13use crossbeam::channel::{Receiver, bounded};
14use rayon::prelude::*;
15
16mod blk_index_to_blk_path;
17mod blk_index_to_blk_recap;
18mod blk_metadata;
19mod blk_recap;
20mod block_state;
21mod error;
22mod utils;
23mod xor_bytes;
24mod xor_index;
25
26use blk_index_to_blk_recap::*;
27use blk_metadata::*;
28use block_state::*;
29pub use error::*;
30use utils::*;
31use xor_bytes::*;
32use xor_index::*;
33
34pub const NUMBER_OF_UNSAFE_BLOCKS: usize = 1000;
35
36const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
37const BOUND_CAP: usize = 50;
38
39pub struct Parser {
40 blocks_dir: PathBuf,
41 rpc: &'static bitcoincore_rpc::Client,
42}
43
44impl Parser {
45 pub fn new(blocks_dir: PathBuf, rpc: &'static bitcoincore_rpc::Client) -> Self {
46 Self { blocks_dir, rpc }
47 }
48
49 pub fn get(&self, height: Height) -> Block {
50 self.parse(Some(height), Some(height))
51 .iter()
52 .next()
53 .unwrap()
54 .1
55 }
56
57 pub fn parse(
63 &self,
64 start: Option<Height>,
65 end: Option<Height>,
66 ) -> Receiver<(Height, Block, BlockHash)> {
67 let blocks_dir = self.blocks_dir.as_path();
68 let rpc = self.rpc;
69
70 let (send_bytes, recv_bytes) = bounded(BOUND_CAP);
71 let (send_block, recv_block) = bounded(BOUND_CAP);
72 let (send_height_block_hash, recv_height_block_hash) = bounded(BOUND_CAP);
73
74 let blk_index_to_blk_path = BlkIndexToBlkPath::scan(blocks_dir);
75
76 let (mut blk_index_to_blk_recap, blk_index) =
77 BlkIndexToBlkRecap::import(blocks_dir, &blk_index_to_blk_path, start);
78
79 let xor_bytes = XORBytes::from(blocks_dir);
80
81 thread::spawn(move || {
82 let xor_bytes = xor_bytes;
83
84 let _ = blk_index_to_blk_path.range(blk_index..).try_for_each(
85 move |(blk_index, blk_path)| {
86 let mut xor_i = XORIndex::default();
87
88 let blk_index = *blk_index;
89
90 let blk_metadata = BlkMetadata::new(blk_index, blk_path.as_path());
91
92 let mut blk_bytes_ = fs::read(blk_path).unwrap();
93 let blk_bytes = blk_bytes_.as_mut_slice();
94 let blk_bytes_len = blk_bytes.len();
95
96 let mut current_4bytes = [0; 4];
97
98 let mut i = 0;
99
100 'parent: loop {
101 loop {
102 if i == blk_bytes_len {
103 break 'parent;
104 }
105
106 current_4bytes.rotate_left(1);
107
108 current_4bytes[3] = xor_i.byte(blk_bytes[i], &xor_bytes);
109 i += 1;
110
111 if current_4bytes == MAGIC_BYTES {
112 break;
113 }
114 }
115
116 let len = u32::from_le_bytes(
117 xor_i
118 .bytes(&mut blk_bytes[i..(i + 4)], &xor_bytes)
119 .try_into()
120 .unwrap(),
121 ) as usize;
122 i += 4;
123
124 let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
125
126 if send_bytes
127 .send((blk_metadata, BlockState::Raw(block_bytes), xor_i))
128 .is_err()
129 {
130 return ControlFlow::Break(());
131 }
132
133 i += len;
134 xor_i.add_assign(len);
135 }
136
137 ControlFlow::Continue(())
138 },
139 );
140 });
141
142 thread::spawn(move || {
143 let xor_bytes = xor_bytes;
144
145 let mut bulk = vec![];
146
147 let drain_and_send = |bulk: &mut Vec<_>| {
148 bulk.par_iter_mut().for_each(|(_, block_state, xor_i)| {
150 BlockState::decode(block_state, xor_i, &xor_bytes);
151 });
152
153 bulk.drain(..)
154 .try_for_each(|(blk_metadata, block_state, _)| {
155 let block = match block_state {
156 BlockState::Decoded(block) => block,
157 _ => unreachable!(),
158 };
159
160 if send_block.send((blk_metadata, block)).is_err() {
161 return ControlFlow::Break(());
162 }
163
164 ControlFlow::Continue(())
165 })
166 };
167
168 recv_bytes.iter().try_for_each(|tuple| {
169 bulk.push(tuple);
170
171 if bulk.len() < BOUND_CAP / 2 {
172 return ControlFlow::Continue(());
173 }
174
175 drain_and_send(&mut bulk)
177 })?;
178
179 drain_and_send(&mut bulk)
180 });
181
182 thread::spawn(move || {
183 let mut current_height = start.unwrap_or_default();
184
185 let mut future_blocks = BTreeMap::default();
186
187 let _ = recv_block
188 .iter()
189 .try_for_each(|(blk_metadata, block)| -> ControlFlow<(), _> {
190 let hash = block.block_hash();
191 let header = rpc.get_block_header_info(&hash);
192
193 if header.is_err() {
194 return ControlFlow::Continue(());
195 }
196 let header = header.unwrap();
197 if header.confirmations <= 0 {
198 return ControlFlow::Continue(());
199 }
200
201 let height = Height::from(header.height);
202
203 let len = blk_index_to_blk_recap.tree.len();
204 if blk_metadata.index == len as u16 || blk_metadata.index + 1 == len as u16 {
205 match (len as u16).cmp(&blk_metadata.index) {
206 Ordering::Equal => {
207 if len % 21 == 0 {
208 blk_index_to_blk_recap.export();
209 }
210 }
211 Ordering::Less => panic!(),
212 Ordering::Greater => {}
213 }
214
215 blk_index_to_blk_recap
216 .tree
217 .entry(blk_metadata.index)
218 .and_modify(|recap| {
219 if recap.max_height < height {
220 recap.max_height = height;
221 }
222 })
223 .or_insert(BlkRecap {
224 max_height: height,
225 modified_time: blk_metadata.modified_time,
226 });
227 }
228
229 let mut opt = if current_height == height {
230 Some((block, hash))
231 } else {
232 if start.is_none_or(|start| start <= height)
233 && end.is_none_or(|end| end >= height)
234 {
235 future_blocks.insert(height, (block, hash));
236 }
237 None
238 };
239
240 while let Some((block, hash)) = opt.take().or_else(|| {
241 if !future_blocks.is_empty() {
242 future_blocks.remove(¤t_height)
243 } else {
244 None
245 }
246 }) {
247 if end.is_some_and(|end| end < current_height) {
248 return ControlFlow::Break(());
249 }
250
251 send_height_block_hash
252 .send((current_height, block, hash))
253 .unwrap();
254
255 if end.is_some_and(|end| end == current_height) {
256 return ControlFlow::Break(());
257 }
258
259 current_height.increment();
260 }
261
262 ControlFlow::Continue(())
263 });
264
265 blk_index_to_blk_recap.export();
266 });
267
268 recv_height_block_hash
269 }
270}