use async_trait::async_trait;
use std::sync::Arc;
use tokio::sync::mpsc;
use tracing::debug;
thread_local! {
static CURRENT_MODULE_ID: std::cell::RefCell<Option<String>> = const { std::cell::RefCell::new(None) };
}
use crate::module::api::events::EventManager;
use crate::module::hooks::HookManager;
use crate::module::ipc::protocol::EventPayload;
use crate::module::ipc::protocol::ModuleMessage;
use crate::module::metrics::manager::{Metric, MetricsManager};
use crate::module::traits::{
module_error_msg, BlockServeDenylistSnapshot, ChainInfo, EventType, LightningInfo, MempoolSize,
ModuleError, ModuleInfo, ModuleState, NetworkStats, NodeAPI, PaymentState, PeerInfo,
SubmitBlockResult, SyncStatus, TxServeDenylistSnapshot,
};
use crate::network::{transport::TransportAddr, NetworkManager};
use crate::node::mempool::MempoolManager;
use crate::storage::Storage;
use crate::{Block, BlockHeader, Hash, OutPoint, Transaction, UTXO};
use hex;
pub struct NodeApiImpl {
storage: Arc<Storage>,
event_manager: Option<Arc<EventManager>>,
module_id: Option<String>,
mempool_manager: Option<Arc<MempoolManager>>,
network_manager: Option<Arc<NetworkManager>>,
rpc_server: Option<Arc<crate::rpc::server::RpcServer>>,
hook_manager: Option<Arc<tokio::sync::RwLock<HookManager>>>,
timer_manager: Option<Arc<crate::module::timers::manager::TimerManager>>,
module_id_for_timers: Option<String>,
metrics_manager: Option<Arc<MetricsManager>>,
module_id_for_metrics: Option<String>,
filesystem_sandbox: Option<Arc<crate::module::sandbox::filesystem::FileSystemSandbox>>,
module_data_dir: Option<std::path::PathBuf>,
module_filesystem_sandboxes: Arc<
tokio::sync::RwLock<
std::collections::HashMap<
String,
Arc<crate::module::sandbox::filesystem::FileSystemSandbox>,
>,
>,
>,
module_data_dirs:
Arc<tokio::sync::RwLock<std::collections::HashMap<String, std::path::PathBuf>>>,
ipc_server: Option<Arc<tokio::sync::Mutex<crate::module::ipc::server::ModuleIpcServer>>>,
sync_coordinator: Option<Arc<tokio::sync::Mutex<crate::node::sync::SyncCoordinator>>>,
payment_state_machine: Option<Arc<crate::payment::state_machine::PaymentStateMachine>>,
module_manager: Option<Arc<tokio::sync::Mutex<crate::module::manager::ModuleManager>>>,
module_api_registry: Option<Arc<crate::module::inter_module::registry::ModuleApiRegistry>>,
module_router: Option<Arc<crate::module::inter_module::router::ModuleRouter>>,
current_module_id_for_api: Option<String>,
}
impl NodeApiImpl {
pub fn new(storage: Arc<Storage>) -> Self {
let storage_clone = Arc::clone(&storage);
Self {
storage,
event_manager: None,
module_id: None,
mempool_manager: None,
network_manager: None,
rpc_server: None,
hook_manager: None,
timer_manager: None,
module_id_for_timers: None,
metrics_manager: None,
module_id_for_metrics: None,
filesystem_sandbox: None,
module_data_dir: None,
module_filesystem_sandboxes: Arc::new(tokio::sync::RwLock::new(
std::collections::HashMap::new(),
)),
module_data_dirs: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
ipc_server: None,
sync_coordinator: None,
payment_state_machine: None,
module_manager: None,
module_api_registry: None,
module_router: None,
current_module_id_for_api: None,
}
}
pub fn with_event_manager(
storage: Arc<Storage>,
event_manager: Arc<EventManager>,
module_id: String,
) -> Self {
Self {
storage,
event_manager: Some(event_manager),
module_id: Some(module_id),
mempool_manager: None,
network_manager: None,
rpc_server: None,
hook_manager: None,
timer_manager: None,
module_id_for_timers: None,
metrics_manager: None,
module_id_for_metrics: None,
filesystem_sandbox: None,
module_data_dir: None,
module_filesystem_sandboxes: Arc::new(tokio::sync::RwLock::new(
std::collections::HashMap::new(),
)),
module_data_dirs: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
ipc_server: None,
sync_coordinator: None,
payment_state_machine: None,
module_manager: None,
module_api_registry: None,
module_router: None,
current_module_id_for_api: None,
}
}
pub fn with_dependencies(
storage: Arc<Storage>,
event_manager: Option<Arc<EventManager>>,
module_id: Option<String>,
mempool_manager: Option<Arc<MempoolManager>>,
network_manager: Option<Arc<NetworkManager>>,
) -> Self {
let storage_clone = Arc::clone(&storage);
Self {
storage,
event_manager,
module_id,
mempool_manager,
network_manager,
rpc_server: None,
hook_manager: None,
ipc_server: None,
timer_manager: None,
module_id_for_timers: None,
metrics_manager: None,
module_id_for_metrics: None,
filesystem_sandbox: None,
module_data_dir: None,
module_filesystem_sandboxes: Arc::new(tokio::sync::RwLock::new(
std::collections::HashMap::new(),
)),
module_data_dirs: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
sync_coordinator: None,
payment_state_machine: None,
module_manager: None,
module_api_registry: None,
module_router: None,
current_module_id_for_api: None,
}
}
pub fn set_module_manager(
&mut self,
module_manager: Arc<tokio::sync::Mutex<crate::module::manager::ModuleManager>>,
) {
self.module_manager = Some(module_manager);
}
pub fn set_module_api_registry(
&mut self,
registry: Arc<crate::module::inter_module::registry::ModuleApiRegistry>,
router: Arc<crate::module::inter_module::router::ModuleRouter>,
) {
self.module_api_registry = Some(registry);
self.module_router = Some(router);
}
pub fn set_current_module_id_for_api(&mut self, module_id: String) {
self.current_module_id_for_api = Some(module_id);
}
pub async fn initialize_module(
&self,
module_id: String,
module_data_dir: std::path::PathBuf,
base_data_dir: std::path::PathBuf,
) -> Result<(), ModuleError> {
let sandbox = Arc::new(crate::module::sandbox::filesystem::FileSystemSandbox::new(
&base_data_dir,
));
{
let mut sandboxes = self.module_filesystem_sandboxes.write().await;
sandboxes.insert(module_id.clone(), sandbox);
}
{
let mut dirs = self.module_data_dirs.write().await;
dirs.insert(module_id.clone(), module_data_dir);
}
Ok(())
}
fn get_module_id(&self) -> Option<String> {
CURRENT_MODULE_ID
.with(|id| id.borrow().clone())
.or_else(|| self.module_id.clone())
}
pub fn set_current_module_id(module_id: String) {
CURRENT_MODULE_ID.with(|id| {
*id.borrow_mut() = Some(module_id);
});
}
pub fn clear_current_module_id() {
CURRENT_MODULE_ID.with(|id| {
*id.borrow_mut() = None;
});
}
pub fn set_filesystem_access(
&mut self,
sandbox: Arc<crate::module::sandbox::filesystem::FileSystemSandbox>,
data_dir: std::path::PathBuf,
) {
self.filesystem_sandbox = Some(sandbox);
self.module_data_dir = Some(data_dir);
}
pub fn set_hook_manager(&mut self, hook_manager: Arc<tokio::sync::RwLock<HookManager>>) {
self.hook_manager = Some(hook_manager);
}
pub fn set_timer_manager(
&mut self,
timer_manager: Arc<crate::module::timers::manager::TimerManager>,
module_id: String,
) {
self.timer_manager = Some(timer_manager);
self.module_id_for_timers = Some(module_id);
}
pub fn set_rpc_server(&mut self, rpc_server: Arc<crate::rpc::server::RpcServer>) {
self.rpc_server = Some(rpc_server);
}
pub fn set_event_manager(&mut self, event_manager: Arc<EventManager>, module_id: String) {
self.event_manager = Some(event_manager);
self.module_id = Some(module_id);
}
pub fn set_mempool_manager(&mut self, mempool_manager: Arc<MempoolManager>) {
self.mempool_manager = Some(mempool_manager);
}
pub fn set_network_manager(&mut self, network_manager: Arc<NetworkManager>) {
self.network_manager = Some(network_manager);
}
pub fn set_sync_coordinator(
&mut self,
sync_coordinator: Arc<tokio::sync::Mutex<crate::node::sync::SyncCoordinator>>,
) {
self.sync_coordinator = Some(sync_coordinator);
}
pub fn set_payment_state_machine(
&mut self,
payment_state_machine: Arc<crate::payment::state_machine::PaymentStateMachine>,
) {
self.payment_state_machine = Some(payment_state_machine);
}
fn calculate_difficulty_from_bits_helper(&self, bits: u64) -> f64 {
blvm_protocol::pow::difficulty_from_bits(bits).unwrap_or(1.0)
}
}
#[async_trait]
impl NodeAPI for NodeApiImpl {
async fn get_block(&self, hash: &Hash) -> Result<Option<Block>, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let hash = *hash;
move || {
storage
.blocks()
.get_block(&hash)
.map_err(|e| ModuleError::op_err("Failed to get block", e))
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_block_header(&self, hash: &Hash) -> Result<Option<BlockHeader>, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let hash = *hash;
move || {
storage
.blocks()
.get_header(&hash)
.map_err(|e| ModuleError::op_err("Failed to get block header", e))
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_transaction(&self, hash: &Hash) -> Result<Option<Transaction>, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let hash = *hash;
move || {
storage
.transactions()
.get_transaction(&hash)
.map_err(|e| ModuleError::op_err("Failed to get transaction", e))
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn has_transaction(&self, hash: &Hash) -> Result<bool, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let hash = *hash;
move || {
storage
.transactions()
.has_transaction(&hash)
.map_err(|e| ModuleError::op_err("Failed to check transaction existence", e))
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_block_height(&self) -> Result<u64, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
move || {
storage
.chain()
.get_height()
.map_err(|e| ModuleError::op_err("Failed to get block height", e))?
.ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::CHAIN_NOT_YET_INITIALIZED.to_string(),
)
})
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_chain_tip(&self) -> Result<Hash, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
move || {
storage
.chain()
.get_tip_hash()
.map_err(|e| ModuleError::op_err("Failed to get chain tip", e))?
.ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::CHAIN_NOT_YET_INITIALIZED.to_string(),
)
})
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_utxo(&self, outpoint: &OutPoint) -> Result<Option<UTXO>, ModuleError> {
let outpoint_clone = *outpoint;
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
move || {
storage
.utxos()
.get_utxo(&outpoint_clone)
.map_err(|e| ModuleError::op_err("Failed to get UTXO", e))
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn subscribe_events(
&self,
event_types: Vec<EventType>,
) -> Result<mpsc::Receiver<ModuleMessage>, ModuleError> {
let (tx, rx) = mpsc::channel(100);
if let (Some(event_manager), Some(module_id)) = (&self.event_manager, &self.module_id) {
event_manager
.subscribe_module(module_id.clone(), event_types, tx)
.await?;
} else {
tracing::debug!(
"Event manager not available for subscribe_events - returning empty receiver"
);
}
Ok(rx)
}
async fn get_mempool_transactions(&self) -> Result<Vec<Hash>, ModuleError> {
let mempool = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
Ok(mempool.transaction_hashes())
}
async fn get_mempool_transaction(
&self,
tx_hash: &Hash,
) -> Result<Option<Transaction>, ModuleError> {
let mempool = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
Ok(mempool.get_transaction(tx_hash))
}
async fn get_mempool_size(&self) -> Result<MempoolSize, ModuleError> {
if let Some(hook_mgr) = &self.hook_manager {
let hooks = hook_mgr.read().await;
if let Some(cached_stats) = hooks.get_mempool_stats_cached().await {
return Ok(cached_stats);
}
}
let mempool = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
let transaction_count = mempool.size();
let transactions = mempool.get_transactions();
let size_bytes: usize = transactions
.iter()
.map(|tx| {
bincode::serialize(tx).map(|bytes| bytes.len()).unwrap_or(0)
})
.sum();
let total_fee_sats = tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let transactions_clone = transactions.clone();
move || {
let mut total_fee = 0u64;
for tx in transactions_clone {
if tx.inputs.is_empty() || tx.inputs[0].prevout.hash == [0u8; 32] {
continue;
}
let mut input_total = 0u64;
for input in &tx.inputs {
if let Ok(Some(utxo)) = storage.utxos().get_utxo(&input.prevout) {
input_total = input_total.saturating_add(utxo.value as u64);
}
}
let output_total: u64 = tx.outputs.iter().map(|out| out.value as u64).sum();
let fee = input_total.saturating_sub(output_total);
total_fee = total_fee.saturating_add(fee);
}
total_fee
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?;
Ok(MempoolSize {
transaction_count,
size_bytes,
total_fee_sats,
})
}
async fn get_network_stats(&self) -> Result<NetworkStats, ModuleError> {
let network = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string())
})?;
let peer_count = network.peer_count();
let hash_rate = tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
move || {
if let Ok(Some(chain_info)) = storage.chain().load_chain_info() {
let difficulty =
blvm_protocol::pow::difficulty_from_bits(chain_info.tip_header.bits)
.unwrap_or(1.0);
difficulty * 4294967296.0 / 600.0
} else {
0.0
}
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?;
Ok(NetworkStats {
peer_count,
hash_rate,
bytes_sent: 0,
bytes_received: 0,
})
}
async fn get_network_peers(&self) -> Result<Vec<PeerInfo>, ModuleError> {
let network = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string())
})?;
let peer_manager_guard = network.peer_manager().await;
let mut peers = Vec::new();
for transport_addr in peer_manager_guard.peer_addresses() {
if let Some(peer) = peer_manager_guard.get_peer(&transport_addr) {
let addr_str = match transport_addr {
TransportAddr::Tcp(addr) => addr.to_string(),
#[cfg(feature = "quinn")]
TransportAddr::Quinn(addr) => addr.to_string(),
#[cfg(feature = "iroh")]
TransportAddr::Iroh(ref node_id) => format!("iroh:{}", hex::encode(node_id)),
};
let transport_type = match transport_addr {
TransportAddr::Tcp(_) => "tcp".to_string(),
#[cfg(feature = "quinn")]
TransportAddr::Quinn(_) => "quinn".to_string(),
#[cfg(feature = "iroh")]
TransportAddr::Iroh(_) => "iroh".to_string(),
};
let version = peer.version();
peers.push(PeerInfo {
addr: addr_str,
transport_type,
services: peer.services(),
version,
connected_since: peer.conntime(),
});
}
}
Ok(peers)
}
async fn get_chain_info(&self) -> Result<ChainInfo, ModuleError> {
let tip = self.get_chain_tip().await?;
let height = self.get_block_height().await?;
let (difficulty, chain_work, is_synced) = tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let tip_clone = tip;
move || {
let difficulty = if let Ok(Some(tip_header)) = storage.chain().get_tip_header() {
blvm_protocol::pow::difficulty_from_bits(tip_header.bits).unwrap_or(1.0) as u32
} else {
0
};
let chain_work = storage
.chain()
.get_chainwork(&tip_clone)
.ok()
.flatten()
.unwrap_or(0) as u64;
(difficulty, chain_work, true) }
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?;
let is_synced = if let Some(ref sync_coord) = self.sync_coordinator {
let sync_guard = sync_coord.lock().await;
sync_guard.is_synced()
} else {
height > 0
};
Ok(ChainInfo {
tip_hash: tip,
height,
difficulty,
chain_work,
is_synced,
})
}
async fn get_block_by_height(&self, height: u64) -> Result<Option<Block>, ModuleError> {
tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
move || {
storage
.blocks()
.get_hash_by_height(height)
.map_err(|e| ModuleError::op_err("Failed to get hash by height", e))?
.and_then(|hash| {
storage
.blocks()
.get_block(&hash)
.map_err(|e| ModuleError::op_err("Failed to get block", e))
.transpose()
})
.transpose()
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?
}
async fn get_lightning_node_url(&self) -> Result<Option<String>, ModuleError> {
Ok(None)
}
async fn get_lightning_info(&self) -> Result<Option<LightningInfo>, ModuleError> {
Ok(None)
}
async fn get_payment_state(
&self,
payment_id: &str,
) -> Result<Option<PaymentState>, ModuleError> {
let payment_state_machine = self.payment_state_machine.as_ref().ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::PAYMENT_STATE_MACHINE_NOT_AVAILABLE.to_string(),
)
})?;
match payment_state_machine.get_payment_state(payment_id).await {
Ok(state) => {
match state {
crate::payment::state_machine::PaymentState::RequestCreated { request_id } => {
Ok(Some(PaymentState {
payment_id: request_id,
status: "pending".to_string(),
amount_sats: 0, tx_hash: None,
confirmations: None,
}))
}
crate::payment::state_machine::PaymentState::ProofCreated {
request_id,
..
}
| crate::payment::state_machine::PaymentState::ProofBroadcast {
request_id,
..
} => Ok(Some(PaymentState {
payment_id: request_id,
status: "pending".to_string(),
amount_sats: 0,
tx_hash: None,
confirmations: None,
})),
crate::payment::state_machine::PaymentState::InMempool {
request_id,
tx_hash,
} => Ok(Some(PaymentState {
payment_id: request_id,
status: "pending".to_string(),
amount_sats: 0,
tx_hash: Some(tx_hash),
confirmations: Some(0),
})),
crate::payment::state_machine::PaymentState::Settled {
request_id,
tx_hash,
confirmation_count,
..
} => Ok(Some(PaymentState {
payment_id: request_id,
status: "confirmed".to_string(),
amount_sats: 0,
tx_hash: Some(tx_hash),
confirmations: Some(confirmation_count),
})),
crate::payment::state_machine::PaymentState::ReorgPending {
request_id,
tx_hash,
..
} => Ok(Some(PaymentState {
payment_id: request_id,
status: "reorg_pending".to_string(),
amount_sats: 0,
tx_hash: Some(tx_hash),
confirmations: Some(0),
})),
crate::payment::state_machine::PaymentState::Failed {
request_id,
reason: _,
} => Ok(Some(PaymentState {
payment_id: request_id,
status: "failed".to_string(),
amount_sats: 0,
tx_hash: None,
confirmations: None,
})),
}
}
Err(_) => Ok(None), }
}
async fn check_transaction_in_mempool(&self, tx_hash: &Hash) -> Result<bool, ModuleError> {
let mempool = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
Ok(mempool.get_transaction(tx_hash).is_some())
}
async fn get_fee_estimate(&self, target_blocks: u32) -> Result<u64, ModuleError> {
if let Some(hook_mgr) = &self.hook_manager {
let hooks = hook_mgr.read().await;
if let Some(cached_estimate) = hooks.get_fee_estimate_cached(target_blocks).await {
return Ok(cached_estimate);
}
}
let mempool = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
let transactions = mempool.get_transactions();
if transactions.is_empty() {
return Ok(1);
}
let fee_rates = tokio::task::spawn_blocking({
let storage = Arc::clone(&self.storage);
let transactions_clone = transactions.clone();
move || {
let mut fee_rates = Vec::new();
for tx in transactions_clone {
if tx.inputs.is_empty() {
continue;
}
let mut input_total = 0u64;
for input in &tx.inputs {
if let Ok(Some(utxo)) = storage.utxos().get_utxo(&input.prevout) {
input_total = input_total.saturating_add(utxo.value as u64);
}
}
let output_total: u64 = tx.outputs.iter().map(|out| out.value as u64).sum();
let fee = input_total.saturating_sub(output_total);
let mut size = 8; for input in &tx.inputs {
size += 36 + input.script_sig.len() + 4; }
for output in &tx.outputs {
size += 8 + output.script_pubkey.len(); }
if size > 0 {
let fee_rate = fee / size as u64;
fee_rates.push(fee_rate);
}
}
fee_rates
}
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?;
let mut fee_rates = fee_rates;
if fee_rates.is_empty() {
return Ok(1);
}
fee_rates.sort();
let median_idx = fee_rates.len() / 2;
let median_fee_rate = fee_rates[median_idx];
let adjusted_fee_rate = if target_blocks > 6 {
median_fee_rate / 2 } else if target_blocks > 1 {
median_fee_rate } else {
median_fee_rate * 2 };
Ok(adjusted_fee_rate.max(1)) }
async fn register_rpc_endpoint(
&self,
method: String,
description: String,
) -> Result<(), ModuleError> {
let rpc_server = self.rpc_server.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::RPC_SERVER_NOT_AVAILABLE.to_string())
})?;
let ipc_server = self.ipc_server.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::IPC_SERVER_NOT_AVAILABLE.to_string())
})?;
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let ipc_server_guard = ipc_server.lock().await;
let rpc_channel = ipc_server_guard
.get_rpc_channel(&module_id)
.await
.ok_or_else(|| {
ModuleError::OperationError(format!("RPC channel not found for module {module_id}"))
})?;
drop(ipc_server_guard);
let handler = Arc::new(crate::module::rpc::ipc_handler::IpcRpcHandler::new(
module_id.clone(),
method.clone(),
rpc_channel,
));
rpc_server
.register_module_endpoint(method.clone(), handler)
.await
.map_err(|e| ModuleError::op_err("Failed to register RPC endpoint", e))?;
Ok(())
}
async fn unregister_rpc_endpoint(&self, method: &str) -> Result<(), ModuleError> {
let rpc_server = self.rpc_server.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::RPC_SERVER_NOT_AVAILABLE.to_string())
})?;
rpc_server
.unregister_module_endpoint(method)
.await
.map_err(ModuleError::OperationError)
}
async fn register_timer(
&self,
interval_seconds: u64,
callback: Arc<dyn crate::module::timers::manager::TimerCallback>,
) -> Result<crate::module::timers::manager::TimerId, ModuleError> {
let timer_manager = self.timer_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::TIMER_MANAGER_NOT_AVAILABLE.to_string())
})?;
let module_id = self.module_id_for_timers.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET_FOR_TIMER.to_string())
})?;
timer_manager
.register_timer(module_id.clone(), interval_seconds, callback)
.await
.map_err(ModuleError::OperationError)
}
async fn cancel_timer(
&self,
timer_id: crate::module::timers::manager::TimerId,
) -> Result<(), ModuleError> {
let timer_manager = self.timer_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::TIMER_MANAGER_NOT_AVAILABLE.to_string())
})?;
timer_manager
.cancel_timer(timer_id)
.await
.map_err(ModuleError::OperationError)
}
async fn schedule_task(
&self,
delay_seconds: u64,
callback: Arc<dyn crate::module::timers::manager::TaskCallback>,
) -> Result<crate::module::timers::manager::TaskId, ModuleError> {
let timer_manager = self.timer_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::TIMER_MANAGER_NOT_AVAILABLE.to_string())
})?;
let module_id = self.module_id_for_timers.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET_FOR_TASK.to_string())
})?;
timer_manager
.schedule_task(module_id.clone(), delay_seconds, callback)
.await
.map_err(ModuleError::OperationError)
}
async fn report_metric(&self, metric: Metric) -> Result<(), ModuleError> {
let metrics_manager = self.metrics_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::METRICS_MANAGER_NOT_AVAILABLE.to_string())
})?;
let module_id = self.module_id_for_metrics.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET_FOR_METRICS.to_string())
})?;
metrics_manager
.report_metric(module_id.clone(), metric)
.await;
Ok(())
}
async fn get_module_metrics(&self, module_id: &str) -> Result<Vec<Metric>, ModuleError> {
let metrics_manager = self.metrics_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::METRICS_MANAGER_NOT_AVAILABLE.to_string())
})?;
Ok(metrics_manager.get_module_metrics(module_id).await)
}
async fn get_all_metrics(
&self,
) -> Result<std::collections::HashMap<String, Vec<Metric>>, ModuleError> {
let metrics_manager = self.metrics_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::METRICS_MANAGER_NOT_AVAILABLE.to_string())
})?;
Ok(metrics_manager.get_all_metrics().await)
}
async fn read_file(&self, path: String) -> Result<Vec<u8>, ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
tokio::fs::read(&full_path)
.await
.map_err(|e| ModuleError::op_err("Failed to read file", e))
}
async fn write_file(&self, path: String, data: Vec<u8>) -> Result<(), ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
if let Some(parent) = full_path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| ModuleError::op_err("Failed to create directory", e))?;
}
tokio::fs::write(&full_path, data)
.await
.map_err(|e| ModuleError::op_err("Failed to write file", e))
}
async fn delete_file(&self, path: String) -> Result<(), ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
tokio::fs::remove_file(&full_path)
.await
.map_err(|e| ModuleError::op_err("Failed to delete file", e))
}
async fn list_directory(&self, path: String) -> Result<Vec<String>, ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
let mut entries = Vec::new();
let mut dir = tokio::fs::read_dir(&full_path)
.await
.map_err(|e| ModuleError::op_err("Failed to read directory", e))?;
while let Some(entry) = dir
.next_entry()
.await
.map_err(|e| ModuleError::op_err("Failed to read directory entry", e))?
{
if let Some(name) = entry.file_name().to_str() {
entries.push(name.to_string());
}
}
Ok(entries)
}
async fn create_directory(&self, path: String) -> Result<(), ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
tokio::fs::create_dir_all(&full_path)
.await
.map_err(|e| ModuleError::op_err("Failed to create directory", e))
}
async fn get_file_metadata(
&self,
path: String,
) -> Result<crate::module::ipc::protocol::FileMetadata, ModuleError> {
let module_id = self.get_module_id().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ID_NOT_SET.to_string())
})?;
let sandbox = {
let sandboxes = self.module_filesystem_sandboxes.read().await;
sandboxes
.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Filesystem sandbox not initialized for module {module_id}"
))
})?
.clone()
};
let data_dir = {
let dirs = self.module_data_dirs.read().await;
dirs.get(&module_id)
.ok_or_else(|| {
ModuleError::OperationError(format!(
"Module data directory not set for module {module_id}"
))
})?
.clone()
};
let full_path = if path.starts_with('/') {
sandbox.validate_path(&path)?
} else {
let joined = data_dir.join(&path);
sandbox.validate_path(&joined)?
};
let metadata = tokio::fs::metadata(&full_path)
.await
.map_err(|e| ModuleError::op_err("Failed to get file metadata", e))?;
let modified = metadata
.modified()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs());
let created = metadata
.created()
.ok()
.and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok())
.map(|d| d.as_secs());
Ok(crate::module::ipc::protocol::FileMetadata {
path: full_path.to_string_lossy().to_string(),
size: metadata.len(),
is_file: metadata.is_file(),
is_directory: metadata.is_dir(),
modified,
created,
})
}
async fn initialize_module(
&self,
module_id: String,
module_data_dir: std::path::PathBuf,
base_data_dir: std::path::PathBuf,
) -> Result<(), ModuleError> {
NodeApiImpl::initialize_module(self, module_id, module_data_dir, base_data_dir).await
}
async fn discover_modules(
&self,
) -> Result<Vec<crate::module::traits::ModuleInfo>, ModuleError> {
let module_manager = self.module_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_MANAGER_NOT_AVAILABLE.to_string())
})?;
let manager = module_manager.lock().await;
let module_info_list = manager.get_all_module_info().await;
let mut result = Vec::new();
for (module_id, metadata, state) in module_info_list {
let module_name = module_id
.split('_')
.next()
.unwrap_or(&module_id)
.to_string();
result.push(crate::module::traits::ModuleInfo {
module_id: module_id.clone(),
module_name: metadata.name.clone(),
version: metadata.version.clone(),
capabilities: metadata.capabilities.clone(),
status: state,
api_version: 1, });
}
Ok(result)
}
async fn get_module_info(
&self,
module_id: &str,
) -> Result<Option<crate::module::traits::ModuleInfo>, ModuleError> {
let module_manager = self.module_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_MANAGER_NOT_AVAILABLE.to_string())
})?;
let manager = module_manager.lock().await;
if let Some(metadata) = manager.get_module_metadata(module_id).await {
let state = manager
.get_module_state(module_id)
.await
.unwrap_or(crate::module::traits::ModuleState::Stopped);
let module_name = module_id.split('_').next().unwrap_or(module_id).to_string();
return Ok(Some(crate::module::traits::ModuleInfo {
module_id: module_id.to_string(),
module_name: metadata.name.clone(),
version: metadata.version.clone(),
capabilities: metadata.capabilities.clone(),
status: state,
api_version: 1,
}));
}
let module_name = module_id.split('_').next().unwrap_or(module_id);
if let Some(metadata) = manager.get_module_metadata(module_name).await {
let state = manager
.get_module_state(module_name)
.await
.unwrap_or(crate::module::traits::ModuleState::Stopped);
let modules = manager.list_modules().await;
let actual_module_id = modules
.iter()
.find(|id| id.starts_with(&format!("{module_name}_")))
.cloned()
.unwrap_or_else(|| module_id.to_string());
return Ok(Some(crate::module::traits::ModuleInfo {
module_id: actual_module_id,
module_name: metadata.name.clone(),
version: metadata.version.clone(),
capabilities: metadata.capabilities.clone(),
status: state,
api_version: 1,
}));
}
Ok(None)
}
async fn is_module_available(&self, module_id: &str) -> Result<bool, ModuleError> {
let module_manager = self.module_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_MANAGER_NOT_AVAILABLE.to_string())
})?;
let manager = module_manager.lock().await;
if manager.get_module_state(module_id).await.is_some() {
return Ok(true);
}
let module_name = module_id.split('_').next().unwrap_or(module_id);
Ok(manager.get_module_state(module_name).await.is_some())
}
async fn publish_event(
&self,
event_type: EventType,
payload: EventPayload,
) -> Result<(), ModuleError> {
let event_manager = self.event_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::EVENT_MANAGER_NOT_AVAILABLE.to_string())
})?;
event_manager.publish_event(event_type, payload).await
}
async fn call_module(
&self,
target_module_id: Option<&str>,
method: &str,
params: Vec<u8>,
) -> Result<Vec<u8>, ModuleError> {
let router = self.module_router.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_ROUTER_NOT_AVAILABLE.to_string())
})?;
let caller_module_id = self
.module_id
.as_ref()
.or_else(|| self.current_module_id_for_api.as_ref())
.cloned()
.unwrap_or_else(|| "unknown".to_string());
router
.route_call(&caller_module_id, target_module_id, method, ¶ms)
.await
}
async fn register_module_api(
&self,
api: Arc<dyn crate::module::inter_module::api::ModuleAPI>,
) -> Result<(), ModuleError> {
let registry = self.module_api_registry.as_ref().ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::MODULE_API_REGISTRY_NOT_AVAILABLE.to_string(),
)
})?;
let module_id = self
.current_module_id_for_api
.as_ref()
.or_else(|| self.module_id.as_ref())
.ok_or_else(|| {
ModuleError::OperationError(
"Module ID not available for API registration".to_string(),
)
})?
.clone();
registry.register_api(module_id.clone(), api).await
}
async fn unregister_module_api(&self) -> Result<(), ModuleError> {
let registry = self.module_api_registry.as_ref().ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::MODULE_API_REGISTRY_NOT_AVAILABLE.to_string(),
)
})?;
let module_id = self
.current_module_id_for_api
.as_ref()
.or_else(|| self.module_id.as_ref())
.ok_or_else(|| {
ModuleError::OperationError(
"Module ID not available for API unregistration".to_string(),
)
})?
.clone();
registry.unregister_api(&module_id).await
}
async fn send_mesh_packet_to_module(
&self,
module_id: &str,
packet_data: Vec<u8>,
peer_addr: String,
) -> Result<(), ModuleError> {
let params = bincode::serialize(&(packet_data, peer_addr))
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
self.call_module(Some(module_id), "handle_mesh_packet", params)
.await?;
Ok(())
}
async fn send_mesh_packet_to_peer(
&self,
peer_addr: String,
packet_data: Vec<u8>,
) -> Result<(), ModuleError> {
let network_manager = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string())
})?;
if let Ok(socket_addr) = peer_addr.parse::<std::net::SocketAddr>() {
network_manager
.send_to_peer(socket_addr, packet_data)
.await
.map_err(|e| ModuleError::op_err("Failed to send mesh packet", e))?;
} else {
use crate::network::transport::TransportAddr;
let transport_addr = if let Some(addr_str) = peer_addr.strip_prefix("tcp:") {
addr_str
.parse::<std::net::SocketAddr>()
.map(TransportAddr::Tcp)
.map_err(|e| ModuleError::op_err("Invalid TCP address", e))?
} else if peer_addr.starts_with("quinn:") {
#[cfg(feature = "quinn")]
{
let addr_str = &peer_addr[6..];
addr_str
.parse::<std::net::SocketAddr>()
.map(TransportAddr::Quinn)
.map_err(|e| ModuleError::op_err("Invalid Quinn address", e))?
}
#[cfg(not(feature = "quinn"))]
return Err(ModuleError::OperationError(
"Quinn transport not enabled".to_string(),
));
} else if peer_addr.starts_with("iroh:") {
#[cfg(feature = "iroh")]
{
let node_id_str = &peer_addr[5..];
let node_id_bytes = hex::decode(node_id_str)
.map_err(|e| ModuleError::op_err("Invalid Iroh node ID hex", e))?;
if node_id_bytes.len() != 32 {
return Err(ModuleError::OperationError(
"Iroh node ID must be 32 bytes".to_string(),
));
}
let mut node_id = [0u8; 32];
node_id.copy_from_slice(&node_id_bytes);
TransportAddr::Iroh(node_id.to_vec())
}
#[cfg(not(feature = "iroh"))]
return Err(ModuleError::OperationError(
"Iroh transport not enabled".to_string(),
));
} else {
return Err(ModuleError::OperationError(format!(
"Invalid peer address format: {peer_addr}"
)));
};
network_manager
.send_to_peer_by_transport(transport_addr, packet_data)
.await
.map_err(|e| ModuleError::op_err("Failed to send mesh packet", e))?;
}
Ok(())
}
async fn send_stratum_v2_message_to_peer(
&self,
peer_addr: String,
message_data: Vec<u8>,
) -> Result<(), ModuleError> {
let network_manager = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string())
})?;
if let Ok(socket_addr) = peer_addr.parse::<std::net::SocketAddr>() {
#[cfg(feature = "stratum-v2")]
let result = network_manager
.send_stratum_v2_to_peer(socket_addr, message_data)
.await;
#[cfg(not(feature = "stratum-v2"))]
let result = network_manager
.send_to_peer(socket_addr, message_data)
.await;
result.map_err(|e| ModuleError::op_err("Failed to send Stratum V2 message", e))?;
} else {
use crate::network::transport::TransportAddr;
let transport_addr = if let Some(addr_str) = peer_addr.strip_prefix("tcp:") {
addr_str
.parse::<std::net::SocketAddr>()
.map(TransportAddr::Tcp)
.map_err(|e| ModuleError::op_err("Invalid TCP address", e))?
} else if peer_addr.starts_with("quinn:") {
#[cfg(feature = "quinn")]
{
let addr_str = &peer_addr[6..];
addr_str
.parse::<std::net::SocketAddr>()
.map(TransportAddr::Quinn)
.map_err(|e| ModuleError::op_err("Invalid Quinn address", e))?
}
#[cfg(not(feature = "quinn"))]
return Err(ModuleError::OperationError(
"Quinn transport not enabled".to_string(),
));
} else if peer_addr.starts_with("iroh:") {
#[cfg(feature = "iroh")]
{
let node_id_str = &peer_addr[5..];
let node_id_bytes = hex::decode(node_id_str)
.map_err(|e| ModuleError::op_err("Invalid Iroh node ID hex", e))?;
if node_id_bytes.len() != 32 {
return Err(ModuleError::OperationError(
"Iroh node ID must be 32 bytes".to_string(),
));
}
let mut node_id = [0u8; 32];
node_id.copy_from_slice(&node_id_bytes);
TransportAddr::Iroh(node_id.to_vec())
}
#[cfg(not(feature = "iroh"))]
return Err(ModuleError::OperationError(
"Iroh transport not enabled".to_string(),
));
} else {
return Err(ModuleError::OperationError(format!(
"Invalid peer address format: {peer_addr}"
)));
};
network_manager
.send_to_peer_by_transport(transport_addr, message_data)
.await
.map_err(|e| ModuleError::op_err("Failed to send Stratum V2 message", e))?;
}
Ok(())
}
async fn get_module_health(
&self,
module_id: &str,
) -> Result<Option<crate::module::process::monitor::ModuleHealth>, ModuleError> {
let module_manager = self.module_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_MANAGER_NOT_AVAILABLE.to_string())
})?;
let manager = module_manager.lock().await;
let module_name = module_id.split('_').next().unwrap_or(module_id);
let state = manager.get_module_state(module_name).await;
match state {
Some(ModuleState::Running) => {
Ok(Some(crate::module::process::monitor::ModuleHealth::Healthy))
}
Some(ModuleState::Initializing) => {
Ok(Some(crate::module::process::monitor::ModuleHealth::Healthy))
}
Some(ModuleState::Stopped) => Ok(Some(
crate::module::process::monitor::ModuleHealth::Unresponsive,
)),
Some(ModuleState::Stopping) => Ok(Some(
crate::module::process::monitor::ModuleHealth::Unresponsive,
)),
Some(ModuleState::Error(err)) => Ok(Some(
crate::module::process::monitor::ModuleHealth::Crashed(err),
)),
None => Ok(None),
}
}
async fn get_all_module_health(
&self,
) -> Result<Vec<(String, crate::module::process::monitor::ModuleHealth)>, ModuleError> {
let module_manager = self.module_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MODULE_MANAGER_NOT_AVAILABLE.to_string())
})?;
let manager = module_manager.lock().await;
let modules = manager.list_modules().await;
let mut result = Vec::new();
for module_id in modules {
let module_name = module_id.split('_').next().unwrap_or(&module_id);
if let Some(state) = manager.get_module_state(module_name).await {
let health = match state {
ModuleState::Running => crate::module::process::monitor::ModuleHealth::Healthy,
ModuleState::Initializing => {
crate::module::process::monitor::ModuleHealth::Healthy
}
ModuleState::Stopped => {
crate::module::process::monitor::ModuleHealth::Unresponsive
}
ModuleState::Stopping => {
crate::module::process::monitor::ModuleHealth::Unresponsive
}
ModuleState::Error(err) => {
crate::module::process::monitor::ModuleHealth::Crashed(err)
}
};
result.push((module_id, health));
}
}
Ok(result)
}
async fn get_block_template(
&self,
rules: Vec<String>,
coinbase_script: Option<Vec<u8>>,
coinbase_address: Option<String>,
) -> Result<blvm_protocol::mining::BlockTemplate, ModuleError> {
let height = self
.storage
.chain()
.get_height()
.map_err(|e| ModuleError::op_err("Failed to get height", e))?
.ok_or_else(|| {
ModuleError::OperationError(module_error_msg::CHAIN_NOT_INITIALIZED.to_string())
})?;
let prev_header = self
.storage
.chain()
.get_tip_header()
.map_err(|e| ModuleError::op_err("Failed to get tip header", e))?
.ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NO_CHAIN_TIP.to_string())
})?;
let prev_headers = if let Ok(recent) = self.storage.blocks().get_recent_headers(2016) {
if recent.len() >= 2 {
recent
} else {
let mut headers = Vec::new();
if let Ok(Some(current_height)) = self.storage.chain().get_height() {
for h in 0..=current_height.min(2015) {
if let Ok(Some(hash)) = self.storage.blocks().get_hash_by_height(h) {
if let Ok(Some(header)) = self.storage.blocks().get_header(&hash) {
headers.push(header);
}
}
}
}
headers
}
} else {
Vec::new()
};
let mempool_manager = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
let mempool_txs = mempool_manager.get_transactions();
let utxo_set = self
.storage
.utxos()
.get_all_utxos()
.map_err(|e| ModuleError::op_err("Failed to get UTXO set", e))?;
let coinbase_script_bytes = coinbase_script.unwrap_or_default();
let coinbase_address_bytes = coinbase_address
.map(|a| {
a.strip_prefix("hex:")
.map(|h| hex::decode(h).unwrap_or_default())
.unwrap_or_else(|| a.into_bytes())
})
.unwrap_or_default();
let template = blvm_protocol::mining::create_block_template(
&utxo_set,
&mempool_txs,
height,
&prev_header,
&prev_headers,
&coinbase_script_bytes,
&coinbase_address_bytes,
)
.map_err(|e| ModuleError::op_err("Template creation failed", e))?;
Ok(template)
}
async fn merge_block_serve_denylist(&self, block_hashes: &[Hash]) -> Result<(), ModuleError> {
if block_hashes.is_empty() {
return Ok(());
}
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.merge_block_serve_denylist(block_hashes);
Ok(())
}
async fn get_block_serve_denylist_snapshot(
&self,
) -> Result<BlockServeDenylistSnapshot, ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
Ok(nm.block_serve_denylist_snapshot())
}
async fn clear_block_serve_denylist(&self) -> Result<(), ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.clear_block_serve_denylist();
Ok(())
}
async fn replace_block_serve_denylist(&self, block_hashes: &[Hash]) -> Result<(), ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.replace_block_serve_denylist(block_hashes);
Ok(())
}
async fn merge_tx_serve_denylist(&self, tx_hashes: &[Hash]) -> Result<(), ModuleError> {
if tx_hashes.is_empty() {
return Ok(());
}
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.merge_tx_serve_denylist(tx_hashes);
Ok(())
}
async fn get_tx_serve_denylist_snapshot(&self) -> Result<TxServeDenylistSnapshot, ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
Ok(nm.tx_serve_denylist_snapshot())
}
async fn clear_tx_serve_denylist(&self) -> Result<(), ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.clear_tx_serve_denylist();
Ok(())
}
async fn replace_tx_serve_denylist(&self, tx_hashes: &[Hash]) -> Result<(), ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.replace_tx_serve_denylist(tx_hashes);
Ok(())
}
async fn get_sync_status(&self) -> Result<SyncStatus, ModuleError> {
let Some(ref sc) = self.sync_coordinator else {
return Err(ModuleError::OperationError(
"sync coordinator not available".to_string(),
));
};
let coordinator = sc.lock().await;
let state = coordinator.current_sync_state();
let phase = state.as_event_str().to_string();
let error_message = match &state {
crate::node::sync::SyncState::Error(s) => Some(s.clone()),
_ => None,
};
Ok(SyncStatus {
phase,
progress: coordinator.progress(),
is_synced: coordinator.is_synced(),
error_message,
})
}
async fn ban_peer(
&self,
peer_addr: &str,
ban_duration_seconds: Option<u64>,
) -> Result<(), ModuleError> {
let nm = self.network_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string())
})?;
let addr: std::net::SocketAddr = peer_addr
.parse()
.map_err(|e| ModuleError::OperationError(format!("invalid peer address: {e}")))?;
let unban_ts = match ban_duration_seconds {
None => 0u64,
Some(0) => 0u64,
Some(secs) => crate::utils::current_timestamp().saturating_add(secs),
};
let nm = Arc::clone(nm);
tokio::task::spawn_blocking(move || {
nm.ban_peer(addr, unban_ts);
})
.await
.map_err(|e| ModuleError::op_err("Task join error", e))?;
Ok(())
}
async fn set_block_serve_maintenance_mode(&self, enabled: bool) -> Result<(), ModuleError> {
let Some(nm) = self.network_manager.as_ref() else {
return Err(ModuleError::OperationError(
module_error_msg::NETWORK_MANAGER_NOT_AVAILABLE.to_string(),
));
};
nm.set_block_serve_maintenance_mode(enabled);
Ok(())
}
async fn submit_block(&self, block: Block) -> Result<SubmitBlockResult, ModuleError> {
use crate::rpc::mining::MiningRpc;
use serde_json::json;
let storage = self.storage.clone();
let mempool_manager = self.mempool_manager.as_ref().ok_or_else(|| {
ModuleError::OperationError(module_error_msg::MEMPOOL_MANAGER_NOT_AVAILABLE.to_string())
})?;
let mining_rpc = {
let m = MiningRpc::with_dependencies(storage, mempool_manager.clone());
match &self.network_manager {
Some(nm) => m.with_network_manager(Some(Arc::clone(nm))),
None => m,
}
};
let block_bytes = bincode::serialize(&block)
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
let block_hex = hex::encode(block_bytes);
let params = json!([block_hex]);
let result = mining_rpc
.submit_block(¶ms)
.await
.map_err(|e| ModuleError::op_err("Failed to submit block", e))?;
let result_str = result.as_str().unwrap_or("");
match result_str {
"" | "null" => Ok(SubmitBlockResult::Accepted),
s if s.contains("duplicate") || s.contains("already") => {
Ok(SubmitBlockResult::Duplicate)
}
s => Ok(SubmitBlockResult::Rejected(s.to_string())),
}
}
async fn report_module_health(
&self,
health: crate::module::process::monitor::ModuleHealth,
) -> Result<(), ModuleError> {
let module_id = self
.current_module_id_for_api
.as_ref()
.or_else(|| self.module_id.as_ref())
.ok_or_else(|| {
ModuleError::OperationError(
"Module ID not available for health reporting".to_string(),
)
})?
.clone();
debug!("Module {} reported health: {:?}", module_id, health);
Ok(())
}
}
unsafe impl Sync for NodeApiImpl {}