blvm-node 0.1.34

Bitcoin Commons BLVM: Minimal Bitcoin node implementation using blvm-protocol and blvm-consensus
//! Generic block pipeline hooks via registered ModuleAPI methods.

use blvm_protocol::segwit::Witness;
use blvm_protocol::Block;
use std::sync::{Arc, OnceLock, RwLock};
use std::time::Duration;
use tracing::{debug, warn};

use crate::module::inter_module::router::ModuleRouter;
use crate::module::traits::ModuleError;

pub const FILTER_BLOCK_BEFORE_STORE: &str = "filter_block_before_store";
const NODE_CALLER_ID: &str = "blvm-node";
const FILTER_TIMEOUT: Duration = Duration::from_secs(5);

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockRequest {
    height: u64,
    block: Block,
    witnesses: Vec<Vec<Witness>>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockResponse {
    block: Block,
    witnesses: Vec<Vec<Witness>>,
    stripped_txids: Vec<String>,
    filtered: bool,
}

static PIPELINE_ROUTER: RwLock<Option<Arc<ModuleRouter>>> = RwLock::new(None);
static RUNTIME_HANDLE: OnceLock<tokio::runtime::Handle> = OnceLock::new();

/// Install the block pipeline using the node's module router and current tokio runtime.
pub fn install_block_pipeline(router: Arc<ModuleRouter>) {
    let _ = RUNTIME_HANDLE.set(tokio::runtime::Handle::current());
    *PIPELINE_ROUTER.write().expect("pipeline router lock") = Some(router);
    debug!("Block pipeline installed");
}

/// Apply `filter_block_before_store` when a module registers it. Fail-open on errors/timeouts.
pub fn try_filter_block_before_store(
    height: u64,
    block: Block,
    witnesses: Arc<Vec<Vec<Witness>>>,
) -> (Block, Arc<Vec<Vec<Witness>>>) {
    let router = PIPELINE_ROUTER
        .read()
        .expect("pipeline router lock")
        .clone();
    let Some(router) = router else {
        return (block, witnesses);
    };
    let Some(runtime_handle) = RUNTIME_HANDLE.get() else {
        return (block, witnesses);
    };

    filter_sync(runtime_handle, &router, height, block, witnesses)
}

fn filter_sync(
    runtime_handle: &tokio::runtime::Handle,
    router: &Arc<ModuleRouter>,
    height: u64,
    block: Block,
    witnesses: Arc<Vec<Vec<Witness>>>,
) -> (Block, Arc<Vec<Vec<Witness>>>) {
    let request = FilterBlockRequest {
        height,
        block: block.clone(),
        witnesses: witnesses.as_ref().clone(),
    };
    let params = match bincode::serialize(&request) {
        Ok(params) => params,
        Err(e) => {
            warn!("filter_block_before_store serialize failed at height {height}: {e}");
            return (block, witnesses);
        }
    };

    let router = Arc::clone(router);
    let fut = async move {
        router
            .route_call(NODE_CALLER_ID, None, FILTER_BLOCK_BEFORE_STORE, &params)
            .await
    };

    let response_bytes = match runtime_handle.block_on(tokio::time::timeout(FILTER_TIMEOUT, fut)) {
        Ok(Ok(bytes)) => bytes,
        Ok(Err(e)) => {
            if !matches!(
                &e,
                ModuleError::OperationError(msg) if msg.contains("not found")
            ) {
                warn!("filter_block_before_store failed-open at height {height}: {e}");
            }
            return (block, witnesses);
        }
        Err(_) => {
            warn!(
                "filter_block_before_store timed out after {}s at height {height}; storing unfiltered",
                FILTER_TIMEOUT.as_secs()
            );
            return (block, witnesses);
        }
    };

    let response: FilterBlockResponse = match bincode::deserialize(&response_bytes) {
        Ok(response) => response,
        Err(e) => {
            warn!("filter_block_before_store bad response at height {height}: {e}");
            return (block, witnesses);
        }
    };

    if response.filtered {
        debug!(
            "filter_block_before_store height={height} stripped {} tx(s)",
            response.stripped_txids.len()
        );
    }

    (response.block, Arc::new(response.witnesses))
}