bitcoin_block_parser/
utxos.rs

1//! Contains [`UtxoParser`] for tracking input amounts and output statuses in [`UtxoBlock`].
2
3use crate::blocks::{BlockParser, ParserIterator, ParserOptions, Pipeline};
4use anyhow::Result;
5use bitcoin::block::Header;
6use bitcoin::hashes::Hash;
7use bitcoin::{Block, OutPoint, Transaction, TxIn, TxOut, Txid};
8use dashmap::DashMap;
9use log::info;
10use rand::prelude::SmallRng;
11use rand::{Error, RngCore, SeedableRng};
12use scalable_cuckoo_filter::{DefaultHasher, ScalableCuckooFilter, ScalableCuckooFilterBuilder};
13use std::fs;
14use std::fs::File;
15use std::io::{BufReader, BufWriter};
16use std::iter::Zip;
17use std::slice::Iter;
18use std::sync::{Arc, Mutex};
19
20/// A block that has been parsed tracking input amounts and output status
21#[derive(Clone, Eq, PartialEq, Debug)]
22pub struct UtxoBlock {
23    /// The block header
24    pub header: Header,
25    /// List of transactions contained in the block
26    pub txdata: Vec<UtxoTransaction>,
27}
28
29impl UtxoBlock {
30    /// Construct from a bitcoin [`Block`].
31    fn new(block: Block) -> Self {
32        Self {
33            header: block.header,
34            txdata: block.txdata.into_iter().map(UtxoTransaction::new).collect(),
35        }
36    }
37
38    /// Convert back into a [`bitcoin::Block`].
39    pub fn to_block(self) -> Block {
40        Block {
41            header: self.header,
42            txdata: self.txdata.into_iter().map(|tx| tx.transaction).collect(),
43        }
44    }
45}
46
47/// A transaction that has been parsed tracking input amounts and output status
48#[derive(Clone, Eq, PartialEq, Debug)]
49pub struct UtxoTransaction {
50    /// Underlying bitcoin transaction [`Transaction`]
51    pub transaction: Transaction,
52    /// Precomputed [`Txid`]
53    pub txid: Txid,
54    /// Tracks the input amounts in-order of inputs
55    inputs: Vec<TxOut>,
56    /// Tracks the output statuses in-order of outputs
57    outputs: Vec<OutputStatus>,
58}
59
60impl UtxoTransaction {
61    /// Construct from a bitcoin [`Transaction`].
62    fn new(transaction: Transaction) -> UtxoTransaction {
63        Self {
64            txid: transaction.compute_txid(),
65            transaction,
66            inputs: vec![],
67            outputs: vec![],
68        }
69    }
70
71    /// Returns the [`TxIn`] of the transaction zipped with the input [`TxOut`].
72    pub fn input(&self) -> Zip<Iter<'_, TxIn>, Iter<'_, TxOut>> {
73        self.transaction.input.iter().zip(self.inputs.iter())
74    }
75
76    /// Returns the [`TxOut`] of the transaction zipped with the output [`OutputStatus`].
77    pub fn output(&self) -> Zip<Iter<'_, TxOut>, Iter<'_, OutputStatus>> {
78        self.transaction.output.iter().zip(self.outputs.iter())
79    }
80}
81
82/// Status of the [`TxOut`] within the transaction graph.
83#[derive(Clone, Debug, Eq, PartialEq, Copy)]
84pub enum OutputStatus {
85    /// The output was spent in a later block.
86    Spent,
87    /// The output was never spent in any later block (it is a UTXO).
88    Unspent,
89}
90
91type ShortOutPoints = (Vec<ShortOutPoint>, Vec<ShortOutPoint>);
92type ShortOutPointFilter = ScalableCuckooFilter<ShortOutPoint, DefaultHasher, FastRng>;
93
94/// Multithreaded parser that returns a [`ParserIterator`] of [`UtxoBlock`]
95/// * Tracks the [`TxOut`] of every [`TxIn`]
96/// * Tracks the [`OutputStatus`] for every [`TxOut`]
97///
98/// # Examples
99/// Computing the largest mining fee requires knowing the input amounts of every transaction.
100/// Call [`UtxoParser::parse`] to get a [`UtxoBlock`] that tracks input amounts.
101/// ```no_run
102/// use std::cmp::max;
103/// use bitcoin::Amount;
104/// use bitcoin_block_parser::utxos::*;
105///
106/// let parser = UtxoParser::new("/home/user/.bitcoin/blocks/", "filter.bin");
107/// let fees = parser.parse(|block| {
108///     let mut max_mining_fee = Amount::ZERO;
109///     for tx in block.txdata.into_iter() {
110///         // For every transaction sum up the input and output amounts
111///         let inputs: Amount = tx.input().map(|(_, out)| out.value).sum();
112///         let outputs: Amount = tx.output().map(|(out, _)| out.value).sum();
113///         if !tx.transaction.is_coinbase() {
114///             // Subtract outputs amount from inputs amount to get the fee
115///             max_mining_fee = max(inputs - outputs, max_mining_fee);
116///         }
117///     }
118///     max_mining_fee
119/// }).unwrap();
120/// println!("Maximum mining fee: {}", fees.max().unwrap());
121/// ```
122///
123/// Computing the largest UTXO requires knowing the [`OutputStatus`] to determine whether a
124/// [`TxOut`] was spent or unspent.
125/// ```no_run
126/// use std::cmp::max;
127/// use bitcoin::Amount;
128/// use bitcoin_block_parser::utxos::*;
129///
130/// let parser = UtxoParser::new("/home/user/.bitcoin/blocks/", "filter.bin");
131/// let amounts = parser.parse(|block| {
132///     let mut max_unspent_tx = Amount::ZERO;
133///     for tx in block.txdata.into_iter() {
134///         for (output, status) in tx.output() {
135///             if status == &OutputStatus::Unspent {
136///                 max_unspent_tx = max(output.value, max_unspent_tx);
137///             }
138///         }
139///     }
140///     max_unspent_tx
141/// }).unwrap();
142/// println!("Maximum unspent output: {}", amounts.max().unwrap());
143/// ```
144#[derive(Clone, Debug)]
145pub struct UtxoParser {
146    /// Filter file that contains all UTXOs
147    filter_file: String,
148    /// Underlying parser for parsing the blocks.
149    blocks_dir: String,
150    /// Used to allocate the initial capacity of shared state.
151    estimated_utxos: usize,
152    /// The block height range to end at
153    end_height: usize,
154    /// Options for the underlying parser
155    options: ParserOptions,
156}
157
158impl UtxoParser {
159    /// Creates a new parser.
160    ///
161    /// - `blocks_dir` - directory where the `*.blk` files are located.
162    /// - `filter_file` - file that will store the UTXO filter.
163    ///
164    /// Returns an `Err` if unable to parse the `blk` files.
165    /// You can [specify the blocks directory](https://en.bitcoin.it/wiki/Data_directory) when
166    ///   running `bitcoind`.
167    pub fn new(blocks_dir: &str, filter_file: &str) -> Self {
168        Self {
169            filter_file: filter_file.to_string(),
170            blocks_dir: blocks_dir.to_string(),
171            estimated_utxos: 250_000_000,
172            end_height: usize::MAX,
173            options: Default::default(),
174        }
175    }
176
177    /// Set the estimated amount of UTXOs in the range of blocks you are parsing.
178    ///
179    /// Used to lower the memory usage of shared state objects.
180    pub fn estimated_utxos(mut self, estimated_utxos: usize) -> Self {
181        self.estimated_utxos = estimated_utxos;
182        self
183    }
184
185    /// Sets the *inclusive* end of block heights to parse.
186    /// Parsing always starts at the genesis block in order to track the transaction graph properly.
187    ///
188    /// * `end_height` - the height to end at, [`usize::MAX`] will stop at the last block
189    ///    available.
190    pub fn end_height(mut self, end_height: usize) -> Self {
191        self.end_height = end_height;
192        self
193    }
194
195    /// Creates a parser with custom [`ParserOptions`].
196    pub fn with_opts(mut self, options: ParserOptions) -> Self {
197        self.options = options;
198        self
199    }
200
201    /// Parse all [`UtxoBlock`] into type `T` and return a [`ParserIterator<T>`].  Results will
202    /// be in random order due to multithreading.
203    ///
204    /// * `extract` - a closure that runs on multiple threads.  For best performance perform as much
205    ///    computation and data reduction here as possible.
206    pub fn parse<T: Send + 'static>(
207        self,
208        extract: impl Fn(UtxoBlock) -> T + Clone + Send + 'static,
209    ) -> Result<ParserIterator<T>> {
210        if !fs::exists(&self.filter_file)? {
211            self.create_filter()?;
212        } else {
213            info!("Found UTXO filter '{}'", self.filter_file);
214        }
215
216        let reader = BufReader::new(File::open(&self.filter_file)?);
217        let filter = bincode::deserialize_from(reader)?;
218        let pipeline = UtxoPipeline::new(filter, extract);
219
220        Ok(
221            BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
222                .end_height(self.end_height)
223                .parse(UtxoBlock::new)
224                .ordered()
225                .pipeline(&pipeline),
226        )
227    }
228
229    /// Force the creation of a new `filter_file`.
230    pub fn create_filter(&self) -> Result<Self> {
231        info!("Creating UTXO filter '{}'", self.filter_file);
232        let filter = UtxoFilter::new(self.estimated_utxos);
233        BlockParser::new_with_opts(&self.blocks_dir, self.options.clone())?
234            .end_height(self.end_height)
235            .parse(UtxoFilter::outpoints)
236            .ordered()
237            .map(&|outpoints| filter.update(outpoints))
238            .for_each(|_| {});
239
240        let filter = Arc::try_unwrap(filter.filter).expect("Arc still referenced");
241        let mut filter = Mutex::into_inner(filter)?;
242        filter.shrink_to_fit();
243        let writer = BufWriter::new(File::create(&self.filter_file)?);
244        bincode::serialize_into(writer, &filter)?;
245        info!("Finished creating UTXO filter '{}'", self.filter_file);
246        Ok(self.clone())
247    }
248}
249
250/// Contains the filter data that tracks all unspent outputs in a memory-efficient manner.
251#[derive(Clone)]
252struct UtxoFilter {
253    filter: Arc<Mutex<ShortOutPointFilter>>,
254}
255
256impl UtxoFilter {
257    /// Construct with an initial `filter_capacity`.
258    fn new(filter_capacity: usize) -> UtxoFilter {
259        Self {
260            filter: Arc::new(Mutex::new(
261                ScalableCuckooFilterBuilder::default()
262                    .initial_capacity(filter_capacity)
263                    .false_positive_probability(0.000_000_000_001)
264                    .rng(FastRng::default())
265                    .finish(),
266            )),
267        }
268    }
269
270    /// Returns [`ShortOutPoint`] for all inputs and outputs.
271    fn outpoints(block: Block) -> ShortOutPoints {
272        let mut inputs = vec![];
273        let mut outputs = vec![];
274        for tx in block.txdata.iter() {
275            let txid = tx.compute_txid();
276            for input in &tx.input {
277                inputs.push(ShortOutPoint::from_outpoint(&input.previous_output));
278            }
279
280            for (index, _) in tx.output.iter().enumerate() {
281                outputs.push(ShortOutPoint::new(index, &txid));
282            }
283        }
284        (inputs, outputs)
285    }
286
287    /// Given the results of `outpoints()` update the filter.
288    pub fn update(&self, outpoints: ShortOutPoints) {
289        let mut filter = self.filter.lock().expect("Lock poisoned");
290        let (inputs, outputs) = outpoints;
291        for outpoint in outputs {
292            // insert outpoints for every output
293            filter.insert(&outpoint);
294        }
295        for input in inputs {
296            // remove outpoints that are spent in a subsequent transaction
297            filter.remove(&input);
298        }
299    }
300}
301
302/// Pipeline for multithreaded tracking of the input amounts and output statuses.
303#[derive(Clone)]
304struct UtxoPipeline<F> {
305    /// Filter containing all unspent outpoints (UTXOs)
306    filter: Arc<ShortOutPointFilter>,
307    /// Tracks the outputs for every input.
308    outputs: Arc<DashMap<ShortOutPoint, TxOut>>,
309    /// Extract function that maps the [`UtxoBlock`] to a new type
310    extract: F,
311}
312
313impl<F> UtxoPipeline<F> {
314    /// Construct a new pipeline with an optional `filter` and initial `hashmap_capacity`.
315    fn new(filter: ShortOutPointFilter, extract: F) -> Self {
316        Self {
317            filter: Arc::new(filter),
318            outputs: Arc::new(DashMap::new()),
319            extract,
320        }
321    }
322
323    /// Returns the [`OutputStatus`] of an outpoint
324    fn status(&self, outpoint: &ShortOutPoint) -> OutputStatus {
325        if self.filter.contains(outpoint) {
326            OutputStatus::Unspent
327        } else {
328            OutputStatus::Spent
329        }
330    }
331}
332
333impl<F, T> Pipeline<UtxoBlock, UtxoBlock, T> for UtxoPipeline<F>
334where
335    F: Fn(UtxoBlock) -> T + Clone + Send + 'static,
336{
337    fn first(&self, mut block: UtxoBlock) -> UtxoBlock {
338        for tx in &mut block.txdata {
339            for (index, output) in tx.transaction.output.iter().enumerate() {
340                let outpoint = ShortOutPoint::new(index, &tx.txid);
341                let status = self.status(&outpoint);
342                // if an outpoint is unspent we don't need to track it (saving memory)
343                if status != OutputStatus::Unspent {
344                    self.outputs.insert(outpoint, output.clone());
345                }
346                tx.outputs.push(status);
347            }
348        }
349        block
350    }
351
352    fn second(&self, mut block: UtxoBlock) -> T {
353        for tx in &mut block.txdata {
354            for input in tx.transaction.input.iter() {
355                if tx.transaction.is_coinbase() {
356                    // coinbase transactions will not have a previous input
357                    tx.inputs.push(TxOut::NULL);
358                } else {
359                    let outpoint = ShortOutPoint::from_outpoint(&input.previous_output);
360                    let (_, out) = self.outputs.remove(&outpoint).expect("Missing outpoint");
361                    tx.inputs.push(out);
362                }
363            }
364        }
365        (self.extract)(block)
366    }
367}
368
369/// Shortened [`OutPoint`] to save memory (14 bytes instead of 36 bytes)
370///
371/// - 2 bytes represent far more than the maximum tx outputs (2^16)
372/// - 12 byte subset of the txid is unlikely to generate collisions even with 1 billion txs (~6.3e-12)
373#[derive(Eq, PartialEq, Hash, Debug, Clone)]
374struct ShortOutPoint(pub Vec<u8>);
375impl ShortOutPoint {
376    /// Shorten an existing [`OutPoint`].
377    fn from_outpoint(outpoint: &OutPoint) -> ShortOutPoint {
378        Self::new(outpoint.vout as usize, &outpoint.txid)
379    }
380
381    /// Create a new [`ShortOutPoint`] given its transaction id and index.
382    fn new(vout: usize, txid: &Txid) -> ShortOutPoint {
383        let mut bytes = vec![];
384        bytes.extend_from_slice(&vout.to_le_bytes()[0..2]);
385        bytes.extend_from_slice(&txid.as_byte_array()[0..12]);
386        ShortOutPoint(bytes)
387    }
388}
389
390/// Wrapper for [`SmallRng`] since it doesn't implement [`Default`] required to deserialize.
391#[derive(Debug)]
392struct FastRng(SmallRng);
393impl Default for FastRng {
394    fn default() -> Self {
395        Self(SmallRng::seed_from_u64(0x2c76c58e13b3a812))
396    }
397}
398impl RngCore for FastRng {
399    fn next_u32(&mut self) -> u32 {
400        self.0.next_u32()
401    }
402
403    fn next_u64(&mut self) -> u64 {
404        self.0.next_u64()
405    }
406
407    fn fill_bytes(&mut self, dest: &mut [u8]) {
408        self.0.fill_bytes(dest)
409    }
410
411    fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
412        self.0.try_fill_bytes(dest)
413    }
414}