use crate::headers::ParsedHeader;
use crate::xor::XorReader;
use crate::HeaderParser;
use anyhow::Result;
use bitcoin::consensus::Decodable;
use bitcoin::{Block, Transaction};
use crossbeam_channel::{bounded, Receiver, Sender};
use log::info;
use std::cmp::min;
use std::collections::HashMap;
use std::fs::File;
use std::io::BufReader;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
use threadpool::ThreadPool;
#[derive(Clone, Debug)]
pub struct BlockParser {
headers: Vec<ParsedHeader>,
logger: ParserLogger,
options: ParserOptions,
start_height: usize,
end_height: usize,
}
impl BlockParser {
pub fn new(blocks_dir: &str) -> Result<Self> {
Self::new_with_opts(blocks_dir, ParserOptions::default())
}
pub fn new_with_opts(blocks_dir: &str, options: ParserOptions) -> Result<Self> {
let headers = HeaderParser::parse(blocks_dir)?;
Ok(Self {
headers,
logger: ParserLogger::new(),
options,
start_height: 0,
end_height: usize::MAX,
})
}
pub fn start_height(mut self, start_height: usize) -> Self {
self.start_height = start_height;
self
}
pub fn end_height(mut self, end_height: usize) -> Self {
self.end_height = end_height;
self
}
pub fn parse<T: Send + 'static>(
&self,
extract: impl Fn(Block) -> T + Clone + Send + 'static,
) -> ParserIterator<T> {
let end_height = min(self.end_height, self.headers.len() - 1);
let header_range = self.headers[self.start_height..=end_height].to_vec();
let pool = ThreadPool::new(self.options.num_threads);
let (tx, rx) = bounded(self.options.channel_size);
for (index, header) in header_range.into_iter().enumerate() {
let logger = self.logger.clone();
let tx = tx.clone();
let extract = extract.clone();
let start_height = self.start_height;
pool.execute(move || {
let extract = match Self::parse_block(&header) {
Ok(block) => extract(block),
e => panic!("Error reading {:?} - {:?}", header.path, e),
};
let height = start_height + index;
let _ = tx.send((height, extract));
logger.increment();
});
}
ParserIterator {
rx,
options: self.options.clone(),
start_height: self.start_height,
}
}
fn parse_block(header: &ParsedHeader) -> Result<Block> {
let reader = BufReader::new(File::open(&header.path)?);
let mut reader = BufReader::new(XorReader::new(reader, header.xor_mask));
reader.seek_relative(header.offset as i64)?;
Ok(Block {
header: header.inner,
txdata: Vec::<Transaction>::consensus_decode_from_finite_reader(&mut reader)?,
})
}
}
#[derive(Clone, Debug)]
pub struct ParserOptions {
pub pipeline_size: usize,
pub channel_size: usize,
pub num_threads: usize,
}
impl Default for ParserOptions {
fn default() -> Self {
Self {
pipeline_size: 1,
channel_size: 100,
num_threads: 64,
}
}
}
pub struct ParserIterator<T> {
rx: Receiver<(usize, T)>,
options: ParserOptions,
start_height: usize,
}
impl<A: Send + 'static> ParserIterator<A> {
fn create<T>(&self, rx: Receiver<(usize, T)>) -> ParserIterator<T> {
ParserIterator::<T> {
rx,
options: self.options.clone(),
start_height: self.start_height,
}
}
pub fn with_height(&self) -> ParserIterator<(usize, A)> {
let (tx, rx) = bounded(self.options.channel_size);
let parser = self.create(rx);
let rx_a = self.rx.clone();
thread::spawn(move || {
for (height, a) in rx_a {
let _ = tx.send((height, (height, a)));
}
});
parser
}
pub fn ordered(&self) -> ParserIterator<A> {
let (tx, rx) = bounded(self.options.channel_size);
let parser = self.create(rx);
let rx_a = self.rx.clone();
let start_height = self.start_height;
thread::spawn(move || {
let mut current_height = start_height;
let mut unordered: HashMap<usize, A> = HashMap::default();
for (height, a) in rx_a {
unordered.insert(height, a);
while let Some(ordered) = unordered.remove(¤t_height) {
let _ = tx.send((current_height, ordered));
current_height += 1;
}
}
});
parser
}
pub fn map_parallel<B: Send + 'static>(
&self,
function: impl Fn(A) -> B + Clone + Send + 'static,
) -> ParserIterator<B> {
let pool = ThreadPool::new(self.options.num_threads);
let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
for _ in 0..self.options.num_threads {
let tx_b = tx_b.clone();
let rx_a = self.rx.clone();
let function = function.clone();
pool.execute(move || {
for (height, a) in rx_a {
let _ = tx_b.send((height, function(a)));
}
});
}
self.create(rx_b)
}
pub fn pipeline_fn<B: Send + 'static, C: Send + 'static>(
&self,
f1: impl Fn(A) -> B + Clone + Send + 'static,
f2: impl Fn(B) -> C + Clone + Send + 'static,
) -> ParserIterator<C> {
let pipeline = PipelineClosure { f1, f2 };
self.pipeline(&pipeline)
}
pub fn pipeline<B: Send + 'static, C: Send + 'static>(
&self,
pipeline: &(impl Pipeline<A, B, C> + Clone + Send + 'static),
) -> ParserIterator<C> {
let pool_a = ThreadPool::new(self.options.num_threads);
let pool_b = ThreadPool::new(self.options.num_threads);
let rx_a = self.rx.clone();
let opts = self.options.clone();
let pipeline = pipeline.clone();
let (tx_b, rx_b) = bounded(self.options.pipeline_size * self.options.num_threads);
let (tx_c, rx_c) = bounded(self.options.pipeline_size * self.options.num_threads);
let run = Arc::new(AtomicBool::new(true));
thread::spawn(move || {
while run.load(Ordering::Relaxed) {
let p1 = pipeline.clone();
let p2 = pipeline.clone();
Self::run_pipeline(&opts, &pool_a, &run, &rx_a, &tx_b, &move |a| p1.first(a));
pool_a.join();
pipeline.between();
Self::run_pipeline(&opts, &pool_b, &run, &rx_b, &tx_c, &move |b| p2.second(b));
}
});
self.create(rx_c)
}
fn run_pipeline<X: Send + 'static, Y: Send + 'static>(
options: &ParserOptions,
pool: &ThreadPool,
running: &Arc<AtomicBool>,
rx: &Receiver<(usize, X)>,
tx: &Sender<(usize, Y)>,
function: &(impl Fn(X) -> Y + Clone + Send + 'static),
) {
for _ in 0..options.num_threads {
let running = running.clone();
let tx = tx.clone();
let rx = rx.clone();
let function = function.clone();
let pipeline_size = options.pipeline_size;
pool.execute(move || {
for _ in 0..pipeline_size {
match rx.recv() {
Ok((height, x)) => {
let _ = tx.send((height, function(x)));
}
Err(_) => {
running.store(false, Ordering::Relaxed);
}
};
}
});
}
}
}
impl<T> Iterator for ParserIterator<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.rx.iter().map(|(_, t)| t).next()
}
}
pub trait Pipeline<A, B, C> {
fn first(&self, a: A) -> B;
fn between(&self) {}
fn second(&self, b: B) -> C;
}
#[derive(Clone)]
struct PipelineClosure<F1, F2> {
f1: F1,
f2: F2,
}
impl<F1, F2, A, B, C> Pipeline<A, B, C> for PipelineClosure<F1, F2>
where
F1: Fn(A) -> B,
F2: Fn(B) -> C,
{
fn first(&self, a: A) -> B {
(self.f1)(a)
}
fn second(&self, b: B) -> C {
(self.f2)(b)
}
}
#[derive(Clone, Debug)]
struct ParserLogger {
num_parsed: Arc<AtomicUsize>,
start: Instant,
log_at: usize,
}
impl ParserLogger {
fn new() -> Self {
Self {
num_parsed: Arc::new(Default::default()),
start: Instant::now(),
log_at: 10_000,
}
}
fn increment(&self) {
let num = self.num_parsed.fetch_add(1, Ordering::Relaxed);
if num == 0 {
info!("Starting to parse blocks...");
} else if num % self.log_at == 0 {
let elapsed = (Instant::now() - self.start).as_secs();
let blocks = format!("{}K blocks parsed,", num / 1000);
info!("{} {}m{}s elapsed", blocks, elapsed / 60, elapsed % 60);
}
}
}