Skip to main content

brk_reader/
lib.rs

1#![doc = include_str!("../README.md")]
2
3use std::{
4    collections::BTreeMap,
5    fs::{self, File},
6    io::{Read, Seek, SeekFrom},
7    ops::ControlFlow,
8    path::{Path, PathBuf},
9    sync::Arc,
10    thread,
11};
12
13use bitcoin::{block::Header, consensus::Decodable};
14use blk_index_to_blk_path::*;
15use brk_error::{Error, Result};
16use brk_rpc::Client;
17use brk_types::{BlkMetadata, BlkPosition, BlockHash, Height, ReadBlock};
18pub use crossbeam::channel::Receiver;
19use crossbeam::channel::bounded;
20use derive_more::Deref;
21use parking_lot::{RwLock, RwLockReadGuard};
22use rayon::prelude::*;
23use tracing::{error, warn};
24
25mod blk_index_to_blk_path;
26mod decode;
27mod xor_bytes;
28mod xor_index;
29
30use decode::*;
31pub use xor_bytes::*;
32pub use xor_index::*;
33
34const MAGIC_BYTES: [u8; 4] = [249, 190, 180, 217];
35const BOUND_CAP: usize = 50;
36
37fn find_magic(bytes: &[u8], xor_i: &mut XORIndex, xor_bytes: XORBytes) -> Option<usize> {
38    let mut window = [0u8; 4];
39    for (i, &b) in bytes.iter().enumerate() {
40        window.rotate_left(1);
41        window[3] = xor_i.byte(b, xor_bytes);
42        if window == MAGIC_BYTES {
43            return Some(i + 1);
44        }
45    }
46    None
47}
48
49///
50/// Bitcoin BLK file reader
51///
52/// Thread safe and free to clone
53///
54///
55#[derive(Debug, Clone, Deref)]
56pub struct Reader(Arc<ReaderInner>);
57
58impl Reader {
59    pub fn new(blocks_dir: PathBuf, client: &Client) -> Self {
60        Self(Arc::new(ReaderInner::new(blocks_dir, client.clone())))
61    }
62}
63
64#[derive(Debug)]
65pub struct ReaderInner {
66    blk_index_to_blk_path: Arc<RwLock<BlkIndexToBlkPath>>,
67    xor_bytes: XORBytes,
68    blocks_dir: PathBuf,
69    client: Client,
70}
71
72impl ReaderInner {
73    pub fn new(blocks_dir: PathBuf, client: Client) -> Self {
74        Self {
75            xor_bytes: XORBytes::from(blocks_dir.as_path()),
76            blk_index_to_blk_path: Arc::new(RwLock::new(BlkIndexToBlkPath::scan(
77                blocks_dir.as_path(),
78            ))),
79            blocks_dir,
80            client,
81        }
82    }
83
84    pub fn client(&self) -> &Client {
85        &self.client
86    }
87
88    pub fn blocks_dir(&self) -> &Path {
89        &self.blocks_dir
90    }
91
92    pub fn blk_index_to_blk_path(&self) -> RwLockReadGuard<'_, BlkIndexToBlkPath> {
93        self.blk_index_to_blk_path.read()
94    }
95
96    pub fn xor_bytes(&self) -> XORBytes {
97        self.xor_bytes
98    }
99
100    /// Read raw bytes from a blk file at the given position with XOR decoding
101    pub fn read_raw_bytes(&self, position: BlkPosition, size: usize) -> Result<Vec<u8>> {
102        let blk_paths = self.blk_index_to_blk_path();
103        let blk_path = blk_paths
104            .get(&position.blk_index())
105            .ok_or(Error::NotFound("Blk file not found".into()))?;
106
107        let mut file = File::open(blk_path)?;
108        file.seek(SeekFrom::Start(position.offset() as u64))?;
109
110        let mut buffer = vec![0u8; size];
111        file.read_exact(&mut buffer)?;
112
113        let mut xori = XORIndex::default();
114        xori.add_assign(position.offset() as usize);
115        xori.bytes(&mut buffer, self.xor_bytes);
116
117        Ok(buffer)
118    }
119
120    /// Returns a crossbeam channel receiver that streams `ReadBlock`s in chain order.
121    ///
122    /// Both `start` and `end` are inclusive. `None` means unbounded.
123    pub fn read(&self, start: Option<Height>, end: Option<Height>) -> Receiver<ReadBlock> {
124        let client = self.client.clone();
125
126        let (send_bytes, recv_bytes) = bounded(BOUND_CAP / 2);
127        let (send_block, recv_block) = bounded(BOUND_CAP);
128        let (send_ordered, recv_ordered) = bounded(BOUND_CAP);
129
130        let blk_index_to_blk_path = BlkIndexToBlkPath::scan(&self.blocks_dir);
131        *self.blk_index_to_blk_path.write() = blk_index_to_blk_path.clone();
132
133        let xor_bytes = self.xor_bytes;
134
135        let first_blk_index = self
136            .find_start_blk_index(start, &blk_index_to_blk_path, xor_bytes)
137            .unwrap_or_default();
138
139        let get_block_time = |h: Height| -> u32 {
140            self.client
141                .get_block_hash(*h as u64)
142                .ok()
143                .and_then(|hash| self.client.get_block_header(&hash).ok())
144                .map(|h| h.time)
145                .unwrap_or(0)
146        };
147
148        let start_time = start.filter(|h| **h > 0).map(&get_block_time).unwrap_or(0);
149        let end_time = end.map(&get_block_time).unwrap_or(0);
150
151        thread::spawn(move || {
152            let _ = blk_index_to_blk_path.range(first_blk_index..).try_for_each(
153                move |(blk_index, blk_path)| {
154                    let mut xor_i = XORIndex::default();
155
156                    let blk_index = *blk_index;
157
158                    let Ok(mut blk_bytes_) = fs::read(blk_path) else {
159                        error!("Failed to read blk file: {}", blk_path.display());
160                        return ControlFlow::Break(());
161                    };
162                    let blk_bytes = blk_bytes_.as_mut_slice();
163                    let mut i = 0;
164
165                    loop {
166                        let Some(offset) = find_magic(&blk_bytes[i..], &mut xor_i, xor_bytes)
167                        else {
168                            break;
169                        };
170                        i += offset;
171
172                        if i + 4 > blk_bytes.len() {
173                            warn!("Truncated blk file {blk_index}: not enough bytes for block length at offset {i}");
174                            break;
175                        }
176                        let len = u32::from_le_bytes(
177                            xor_i
178                                .bytes(&mut blk_bytes[i..(i + 4)], xor_bytes)
179                                .try_into()
180                                .unwrap(),
181                        ) as usize;
182                        i += 4;
183
184                        if i + len > blk_bytes.len() {
185                            warn!("Truncated blk file {blk_index}: block at offset {} claims {len} bytes but only {} remain", i - 4, blk_bytes.len() - i);
186                            break;
187                        }
188                        let position = BlkPosition::new(blk_index, i as u32);
189                        let metadata = BlkMetadata::new(position, len as u32);
190
191                        let block_bytes = (blk_bytes[i..(i + len)]).to_vec();
192
193                        if send_bytes.send((metadata, block_bytes, xor_i)).is_err() {
194                            return ControlFlow::Break(());
195                        }
196
197                        i += len;
198                        xor_i.add_assign(len);
199                    }
200
201                    ControlFlow::Continue(())
202                },
203            );
204        });
205
206        thread::spawn(move || {
207            // Private pool to prevent collision with the global pool
208            let parser_pool = rayon::ThreadPoolBuilder::new()
209                .num_threads(4.min(thread::available_parallelism().unwrap().get() / 2))
210                .build()
211                .expect("Failed to create parser thread pool");
212
213            parser_pool.install(|| {
214                let _ =
215                    recv_bytes
216                        .into_iter()
217                        .par_bridge()
218                        .try_for_each(|(metadata, bytes, xor_i)| {
219                            let position = metadata.position();
220                            match decode_block(
221                                bytes, metadata, &client, xor_i, xor_bytes, start, end, start_time,
222                                end_time,
223                            ) {
224                                Ok(Some(block)) => {
225                                    if send_block.send(block).is_err() {
226                                        return ControlFlow::Break(());
227                                    }
228                                }
229                                Ok(None) => {} // Block filtered out (outside range, unconfirmed)
230                                Err(e) => {
231                                    warn!("Failed to decode block at {position}: {e}");
232                                }
233                            }
234                            ControlFlow::Continue(())
235                        });
236            });
237        });
238
239        thread::spawn(move || {
240            let mut current_height = start.unwrap_or_default();
241            let mut prev_hash: Option<BlockHash> = None;
242            let mut future_blocks = BTreeMap::default();
243
244            let _ = recv_block
245                .iter()
246                .try_for_each(|block| -> ControlFlow<(), _> {
247                    let mut opt = if current_height == block.height() {
248                        Some(block)
249                    } else {
250                        future_blocks.insert(block.height(), block);
251                        None
252                    };
253
254                    while let Some(block) = opt.take().or_else(|| {
255                        if !future_blocks.is_empty() {
256                            future_blocks.remove(&current_height)
257                        } else {
258                            None
259                        }
260                    }) {
261                        if let Some(expected_prev) = prev_hash.as_ref() && block.header.prev_blockhash != expected_prev.into() {
262                            error!(
263                                "Chain discontinuity detected at height {}: expected prev_hash {}, got {}. Stopping iteration.",
264                                *block.height(),
265                                expected_prev,
266                                block.header.prev_blockhash
267                            );
268                            return ControlFlow::Break(());
269                        }
270
271                        prev_hash = Some(block.hash().clone());
272
273                        if send_ordered.send(block).is_err() {
274                            return ControlFlow::Break(());
275                        }
276
277                        current_height.increment();
278
279                        if end.is_some_and(|end| current_height > end) {
280                            return ControlFlow::Break(());
281                        }
282                    }
283
284                    ControlFlow::Continue(())
285                });
286        });
287
288        recv_ordered
289    }
290
291    fn find_start_blk_index(
292        &self,
293        target_start: Option<Height>,
294        blk_index_to_blk_path: &BlkIndexToBlkPath,
295        xor_bytes: XORBytes,
296    ) -> Result<u16> {
297        let Some(target_start) = target_start else {
298            return Ok(0);
299        };
300
301        // If start is a very recent block we only look back X blk file before the last
302        if let Ok(height) = self.client.get_last_height()
303            && (*height).saturating_sub(*target_start) <= 3
304        {
305            return Ok(blk_index_to_blk_path
306                .keys()
307                .rev()
308                .nth(2)
309                .copied()
310                .unwrap_or_default());
311        }
312
313        let blk_indices: Vec<u16> = blk_index_to_blk_path.keys().copied().collect();
314
315        if blk_indices.is_empty() {
316            return Ok(0);
317        }
318
319        let mut left = 0;
320        let mut right = blk_indices.len() - 1;
321        let mut best_start_idx = 0;
322
323        while left <= right {
324            let mid = (left + right) / 2;
325            let blk_index = blk_indices[mid];
326
327            if let Some(blk_path) = blk_index_to_blk_path.get(&blk_index) {
328                match self.get_first_block_height(blk_path, xor_bytes) {
329                    Ok(height) => {
330                        if height <= target_start {
331                            best_start_idx = mid;
332                            left = mid + 1;
333                        } else {
334                            if mid == 0 {
335                                break;
336                            }
337                            right = mid - 1;
338                        }
339                    }
340                    Err(_) => {
341                        left = mid + 1;
342                    }
343                }
344            } else {
345                break;
346            }
347        }
348
349        // buffer for worst-case scenarios when a block as far behind
350        let final_idx = best_start_idx.saturating_sub(21);
351
352        Ok(blk_indices.get(final_idx).copied().unwrap_or(0))
353    }
354
355    pub fn get_first_block_height(
356        &self,
357        blk_path: &PathBuf,
358        xor_bytes: XORBytes,
359    ) -> Result<Height> {
360        let mut file = File::open(blk_path)?;
361        let mut buf = [0u8; 4096];
362        let n = file.read(&mut buf)?;
363
364        let mut xor_i = XORIndex::default();
365        let magic_end = find_magic(&buf[..n], &mut xor_i, xor_bytes)
366            .ok_or_else(|| Error::NotFound("No magic bytes found".into()))?;
367
368        let size_end = magic_end + 4;
369        xor_i.bytes(&mut buf[magic_end..size_end], xor_bytes);
370
371        let header_end = size_end + 80;
372        xor_i.bytes(&mut buf[size_end..header_end], xor_bytes);
373
374        let header =
375            Header::consensus_decode(&mut std::io::Cursor::new(&buf[size_end..header_end]))?;
376
377        let height = self.client.get_block_info(&header.block_hash())?.height as u32;
378
379        Ok(Height::new(height))
380    }
381}