brk_computer 0.2.5

A Bitcoin dataset computer built on top of brk_indexer
Documentation
use std::{cmp::Ordering, collections::BTreeSet};

use brk_error::Result;
use brk_types::Height;
use tracing::{debug, warn};
use vecdb::Stamp;

use super::super::{
    AddrsDataVecs,
    addr::AnyAddrIndexesVecs,
    cohorts::{AddrCohorts, UTXOCohorts},
};

/// Result of state recovery.
pub struct RecoveredState {
    /// Height to start processing from. Zero means fresh start.
    pub starting_height: Height,
}

/// Perform state recovery for resuming from checkpoint.
///
/// Rolls back state vectors and imports cohort states.
/// Validates that all rollbacks and imports are consistent.
/// Returns Height::ZERO if any validation fails (triggers fresh start).
pub(crate) fn recover_state(
    height: Height,
    chain_state_rollback: vecdb::Result<Stamp>,
    any_addr_indexes: &mut AnyAddrIndexesVecs,
    addrs_data: &mut AddrsDataVecs,
    utxo_cohorts: &mut UTXOCohorts,
    addr_cohorts: &mut AddrCohorts,
) -> Result<RecoveredState> {
    let stamp = Stamp::from(height);

    // Rollback address state vectors
    let addr_indexes_rollback = any_addr_indexes.rollback_before(stamp);
    let addr_data_rollback = addrs_data.rollback_before(stamp);

    // Verify rollback consistency - all must agree on the same height
    let consistent_height = rollback_states(
        chain_state_rollback,
        addr_indexes_rollback,
        addr_data_rollback,
    );

    // If rollbacks are inconsistent, start fresh
    if consistent_height.is_zero() {
        warn!("Rollback consistency check failed: inconsistent heights");
        return Ok(RecoveredState {
            starting_height: Height::ZERO,
        });
    }

    // Rollback can land at an earlier height (multi-block change file), which is fine.
    // But if it lands AHEAD of target, that means rollback failed (missing change files).
    if consistent_height > height {
        warn!(
            "Rollback failed: still at {} but target was {}, falling back to fresh start",
            consistent_height, height
        );
        return Ok(RecoveredState {
            starting_height: Height::ZERO,
        });
    }

    if consistent_height != height {
        debug!(
            "Rollback landed at {} instead of {}, will resume from there",
            consistent_height, height
        );
    }

    // Import UTXO cohort states - all must succeed
    debug!(
        "importing UTXO cohort states at height {}",
        consistent_height
    );
    if !utxo_cohorts.import_separate_states(consistent_height) {
        warn!(
            "UTXO cohort state import failed at height {}",
            consistent_height
        );
        return Ok(RecoveredState {
            starting_height: Height::ZERO,
        });
    }
    debug!("UTXO cohort states imported");

    // Import address cohort states - all must succeed
    debug!(
        "importing addr cohort states at height {}",
        consistent_height
    );
    if !addr_cohorts.import_separate_states(consistent_height) {
        warn!(
            "Addr cohort state import failed at height {}",
            consistent_height
        );
        return Ok(RecoveredState {
            starting_height: Height::ZERO,
        });
    }
    debug!("addr cohort states imported");

    Ok(RecoveredState {
        starting_height: consistent_height,
    })
}

/// Reset all state for fresh start.
///
/// Resets all state vectors and cohort states.
pub(crate) fn reset_state(
    any_addr_indexes: &mut AnyAddrIndexesVecs,
    addrs_data: &mut AddrsDataVecs,
    utxo_cohorts: &mut UTXOCohorts,
    addr_cohorts: &mut AddrCohorts,
) -> Result<RecoveredState> {
    // Reset address state
    any_addr_indexes.reset()?;
    addrs_data.reset()?;

    // Reset cohort state heights
    utxo_cohorts.reset_separate_state_heights();
    addr_cohorts.reset_separate_state_heights();

    // Reset cost_basis_data for all cohorts
    utxo_cohorts.reset_separate_cost_basis_data()?;
    addr_cohorts.reset_separate_cost_basis_data()?;

    // Reset in-memory caches (fenwick, tick_tock positions)
    utxo_cohorts.reset_caches();

    Ok(RecoveredState {
        starting_height: Height::ZERO,
    })
}

/// Check if we can resume from a checkpoint or need to start fresh.
///
/// - `min_available`: minimum height we have data for across all stateful vecs
/// - `resume_target`: the height we want to resume processing from
pub(crate) fn determine_start_mode(min_available: Height, resume_target: Height) -> StartMode {
    // No data to resume from
    if resume_target.is_zero() {
        return StartMode::Fresh;
    }

    match min_available.cmp(&resume_target) {
        Ordering::Greater => unreachable!("min_available > resume_target"),
        Ordering::Equal => StartMode::Resume(resume_target),
        Ordering::Less => StartMode::Fresh,
    }
}

/// Whether to resume from checkpoint or start fresh.
pub enum StartMode {
    /// Resume from the given height.
    Resume(Height),
    /// Start from height 0.
    Fresh,
}

/// Rollback state vectors to before a given stamp.
///
/// Returns the consistent starting height if ALL rollbacks succeed and agree,
/// otherwise returns Height::ZERO (need fresh start).
fn rollback_states(
    chain_state_rollback: vecdb::Result<Stamp>,
    addr_indexes_rollbacks: Result<Vec<Stamp>>,
    addr_data_rollbacks: Result<[Stamp; 2]>,
) -> Height {
    let mut heights: BTreeSet<Height> = BTreeSet::new();

    // All rollbacks must succeed - any error means fresh start
    let Ok(s) = chain_state_rollback else {
        warn!("chain_state rollback failed: {:?}", chain_state_rollback);
        return Height::ZERO;
    };
    let chain_height = Height::from(s).incremented();
    debug!(
        "chain_state rolled back to stamp {:?}, height {}",
        s, chain_height
    );
    heights.insert(chain_height);

    let Ok(stamps) = addr_indexes_rollbacks else {
        warn!("addr_indexes rollback failed: {:?}", addr_indexes_rollbacks);
        return Height::ZERO;
    };
    for (i, s) in stamps.iter().enumerate() {
        let h = Height::from(*s).incremented();
        debug!(
            "addr_indexes[{}] rolled back to stamp {:?}, height {}",
            i, s, h
        );
        heights.insert(h);
    }

    let Ok(stamps) = addr_data_rollbacks else {
        warn!("addr_data rollback failed: {:?}", addr_data_rollbacks);
        return Height::ZERO;
    };
    for (i, s) in stamps.iter().enumerate() {
        let h = Height::from(*s).incremented();
        debug!(
            "addr_data[{}] rolled back to stamp {:?}, height {}",
            i, s, h
        );
        heights.insert(h);
    }

    // All must agree on the same height
    if heights.len() == 1 {
        heights.pop_first().unwrap()
    } else {
        warn!("Rollback heights inconsistent: {:?}", heights);
        Height::ZERO
    }
}