brk_indexer 0.2.3

A Bitcoin indexer built on top of brk_reader
Documentation
#![doc = include_str!("../README.md")]

use std::{fs, path::Path, thread, time::Instant};

use brk_error::Result;
use brk_iterator::Blocks;
use brk_rpc::Client;
use brk_types::Height;
use fjall::PersistMode;
use tracing::{debug, info};
use vecdb::{Exit, ReadOnlyClone, ReadableVec, Ro, Rw, StorageMode};
mod constants;
mod indexes;
mod processor;
mod readers;
mod stores;
mod vecs;

use constants::*;
use indexes::IndexesExt;
use processor::{BlockBuffers, BlockProcessor};
use readers::Readers;

pub use brk_types::Indexes;
pub use stores::Stores;
pub use vecs::*;

pub struct Indexer<M: StorageMode = Rw> {
    pub vecs: Vecs<M>,
    pub stores: Stores,
}

impl ReadOnlyClone for Indexer {
    type ReadOnly = Indexer<Ro>;

    fn read_only_clone(&self) -> Indexer<Ro> {
        Indexer {
            vecs: self.vecs.read_only_clone(),
            stores: self.stores.clone(),
        }
    }
}

impl Indexer {
    pub fn forced_import(outputs_dir: &Path) -> Result<Self> {
        Self::forced_import_inner(outputs_dir, true)
    }

    fn forced_import_inner(outputs_dir: &Path, can_retry: bool) -> Result<Self> {
        info!("Increasing number of open files limit...");
        let no_file_limit = rlimit::getrlimit(rlimit::Resource::NOFILE)?;
        rlimit::setrlimit(
            rlimit::Resource::NOFILE,
            no_file_limit.0.max(10_000),
            no_file_limit.1,
        )?;

        info!("Importing indexer...");

        let indexed_path = outputs_dir.join("indexed");

        let try_import = || -> Result<Self> {
            let i = Instant::now();
            let vecs = Vecs::forced_import(&indexed_path, VERSION)?;
            info!("Imported vecs in {:?}", i.elapsed());

            let i = Instant::now();
            let stores = Stores::forced_import(&indexed_path, VERSION)?;
            info!("Imported stores in {:?}", i.elapsed());

            Ok(Self { vecs, stores })
        };

        match try_import() {
            Ok(result) => Ok(result),
            Err(err) if err.is_lock_error() => {
                // Lock errors are transient - another process has the database open.
                // Don't delete data, just return the error.
                Err(err)
            }
            Err(err) if can_retry && err.is_data_error() => {
                // Data corruption or version mismatch - safe to delete and retry
                info!("{err:?}, deleting {indexed_path:?} and retrying");
                fs::remove_dir_all(&indexed_path)?;
                Self::forced_import_inner(outputs_dir, false)
            }
            Err(err) => Err(err),
        }
    }

    pub fn index(&mut self, blocks: &Blocks, client: &Client, exit: &Exit) -> Result<Indexes> {
        self.index_(blocks, client, exit, false)
    }

    pub fn checked_index(
        &mut self,
        blocks: &Blocks,
        client: &Client,
        exit: &Exit,
    ) -> Result<Indexes> {
        self.index_(blocks, client, exit, true)
    }

