use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
use crate::module::ipc::protocol::{
RequestMessage, RequestPayload, ResponseMessage, ResponsePayload,
};
use crate::module::security::{PermissionChecker, RequestValidator};
use crate::module::traits::{ModuleError, NodeAPI};
pub struct ModuleApiHub {
node_api: Arc<dyn NodeAPI + Send + Sync>,
permission_checker: PermissionChecker,
request_validator: RequestValidator,
#[allow(dead_code)]
audit_log: VecDeque<AuditEntry>,
#[allow(dead_code)]
max_audit_entries: usize,
rate_limiters: Arc<Mutex<HashMap<String, ModuleRateLimiter>>>,
default_rate_limit_burst: u32,
default_rate_limit_rate: u32,
}
#[derive(Debug, Clone)]
#[allow(dead_code)] pub struct AuditEntry {
pub module_id: String,
pub api_call: String,
pub timestamp: u64,
pub success: bool,
}
struct ModuleRateLimiter {
tokens: u32,
burst_limit: u32,
rate: u32,
last_refill: u64,
}
impl ModuleRateLimiter {
fn new(burst_limit: u32, rate: u32) -> Self {
let now = crate::utils::current_timestamp();
Self {
tokens: burst_limit,
burst_limit,
rate,
last_refill: now,
}
}
fn check_and_consume(&mut self) -> bool {
let now = crate::utils::current_timestamp();
let elapsed = now.saturating_sub(self.last_refill);
if elapsed > 0 {
let tokens_to_add = (elapsed as u32).saturating_mul(self.rate);
self.tokens = self
.tokens
.saturating_add(tokens_to_add)
.min(self.burst_limit);
self.last_refill = now;
}
if self.tokens > 0 {
self.tokens -= 1;
true
} else {
false
}
}
}
impl ModuleApiHub {
pub fn new<A: NodeAPI + Send + Sync + 'static>(node_api: Arc<A>) -> Self {
Self {
node_api,
permission_checker: PermissionChecker::new(),
request_validator: RequestValidator::new(),
audit_log: VecDeque::new(),
max_audit_entries: 1000,
rate_limiters: Arc::new(Mutex::new(HashMap::new())),
default_rate_limit_burst: 1000, default_rate_limit_rate: 100, }
}
fn create_rate_limiter(&self) -> ModuleRateLimiter {
ModuleRateLimiter::new(self.default_rate_limit_burst, self.default_rate_limit_rate)
}
async fn check_module_rate_limit(&self, module_id: &str) -> bool {
let mut limiters = self.rate_limiters.lock().await;
let limiter = limiters
.entry(module_id.to_string())
.or_insert_with(|| self.create_rate_limiter());
limiter.check_and_consume()
}
pub fn register_module_permissions(
&mut self,
module_id: String,
permissions: crate::module::security::permissions::PermissionSet,
) {
self.permission_checker
.register_module_permissions(module_id, permissions);
}
pub async fn unregister_module(&mut self, module_name: &str) {
self.permission_checker
.unregister_module_permissions(module_name);
let mut limiters = self.rate_limiters.lock().await;
limiters.retain(|id, _| id != module_name && !id.starts_with(&format!("{module_name}_")));
}
pub async fn handle_request(
&mut self,
module_id: &str,
request: RequestMessage,
) -> Result<ResponseMessage, ModuleError> {
debug!(
"API hub handling request from module {}: {:?}",
module_id, request.payload
);
self.permission_checker
.check_api_call(module_id, &request.payload)?;
match self
.request_validator
.validate_request(module_id, &request.payload)?
{
crate::module::security::ValidationResult::Allowed => {}
crate::module::security::ValidationResult::Denied(reason) => {
return Err(ModuleError::OperationError(format!(
"Request denied: {reason}"
)));
}
}
if let RequestPayload::Handshake {
module_id: handshake_id,
module_name,
version,
} = &request.payload
{
if handshake_id != module_id {
return Err(ModuleError::OperationError(format!(
"Handshake module_id mismatch: expected {module_id}, got {handshake_id}"
)));
}
info!(
"Handshake from module {} (name={}, version={})",
module_id, module_name, version
);
return Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::HandshakeAck {
node_version: env!("CARGO_PKG_VERSION").to_string(),
},
));
}
let operation_id = Self::get_operation_id(&request.payload);
self.request_validator
.validate_resource_limits(module_id, operation_id)?;
let response = match &request.payload {
RequestPayload::Handshake { .. } => {
return Err(crate::module::traits::ModuleError::IpcError(
"Handshake should be handled at connection level".to_string(),
));
}
RequestPayload::RegisterCliSpec { .. } => {
return Err(crate::module::traits::ModuleError::IpcError(
"RegisterCliSpec should be handled at connection level".to_string(),
));
}
RequestPayload::GetBlock { hash } => {
let block = self.node_api.get_block(hash).await?;
ResponsePayload::Block(block)
}
RequestPayload::GetBlockHeader { hash } => {
let header = self.node_api.get_block_header(hash).await?;
ResponsePayload::BlockHeader(header)
}
RequestPayload::GetTransaction { hash } => {
let tx = self.node_api.get_transaction(hash).await?;
ResponsePayload::Transaction(tx)
}
RequestPayload::HasTransaction { hash } => {
let exists = self.node_api.has_transaction(hash).await?;
ResponsePayload::Bool(exists)
}
RequestPayload::GetChainTip => {
let tip = self.node_api.get_chain_tip().await?;
ResponsePayload::Hash(tip)
}
RequestPayload::GetBlockHeight => {
let height = self.node_api.get_block_height().await?;
ResponsePayload::U64(height)
}
RequestPayload::GetUtxo { outpoint } => {
let utxo = self.node_api.get_utxo(outpoint).await?;
ResponsePayload::Utxo(utxo)
}
RequestPayload::SubscribeEvents { event_types: _ } => {
ResponsePayload::SubscribeAck
}
RequestPayload::GetMempoolTransactions => {
let txs = self.node_api.get_mempool_transactions().await?;
ResponsePayload::MempoolTransactions(txs)
}
RequestPayload::GetMempoolTransaction { tx_hash } => {
let tx = self.node_api.get_mempool_transaction(tx_hash).await?;
ResponsePayload::MempoolTransaction(tx)
}
RequestPayload::GetMempoolSize => {
let size = self.node_api.get_mempool_size().await?;
ResponsePayload::MempoolSize(size)
}
RequestPayload::GetNetworkStats => {
let stats = self.node_api.get_network_stats().await?;
ResponsePayload::NetworkStats(stats)
}
RequestPayload::GetNetworkPeers => {
let peers = self.node_api.get_network_peers().await?;
ResponsePayload::NetworkPeers(peers)
}
RequestPayload::GetChainInfo => {
let info = self.node_api.get_chain_info().await?;
ResponsePayload::ChainInfo(info)
}
RequestPayload::GetBlockByHeight { height } => {
let block = self.node_api.get_block_by_height(*height).await?;
ResponsePayload::BlockByHeight(block)
}
RequestPayload::GetLightningNodeUrl => {
let url = self.node_api.get_lightning_node_url().await?;
ResponsePayload::LightningNodeUrl(url)
}
RequestPayload::GetLightningInfo => {
let info = self.node_api.get_lightning_info().await?;
ResponsePayload::LightningInfo(info)
}
RequestPayload::GetPaymentState { payment_id } => {
let state = self.node_api.get_payment_state(payment_id).await?;
ResponsePayload::PaymentState(state)
}
RequestPayload::CheckTransactionInMempool { tx_hash } => {
let exists = self.node_api.check_transaction_in_mempool(tx_hash).await?;
ResponsePayload::CheckTransactionInMempool(exists)
}
RequestPayload::GetFeeEstimate { target_blocks } => {
let fee_rate = self.node_api.get_fee_estimate(*target_blocks).await?;
ResponsePayload::FeeEstimate(fee_rate)
}
RequestPayload::ReadFile { path } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.read_file(path.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
let data = result?;
ResponsePayload::FileData(data)
}
RequestPayload::WriteFile { path, data } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.write_file(path.clone(), data.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
result?;
ResponsePayload::Bool(true)
}
RequestPayload::DeleteFile { path } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.delete_file(path.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
result?;
ResponsePayload::Bool(true)
}
RequestPayload::ListDirectory { path } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.list_directory(path.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
let entries = result?;
ResponsePayload::DirectoryListing(entries)
}
RequestPayload::CreateDirectory { path } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.create_directory(path.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
result?;
ResponsePayload::Bool(true)
}
RequestPayload::GetFileMetadata { path } => {
crate::module::api::node_api::NodeApiImpl::set_current_module_id(
module_id.to_string(),
);
let result = self.node_api.get_file_metadata(path.clone()).await;
crate::module::api::node_api::NodeApiImpl::clear_current_module_id();
let metadata = result?;
ResponsePayload::FileMetadata(metadata)
}
RequestPayload::ReportMetric { metric } => {
self.node_api.report_metric(metric.clone()).await?;
ResponsePayload::MetricReported
}
RequestPayload::GetModuleMetrics { module_id } => {
let metrics = self.node_api.get_module_metrics(module_id).await?;
ResponsePayload::ModuleMetrics(metrics)
}
RequestPayload::GetAllMetrics => {
let metrics = self.node_api.get_all_metrics().await?;
ResponsePayload::AllMetrics(metrics)
}
RequestPayload::DiscoverModules => {
let modules = self.node_api.discover_modules().await?;
ResponsePayload::ModuleList(modules)
}
RequestPayload::GetModuleInfo {
module_id: target_module_id,
} => {
let info = self.node_api.get_module_info(target_module_id).await?;
ResponsePayload::ModuleInfo(info)
}
RequestPayload::IsModuleAvailable {
module_id: target_module_id,
} => {
let available = self.node_api.is_module_available(target_module_id).await?;
ResponsePayload::ModuleAvailable(available)
}
RequestPayload::PublishEvent {
event_type,
payload,
} => {
self.node_api
.publish_event(*event_type, payload.clone())
.await?;
ResponsePayload::EventPublished
}
RequestPayload::CallModule {
target_module_id,
method,
params,
} => {
let response = self
.node_api
.call_module(target_module_id.as_deref(), method, params.clone())
.await?;
ResponsePayload::ModuleApiResponse(response)
}
RequestPayload::RegisterModuleApi { .. } => {
return Err(crate::module::traits::ModuleError::OperationError(
"Module API registration must be done via register_module_api() method, not IPC".to_string(),
));
}
RequestPayload::UnregisterModuleApi => {
self.node_api.unregister_module_api().await?;
ResponsePayload::ModuleApiUnregistered
}
RequestPayload::GetModuleHealth { module_id } => {
let health = self.node_api.get_module_health(module_id).await?;
ResponsePayload::ModuleHealth(health)
}
RequestPayload::GetAllModuleHealth => {
let health = self.node_api.get_all_module_health().await?;
ResponsePayload::AllModuleHealth(health)
}
RequestPayload::ReportModuleHealth { health } => {
self.node_api.report_module_health(health.clone()).await?;
ResponsePayload::HealthReported
}
RequestPayload::SendMeshPacketToPeer {
peer_addr,
packet_data,
} => {
self.node_api
.send_mesh_packet_to_peer(peer_addr.clone(), packet_data.clone())
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::SendStratumV2MessageToPeer {
peer_addr,
message_data,
} => {
self.node_api
.send_stratum_v2_message_to_peer(peer_addr.clone(), message_data.clone())
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::GetBlockTemplate {
rules,
coinbase_script,
coinbase_address,
} => {
let template = self
.node_api
.get_block_template(
rules.clone(),
coinbase_script.clone(),
coinbase_address.clone(),
)
.await?;
ResponsePayload::BlockTemplate(template)
}
RequestPayload::SubmitBlock { block } => {
let result = self.node_api.submit_block(block.clone()).await?;
ResponsePayload::SubmitBlockResult(result)
}
RequestPayload::MergeBlockServeDenylist { block_hashes } => {
self.node_api
.merge_block_serve_denylist(block_hashes.as_slice())
.await?;
ResponsePayload::BlockServeDenylistMerged
}
RequestPayload::GetBlockServeDenylistSnapshot => {
let s = self.node_api.get_block_serve_denylist_snapshot().await?;
ResponsePayload::BlockServeDenylistSnapshot(s)
}
RequestPayload::ClearBlockServeDenylist => {
self.node_api.clear_block_serve_denylist().await?;
ResponsePayload::Bool(true)
}
RequestPayload::ReplaceBlockServeDenylist { block_hashes } => {
self.node_api
.replace_block_serve_denylist(block_hashes.as_slice())
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::MergeTxServeDenylist { tx_hashes } => {
self.node_api
.merge_tx_serve_denylist(tx_hashes.as_slice())
.await?;
ResponsePayload::TxServeDenylistMerged
}
RequestPayload::GetTxServeDenylistSnapshot => {
let s = self.node_api.get_tx_serve_denylist_snapshot().await?;
ResponsePayload::TxServeDenylistSnapshot(s)
}
RequestPayload::ClearTxServeDenylist => {
self.node_api.clear_tx_serve_denylist().await?;
ResponsePayload::Bool(true)
}
RequestPayload::ReplaceTxServeDenylist { tx_hashes } => {
self.node_api
.replace_tx_serve_denylist(tx_hashes.as_slice())
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::GetSyncStatus => {
let s = self.node_api.get_sync_status().await?;
ResponsePayload::NodeSyncStatus(s)
}
RequestPayload::BanPeer {
peer_addr,
ban_duration_seconds,
} => {
self.node_api
.ban_peer(peer_addr.as_str(), *ban_duration_seconds)
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::SetBlockServeMaintenanceMode { enabled } => {
self.node_api
.set_block_serve_maintenance_mode(*enabled)
.await?;
ResponsePayload::Bool(true)
}
RequestPayload::RegisterRpcEndpoint {
method,
description,
} => {
self.node_api
.register_rpc_endpoint(method.clone(), description.clone())
.await?;
ResponsePayload::RpcEndpointRegistered
}
RequestPayload::UnregisterRpcEndpoint { method } => {
self.node_api.unregister_rpc_endpoint(method).await?;
ResponsePayload::RpcEndpointUnregistered
}
RequestPayload::RegisterTimer { .. } => {
return Err(crate::module::traits::ModuleError::OperationError(
"Timer registration requires a callback which cannot be serialized over IPC. \
Use module-side timer management (e.g. tokio::spawn with sleep)."
.to_string(),
));
}
RequestPayload::CancelTimer { .. } => {
return Err(crate::module::traits::ModuleError::OperationError(
"Timer cancellation not supported over IPC. Use module-side timer management."
.to_string(),
));
}
RequestPayload::ScheduleTask { .. } => {
return Err(crate::module::traits::ModuleError::OperationError(
"Task scheduling requires a callback which cannot be serialized over IPC. \
Use module-side task management (e.g. tokio::spawn)."
.to_string(),
));
}
#[allow(unreachable_patterns)]
other => {
return Err(crate::module::traits::ModuleError::OperationError(format!(
"Unimplemented request payload: {other:?}"
)))
}
};
self.log_audit(module_id.to_string(), operation_id.to_string(), true);
Ok(ResponseMessage::success(request.correlation_id, response))
}
#[inline]
fn get_operation_id(payload: &RequestPayload) -> &'static str {
match payload {
RequestPayload::Handshake { .. } => "handshake",
RequestPayload::GetBlock { .. } => "get_block",
RequestPayload::GetBlockHeader { .. } => "get_block_header",
RequestPayload::GetTransaction { .. } => "get_transaction",
RequestPayload::HasTransaction { .. } => "has_transaction",
RequestPayload::GetChainTip => "get_chain_tip",
RequestPayload::GetBlockHeight => "get_block_height",
RequestPayload::GetUtxo { .. } => "get_utxo",
RequestPayload::SubscribeEvents { .. } => "subscribe_events",
RequestPayload::GetMempoolTransactions => "get_mempool_transactions",
RequestPayload::GetMempoolTransaction { .. } => "get_mempool_transaction",
RequestPayload::GetMempoolSize => "get_mempool_size",
RequestPayload::GetNetworkStats => "get_network_stats",
RequestPayload::GetNetworkPeers => "get_network_peers",
RequestPayload::GetChainInfo => "get_chain_info",
RequestPayload::GetBlockByHeight { .. } => "get_block_by_height",
RequestPayload::GetLightningNodeUrl => "get_lightning_node_url",
RequestPayload::GetLightningInfo => "get_lightning_info",
RequestPayload::GetPaymentState { .. } => "get_payment_state",
RequestPayload::CheckTransactionInMempool { .. } => "check_transaction_in_mempool",
RequestPayload::GetFeeEstimate { .. } => "get_fee_estimate",
RequestPayload::GetBlockTemplate { .. } => "get_block_template",
RequestPayload::SubmitBlock { .. } => "submit_block",
RequestPayload::MergeBlockServeDenylist { .. } => "merge_block_serve_denylist",
RequestPayload::GetBlockServeDenylistSnapshot => "get_block_serve_denylist_snapshot",
RequestPayload::ClearBlockServeDenylist => "clear_block_serve_denylist",
RequestPayload::ReplaceBlockServeDenylist { .. } => "replace_block_serve_denylist",
RequestPayload::MergeTxServeDenylist { .. } => "merge_tx_serve_denylist",
RequestPayload::GetTxServeDenylistSnapshot => "get_tx_serve_denylist_snapshot",
RequestPayload::ClearTxServeDenylist => "clear_tx_serve_denylist",
RequestPayload::ReplaceTxServeDenylist { .. } => "replace_tx_serve_denylist",
RequestPayload::GetSyncStatus => "get_sync_status",
RequestPayload::BanPeer { .. } => "ban_peer",
RequestPayload::SetBlockServeMaintenanceMode { .. } => {
"set_block_serve_maintenance_mode"
}
RequestPayload::ReadFile { .. } => "read_file",
RequestPayload::WriteFile { .. } => "write_file",
RequestPayload::DeleteFile { .. } => "delete_file",
RequestPayload::ListDirectory { .. } => "list_directory",
RequestPayload::CreateDirectory { .. } => "create_directory",
RequestPayload::GetFileMetadata { .. } => "get_file_metadata",
RequestPayload::RegisterRpcEndpoint { .. } => "register_rpc_endpoint",
RequestPayload::UnregisterRpcEndpoint { .. } => "unregister_rpc_endpoint",
RequestPayload::RegisterTimer { .. } => "register_timer",
RequestPayload::CancelTimer { .. } => "cancel_timer",
RequestPayload::ScheduleTask { .. } => "schedule_task",
RequestPayload::ReportMetric { .. } => "report_metric",
RequestPayload::GetModuleMetrics { .. } => "get_module_metrics",
RequestPayload::GetAllMetrics => "get_all_metrics",
RequestPayload::DiscoverModules => "discover_modules",
RequestPayload::GetModuleInfo { .. } => "get_module_info",
RequestPayload::IsModuleAvailable { .. } => "is_module_available",
RequestPayload::PublishEvent { .. } => "publish_event",
RequestPayload::CallModule { .. } => "call_module",
RequestPayload::RegisterModuleApi { .. } => "register_module_api",
RequestPayload::UnregisterModuleApi => "unregister_module_api",
RequestPayload::GetModuleHealth { .. } => "get_module_health",
RequestPayload::GetAllModuleHealth => "get_all_module_health",
RequestPayload::ReportModuleHealth { .. } => "report_module_health",
RequestPayload::SendMeshPacketToPeer { .. } => "send_mesh_packet_to_peer",
RequestPayload::SendStratumV2MessageToPeer { .. } => "send_stratum_v2_message_to_peer",
RequestPayload::RegisterCliSpec { .. } => "register_cli_spec",
}
}
fn log_audit(&mut self, module_id: String, api_call: String, success: bool) {
let timestamp = crate::utils::current_timestamp();
let entry = AuditEntry {
module_id: module_id.clone(),
api_call: api_call.clone(),
timestamp,
success,
};
self.audit_log.push_back(entry.clone());
while self.audit_log.len() > self.max_audit_entries {
self.audit_log.pop_front();
}
use tracing::{info, warn};
if success {
info!(
target: "blvm_node::module::audit",
module_id = %module_id,
api_call = %api_call,
timestamp = timestamp,
"Module API call"
);
} else {
warn!(
target: "blvm_node::module::audit",
module_id = %module_id,
api_call = %api_call,
timestamp = timestamp,
"Module API call failed"
);
}
}
pub fn get_audit_log(&self, limit: usize) -> Vec<AuditEntry> {
let start = self.audit_log.len().saturating_sub(limit);
self.audit_log.range(start..).cloned().collect()
}
}