blvm-node 0.1.33

Bitcoin Commons BLVM: Minimal Bitcoin node implementation using blvm-protocol and blvm-consensus
//! Block pipeline: `filter_block_before_store` ModuleAPI routing during IBD flush.

use std::sync::Arc;

use async_trait::async_trait;
use blvm_node::module::inter_module::api::ModuleAPI;
use blvm_node::module::inter_module::{ModuleApiRegistry, ModuleRouter};
use blvm_node::module::pipeline::{install_block_pipeline, try_filter_block_before_store};
use blvm_node::module::traits::ModuleError;
use blvm_protocol::{
    Block, BlockHeader, OutPoint, Transaction, TransactionInput, TransactionOutput,
};

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockRequest {
    height: u64,
    block: Block,
    witnesses: Vec<Vec<blvm_protocol::segwit::Witness>>,
}

#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockResponse {
    block: Block,
    witnesses: Vec<Vec<blvm_protocol::segwit::Witness>>,
    stripped_txids: Vec<String>,
    filtered: bool,
}

/// Test double: strips non-empty witness stacks (mimics selective-sync IBD filter).
struct StripWitnessApi;

#[async_trait]
impl ModuleAPI for StripWitnessApi {
    async fn handle_request(
        &self,
        method: &str,
        params: &[u8],
        _caller: &str,
    ) -> Result<Vec<u8>, ModuleError> {
        if method != "filter_block_before_store" {
            return Err(ModuleError::OperationError(format!(
                "unknown method {method}"
            )));
        }
        let mut req: FilterBlockRequest = bincode::deserialize(params)
            .map_err(|e| ModuleError::OperationError(format!("bad params: {e}")))?;
        let mut stripped = Vec::new();
        for (i, tx_witnesses) in req.witnesses.iter_mut().enumerate() {
            let had_data = tx_witnesses.iter().any(|stack| !stack.is_empty());
            if had_data {
                let tx =
                    req.block.transactions.get(i).ok_or_else(|| {
                        ModuleError::OperationError("tx index out of range".into())
                    })?;
                stripped.push(hex::encode(blvm_protocol::block::calculate_tx_id(tx)));
                *tx_witnesses = tx.inputs.iter().map(|_| Vec::new()).collect();
            }
        }
        let response = FilterBlockResponse {
            block: req.block,
            witnesses: req.witnesses,
            stripped_txids: stripped.clone(),
            filtered: !stripped.is_empty(),
        };
        Ok(bincode::serialize(&response)
            .map_err(|e| ModuleError::SerializationError(format!("serialize response: {e}")))?)
    }

    fn list_methods(&self) -> Vec<String> {
        vec!["filter_block_before_store".to_string()]
    }

    fn api_version(&self) -> u32 {
        1
    }
}

fn sample_block_with_witnesses() -> (Block, Arc<Vec<Vec<blvm_protocol::segwit::Witness>>>) {
    let coinbase = Transaction {
        version: 1,
        inputs: vec![TransactionInput {
            prevout: OutPoint {
                hash: [0; 32],
                index: 0xffffffff,
            },
            script_sig: vec![0x01, 0x00],
            sequence: 0xffffffff,
        }]
        .into(),
        outputs: vec![TransactionOutput {
            value: 50_0000_0000,
            script_pubkey: vec![0x51],
        }]
        .into(),
        lock_time: 0,
    };
    let flagged = Transaction {
        version: 1,
        inputs: vec![TransactionInput {
            prevout: OutPoint {
                hash: [1; 32],
                index: 0,
            },
            script_sig: vec![],
            sequence: 0xffffffff,
        }]
        .into(),
        outputs: vec![TransactionOutput {
            value: 1_000,
            script_pubkey: vec![0x51],
        }]
        .into(),
        lock_time: 0,
    };
    let block = Block {
        header: BlockHeader {
            version: 1,
            prev_block_hash: [0; 32],
            merkle_root: [0; 32],
            timestamp: 1,
            bits: 0x1d00ffff,
            nonce: 0,
        },
        transactions: vec![coinbase, flagged].into(),
    };
    let witnesses = Arc::new(vec![vec![vec![]], vec![vec![vec![0x01, 0x02, 0x03]]]]);
    (block, witnesses)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn block_pipeline_routes_filter_block_before_store() {
    let (block, witnesses) = sample_block_with_witnesses();

    // Without pipeline installed: pass-through.
    let witnesses_clone = Arc::clone(&witnesses);
    let block_clone = block.clone();
    let (_, w_pass) = tokio::task::spawn_blocking(move || {
        try_filter_block_before_store(1, block_clone, witnesses_clone)
    })
    .await
    .expect("spawn_blocking");
    assert!(w_pass[1].iter().any(|stack| !stack.is_empty()));

    let registry = Arc::new(ModuleApiRegistry::new());
    let router = Arc::new(ModuleRouter::new(Arc::clone(&registry)));
    registry
        .register_api("selective-sync_test".to_string(), Arc::new(StripWitnessApi))
        .await
        .expect("register_api");
    install_block_pipeline(router);

    let (_, w_filtered) =
        tokio::task::spawn_blocking(move || try_filter_block_before_store(1, block, witnesses))
            .await
            .expect("spawn_blocking");
    assert!(w_filtered[1].iter().all(|stack| stack.is_empty()));
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn block_pipeline_fail_open_without_registered_module() {
    let (block, witnesses) = sample_block_with_witnesses();
    let registry = Arc::new(ModuleApiRegistry::new());
    let router = Arc::new(ModuleRouter::new(Arc::clone(&registry)));
    install_block_pipeline(router);

    let (_, w) =
        tokio::task::spawn_blocking(move || try_filter_block_before_store(1, block, witnesses))
            .await
            .expect("spawn_blocking");
    assert!(
        w[1].iter().any(|stack| !stack.is_empty()),
        "expected fail-open pass-through when no module registers filter_block_before_store"
    );
}