brk_indexer 0.2.3

A Bitcoin indexer built on top of brk_reader
Documentation
use brk_cohort::ByAddrType;
use brk_error::{Error, Result};
use brk_store::Store;
use brk_types::{
    AddrIndexOutPoint, AddrIndexTxIndex, OutPoint, OutputType, TxInIndex, TxIndex, Txid,
    TxidPrefix, TypeIndex, Unit, Vin, Vout,
};
use rayon::prelude::*;
use rustc_hash::{FxHashMap, FxHashSet};
use tracing::error;
use vecdb::{PcoVec, WritableVec};

use super::{BlockProcessor, ComputedTx, InputSource, SameBlockOutputInfo};
use crate::InputsVecs;

impl<'a> BlockProcessor<'a> {
    pub fn process_inputs(
        &self,
        txs: &[ComputedTx],
        txid_prefix_to_tx_index: &mut FxHashMap<TxidPrefix, TxIndex>,
    ) -> Result<Vec<(TxInIndex, InputSource)>> {
        txid_prefix_to_tx_index.clear();
        txid_prefix_to_tx_index.extend(txs.iter().map(|ct| (ct.txid_prefix, ct.tx_index)));

        let base_tx_index = self.indexes.tx_index;
        let base_txin_index = self.indexes.txin_index;

        let total_inputs: usize = self.block.txdata.iter().map(|tx| tx.input.len()).sum();
        let mut items = Vec::with_capacity(total_inputs);
        for (index, tx) in self.block.txdata.iter().enumerate() {
            for (vin, txin) in tx.input.iter().enumerate() {
                items.push((TxIndex::from(index), Vin::from(vin), txin, tx));
            }
        }

        let txid_prefix_to_tx_index = &*txid_prefix_to_tx_index;

        let txins = items
            .into_par_iter()
            .enumerate()
            .map(
                |(block_txin_index, (block_tx_index, vin, txin, tx))| -> Result<(TxInIndex, InputSource)> {
                    let tx_index = base_tx_index + block_tx_index;
                    let txin_index = base_txin_index + TxInIndex::from(block_txin_index);

                    if tx.is_coinbase() {
                        return Ok((
                            txin_index,
                            InputSource::SameBlock {
                                tx_index,
                                vin,
                                outpoint: OutPoint::COINBASE,
                            },
                        ));
                    }

                    let outpoint = txin.previous_output;
                    let txid = Txid::from(outpoint.txid);
                    let txid_prefix = TxidPrefix::from(&txid);
                    let vout = Vout::from(outpoint.vout);

                    if let Some(&same_block_tx_index) = txid_prefix_to_tx_index
                        .get(&txid_prefix) {
                        let outpoint = OutPoint::new(same_block_tx_index, vout);
                        return Ok((
                            txin_index,
                            InputSource::SameBlock {
                                tx_index,
                                vin,
                                outpoint,
                            },
                        ));
                    }

                    let store_result = self
                        .stores
                        .txid_prefix_to_tx_index
                        .get(&txid_prefix)?
                        .map(|v| *v);

                    let prev_tx_index = match store_result {
                        Some(tx_index) if tx_index < self.indexes.tx_index => tx_index,
                        _ => {
                            error!(
                                "UnknownTxid: txid={}, prefix={:?}, store_result={:?}, current_tx_index={:?}",
                                txid, txid_prefix, store_result, self.indexes.tx_index
                            );
                            return Err(Error::UnknownTxid);
                        }
                    };

                    let txout_index = self
                        .vecs
                        .transactions
                        .first_txout_index
                        .get_pushed_or_read(prev_tx_index, &self.readers.tx_index_to_first_txout_index)
                        .ok_or(Error::Internal("Missing txout_index"))?
                        + vout;

                    let outpoint = OutPoint::new(prev_tx_index, vout);

                    let output_type = self
                        .vecs
                        .outputs
                        .output_type
                        .get_pushed_or_read(txout_index, &self.readers.txout_index_to_output_type)
                        .ok_or(Error::Internal("Missing output_type"))?;

                    let type_index = self
                        .vecs
                        .outputs
                        .type_index
                        .get_pushed_or_read(txout_index, &self.readers.txout_index_to_type_index)
                        .ok_or(Error::Internal("Missing type_index"))?;

                    Ok((
                        txin_index,
                        InputSource::PreviousBlock {
                            vin,
                            tx_index,
                            outpoint,
                            output_type,
                            type_index,
                        },
                    ))
                },
            )
            .collect::<Result<Vec<_>>>()?;

        Ok(txins)
    }

    pub fn collect_same_block_spent_outpoints(
        txins: &[(TxInIndex, InputSource)],
        out: &mut FxHashSet<OutPoint>,
    ) {
        out.clear();
        out.extend(
            txins
                .iter()
                .filter_map(|(_, input_source)| match input_source {
                    InputSource::SameBlock { outpoint, .. } if !outpoint.is_coinbase() => {
                        Some(*outpoint)
                    }
                    _ => None,
                }),
        );
    }
}

pub(super) fn finalize_inputs(
    first_txin_index: &mut PcoVec<TxIndex, TxInIndex>,
    inputs: &mut InputsVecs,
    addr_tx_index_stores: &mut ByAddrType<Store<AddrIndexTxIndex, Unit>>,
    addr_outpoint_stores: &mut ByAddrType<Store<AddrIndexOutPoint, Unit>>,
    txins: Vec<(TxInIndex, InputSource)>,
    same_block_output_info: &mut FxHashMap<OutPoint, SameBlockOutputInfo>,
) -> Result<()> {
    for (txin_index, input_source) in txins {
        let (vin, tx_index, outpoint, output_type, type_index) = match input_source {
            InputSource::PreviousBlock {
                vin,
                tx_index,
                outpoint,
                output_type,
                type_index,
            } => (vin, tx_index, outpoint, output_type, type_index),
            InputSource::SameBlock {
                tx_index,
                vin,
                outpoint,
            } => {
                if outpoint.is_coinbase() {
                    (
                        vin,
                        tx_index,
                        outpoint,
                        OutputType::Unknown,
                        TypeIndex::COINBASE,
                    )
                } else {
                    let info = same_block_output_info
                        .remove(&outpoint)
                        .ok_or(Error::Internal("Same-block output not found"))
                        .inspect_err(|_| {
                            error!(
                                ?outpoint,
                                remaining = same_block_output_info.len(),
                                "Same-block output not found"
                            );
                        })?;
                    (vin, tx_index, outpoint, info.output_type, info.type_index)
                }
            }
        };

        if vin.is_zero() {
            first_txin_index.checked_push(tx_index, txin_index)?;
        }

        inputs.tx_index.checked_push(txin_index, tx_index)?;
        inputs.outpoint.checked_push(txin_index, outpoint)?;
        inputs.output_type.checked_push(txin_index, output_type)?;
        inputs.type_index.checked_push(txin_index, type_index)?;

        if !output_type.is_addr() {
            continue;
        }
        let addr_type = output_type;
        let addr_index = type_index;

        addr_tx_index_stores
            .get_mut_unwrap(addr_type)
            .insert(AddrIndexTxIndex::from((addr_index, tx_index)), Unit);

        addr_outpoint_stores
            .get_mut_unwrap(addr_type)
            .remove(AddrIndexOutPoint::from((addr_index, outpoint)));
    }

    Ok(())
}