    fn index_(
        &mut self,
        blocks: &Blocks,
        client: &Client,
        exit: &Exit,
        check_collisions: bool,
    ) -> Result<Indexes> {
        self.vecs.db.sync_bg_tasks()?;

        debug!("Starting indexing...");

        let last_blockhash = self.vecs.blocks.blockhash.collect_last();
        // Rollback sim
        // let last_blockhash = self
        //     .vecs
        //     .blocks
        //     .blockhash
        //     .collect_one_at(self.vecs.blocks.blockhash.len() - 2);
        debug!("Last block hash found.");

        let (starting_indexes, prev_hash) = if let Some(hash) = last_blockhash {
            let (height, hash) = client.get_closest_valid_height(hash)?;
            match Indexes::from_vecs_and_stores(height.incremented(), &mut self.vecs, &self.stores)
            {
                Some(starting_indexes) => {
                    if starting_indexes.height > client.get_last_height()? {
                        info!("Up to date, nothing to index.");
                        return Ok(starting_indexes);
                    }
                    (starting_indexes, Some(hash))
                }
                None => {
                    info!("Data inconsistency detected, resetting indexer...");
                    self.vecs.reset()?;
                    self.stores.reset()?;
                    (Indexes::default(), None)
                }
            }
        } else {
            (Indexes::default(), None)
        };
        debug!("Starting indexes set.");

        let lock = exit.lock();
        self.stores
            .rollback_if_needed(&mut self.vecs, &starting_indexes)?;
        debug!("Rollback stores done.");
        self.vecs.rollback_if_needed(&starting_indexes)?;
        debug!("Rollback vecs done.");
        drop(lock);

        // Cloned because we want to return starting indexes for the computer
        let mut indexes = starting_indexes.clone();
        debug!("Indexes cloned.");

        let is_export_height =
            |height: Height| -> bool { height != 0 && height % SNAPSHOT_BLOCK_RANGE == 0 };

        let export = move |stores: &mut Stores, vecs: &mut Vecs, height: Height| -> Result<()> {
            info!("Exporting...");
            let i = Instant::now();
            let _lock = exit.lock();
            thread::scope(|s| -> Result<()> {
                let stores_res = s.spawn(|| -> Result<()> {
                    let i = Instant::now();
                    stores.commit(height)?;
                    info!("Stores exported in {:?}", i.elapsed());
                    Ok(())
                });
                let vecs_res = s.spawn(|| -> Result<()> {
                    let i = Instant::now();
                    vecs.flush(height)?;
                    info!("Vecs exported in {:?}", i.elapsed());
                    Ok(())
                });
                stores_res.join().unwrap()?;
                vecs_res.join().unwrap()?;
                Ok(())
            })?;
            info!("Exported in {:?}", i.elapsed());
            Ok(())
        };

        let mut readers = Readers::new(&self.vecs);
        let mut buffers = BlockBuffers::default();

        let vecs = &mut self.vecs;
        let stores = &mut self.stores;

        for block in blocks.after(prev_hash)? {
            let height = block.height();

            info!("Indexing block {height}...");

            indexes.height = height;

            let mut processor = BlockProcessor {
                block: &block,
                height,
                check_collisions,
                indexes: &mut indexes,
                vecs,
                stores,
                readers: &readers,
            };

            processor.process_block_metadata()?;

            let txs = processor.compute_txids()?;

            processor.push_block_size_and_weight(&txs)?;

            let (txins_result, txouts_result) = rayon::join(
                || processor.process_inputs(&txs, &mut buffers.txid_prefix_map),
                || processor.process_outputs(),
            );
            let txins = txins_result?;
            let txouts = txouts_result?;

            let tx_count = block.txdata.len();
            let input_count = txins.len();
            let output_count = txouts.len();

            BlockProcessor::collect_same_block_spent_outpoints(
                &txins,
                &mut buffers.same_block_spent,
            );

            processor.check_txid_collisions(&txs)?;

            processor.finalize_and_store_metadata(
                txs,
                txouts,
                txins,
                &buffers.same_block_spent,
                &mut buffers.already_added_addrs,
                &mut buffers.same_block_output_info,
            )?;

            processor.update_indexes(tx_count, input_count, output_count);

            if is_export_height(height) {
                drop(readers);
                export(stores, vecs, height)?;
                readers = Readers::new(vecs);
            }
        }

        drop(readers);

        let lock = exit.lock();
        let tasks = self.stores.take_all_pending_ingests(indexes.height)?;
        self.vecs.stamped_write(indexes.height)?;
        let fjall_db = self.stores.db.clone();

        self.vecs.db.run_bg(move |db| {
            let _lock = lock;

            if !tasks.is_empty() {
                let i = Instant::now();
                for task in tasks {
                    task().map_err(vecdb::RawDBError::other)?;
                }
                info!("Stores committed in {:?}", i.elapsed());

                let i = Instant::now();
                fjall_db
                    .persist(PersistMode::SyncData)
                    .map_err(vecdb::RawDBError::other)?;
                info!("Stores persisted in {:?}", i.elapsed());
            }

            db.flush()?;
            db.compact()?;
            Ok(())
        });

        Ok(starting_indexes)
    }
}