use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::sync::mpsc;
use tokio_util::codec::{FramedRead, FramedWrite, LengthDelimitedCodec};
use tracing::{debug, error, info, warn};
use async_trait::async_trait;
use crate::module::api::events::EventManager;
use crate::module::api::hub::ModuleApiHub;
use crate::module::ipc::module_ipc_length_codec;
use crate::module::ipc::protocol::{
CliSpec, InvocationMessage, InvocationResultMessage, InvocationResultPayload, InvocationType,
ModuleMessage, RequestMessage, RequestPayload, ResponseMessage, ResponsePayload,
};
use crate::module::traits::{module_error_msg, EventType, ModuleError, NodeAPI};
use tokio::sync::oneshot;
#[cfg(feature = "wasm-modules")]
#[async_trait]
pub trait WasmInvoker: Send + Sync {
async fn invoke_cli(
&self,
module_name: &str,
subcommand: &str,
args: Vec<String>,
) -> Result<InvocationResultPayload, ModuleError>;
async fn invoke_rpc(
&self,
module_name: &str,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, ModuleError>;
}
pub struct ModuleIpcServer {
socket_path: PathBuf,
connection_count: std::sync::atomic::AtomicUsize,
event_manager: Option<Arc<crate::module::api::events::EventManager>>,
api_hub: Option<Arc<tokio::sync::Mutex<ModuleApiHub>>>,
rpc_channels: Arc<
tokio::sync::RwLock<
HashMap<
String,
mpsc::UnboundedSender<(
u64,
String,
serde_json::Value,
mpsc::UnboundedSender<Result<serde_json::Value, crate::rpc::errors::RpcError>>,
)>,
>,
>,
>,
cli_registry: Arc<tokio::sync::RwLock<HashMap<String, CliSpec>>>,
outgoing_tx_by_module:
Arc<tokio::sync::RwLock<HashMap<String, mpsc::UnboundedSender<bytes::Bytes>>>>,
pending_invocations:
Arc<tokio::sync::Mutex<HashMap<u64, oneshot::Sender<InvocationResultMessage>>>>,
rpc_pending: Arc<
tokio::sync::Mutex<
HashMap<
u64,
mpsc::UnboundedSender<Result<serde_json::Value, crate::rpc::errors::RpcError>>,
>,
>,
>,
next_invocation_id: Arc<AtomicU64>,
#[cfg(feature = "wasm-modules")]
wasm_invoker: Option<Arc<dyn WasmInvoker>>,
}
struct ModuleConnection<R: tokio::io::AsyncRead> {
module_id: String,
reader: FramedRead<R, LengthDelimitedCodec>,
outgoing_tx: Option<mpsc::UnboundedSender<bytes::Bytes>>,
subscriptions: Vec<EventType>,
event_tx: Option<mpsc::Sender<ModuleMessage>>,
writer_task_handle: Option<tokio::task::JoinHandle<()>>,
rpc_request_tx: Option<
mpsc::UnboundedSender<(
u64,
String,
serde_json::Value,
mpsc::UnboundedSender<Result<serde_json::Value, crate::rpc::errors::RpcError>>,
)>,
>,
}
impl ModuleIpcServer {
pub fn new<P: AsRef<Path>>(socket_path: P) -> Self {
Self {
socket_path: socket_path.as_ref().to_path_buf(),
connection_count: std::sync::atomic::AtomicUsize::new(0),
event_manager: None,
api_hub: None,
rpc_channels: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
cli_registry: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
outgoing_tx_by_module: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
pending_invocations: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
rpc_pending: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
next_invocation_id: Arc::new(AtomicU64::new(1)),
#[cfg(feature = "wasm-modules")]
wasm_invoker: None,
}
}
#[cfg(feature = "wasm-modules")]
pub fn with_wasm_invoker(mut self, invoker: Arc<dyn WasmInvoker>) -> Self {
self.wasm_invoker = Some(invoker);
self
}
pub async fn get_rpc_channel(
&self,
module_id: &str,
) -> Option<
mpsc::UnboundedSender<(
u64,
String,
serde_json::Value,
mpsc::UnboundedSender<Result<serde_json::Value, crate::rpc::errors::RpcError>>,
)>,
> {
let channels = self.rpc_channels.read().await;
channels.get(module_id).cloned()
}
pub async fn get_cli_specs(&self) -> HashMap<String, CliSpec> {
let registry = self.cli_registry.read().await;
registry.clone()
}
pub async fn register_cli_spec(&self, module_id: String, spec: CliSpec) {
let mut registry = self.cli_registry.write().await;
registry.insert(module_id, spec);
}
pub async fn unregister_cli_spec(&self, module_id: &str) {
let mut registry = self.cli_registry.write().await;
registry.remove(module_id);
}
pub async fn unregister_cli_spec_by_name(&self, module_name: &str) {
let mut registry = self.cli_registry.write().await;
let to_remove: Vec<String> = registry
.iter()
.filter(|(_, spec)| spec.name == module_name)
.map(|(id, _)| id.clone())
.collect();
for id in to_remove {
registry.remove(&id);
}
}
pub async fn invoke_cli(
&self,
module_name: &str,
subcommand: &str,
args: Vec<String>,
) -> Result<InvocationResultPayload, ModuleError> {
let maybe_ipc = {
let registry = self.cli_registry.read().await;
let module_id = registry
.iter()
.find(|(_, spec)| spec.name == module_name)
.map(|(id, _)| id.clone());
let by_module = self.outgoing_tx_by_module.read().await;
module_id.and_then(|id| by_module.get(&id).cloned())
};
if let Some(outgoing_tx) = maybe_ipc {
let correlation_id = self.next_invocation_id.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending_invocations.lock().await;
pending.insert(correlation_id, tx);
}
let invocation = InvocationMessage {
correlation_id,
invocation_type: InvocationType::Cli {
subcommand: subcommand.to_string(),
args,
},
};
let bytes = bincode::serialize(&ModuleMessage::Invocation(invocation))
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
outgoing_tx.send(bytes::Bytes::from(bytes)).map_err(|_| {
ModuleError::OperationError(module_error_msg::MODULE_CONNECTION_CLOSED.to_string())
})?;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(60), rx)
.await
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::MODULE_DID_NOT_RESPOND_CLI_60S.to_string(),
)
})?
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::INVOCATION_RESPONSE_CHANNEL_CLOSED.to_string(),
)
})?;
if !result.success {
return Err(ModuleError::Cli(
result.error.unwrap_or_else(|| "Unknown error".to_string()),
));
}
return result.payload.ok_or_else(|| {
ModuleError::OperationError(
module_error_msg::MODULE_RETURNED_SUCCESS_BUT_NO_PAYLOAD.to_string(),
)
});
}
#[cfg(feature = "wasm-modules")]
if let Some(ref invoker) = self.wasm_invoker {
return invoker.invoke_cli(module_name, subcommand, args).await;
}
Err(ModuleError::OperationError(format!(
"No module with CLI name '{module_name}' is loaded"
)))
}
pub async fn invoke_rpc(
&self,
module_name: &str,
method: &str,
params: serde_json::Value,
) -> Result<serde_json::Value, ModuleError> {
let maybe_ipc = {
let registry = self.cli_registry.read().await;
let module_id = registry
.iter()
.find(|(_, spec)| spec.name == module_name)
.map(|(id, _)| id.clone());
let by_module = self.outgoing_tx_by_module.read().await;
module_id.and_then(|id| by_module.get(&id).cloned())
};
if let Some(outgoing_tx) = maybe_ipc {
let correlation_id = self.next_invocation_id.fetch_add(1, Ordering::SeqCst);
let (tx, rx) = oneshot::channel();
{
let mut pending = self.pending_invocations.lock().await;
pending.insert(correlation_id, tx);
}
let invocation = InvocationMessage {
correlation_id,
invocation_type: InvocationType::Rpc {
method: method.to_string(),
params,
},
};
let bytes = bincode::serialize(&ModuleMessage::Invocation(invocation))
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
outgoing_tx.send(bytes::Bytes::from(bytes)).map_err(|_| {
ModuleError::OperationError(module_error_msg::MODULE_CONNECTION_CLOSED.to_string())
})?;
let result = tokio::time::timeout(tokio::time::Duration::from_secs(60), rx)
.await
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::MODULE_DID_NOT_RESPOND_RPC_60S.to_string(),
)
})?
.map_err(|_| {
ModuleError::OperationError(
module_error_msg::INVOCATION_RESPONSE_CHANNEL_CLOSED.to_string(),
)
})?;
if !result.success {
return Err(ModuleError::OperationError(
result.error.unwrap_or_else(|| "Unknown error".to_string()),
));
}
match result.payload {
Some(InvocationResultPayload::Rpc(value)) => return Ok(value),
Some(_) => {
return Err(ModuleError::OperationError(
module_error_msg::MODULE_RETURNED_WRONG_PAYLOAD_TYPE_RPC.to_string(),
));
}
None => {
return Err(ModuleError::OperationError(
module_error_msg::MODULE_RETURNED_SUCCESS_BUT_NO_PAYLOAD.to_string(),
));
}
}
}
#[cfg(feature = "wasm-modules")]
if let Some(ref invoker) = self.wasm_invoker {
return invoker.invoke_rpc(module_name, method, params).await;
}
Err(ModuleError::OperationError(format!(
"No module with CLI name '{module_name}' is loaded"
)))
}
pub fn with_event_manager(mut self, event_manager: Arc<EventManager>) -> Self {
self.event_manager = Some(event_manager);
self
}
pub fn with_api_hub(mut self, api_hub: Arc<tokio::sync::Mutex<ModuleApiHub>>) -> Self {
self.api_hub = Some(api_hub);
self
}
pub async fn start<A: NodeAPI + Send + Sync + 'static>(
&mut self,
node_api: Arc<A>,
) -> Result<(), ModuleError> {
#[cfg(unix)]
return self.start_unix(node_api).await;
#[cfg(windows)]
return self.start_windows(node_api).await;
}
#[cfg(unix)]
async fn start_unix<A: NodeAPI + Send + Sync + 'static>(
&mut self,
node_api: Arc<A>,
) -> Result<(), ModuleError> {
use tokio::net::{UnixListener, UnixStream};
if self.socket_path.exists() {
std::fs::remove_file(&self.socket_path)
.map_err(|e| ModuleError::IpcError(format!("Failed to remove old socket: {e}")))?;
}
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
ModuleError::IpcError(format!("Failed to create socket directory: {e}"))
})?;
}
let listener = UnixListener::bind(&self.socket_path)
.map_err(|e| ModuleError::IpcError(format!("Failed to bind socket: {e}")))?;
{
use std::os::unix::fs::PermissionsExt;
let perms = std::fs::Permissions::from_mode(0o600);
if let Err(e) = std::fs::set_permissions(&self.socket_path, perms) {
warn!("Failed to set IPC socket permissions: {}", e);
}
}
info!("Module IPC server listening on {:?}", self.socket_path);
loop {
match listener.accept().await {
Ok((stream, _)) => {
debug!("New module connection");
self.handle_connection(stream, Arc::clone(&node_api))
.await?;
}
Err(e) => error!("Failed to accept module connection: {}", e),
}
}
}
#[cfg(windows)]
async fn start_windows<A: NodeAPI + Send + Sync + 'static>(
&mut self,
node_api: Arc<A>,
) -> Result<(), ModuleError> {
use tokio::net::windows::named_pipe::ServerOptions;
let pipe_name = path_to_pipe_name(&self.socket_path);
if let Some(parent) = self.socket_path.parent() {
std::fs::create_dir_all(parent).map_err(|e| {
ModuleError::IpcError(format!("Failed to create socket directory: {e}"))
})?;
}
info!("Module IPC server listening on pipe {}", pipe_name);
loop {
let server = ServerOptions::new()
.first_pipe_instance(false)
.create(&pipe_name)
.map_err(|e| ModuleError::IpcError(format!("Failed to create named pipe: {e}")))?;
server
.connect()
.await
.map_err(|e| ModuleError::IpcError(format!("Failed to connect pipe: {e}")))?;
debug!("New module connection (named pipe)");
self.handle_connection(server, Arc::clone(&node_api))
.await?;
}
}
async fn handle_connection<S, A>(
&mut self,
stream: S,
node_api: Arc<A>,
) -> Result<(), ModuleError>
where
S: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: NodeAPI + Send + Sync,
{
let (read_half, write_half) = tokio::io::split(stream);
let mut reader = FramedRead::new(read_half, module_ipc_length_codec());
let mut writer = FramedWrite::new(write_half, module_ipc_length_codec());
let module_id = match reader.next().await {
Some(Ok(bytes)) => {
let message: ModuleMessage = bincode::deserialize(bytes.as_ref())
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
match message {
ModuleMessage::Request(request) => {
if let RequestPayload::Handshake {
module_id,
module_name,
version,
} = request.payload
{
info!(
"Module handshake: id={}, name={}, version={}",
module_id, module_name, version
);
let ack = ResponseMessage {
correlation_id: request.correlation_id,
success: true,
payload: Some(ResponsePayload::HandshakeAck {
node_version: env!("CARGO_PKG_VERSION").to_string(),
}),
error: None,
};
let ack_bytes = bincode::serialize(&ModuleMessage::Response(ack))
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
writer
.send(bytes::Bytes::from(ack_bytes))
.await
.map_err(|e| {
ModuleError::IpcError(format!(
"Failed to send handshake ack: {e}"
))
})?;
module_id
} else {
warn!("Module did not send handshake, using fallback ID");
let timestamp = crate::utils::current_timestamp_nanos();
let count = self.connection_count.fetch_add(1, Ordering::SeqCst);
format!("module_{count}_{timestamp}")
}
}
_ => {
return Err(ModuleError::IpcError(
"First message must be a handshake request".to_string(),
));
}
}
}
Some(Err(e)) => {
return Err(ModuleError::IpcError(format!(
"Failed to read handshake: {e}"
)));
}
None => {
return Err(ModuleError::IpcError(
"Connection closed before handshake".to_string(),
));
}
};
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<bytes::Bytes>();
type RpcResponseSender = mpsc::UnboundedSender<
std::result::Result<serde_json::Value, crate::rpc::errors::RpcError>,
>;
let (rpc_request_tx, mut rpc_request_rx) =
mpsc::unbounded_channel::<(u64, String, serde_json::Value, RpcResponseSender)>();
let (event_tx, mut event_rx) = mpsc::channel(100);
let outgoing_tx_for_events = outgoing_tx.clone();
let module_id_writer_task = module_id.clone();
let event_manager_clone = self.event_manager.clone();
let writer_task_handle = tokio::spawn(async move {
let module_id_event_fwd = module_id_writer_task.clone();
tokio::spawn(async move {
while let Some(event_message) = event_rx.recv().await {
match bincode::serialize(&event_message) {
Ok(bytes) => {
if outgoing_tx_for_events
.send(bytes::Bytes::from(bytes))
.is_err()
{
break; }
}
Err(e) => {
warn!(
"Failed to serialize event for module {}: {}",
module_id_event_fwd, e
);
}
}
}
if let Some(event_mgr) = event_manager_clone {
if let Err(e) = event_mgr.unsubscribe_module(&module_id_event_fwd).await {
warn!(
"Failed to unsubscribe module {} from events: {}",
module_id_event_fwd, e
);
}
}
});
while let Some(bytes) = outgoing_rx.recv().await {
if let Err(e) = writer.send(bytes).await {
warn!(
"Failed to send message to module {}: {}",
module_id_writer_task, e
);
break;
}
}
});
let module_name = module_id
.split('_')
.next()
.unwrap_or(&module_id)
.to_string();
let base_data_dir = std::path::PathBuf::from("data/modules");
let module_data_dir = base_data_dir.join(&module_name);
if let Err(e) = std::fs::create_dir_all(&module_data_dir) {
warn!(
"Failed to create module data directory {:?}: {}",
module_data_dir, e
);
}
if let Err(e) = node_api
.initialize_module(module_id.clone(), module_data_dir, base_data_dir)
.await
{
warn!(
"Failed to initialize module {} filesystem/storage: {}",
module_id, e
);
}
{
let mut by_module = self.outgoing_tx_by_module.write().await;
by_module.insert(module_id.clone(), outgoing_tx.clone());
}
{
let mut channels = self.rpc_channels.write().await;
channels.insert(module_id.clone(), rpc_request_tx.clone());
}
let outgoing_tx_for_rpc = outgoing_tx.clone();
let rpc_pending = Arc::clone(&self.rpc_pending);
tokio::spawn(async move {
while let Some((correlation_id, method, params, response_tx)) =
rpc_request_rx.recv().await
{
rpc_pending.lock().await.insert(correlation_id, response_tx);
let invocation = InvocationMessage {
correlation_id,
invocation_type: InvocationType::Rpc { method, params },
};
if let Ok(bytes) = bincode::serialize(&ModuleMessage::Invocation(invocation)) {
if outgoing_tx_for_rpc.send(bytes::Bytes::from(bytes)).is_err() {
break; }
}
}
});
let mut connection = ModuleConnection {
module_id: module_id.clone(),
reader,
outgoing_tx: Some(outgoing_tx),
subscriptions: Vec::new(),
event_tx: Some(event_tx),
writer_task_handle: Some(writer_task_handle),
rpc_request_tx: Some(rpc_request_tx),
};
while let Some(result) = connection.reader.next().await {
match result {
Ok(bytes) => {
let node_api_clone = Arc::clone(&node_api);
match self
.handle_message(bytes.as_ref(), &mut connection, node_api_clone)
.await
{
Ok(()) => {}
Err(e) => {
error!("Error handling message: {}", e);
break;
}
}
}
Err(e) => {
error!("Error reading from module {}: {}", module_id, e);
break;
}
}
}
info!("Module {} disconnected", module_id);
{
let mut by_module = self.outgoing_tx_by_module.write().await;
by_module.remove(&module_id);
}
{
let mut channels = self.rpc_channels.write().await;
channels.remove(&module_id);
}
{
let mut registry = self.cli_registry.write().await;
registry.remove(&module_id);
}
drop(connection.outgoing_tx);
if let Some(handle) = connection.writer_task_handle.take() {
handle.abort();
}
if let Some(event_mgr) = &self.event_manager {
if let Err(e) = event_mgr.unsubscribe_module(&module_id).await {
warn!(
"Failed to unsubscribe module {} from events: {}",
module_id, e
);
}
}
Ok(())
}
async fn handle_message<R: tokio::io::AsyncRead, A: NodeAPI + Send + Sync>(
&mut self,
bytes: &[u8],
connection: &mut ModuleConnection<R>,
node_api: Arc<A>,
) -> Result<(), ModuleError> {
let message: ModuleMessage = bincode::deserialize(bytes)
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
match message {
ModuleMessage::Request(request) => {
if let RequestPayload::RegisterCliSpec { spec } = &request.payload {
let module_id = connection.module_id.clone();
let mut registry = self.cli_registry.write().await;
registry.insert(module_id.clone(), spec.clone());
debug!("Module {} registered CLI spec: {}", module_id, spec.name);
let response = ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
);
let response_message = ModuleMessage::Response(response);
let response_bytes = bincode::serialize(&response_message)
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
if let Some(tx) = &connection.outgoing_tx {
tx.send(bytes::Bytes::from(response_bytes)).map_err(|e| {
ModuleError::IpcError(format!("Failed to send response: {e}"))
})?;
}
return Ok(());
}
if let RequestPayload::SubscribeEvents { ref event_types } = request.payload {
if let Some(event_mgr) = &self.event_manager {
if let Some(event_tx) = &connection.event_tx {
let module_id = connection.module_id.clone();
let event_tx_clone = event_tx.clone();
event_mgr
.subscribe_module(
module_id.clone(),
event_types.clone(),
event_tx_clone,
)
.await?;
connection.subscriptions = event_types.clone();
debug!(
"Module {} subscribed to events: {:?}",
module_id, event_types
);
}
}
}
let response = if let Some(hub) = &self.api_hub {
let mut hub_guard = hub.lock().await;
hub_guard
.handle_request(&connection.module_id, request.clone())
.await?
} else {
self.process_request(&request, node_api).await?
};
let response_message = ModuleMessage::Response(response);
let response_bytes = bincode::serialize(&response_message)
.map_err(|e| ModuleError::SerializationError(e.to_string()))?;
if let Some(tx) = &connection.outgoing_tx {
tx.send(bytes::Bytes::from(response_bytes)).map_err(|e| {
ModuleError::IpcError(format!("Failed to send response: {e}"))
})?;
}
}
ModuleMessage::Response(_) => {
warn!("Received response from module (unexpected)");
}
ModuleMessage::Event(_) => {
warn!("Received event from module (unexpected)");
}
ModuleMessage::InvocationResult(result) => {
{
let mut rpc_pending = self.rpc_pending.lock().await;
if let Some(response_tx) = rpc_pending.remove(&result.correlation_id) {
let response = if result.success {
result
.payload
.map(|p| {
if let InvocationResultPayload::Rpc(v) = p {
Ok(v)
} else {
Err(crate::rpc::errors::RpcError::internal_error(
"Wrong payload type".to_string(),
))
}
})
.unwrap_or_else(|| {
Err(crate::rpc::errors::RpcError::internal_error(
"No RPC payload".to_string(),
))
})
} else {
Err(crate::rpc::errors::RpcError::internal_error(
result.error.unwrap_or_else(|| "Unknown error".to_string()),
))
};
let _ = response_tx.send(response);
return Ok(());
}
}
let mut pending = self.pending_invocations.lock().await;
if let Some(tx) = pending.remove(&result.correlation_id) {
let _ = tx.send(result);
}
return Ok(());
}
ModuleMessage::Invocation(_) => {
warn!("Received Invocation from module (unexpected; node sends to module)");
return Ok(());
}
ModuleMessage::Log(log_msg) => {
use crate::module::ipc::protocol::LogLevel;
let module_id_str = log_msg.module_id.clone();
let message_str = log_msg.message.clone();
match log_msg.level {
LogLevel::Trace => {
tracing::trace!(
module_id = %module_id_str,
"{}",
message_str
);
}
LogLevel::Debug => {
tracing::debug!(
module_id = %module_id_str,
"{}",
message_str
);
}
LogLevel::Info => {
tracing::info!(
module_id = %module_id_str,
"{}",
message_str
);
}
LogLevel::Warn => {
tracing::warn!(
module_id = %module_id_str,
"{}",
message_str
);
}
LogLevel::Error => {
tracing::error!(
module_id = %module_id_str,
"{}",
message_str
);
}
}
return Ok(());
}
}
Ok(())
}
async fn process_request<A: NodeAPI + Send + Sync>(
&self,
request: &RequestMessage,
node_api: Arc<A>,
) -> Result<ResponseMessage, ModuleError> {
use crate::module::ipc::protocol::{RequestPayload, ResponsePayload};
match &request.payload {
RequestPayload::Handshake { .. } => {
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::HandshakeAck {
node_version: env!("CARGO_PKG_VERSION").to_string(),
},
))
}
RequestPayload::GetBlock { hash } => {
let block = node_api.get_block(hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Block(block),
))
}
RequestPayload::GetBlockHeader { hash } => {
let header = node_api.get_block_header(hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::BlockHeader(header),
))
}
RequestPayload::GetTransaction { hash } => {
let tx = node_api.get_transaction(hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Transaction(tx),
))
}
RequestPayload::HasTransaction { hash } => {
let exists = node_api.has_transaction(hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(exists),
))
}
RequestPayload::GetChainTip => {
let tip = node_api.get_chain_tip().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Hash(tip),
))
}
RequestPayload::GetBlockHeight => {
let height = node_api.get_block_height().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::U64(height),
))
}
RequestPayload::GetUtxo { outpoint } => {
let utxo = node_api.get_utxo(outpoint).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Utxo(utxo),
))
}
RequestPayload::SubscribeEvents { event_types } => {
if let Some(_event_mgr) = &self.event_manager {
debug!("Module subscribing to events: {:?}", event_types);
}
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::SubscribeAck,
))
}
RequestPayload::GetMempoolTransactions => {
let txs = node_api.get_mempool_transactions().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::MempoolTransactions(txs),
))
}
RequestPayload::GetMempoolTransaction { tx_hash } => {
let tx = node_api.get_mempool_transaction(tx_hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::MempoolTransaction(tx),
))
}
RequestPayload::GetMempoolSize => {
let size = node_api.get_mempool_size().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::MempoolSize(size),
))
}
RequestPayload::GetNetworkStats => {
let stats = node_api.get_network_stats().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::NetworkStats(stats),
))
}
RequestPayload::GetNetworkPeers => {
let peers = node_api.get_network_peers().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::NetworkPeers(peers),
))
}
RequestPayload::GetChainInfo => {
let info = node_api.get_chain_info().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::ChainInfo(info),
))
}
RequestPayload::GetBlockByHeight { height } => {
let block = node_api.get_block_by_height(*height).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::BlockByHeight(block),
))
}
RequestPayload::GetLightningNodeUrl => {
let url = node_api.get_lightning_node_url().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::LightningNodeUrl(url),
))
}
RequestPayload::GetLightningInfo => {
let info = node_api.get_lightning_info().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::LightningInfo(info),
))
}
RequestPayload::GetPaymentState { payment_id } => {
let state = node_api.get_payment_state(payment_id).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::PaymentState(state),
))
}
RequestPayload::CheckTransactionInMempool { tx_hash } => {
let exists = node_api.check_transaction_in_mempool(tx_hash).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::CheckTransactionInMempool(exists),
))
}
RequestPayload::GetFeeEstimate { target_blocks } => {
let fee_rate = node_api.get_fee_estimate(*target_blocks).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::FeeEstimate(fee_rate),
))
}
RequestPayload::ReadFile { path } => {
let data = node_api.read_file(path.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::FileData(data),
))
}
RequestPayload::WriteFile { path, data } => {
node_api.write_file(path.clone(), data.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::DeleteFile { path } => {
node_api.delete_file(path.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::ListDirectory { path } => {
let entries = node_api.list_directory(path.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::DirectoryListing(entries),
))
}
RequestPayload::CreateDirectory { path } => {
node_api.create_directory(path.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::GetFileMetadata { path } => {
let metadata = node_api.get_file_metadata(path.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::FileMetadata(metadata),
))
}
RequestPayload::RegisterRpcEndpoint {
method,
description,
} => {
node_api
.register_rpc_endpoint(method.clone(), description.clone())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::RpcEndpointRegistered,
))
}
RequestPayload::UnregisterRpcEndpoint { method } => {
node_api.unregister_rpc_endpoint(method).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::RpcEndpointUnregistered,
))
}
RequestPayload::RegisterTimer {
interval_seconds: _,
} => {
Err(ModuleError::OperationError(
module_error_msg::TIMER_REGISTRATION_REQUIRES_CALLBACK_IPC.to_string(),
))
}
RequestPayload::CancelTimer { timer_id: _ } => {
Err(ModuleError::OperationError(
module_error_msg::TIMER_CANCELLATION_NOT_SUPPORTED_IPC.to_string(),
))
}
RequestPayload::ScheduleTask { delay_seconds: _ } => {
Err(ModuleError::OperationError(
module_error_msg::TASK_SCHEDULING_REQUIRES_CALLBACK_IPC.to_string(),
))
}
RequestPayload::ReportMetric { metric } => {
node_api.report_metric(metric.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::MetricReported,
))
}
RequestPayload::GetModuleMetrics { module_id } => {
let metrics = node_api.get_module_metrics(module_id).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::ModuleMetrics(metrics),
))
}
RequestPayload::SendMeshPacketToPeer {
peer_addr,
packet_data,
} => {
node_api
.send_mesh_packet_to_peer(peer_addr.clone(), packet_data.clone())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::SendStratumV2MessageToPeer {
peer_addr,
message_data,
} => {
node_api
.send_stratum_v2_message_to_peer(peer_addr.clone(), message_data.clone())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::GetBlockTemplate {
rules,
coinbase_script,
coinbase_address,
} => {
let template = node_api
.get_block_template(
rules.clone(),
coinbase_script.clone(),
coinbase_address.clone(),
)
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::BlockTemplate(template),
))
}
RequestPayload::SubmitBlock { block } => {
let result = node_api.submit_block(block.clone()).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::SubmitBlockResult(result),
))
}
RequestPayload::MergeBlockServeDenylist { block_hashes } => {
node_api
.merge_block_serve_denylist(block_hashes.as_slice())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::BlockServeDenylistMerged,
))
}
RequestPayload::GetBlockServeDenylistSnapshot => {
let s = node_api.get_block_serve_denylist_snapshot().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::BlockServeDenylistSnapshot(s),
))
}
RequestPayload::ClearBlockServeDenylist => {
node_api.clear_block_serve_denylist().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::ReplaceBlockServeDenylist { block_hashes } => {
node_api
.replace_block_serve_denylist(block_hashes.as_slice())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::MergeTxServeDenylist { tx_hashes } => {
node_api
.merge_tx_serve_denylist(tx_hashes.as_slice())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::TxServeDenylistMerged,
))
}
RequestPayload::GetTxServeDenylistSnapshot => {
let s = node_api.get_tx_serve_denylist_snapshot().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::TxServeDenylistSnapshot(s),
))
}
RequestPayload::ClearTxServeDenylist => {
node_api.clear_tx_serve_denylist().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::ReplaceTxServeDenylist { tx_hashes } => {
node_api
.replace_tx_serve_denylist(tx_hashes.as_slice())
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::GetSyncStatus => {
let s = node_api.get_sync_status().await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::NodeSyncStatus(s),
))
}
RequestPayload::BanPeer {
peer_addr,
ban_duration_seconds,
} => {
node_api
.ban_peer(peer_addr.as_str(), *ban_duration_seconds)
.await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
RequestPayload::SetBlockServeMaintenanceMode { enabled } => {
node_api.set_block_serve_maintenance_mode(*enabled).await?;
Ok(ResponseMessage::success(
request.correlation_id,
ResponsePayload::Bool(true),
))
}
_ => Ok(ResponseMessage::error(
request.correlation_id,
format!("Unimplemented request payload: {:?}", request.payload),
)),
}
}
}
#[cfg(windows)]
fn path_to_pipe_name(path: &Path) -> String {
let stem = path
.file_stem()
.and_then(|s| s.to_str())
.unwrap_or("blvm-module");
let safe = stem.replace(|c: char| !c.is_alphanumeric(), "-");
format!(r"\\.\pipe\blvm-{}", safe)
}