use anyhow::Result;
use anyhow::{anyhow, bail};
use bitcoin::block::Header;
use bitcoin::consensus::Decodable;
use bitcoin::hashes::Hash;
use bitcoin::{Amount, Block, BlockHash, OutPoint, Transaction, Txid};
use rustc_hash::FxHashMap;
use scalable_cuckoo_filter::ScalableCuckooFilter;
use std::collections::{BTreeMap, HashMap};
use std::fs::File;
use std::io::{BufReader, BufWriter, Read};
use std::iter::Zip;
use std::path::{Path, PathBuf};
use std::slice::Iter;
use std::sync::mpsc;
use std::sync::mpsc::{Receiver, SyncSender};
use std::time::Instant;
use std::{fs, thread};
use threadpool::ThreadPool;
const NUM_FILE_THREADS: usize = 100;
const BLOCK_BUFFER: usize = 100;
const APPROX_UNSPENT_TXS: usize = 300_000_000;
const FILTER_COLLISION_PROB: f64 = 0.000_000_000_001;
const SHORT_TXID_BYTES: usize = 12;
const LOG_BLOCKS: u32 = 10_000;
const HEADER_SIZE: usize = 80;
const PRE_HEADER_SIZE: usize = 8;
type UnspentFilter = ScalableCuckooFilter<OutPoint>;
type ResultBlock = Result<ParsedBlock>;
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct ParsedBlock {
pub block: Block,
txids: Vec<Txid>,
input_amounts: BTreeMap<Txid, Vec<Amount>>,
output_status: BTreeMap<Txid, Vec<OutStatus>>,
}
impl ParsedBlock {
pub fn transactions(&self) -> Zip<Iter<'_, Transaction>, Iter<'_, Txid>> {
self.block.txdata.iter().zip(self.txids.iter())
}
pub fn output_status(&self, txid: &Txid) -> Result<&Vec<OutStatus>> {
self.output_status.get(txid).ok_or(anyhow!("Output status not found, try calling parse_o() or parse_io()"))
}
pub fn input_amount(&self, txid: &Txid) -> Result<&Vec<Amount>> {
self.input_amounts.get(txid).ok_or(anyhow!("Input amount not found, try calling parse_i() or parse_io()"))
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum OutStatus {
Spent,
Unspent
}
pub struct BlockParser {
locations: Vec<BlockLocation>
}
impl BlockParser {
pub fn new(locations: &[BlockLocation]) -> Self { Self { locations: locations.to_vec() } }
pub fn parse(&self) -> Receiver<ResultBlock> {
self.parse_ordered(None)
}
pub fn parse_i(&self) -> Receiver<ResultBlock> {
self.parse_txin_amounts(None)
}
pub fn parse_o(&self, filter_file: &str) -> Receiver<ResultBlock> {
self.parse_ordered(Some(filter_file.to_string()))
}
pub fn parse_io(&self, filter_file: &str) -> Receiver<ResultBlock> {
self.parse_txin_amounts(Some(filter_file.to_string()))
}
pub fn write_filter(&self, output: &str) -> Result<()> {
self.check_genesis()?;
let writer = BufWriter::new(File::create(output)?);
let mut filter = ScalableCuckooFilter::new(APPROX_UNSPENT_TXS, FILTER_COLLISION_PROB);
for parsed in self.parse_ordered(None) {
let parsed = parsed?;
for (tx, txid) in parsed.transactions() {
for (index, _) in tx.output.iter().enumerate() {
let outpoint = OutPoint::new(*txid, index as u32);
filter.insert(&outpoint);
}
for input in &tx.input {
filter.remove(&input.previous_output);
}
}
}
filter.shrink_to_fit();
println!("Wrote {} filter with {} items", output, filter.len());
postcard::to_io(&filter, writer)?;
Ok(())
}
fn parse_ordered(&self, filter_file: Option<String>) -> Receiver<ResultBlock> {
let (tx_blocks, rx_blocks) = mpsc::sync_channel(BLOCK_BUFFER);
let pool = ThreadPool::new(NUM_FILE_THREADS);
for (index, location) in self.locations.clone().into_iter().enumerate() {
let tx = tx_blocks.clone();
pool.execute(move || {
let _ = tx.send((index as u32, Self::parse_block(location)));
});
}
drop(tx_blocks);
let (tx, rx) = mpsc::sync_channel(BLOCK_BUFFER);
thread::spawn(move || {
if let Err(e) = Self::parse_ordered_helper(filter_file, tx.clone(), rx_blocks) {
let _ = tx.send(Err(e));
}
});
rx
}
fn parse_ordered_helper(filter_file: Option<String>,
tx: SyncSender<ResultBlock>,
rx: Receiver<(u32, ResultBlock)>) -> Result<()> {
let filter = match filter_file {
None => None,
Some(file) => {
let mut reader = BufReader::new(File::open(file)?);
let mut buffer = vec![];
reader.read_to_end(&mut buffer)?;
let filter: UnspentFilter = postcard::from_bytes(&buffer)?;
Some(filter)
}
};
let mut current_index = 0;
let mut unordered = FxHashMap::default();
let start = Instant::now();
for (index, block) in rx {
unordered.insert(index, block);
while let Some(parsed) = unordered.remove(¤t_index) {
current_index += 1;
let _ = tx.send(Self::update_outputs(parsed, &filter));
if current_index % LOG_BLOCKS == 0 {
let elapsed = (Instant::now() - start).as_secs();
print!("{}0K blocks parsed,", current_index / LOG_BLOCKS);
println!(" {}m{}s elapsed", elapsed / 60, elapsed % 60);
}
}
}
println!("Parsed {} total blocks", current_index);
Ok(())
}
fn update_outputs(parsed: ResultBlock, filter: &Option<UnspentFilter>) -> ResultBlock {
if let Some(filter) = &filter {
let mut output_status: BTreeMap<Txid, Vec<OutStatus>> = BTreeMap::new();
let mut parsed = parsed?;
for (tx, txid) in parsed.transactions() {
for (index, _) in tx.output.iter().enumerate() {
let entry = output_status.entry(*txid).or_default();
if filter.contains(&OutPoint::new(*txid, index as u32)) {
entry.push(OutStatus::Unspent);
} else {
entry.push(OutStatus::Spent);
}
}
}
parsed.output_status = output_status;
return Ok(parsed);
}
parsed
}
fn parse_txin_amounts(&self, filter_file: Option<String>) -> Receiver<ResultBlock> {
let (tx, rx) = mpsc::sync_channel(BLOCK_BUFFER);
let rx_blocks = self.parse_ordered(filter_file);
if let Err(e) = self.check_genesis() {
let _ = tx.send(Err(e));
}
thread::spawn(move || {
if let Err(e) = Self::parse_txin_amounts_helper(tx.clone(), rx_blocks) {
let _ = tx.send(Err(e));
}
});
rx
}
fn parse_txin_amounts_helper(tx: SyncSender<ResultBlock>, rx: Receiver<ResultBlock>) -> Result<()> {
let mut outpoints = FxHashMap::default();
for parsed in rx {
let mut parsed = parsed?;
let mut input_amounts: BTreeMap<Txid, Vec<Amount>> = BTreeMap::new();
for (tx, txid) in parsed.transactions() {
for (index, output) in tx.output.iter().enumerate() {
let outpoint = OutPoint::new(*txid, index as u32);
match parsed.output_status(&txid).ok().and_then(|status| status.get(index)) {
Some(OutStatus::Unspent) => {},
_ => { outpoints.insert(Self::truncate(&outpoint), output.value); }
}
}
for input in &tx.input {
let entry = input_amounts.entry(*txid).or_default();
if let Some(amount) = outpoints.remove(&Self::truncate(&input.previous_output)) {
entry.push(amount);
} else if tx.is_coinbase() {
entry.push(Amount::ZERO);
} else {
bail!("Input amount not found for {:?}", input.previous_output);
}
}
}
parsed.input_amounts = input_amounts;
let _ = tx.send(Ok(parsed));
}
Ok(())
}
fn check_genesis(&self) -> Result<()> {
match self.locations.first() {
Some(first) if first.prev == BlockHash::all_zeros() => Ok(()),
_ => bail!("Calling this function must start at the genesis block")
}
}
fn parse_block(location: BlockLocation) -> ResultBlock {
let mut reader = BufReader::new(File::open(&location.path)?);
reader.seek_relative(location.offset as i64)?;
let block = Block::consensus_decode(&mut reader)?;
Ok(ParsedBlock {
txids: block.txdata.iter().map(|tx| tx.compute_txid()).collect(),
block,
input_amounts: Default::default(),
output_status: Default::default(),
})
}
fn truncate(outpoint: &OutPoint) -> Vec<u8> {
let mut bytes = vec![];
bytes.extend_from_slice(&outpoint.vout.to_le_bytes()[0..2]);
bytes.extend_from_slice(&outpoint.txid.as_byte_array()[0..SHORT_TXID_BYTES]);
bytes
}
}
#[derive(Clone, Debug)]
pub struct BlockLocation {
pub offset: usize,
pub prev: BlockHash,
pub hash: BlockHash,
pub path: PathBuf,
}
impl BlockLocation {
pub fn parse(blocks_dir: &str) -> Result<Vec<BlockLocation>> {
let (tx, rx) = mpsc::channel();
let pool = ThreadPool::new(NUM_FILE_THREADS);
for path in Self::blk_files(blocks_dir)? {
let path = path.clone();
let tx = tx.clone();
pool.execute(move || {
let results = Self::parse_headers_file(path);
let _ = tx.send(results);
});
}
drop(tx);
let mut locations: HashMap<BlockHash, BlockLocation> = HashMap::new();
let mut collisions: Vec<BlockLocation> = vec![];
for received in rx {
for header in received? {
if let Some(collision) = locations.insert(header.prev, header) {
collisions.push(collision)
}
}
}
for collision in collisions {
Self::resolve_collisions(&mut locations, collision);
}
Ok(Self::order_headers(locations))
}
fn parse_headers_file(path: PathBuf) -> Result<Vec<BlockLocation>> {
let buffer_size = PRE_HEADER_SIZE + HEADER_SIZE;
let mut reader = BufReader::with_capacity(buffer_size, File::open(&path)?);
let mut offset = 0;
let mut buffer = vec![0; PRE_HEADER_SIZE];
let mut headers = vec![];
while reader.read_exact(&mut buffer).is_ok() {
offset += buffer.len();
if let Ok(header) = Header::consensus_decode(&mut reader) {
headers.push(BlockLocation {
offset,
prev: header.prev_blockhash,
hash: header.block_hash(),
path: path.clone(),
});
let size = u32::from_le_bytes(buffer[4..].try_into()?) as usize;
reader.seek_relative((size.saturating_sub(HEADER_SIZE)) as i64)?;
offset += size;
}
}
Ok(headers)
}
fn blk_files(dir: &str) -> Result<Vec<PathBuf>> {
let read_dir = fs::read_dir(Path::new(&dir))?;
let mut files = vec![];
for file in read_dir {
let file = file?;
let name = file.file_name().into_string().expect("Could not parse filename");
if name.starts_with("blk") {
files.push(file.path())
}
}
if files.is_empty() {
bail!("No BLK files found in dir {:?}", dir);
}
Ok(files)
}
fn resolve_collisions(headers: &mut HashMap<BlockHash, BlockLocation>, collision: BlockLocation) {
let existing = headers.get(&collision.prev).expect("exists");
let mut e_hash = &existing.hash;
let mut c_hash = &collision.hash;
while let (Some(e), Some(c)) = (headers.get(e_hash), headers.get(c_hash)) {
e_hash = &e.hash;
c_hash = &c.hash;
}
if headers.contains_key(c_hash) {
headers.insert(collision.prev, collision);
}
}
fn order_headers(mut headers: HashMap<BlockHash, BlockLocation>) -> Vec<BlockLocation> {
let mut ordered_headers = vec![];
let mut next_hash = BlockHash::all_zeros();
while let Some(index) = headers.remove(&next_hash) {
next_hash = index.hash;
ordered_headers.push(index);
}
ordered_headers
}
}