Skip to main content

blvm_sdk/module/
runner.rs

1//! Module runner and invocation context.
2//!
3//! Provides `InvocationContext` for CLI/RPC handlers, `run_async` for sync-over-async CLI,
4//! and `run_module` for the unified connect/dispatch/loop lifecycle.
5
6use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8    CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19/// Core RPC allowlist: registered with [`NodeAPI::register_core_rpc_override`] in module setup,
20/// not via [`NodeAPI::register_rpc_endpoint`] (the RPC server rejects extension registration for them).
21fn is_overrideable_core_rpc_method(method: &str) -> bool {
22    blvm_node::rpc::methods::OVERRIDABLE_CORE_RPC_METHODS.contains(&method)
23}
24
25/// Run an async future from a sync context (e.g. CLI handler).
26/// Blocks the current thread and executes the future on the current runtime.
27/// Use when `#[command]` methods need to call async APIs.
28///
29/// When the future only returns `Ok(_)` with no error path, use `Ok::<_, String>(...)` to fix inference.
30pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
31where
32    F: std::future::Future<Output = Result<T, E>>,
33    E: std::fmt::Display,
34{
35    tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36        .map_err(|e| ModuleError::Other(e.to_string()))
37}
38
39/// Context passed to CLI handlers for database and config access.
40///
41/// Uses `ModuleStorage` internally; `ctx.db()` returns the database interface for compatibility.
42/// When connected to a node, `node_api()` provides access to blockchain data (e.g. get_transaction).
43#[derive(Clone)]
44pub struct InvocationContext {
45    db: Arc<dyn Database>,
46    node_api: Option<Arc<dyn NodeAPI>>,
47}
48
49impl InvocationContext {
50    /// Create a new invocation context from module storage.
51    pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
52        let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
53        Self { db, node_api: None }
54    }
55
56    /// Create a new invocation context from a database (legacy; wraps in ModuleStorage).
57    pub fn new(db: Arc<dyn Database>) -> Self {
58        let storage = Arc::new(DatabaseStorageAdapter::new(db));
59        Self::from_storage(storage)
60    }
61
62    /// Create invocation context with NodeAPI for CLI commands that need blockchain access.
63    pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
64        let storage = Arc::new(DatabaseStorageAdapter::new(db));
65        Self {
66            db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
67            node_api: Some(node_api),
68        }
69    }
70
71    /// Get the module's database.
72    pub fn db(&self) -> &Arc<dyn Database> {
73        &self.db
74    }
75
76    /// Get NodeAPI when connected to node (for fetch-by-txid, etc.).
77    pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
78        self.node_api.clone()
79    }
80}
81
82/// Run a module with automatic connect, registration, event subscription, and dispatch.
83///
84/// Handles the full lifecycle: connect → register CLI/RPC/events → loop (invocations + events) → unload on disconnect.
85#[allow(clippy::too_many_arguments)] // Explicit wiring for embedders; splitting would obscure the lifecycle.
86pub async fn run_module<M, C, F, FE, Fut>(
87    socket_path: impl AsRef<Path>,
88    module_id: &str,
89    module_name: &str,
90    version: &str,
91    cli_spec: CliSpec,
92    rpc_methods: &[&str],
93    event_types: Vec<blvm_node::module::traits::EventType>,
94    dispatch: F,
95    on_event: FE,
96    module: M,
97    cli: C,
98    db: Arc<dyn Database>,
99) -> Result<(), ModuleError>
100where
101    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
102    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
103    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
104{
105    let socket_path = socket_path.as_ref().to_path_buf();
106
107    match ModuleIntegration::connect(
108        socket_path.clone(),
109        module_id.to_string(),
110        module_name.to_string(),
111        version.to_string(),
112        Some(cli_spec),
113    )
114    .await
115    {
116        Ok(mut integration) => {
117            info!("Connected to node");
118
119            let node_api = integration.node_api();
120            for method in rpc_methods {
121                if is_overrideable_core_rpc_method(method) {
122                    continue;
123                }
124                node_api
125                    .register_rpc_endpoint((*method).to_string(), String::new())
126                    .await?;
127            }
128
129            integration.subscribe_events(event_types).await?;
130
131            let mut event_rx = integration.event_receiver();
132            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
133                ModuleError::IpcError(
134                    "Invocation receiver not available for this module integration".to_string(),
135                )
136            })?;
137            let ctx = InvocationContext::with_node_api(db, node_api);
138
139            loop {
140                tokio::select! {
141                    msg = event_rx.recv() => {
142                        if let Ok(ModuleMessage::Event(e)) = msg {
143                            let _ = on_event(e, &module, &ctx).await;
144                        }
145                    }
146                    inv = invocation_rx.recv() => {
147                        if let Some((invocation, result_tx)) = inv {
148                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
149                            let _ = result_tx.send(result);
150                        } else {
151                            info!("Invocation channel closed, module unloading");
152                            break;
153                        }
154                    }
155                    _ = sleep(Duration::from_secs(30)) => {
156                        info!("Module running");
157                    }
158                }
159            }
160        }
161        Err(e) => {
162            info!("Node not running, standalone mode: {}", e);
163            loop {
164                sleep(Duration::from_secs(5)).await;
165            }
166        }
167    }
168
169    Ok(())
170}
171
172/// Run a module where (module, cli) are created after connect.
173///
174/// Use when the module depends on NodeAPI (e.g. datum creates DatumServer with node_api).
175/// The setup receives (node_api, db, data_dir) and returns (module, cli).
176#[allow(clippy::too_many_arguments)]
177pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
178    socket_path: impl AsRef<Path>,
179    module_id: &str,
180    module_name: &str,
181    version: &str,
182    cli_spec: CliSpec,
183    rpc_methods: &[&str],
184    event_types: Vec<blvm_node::module::traits::EventType>,
185    dispatch: F,
186    on_event: FE,
187    setup: FSetup,
188    db: Arc<dyn Database>,
189    data_dir: &Path,
190) -> Result<(), ModuleError>
191where
192    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
193    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
194    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
195    FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
196    FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
197{
198    let socket_path = socket_path.as_ref().to_path_buf();
199
200    match ModuleIntegration::connect(
201        socket_path.clone(),
202        module_id.to_string(),
203        module_name.to_string(),
204        version.to_string(),
205        Some(cli_spec),
206    )
207    .await
208    {
209        Ok(mut integration) => {
210            info!("Connected to node");
211
212            let node_api = integration.node_api();
213            for method in rpc_methods {
214                if is_overrideable_core_rpc_method(method) {
215                    continue;
216                }
217                node_api
218                    .register_rpc_endpoint((*method).to_string(), String::new())
219                    .await?;
220            }
221
222            integration.subscribe_events(event_types).await?;
223
224            let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
225            let module = Arc::new(module);
226
227            let mut event_rx = integration.event_receiver();
228            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
229                ModuleError::IpcError(
230                    "Invocation receiver not available for this module integration".to_string(),
231                )
232            })?;
233            let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
234
235            loop {
236                tokio::select! {
237                    msg = event_rx.recv() => {
238                        if let Ok(ModuleMessage::Event(e)) = msg {
239                            let _ = on_event(e, &*module, &ctx).await;
240                        }
241                    }
242                    inv = invocation_rx.recv() => {
243                        if let Some((invocation, result_tx)) = inv {
244                            let result = dispatch(invocation, ctx.clone(), &*module, &cli);
245                            let _ = result_tx.send(result);
246                        } else {
247                            info!("Invocation channel closed, module unloading");
248                            break;
249                        }
250                    }
251                    _ = sleep(Duration::from_secs(30)) => {
252                        info!("Module running");
253                    }
254                }
255            }
256        }
257        Err(e) => {
258            info!("Node not running, standalone mode: {}", e);
259            loop {
260                sleep(Duration::from_secs(5)).await;
261            }
262        }
263    }
264
265    Ok(())
266}
267
268/// Run a module with optional on_connect (setup) and on_tick (periodic) callbacks.
269#[allow(clippy::too_many_arguments)]
270pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
271    socket_path: impl AsRef<Path>,
272    module_id: &str,
273    module_name: &str,
274    version: &str,
275    cli_spec: CliSpec,
276    rpc_methods: &[&str],
277    event_types: Vec<blvm_node::module::traits::EventType>,
278    dispatch: F,
279    on_event: FE,
280    on_connect: Option<FConnect>,
281    on_tick: Option<FTick>,
282    module: M,
283    cli: C,
284    db: Arc<dyn Database>,
285) -> Result<(), ModuleError>
286where
287    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
288    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
289    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
290    FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
291    FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
292    FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
293    FutTick: std::future::Future<Output = ()> + Send,
294{
295    let socket_path = socket_path.as_ref().to_path_buf();
296
297    match ModuleIntegration::connect(
298        socket_path.clone(),
299        module_id.to_string(),
300        module_name.to_string(),
301        version.to_string(),
302        Some(cli_spec),
303    )
304    .await
305    {
306        Ok(mut integration) => {
307            info!("Connected to node");
308
309            let node_api = integration.node_api();
310            for method in rpc_methods {
311                if is_overrideable_core_rpc_method(method) {
312                    continue;
313                }
314                node_api
315                    .register_rpc_endpoint((*method).to_string(), String::new())
316                    .await?;
317            }
318
319            integration.subscribe_events(event_types).await?;
320
321            if let Some(ref connect) = on_connect {
322                connect(node_api.clone(), Arc::clone(&db)).await?;
323            }
324
325            let mut event_rx = integration.event_receiver();
326            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
327                ModuleError::IpcError(
328                    "Invocation receiver not available for this module integration".to_string(),
329                )
330            })?;
331            let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
332
333            loop {
334                tokio::select! {
335                    msg = event_rx.recv() => {
336                        if let Ok(ModuleMessage::Event(e)) = msg {
337                            let _ = on_event(e, &module, &ctx).await;
338                        }
339                    }
340                    inv = invocation_rx.recv() => {
341                        if let Some((invocation, result_tx)) = inv {
342                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
343                            let _ = result_tx.send(result);
344                        } else {
345                            info!("Invocation channel closed, module unloading");
346                            break;
347                        }
348                    }
349                    _ = sleep(Duration::from_secs(30)) => {
350                        if let Some(ref tick) = on_tick {
351                            tick(node_api.clone(), Arc::clone(&db)).await;
352                        }
353                        info!("Module running");
354                    }
355                }
356            }
357        }
358        Err(e) => {
359            info!("Node not running, standalone mode: {}", e);
360            loop {
361                sleep(Duration::from_secs(5)).await;
362            }
363        }
364    }
365
366    Ok(())
367}