bitcoin_block_parser/
headers.rsuse crate::xor::{XorReader, XOR_MASK_LEN};
use anyhow::bail;
use anyhow::Result;
use bitcoin::block::Header;
use bitcoin::consensus::Decodable;
use bitcoin::hashes::Hash;
use bitcoin::BlockHash;
use rustc_hash::FxHashMap;
use std::fs;
use std::fs::File;
use std::io::{BufReader, Read};
use std::path::{Path, PathBuf};
use std::sync::mpsc;
use threadpool::ThreadPool;
const PRE_HEADER_SIZE: usize = 8;
#[derive(Clone, Debug)]
pub struct ParsedHeader {
pub inner: Header,
pub offset: usize,
pub hash: BlockHash,
pub path: PathBuf,
pub xor_mask: [u8; XOR_MASK_LEN],
}
pub struct HeaderParser;
impl HeaderParser {
pub fn parse(blocks_dir: &str) -> Result<Vec<ParsedHeader>> {
let xor_mask = Self::read_xor_mask(blocks_dir)?;
let (tx, rx) = mpsc::channel();
let pool = ThreadPool::new(100);
for path in Self::blk_files(blocks_dir)? {
let path = path.clone();
let tx = tx.clone();
pool.execute(move || {
let results = Self::parse_headers_file(path, xor_mask);
let _ = tx.send(results);
});
}
drop(tx);
let mut locations = FxHashMap::default();
let mut collisions: Vec<ParsedHeader> = vec![];
for received in rx {
for header in received? {
if let Some(collision) = locations.insert(header.inner.prev_blockhash, header) {
collisions.push(collision)
}
}
}
for collision in collisions {
Self::resolve_collisions(&mut locations, collision);
}
Ok(Self::order_headers(locations))
}
fn parse_headers_file(
path: PathBuf,
xor_mask: [u8; XOR_MASK_LEN],
) -> anyhow::Result<Vec<ParsedHeader>> {
let buffer_size = PRE_HEADER_SIZE + Header::SIZE;
let reader = XorReader::new(File::open(&path)?, xor_mask);
let mut reader = BufReader::with_capacity(buffer_size, reader);
let mut offset = 0;
let mut buffer = vec![0; PRE_HEADER_SIZE];
let mut headers = vec![];
while reader.read_exact(&mut buffer).is_ok() {
offset += buffer.len();
if let Ok(header) = Header::consensus_decode(&mut reader) {
headers.push(ParsedHeader {
inner: header,
offset: offset + Header::SIZE,
hash: header.block_hash(),
path: path.clone(),
xor_mask,
});
let size = u32::from_le_bytes(buffer[4..].try_into()?) as usize;
reader.seek_relative(size.saturating_sub(Header::SIZE) as i64)?;
offset += size;
}
}
Ok(headers)
}
fn blk_files(dir: &str) -> anyhow::Result<Vec<PathBuf>> {
let read_dir = fs::read_dir(Path::new(&dir))?;
let mut files = vec![];
for file in read_dir {
let file = file?;
let name = file
.file_name()
.into_string()
.expect("Could not parse filename");
if name.starts_with("blk") {
files.push(file.path())
}
}
if files.is_empty() {
bail!("No BLK files found in dir {:?}", dir);
}
Ok(files)
}
fn read_xor_mask<P: AsRef<Path>>(dir: P) -> anyhow::Result<[u8; XOR_MASK_LEN]> {
let path = dir.as_ref().join("xor.dat");
if !path.exists() {
return Ok(Default::default());
}
let mut file = File::open(path)?;
let mut buf = [0_u8; XOR_MASK_LEN];
file.read_exact(&mut buf)?;
Ok(buf)
}
fn resolve_collisions(
headers: &mut FxHashMap<BlockHash, ParsedHeader>,
collision: ParsedHeader,
) {
let existing = headers
.get(&collision.inner.prev_blockhash)
.expect("exists");
let mut e_hash = &existing.hash;
let mut c_hash = &collision.hash;
while let (Some(e), Some(c)) = (headers.get(e_hash), headers.get(c_hash)) {
e_hash = &e.hash;
c_hash = &c.hash;
}
if headers.contains_key(c_hash) {
headers.insert(collision.inner.prev_blockhash, collision);
}
}
fn order_headers(mut headers: FxHashMap<BlockHash, ParsedHeader>) -> Vec<ParsedHeader> {
let mut ordered_headers = vec![];
let mut next_hash = BlockHash::all_zeros();
while let Some(index) = headers.remove(&next_hash) {
next_hash = index.hash;
ordered_headers.push(index);
}
ordered_headers
}
}