use crate::blocks::{BlockParser, ParserIterator, ParserOptions, Pipeline};
use anyhow::Result;
use bitcoin::block::Header;
use bitcoin::hashes::Hash;
use bitcoin::{Block, OutPoint, Transaction, TxIn, TxOut, Txid};
use dashmap::DashMap;
use log::info;
use rand::prelude::SmallRng;
use rand::{Error, RngCore, SeedableRng};
use scalable_cuckoo_filter::{DefaultHasher, ScalableCuckooFilter, ScalableCuckooFilterBuilder};
use std::fs;
use std::fs::File;
use std::io::{BufReader, BufWriter};
use std::iter::Zip;
use std::slice::Iter;
use std::sync::{Arc, Mutex};
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct UtxoBlock {
pub header: Header,
pub txdata: Vec<UtxoTransaction>,
}
impl UtxoBlock {
fn new(block: Block) -> Self {
Self {
header: block.header,
txdata: block.txdata.into_iter().map(UtxoTransaction::new).collect(),
}
}
pub fn to_block(self) -> Block {
Block {
header: self.header,
txdata: self.txdata.into_iter().map(|tx| tx.transaction).collect(),
}
}
}
#[derive(Clone, Eq, PartialEq, Debug)]
pub struct UtxoTransaction {
pub transaction: Transaction,
pub txid: Txid,
inputs: Vec<TxOut>,
outputs: Vec<OutputStatus>,
}
impl UtxoTransaction {
fn new(transaction: Transaction) -> UtxoTransaction {
Self {
txid: transaction.compute_txid(),
transaction,
inputs: vec![],
outputs: vec![],
}
}
pub fn input(&self) -> Zip<Iter<'_, TxIn>, Iter<'_, TxOut>> {
self.transaction.input.iter().zip(self.inputs.iter())
}
pub fn output(&self) -> Zip<Iter<'_, TxOut>, Iter<'_, OutputStatus>> {
self.transaction.output.iter().zip(self.outputs.iter())
}
}
#[derive(Clone, Debug, Eq, PartialEq, Copy)]
pub enum OutputStatus {
Spent,
Unspent,
}
type ShortOutPoints = (Vec<ShortOutPoint>, Vec<ShortOutPoint>);
type ShortOutPointFilter = ScalableCuckooFilter<ShortOutPoint, DefaultHasher, FastRng>;
#[derive(Clone, Debug)]
pub struct UtxoParser {
filter_file: String,
blocks_dir: String,
estimated_utxos: usize,
end_height: usize,
options: ParserOptions,
}
impl UtxoParser {
pub fn new(blocks_dir: &str, filter_file: &str) -> Self {
Self {
filter_file: filter_file.to_string(),
blocks_dir: blocks_dir.to_string(),
estimated_utxos: 250_000_000,
end_height: usize::MAX,
options: Default::default(),
}
}
pub fn estimated_utxos(mut self, estimated_utxos: usize) -> Self {
self.estimated_utxos = estimated_utxos;
self
}
pub fn end_height(mut self, end_height: usize) -> Self {
self.end_height = end_height;
self
}
pub fn with_opts(mut self, options: ParserOptions) -> Self {
self.options = options;
self
}
pub fn parse<T: Send + 'static>(
self,
extract: impl Fn(UtxoBlock) -> T + Clone + Send + 'static,
) -> Result<ParserIterator<T>> {
if !fs::exists(&self.filter_file)? {
self.create_filter()?;
} else {
info!("Found UTXO filter '{}'", self.filter_file);
}
let reader = BufReader::new(File::open(&self.filter_file)?);
let filter = bincode::deserialize_from(reader)?;
let pipeline = UtxoPipeline::new(filter, extract);
Ok(
BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
.end_height(self.end_height)
.parse(UtxoBlock::new)
.ordered()
.pipeline(&pipeline),
)
}
pub fn create_filter(&self) -> Result<Self> {
info!("Creating UTXO filter '{}'", self.filter_file);
let filter = UtxoFilter::new(self.estimated_utxos);
BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
.end_height(self.end_height)
.parse(UtxoFilter::outpoints)
.ordered()
.map(&|outpoints| filter.update(outpoints))
.for_each(|_| {});
let filter = Arc::try_unwrap(filter.filter).expect("Arc still referenced");
let mut filter = Mutex::into_inner(filter)?;
filter.shrink_to_fit();
let writer = BufWriter::new(File::create(&self.filter_file)?);
bincode::serialize_into(writer, &filter)?;
info!("Finished creating UTXO filter '{}'", self.filter_file);
Ok(self.clone())
}
}
#[derive(Clone)]
struct UtxoFilter {
filter: Arc<Mutex<ShortOutPointFilter>>,
}
impl UtxoFilter {
fn new(filter_capacity: usize) -> UtxoFilter {
Self {
filter: Arc::new(Mutex::new(
ScalableCuckooFilterBuilder::default()
.initial_capacity(filter_capacity)
.false_positive_probability(0.000_000_000_001)
.rng(FastRng::default())
.finish(),
)),
}
}
fn outpoints(block: Block) -> ShortOutPoints {
let mut inputs = vec![];
let mut outputs = vec![];
for tx in block.txdata.iter() {
let txid = tx.compute_txid();
for input in &tx.input {
inputs.push(ShortOutPoint::from_outpoint(&input.previous_output));
}
for (index, _) in tx.output.iter().enumerate() {
outputs.push(ShortOutPoint::new(index, &txid));
}
}
(inputs, outputs)
}
pub fn update(&self, outpoints: ShortOutPoints) {
let mut filter = self.filter.lock().expect("Lock poisoned");
let (inputs, outputs) = outpoints;
for outpoint in outputs {
filter.insert(&outpoint);
}
for input in inputs {
filter.remove(&input);
}
}
}
#[derive(Clone)]
struct UtxoPipeline<F> {
filter: Arc<ShortOutPointFilter>,
outputs: Arc<DashMap<ShortOutPoint, TxOut>>,
extract: F,
}
impl<F> UtxoPipeline<F> {
fn new(filter: ShortOutPointFilter, extract: F) -> Self {
Self {
filter: Arc::new(filter),
outputs: Arc::new(DashMap::new()),
extract,
}
}
fn status(&self, outpoint: &ShortOutPoint) -> OutputStatus {
if self.filter.contains(outpoint) {
OutputStatus::Unspent
} else {
OutputStatus::Spent
}
}
}
impl<F, T> Pipeline<UtxoBlock, UtxoBlock, T> for UtxoPipeline<F>
where
F: Fn(UtxoBlock) -> T + Clone + Send + 'static,
{
fn first(&self, mut block: UtxoBlock) -> UtxoBlock {
for tx in &mut block.txdata {
for (index, output) in tx.transaction.output.iter().enumerate() {
let outpoint = ShortOutPoint::new(index, &tx.txid);
let status = self.status(&outpoint);
if status != OutputStatus::Unspent {
self.outputs.insert(outpoint, output.clone());
}
tx.outputs.push(status);
}
}
block
}
fn second(&self, mut block: UtxoBlock) -> T {
for tx in &mut block.txdata {
for input in tx.transaction.input.iter() {
if tx.transaction.is_coinbase() {
tx.inputs.push(TxOut::NULL);
} else {
let outpoint = ShortOutPoint::from_outpoint(&input.previous_output);
let (_, out) = self.outputs.remove(&outpoint).expect("Missing outpoint");
tx.inputs.push(out);
}
}
}
(self.extract)(block)
}
}
#[derive(Eq, PartialEq, Hash, Debug, Clone)]
struct ShortOutPoint(pub Vec<u8>);
impl ShortOutPoint {
fn from_outpoint(outpoint: &OutPoint) -> ShortOutPoint {
Self::new(outpoint.vout as usize, &outpoint.txid)
}
fn new(vout: usize, txid: &Txid) -> ShortOutPoint {
let mut bytes = vec![];
bytes.extend_from_slice(&vout.to_le_bytes()[0..2]);
bytes.extend_from_slice(&txid.as_byte_array()[0..12]);
ShortOutPoint(bytes)
}
}
#[derive(Debug)]
struct FastRng(SmallRng);
impl Default for FastRng {
fn default() -> Self {
Self(SmallRng::seed_from_u64(0x2c76c58e13b3a812))
}
}
impl RngCore for FastRng {
fn next_u32(&mut self) -> u32 {
self.0.next_u32()
}
fn next_u64(&mut self) -> u64 {
self.0.next_u64()
}
fn fill_bytes(&mut self, dest: &mut [u8]) {
self.0.fill_bytes(dest)
}
fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
self.0.try_fill_bytes(dest)
}
}