brk_reader 0.3.0-beta.2

A very fast Bitcoin block parser and iterator built on top of bitcoin-rust
Documentation
use std::{fs, ops::ControlFlow, sync::OnceLock, thread};

use bitcoin::block::Header;
use brk_error::{Error, Result};
use brk_types::{BlkMetadata, Height, ReadBlock};
use crossbeam::channel::{Receiver, Sender, bounded};
use parking_lot::Mutex;
use tracing::error;

use crate::{
    BlkIndexToBlkPath, BlockHash, XORBytes, XORIndex,
    canonical::CanonicalRange,
    parse::{parse_canonical_body, peek_canonical},
    pipeline::{CHANNEL_CAPACITY, reorder::ReorderState},
    scan::scan_bytes,
};

struct ScannedBlock {
    metadata: BlkMetadata,
    bytes: Vec<u8>,
    xor_state: XORIndex,
    canonical_offset: u32,
    header: Header,
}

enum Stop {
    Done,
    Failed(Error),
}

pub(super) fn pipeline_forward(
    paths: &BlkIndexToBlkPath,
    first_blk_index: u16,
    xor_bytes: XORBytes,
    canonical: &CanonicalRange,
    send: &Sender<Result<ReadBlock>>,
    parser_threads: usize,
) -> Result<()> {
    let (parser_send, parser_recv) = bounded::<ScannedBlock>(CHANNEL_CAPACITY);
    let reorder = Mutex::new(ReorderState::new(send.clone()));
    let stop: OnceLock<Stop> = OnceLock::new();

    thread::scope(|scope| {
        for _ in 0..parser_threads {
            let parser_recv = parser_recv.clone();
            scope.spawn(|| parser_loop(parser_recv, &reorder, &stop, canonical, xor_bytes));
        }
        drop(parser_recv);

        let read_result =
            read_and_dispatch(paths, first_blk_index, xor_bytes, canonical, &parser_send, &stop);
        drop(parser_send);
        read_result
    })?;

    if let Some(Stop::Failed(e)) = stop.into_inner() {
        return Err(e);
    }
    reorder.into_inner().finalize(canonical.len())
}

fn parser_loop(
    parser_recv: Receiver<ScannedBlock>,
    reorder: &Mutex<ReorderState>,
    stop: &OnceLock<Stop>,
    canonical: &CanonicalRange,
    xor_bytes: XORBytes,
) {
    for ScannedBlock {
        metadata,
        bytes,
        xor_state,
        canonical_offset,
        header,
    } in parser_recv
    {
        if stop.get().is_some() {
            continue;
        }
        let height = Height::from(*canonical.start + canonical_offset);
        let block =
            match parse_canonical_body(bytes, metadata, xor_state, xor_bytes, height, header) {
                Ok(block) => block,
                Err(e) => {
                    error!("parse_canonical_body failed at height {height}: {e}");
                    let _ = stop.set(Stop::Failed(e));
                    continue;
                }
            };
        let pipeline_finished = {
            let mut state = reorder.lock();
            !state.try_emit(canonical_offset, block)
                || state.next_offset as usize >= canonical.len()
        };
        if pipeline_finished {
            let _ = stop.set(Stop::Done);
        }
    }
}

fn read_and_dispatch(
    paths: &BlkIndexToBlkPath,
    first_blk_index: u16,
    xor_bytes: XORBytes,
    canonical: &CanonicalRange,
    parser_send: &Sender<ScannedBlock>,
    stop: &OnceLock<Stop>,
) -> Result<()> {
    for (&blk_index, blk_path) in paths.range(first_blk_index..) {
        if stop.get().is_some() {
            return Ok(());
        }
        let mut bytes = fs::read(blk_path)?;
        scan_bytes(
            &mut bytes,
            blk_index,
            0,
            xor_bytes,
            |metadata, block_bytes, xor_state| {
                if stop.get().is_some() {
                    return ControlFlow::Break(());
                }
                let Some((canonical_offset, header)) =
                    peek_canonical(block_bytes, xor_state, xor_bytes, canonical)
                else {
                    return ControlFlow::Continue(());
                };
                if !canonical
                    .verify_prev(canonical_offset, &BlockHash::from(header.prev_blockhash))
                {
                    let _ = stop.set(Stop::Failed(Error::Internal(
                        "forward pipeline: canonical batch stitched across a reorg",
                    )));
                    return ControlFlow::Break(());
                }
                let scanned = ScannedBlock {
                    metadata,
                    bytes: block_bytes.to_vec(),
                    xor_state,
                    canonical_offset,
                    header,
                };
                if parser_send.send(scanned).is_err() {
                    ControlFlow::Break(())
                } else {
                    ControlFlow::Continue(())
                }
            },
        );
    }
    Ok(())
}