use blvm_protocol::segwit::Witness;
use blvm_protocol::Block;
use std::sync::{Arc, OnceLock, RwLock};
use std::time::Duration;
use tracing::{debug, warn};
use crate::module::inter_module::router::ModuleRouter;
use crate::module::traits::ModuleError;
pub const FILTER_BLOCK_BEFORE_STORE: &str = "filter_block_before_store";
const NODE_CALLER_ID: &str = "blvm-node";
const FILTER_TIMEOUT: Duration = Duration::from_secs(5);
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockRequest {
height: u64,
block: Block,
witnesses: Vec<Vec<Witness>>,
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct FilterBlockResponse {
block: Block,
witnesses: Vec<Vec<Witness>>,
stripped_txids: Vec<String>,
filtered: bool,
}
static PIPELINE_ROUTER: RwLock<Option<Arc<ModuleRouter>>> = RwLock::new(None);
static RUNTIME_HANDLE: OnceLock<tokio::runtime::Handle> = OnceLock::new();
pub fn install_block_pipeline(router: Arc<ModuleRouter>) {
let _ = RUNTIME_HANDLE.set(tokio::runtime::Handle::current());
*PIPELINE_ROUTER.write().expect("pipeline router lock") = Some(router);
debug!("Block pipeline installed");
}
pub fn try_filter_block_before_store(
height: u64,
block: Block,
witnesses: Arc<Vec<Vec<Witness>>>,
) -> (Block, Arc<Vec<Vec<Witness>>>) {
let router = PIPELINE_ROUTER
.read()
.expect("pipeline router lock")
.clone();
let Some(router) = router else {
return (block, witnesses);
};
let Some(runtime_handle) = RUNTIME_HANDLE.get() else {
return (block, witnesses);
};
filter_sync(runtime_handle, &router, height, block, witnesses)
}
fn filter_sync(
runtime_handle: &tokio::runtime::Handle,
router: &Arc<ModuleRouter>,
height: u64,
block: Block,
witnesses: Arc<Vec<Vec<Witness>>>,
) -> (Block, Arc<Vec<Vec<Witness>>>) {
let request = FilterBlockRequest {
height,
block: block.clone(),
witnesses: witnesses.as_ref().clone(),
};
let params = match bincode::serialize(&request) {
Ok(params) => params,
Err(e) => {
warn!("filter_block_before_store serialize failed at height {height}: {e}");
return (block, witnesses);
}
};
let router = Arc::clone(router);
let fut = async move {
router
.route_call(NODE_CALLER_ID, None, FILTER_BLOCK_BEFORE_STORE, ¶ms)
.await
};
let response_bytes = match runtime_handle.block_on(tokio::time::timeout(FILTER_TIMEOUT, fut)) {
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
if !matches!(
&e,
ModuleError::OperationError(msg) if msg.contains("not found")
) {
warn!("filter_block_before_store failed-open at height {height}: {e}");
}
return (block, witnesses);
}
Err(_) => {
warn!(
"filter_block_before_store timed out after {}s at height {height}; storing unfiltered",
FILTER_TIMEOUT.as_secs()
);
return (block, witnesses);
}
};
let response: FilterBlockResponse = match bincode::deserialize(&response_bytes) {
Ok(response) => response,
Err(e) => {
warn!("filter_block_before_store bad response at height {height}: {e}");
return (block, witnesses);
}
};
if response.filtered {
debug!(
"filter_block_before_store height={height} stripped {} tx(s)",
response.stripped_txids.len()
);
}
(response.block, Arc::new(response.witnesses))
}