use blvm_node::module::integration::ModuleIntegration;
use blvm_node::module::ipc::protocol::{
CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
};
use blvm_node::module::traits::{ModuleError, NodeAPI};
use blvm_node::storage::database::Database;
use std::path::Path;
use std::sync::Arc;
use tokio::time::{sleep, Duration};
use tracing::info;
use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
where
F: std::future::Future<Output = Result<T, E>>,
E: std::fmt::Display,
{
tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
.map_err(|e| ModuleError::Other(e.to_string()))
}
#[derive(Clone)]
pub struct InvocationContext {
db: Arc<dyn Database>,
node_api: Option<Arc<dyn NodeAPI>>,
}
impl InvocationContext {
pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
Self { db, node_api: None }
}
pub fn new(db: Arc<dyn Database>) -> Self {
let storage = Arc::new(DatabaseStorageAdapter::new(db));
Self::from_storage(storage)
}
pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
let storage = Arc::new(DatabaseStorageAdapter::new(db));
Self {
db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
node_api: Some(node_api),
}
}
pub fn db(&self) -> &Arc<dyn Database> {
&self.db
}
pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
self.node_api.clone()
}
}
#[allow(clippy::too_many_arguments)] pub async fn run_module<M, C, F, FE, Fut>(
socket_path: impl AsRef<Path>,
module_id: &str,
module_name: &str,
version: &str,
cli_spec: CliSpec,
rpc_methods: &[&str],
event_types: Vec<blvm_node::module::traits::EventType>,
dispatch: F,
on_event: FE,
module: M,
cli: C,
db: Arc<dyn Database>,
) -> Result<(), ModuleError>
where
F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
{
let socket_path = socket_path.as_ref().to_path_buf();
match ModuleIntegration::connect(
socket_path.clone(),
module_id.to_string(),
module_name.to_string(),
version.to_string(),
Some(cli_spec),
)
.await
{
Ok(mut integration) => {
info!("Connected to node");
let node_api = integration.node_api();
for method in rpc_methods {
node_api
.register_rpc_endpoint((*method).to_string(), String::new())
.await?;
}
integration.subscribe_events(event_types).await?;
let mut event_rx = integration.event_receiver();
let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
ModuleError::IpcError(
"Invocation receiver not available for this module integration".to_string(),
)
})?;
let ctx = InvocationContext::with_node_api(db, node_api);
loop {
tokio::select! {
msg = event_rx.recv() => {
if let Ok(ModuleMessage::Event(e)) = msg {
let _ = on_event(e, &module, &ctx).await;
}
}
inv = invocation_rx.recv() => {
if let Some((invocation, result_tx)) = inv {
let result = dispatch(invocation, ctx.clone(), &module, &cli);
let _ = result_tx.send(result);
} else {
info!("Invocation channel closed, module unloading");
break;
}
}
_ = sleep(Duration::from_secs(30)) => {
info!("Module running");
}
}
}
}
Err(e) => {
info!("Node not running, standalone mode: {}", e);
loop {
sleep(Duration::from_secs(5)).await;
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
socket_path: impl AsRef<Path>,
module_id: &str,
module_name: &str,
version: &str,
cli_spec: CliSpec,
rpc_methods: &[&str],
event_types: Vec<blvm_node::module::traits::EventType>,
dispatch: F,
on_event: FE,
setup: FSetup,
db: Arc<dyn Database>,
data_dir: &Path,
) -> Result<(), ModuleError>
where
F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
{
let socket_path = socket_path.as_ref().to_path_buf();
match ModuleIntegration::connect(
socket_path.clone(),
module_id.to_string(),
module_name.to_string(),
version.to_string(),
Some(cli_spec),
)
.await
{
Ok(mut integration) => {
info!("Connected to node");
let node_api = integration.node_api();
for method in rpc_methods {
node_api
.register_rpc_endpoint((*method).to_string(), String::new())
.await?;
}
integration.subscribe_events(event_types).await?;
let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
let module = Arc::new(module);
let mut event_rx = integration.event_receiver();
let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
ModuleError::IpcError(
"Invocation receiver not available for this module integration".to_string(),
)
})?;
let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
loop {
tokio::select! {
msg = event_rx.recv() => {
if let Ok(ModuleMessage::Event(e)) = msg {
let _ = on_event(e, &*module, &ctx).await;
}
}
inv = invocation_rx.recv() => {
if let Some((invocation, result_tx)) = inv {
let result = dispatch(invocation, ctx.clone(), &*module, &cli);
let _ = result_tx.send(result);
} else {
info!("Invocation channel closed, module unloading");
break;
}
}
_ = sleep(Duration::from_secs(30)) => {
info!("Module running");
}
}
}
}
Err(e) => {
info!("Node not running, standalone mode: {}", e);
loop {
sleep(Duration::from_secs(5)).await;
}
}
}
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
socket_path: impl AsRef<Path>,
module_id: &str,
module_name: &str,
version: &str,
cli_spec: CliSpec,
rpc_methods: &[&str],
event_types: Vec<blvm_node::module::traits::EventType>,
dispatch: F,
on_event: FE,
on_connect: Option<FConnect>,
on_tick: Option<FTick>,
module: M,
cli: C,
db: Arc<dyn Database>,
) -> Result<(), ModuleError>
where
F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
FutTick: std::future::Future<Output = ()> + Send,
{
let socket_path = socket_path.as_ref().to_path_buf();
match ModuleIntegration::connect(
socket_path.clone(),
module_id.to_string(),
module_name.to_string(),
version.to_string(),
Some(cli_spec),
)
.await
{
Ok(mut integration) => {
info!("Connected to node");
let node_api = integration.node_api();
for method in rpc_methods {
node_api
.register_rpc_endpoint((*method).to_string(), String::new())
.await?;
}
integration.subscribe_events(event_types).await?;
if let Some(ref connect) = on_connect {
connect(node_api.clone(), Arc::clone(&db)).await?;
}
let mut event_rx = integration.event_receiver();
let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
ModuleError::IpcError(
"Invocation receiver not available for this module integration".to_string(),
)
})?;
let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
loop {
tokio::select! {
msg = event_rx.recv() => {
if let Ok(ModuleMessage::Event(e)) = msg {
let _ = on_event(e, &module, &ctx).await;
}
}
inv = invocation_rx.recv() => {
if let Some((invocation, result_tx)) = inv {
let result = dispatch(invocation, ctx.clone(), &module, &cli);
let _ = result_tx.send(result);
} else {
info!("Invocation channel closed, module unloading");
break;
}
}
_ = sleep(Duration::from_secs(30)) => {
if let Some(ref tick) = on_tick {
tick(node_api.clone(), Arc::clone(&db)).await;
}
info!("Module running");
}
}
}
}
Err(e) => {
info!("Node not running, standalone mode: {}", e);
loop {
sleep(Duration::from_secs(5)).await;
}
}
}
Ok(())
}