Skip to main content

blvm_sdk/module/
runner.rs

1//! Module runner and invocation context.
2//!
3//! Provides `InvocationContext` for CLI/RPC handlers, `run_async` for sync-over-async CLI,
4//! and `run_module` for the unified connect/dispatch/loop lifecycle.
5
6use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8    CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19/// Core RPC allowlist: registered with [`NodeAPI::register_core_rpc_override`] in module setup,
20/// not via [`NodeAPI::register_rpc_endpoint`] (the RPC server rejects extension registration for them).
21fn is_overrideable_core_rpc_method(method: &str) -> bool {
22    blvm_node::rpc::methods::OVERRIDABLE_CORE_RPC_METHODS.contains(&method)
23}
24
25/// Run an async future from a sync context (e.g. CLI handler).
26/// Blocks the current thread and executes the future on the current runtime.
27/// Use when `#[command]` methods need to call async APIs.
28///
29/// When the future only returns `Ok(_)` with no error path, use `Ok::<_, String>(...)` to fix inference.
30pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
31where
32    F: std::future::Future<Output = Result<T, E>>,
33    E: std::fmt::Display,
34{
35    tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36        .map_err(|e| ModuleError::Other(e.to_string()))
37}
38
39/// Context passed to CLI handlers for database and config access.
40///
41/// Uses `ModuleStorage` internally; `ctx.db()` returns the database interface for compatibility.
42/// When connected to a node, `node_api()` provides access to blockchain data (e.g. get_transaction).
43#[derive(Clone)]
44pub struct InvocationContext {
45    db: Arc<dyn Database>,
46    node_api: Option<Arc<dyn NodeAPI>>,
47}
48
49impl InvocationContext {
50    /// Create a new invocation context from module storage.
51    pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
52        let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
53        Self { db, node_api: None }
54    }
55
56    /// Create a new invocation context from a database (legacy; wraps in ModuleStorage).
57    pub fn new(db: Arc<dyn Database>) -> Self {
58        let storage = Arc::new(DatabaseStorageAdapter::new(db));
59        Self::from_storage(storage)
60    }
61
62    /// Create invocation context with NodeAPI for CLI commands that need blockchain access.
63    pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
64        let storage = Arc::new(DatabaseStorageAdapter::new(db));
65        Self {
66            db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
67            node_api: Some(node_api),
68        }
69    }
70
71    /// Get the module's database.
72    pub fn db(&self) -> &Arc<dyn Database> {
73        &self.db
74    }
75
76    /// Get NodeAPI when connected to node (for fetch-by-txid, etc.).
77    pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
78        self.node_api.clone()
79    }
80}
81
82/// Run a module with automatic connect, registration, event subscription, and dispatch.
83///
84/// Handles the full lifecycle: connect → register CLI/RPC/events → loop (invocations + events) → unload on disconnect.
85#[allow(clippy::too_many_arguments)] // Explicit wiring for embedders; splitting would obscure the lifecycle.
86pub async fn run_module<M, C, F, FE, Fut>(
87    socket_path: impl AsRef<Path>,
88    module_id: &str,
89    module_name: &str,
90    version: &str,
91    cli_spec: CliSpec,
92    rpc_methods: &[&str],
93    event_types: Vec<blvm_node::module::traits::EventType>,
94    dispatch: F,
95    on_event: FE,
96    module: M,
97    cli: C,
98    db: Arc<dyn Database>,
99) -> Result<(), ModuleError>
100where
101    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
102    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
103    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
104{
105    let socket_path = socket_path.as_ref().to_path_buf();
106
107    match ModuleIntegration::connect(
108        socket_path.clone(),
109        module_id.to_string(),
110        module_name.to_string(),
111        version.to_string(),
112        Some(cli_spec),
113    )
114    .await
115    {
116        Ok(mut integration) => {
117            info!("Connected to node");
118
119            let node_api = integration.node_api();
120            for method in rpc_methods {
121                if is_overrideable_core_rpc_method(method) {
122                    continue;
123                }
124                node_api
125                    .register_rpc_endpoint((*method).to_string(), String::new())
126                    .await?;
127            }
128
129            integration.subscribe_events(event_types).await?;
130
131            let mut event_rx = integration.event_receiver();
132            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
133                ModuleError::IpcError(
134                    "Invocation receiver not available for this module integration".to_string(),
135                )
136            })?;
137            let ctx = InvocationContext::with_node_api(db, node_api);
138
139            loop {
140                tokio::select! {
141                    msg = event_rx.recv() => {
142                        if let Ok(ModuleMessage::Event(e)) = msg {
143                            let _ = on_event(e, &module, &ctx).await;
144                        }
145                    }
146                    inv = invocation_rx.recv() => {
147                        if let Some((invocation, result_tx)) = inv {
148                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
149                            let _ = result_tx.send(result);
150                        } else {
151                            info!("Invocation channel closed, module unloading");
152                            break;
153                        }
154                    }
155                    _ = sleep(Duration::from_secs(30)) => {
156                        info!("Module running");
157                    }
158                }
159            }
160        }
161        Err(e) => {
162            info!("Node not running, standalone mode: {}", e);
163            loop {
164                sleep(Duration::from_secs(5)).await;
165            }
166        }
167    }
168
169    Ok(())
170}
171
172/// Run a module where (module, cli) are created after connect.
173///
174/// Use when the module depends on NodeAPI (e.g. datum creates DatumServer with node_api).
175/// The setup receives (node_api, db, data_dir) and returns (module, cli).
176#[allow(clippy::too_many_arguments)]
177pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
178    socket_path: impl AsRef<Path>,
179    module_id: &str,
180    module_name: &str,
181    version: &str,
182    cli_spec: CliSpec,
183    rpc_methods: &[&str],
184    event_types: Vec<blvm_node::module::traits::EventType>,
185    dispatch: F,
186    on_event: FE,
187    setup: FSetup,
188    db: Arc<dyn Database>,
189    data_dir: &Path,
190) -> Result<(), ModuleError>
191where
192    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
193    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
194    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
195    FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
196    FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
197{
198    let socket_path = socket_path.as_ref().to_path_buf();
199
200    match ModuleIntegration::connect(
201        socket_path.clone(),
202        module_id.to_string(),
203        module_name.to_string(),
204        version.to_string(),
205        Some(cli_spec),
206    )
207    .await
208    {
209        Ok(mut integration) => {
210            info!("Connected to node");
211
212            let node_api = integration.node_api();
213            for method in rpc_methods {
214                if is_overrideable_core_rpc_method(method) {
215                    continue;
216                }
217                node_api
218                    .register_rpc_endpoint((*method).to_string(), String::new())
219                    .await?;
220            }
221
222            integration.subscribe_events(event_types).await?;
223
224            let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
225            let module = Arc::new(module);
226
227            let mut event_rx = integration.event_receiver();
228            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
229                ModuleError::IpcError(
230                    "Invocation receiver not available for this module integration".to_string(),
231                )
232            })?;
233            let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
234
235            loop {
236                tokio::select! {
237                    msg = event_rx.recv() => {
238                        if let Ok(ModuleMessage::Event(e)) = msg {
239                            let _ = on_event(e, &*module, &ctx).await;
240                        }
241                    }
242                    inv = invocation_rx.recv() => {
243                        if let Some((invocation, result_tx)) = inv {
244                            let result = dispatch(invocation, ctx.clone(), &*module, &cli);
245                            let _ = result_tx.send(result);
246                        } else {
247                            info!("Invocation channel closed, module unloading");
248                            break;
249                        }
250                    }
251                    _ = sleep(Duration::from_secs(30)) => {
252                        info!("Module running");
253                    }
254                }
255            }
256        }
257        Err(e) => {
258            info!("Node not running, standalone mode: {}", e);
259            loop {
260                sleep(Duration::from_secs(5)).await;
261            }
262        }
263    }
264
265    Ok(())
266}
267
268/// Like [`run_module_with_setup`], but setup also returns a [`ModuleAPI`] for IPC forwarding.
269#[allow(clippy::too_many_arguments)]
270pub async fn run_module_with_setup_and_api<M, C, F, FE, Fut, FSetup, FutSetup>(
271    socket_path: impl AsRef<Path>,
272    module_id: &str,
273    module_name: &str,
274    version: &str,
275    cli_spec: CliSpec,
276    rpc_methods: &[&str],
277    event_types: Vec<blvm_node::module::traits::EventType>,
278    dispatch: F,
279    on_event: FE,
280    setup: FSetup,
281    db: Arc<dyn Database>,
282    data_dir: &Path,
283) -> Result<(), ModuleError>
284where
285    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
286    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
287    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
288    FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
289    FutSetup: std::future::Future<
290            Output = Result<
291                (
292                    M,
293                    C,
294                    Arc<dyn blvm_node::module::inter_module::api::ModuleAPI>,
295                ),
296                ModuleError,
297            >,
298        > + Send,
299{
300    use blvm_node::module::ipc::protocol::{InvocationResultPayload, InvocationType};
301
302    let socket_path = socket_path.as_ref().to_path_buf();
303
304    match ModuleIntegration::connect(
305        socket_path.clone(),
306        module_id.to_string(),
307        module_name.to_string(),
308        version.to_string(),
309        Some(cli_spec),
310    )
311    .await
312    {
313        Ok(mut integration) => {
314            info!("Connected to node");
315
316            let node_api = integration.node_api();
317            for method in rpc_methods {
318                if is_overrideable_core_rpc_method(method) {
319                    continue;
320                }
321                node_api
322                    .register_rpc_endpoint((*method).to_string(), String::new())
323                    .await?;
324            }
325
326            integration.subscribe_events(event_types).await?;
327
328            let (module, cli, module_api) =
329                setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
330            let module = Arc::new(module);
331            let module_api = Arc::clone(&module_api);
332
333            if let Err(e) = node_api.register_module_api(module_api.clone()).await {
334                return Err(ModuleError::Other(format!(
335                    "Failed to register module API descriptor: {e}"
336                )));
337            }
338            info!("Module API descriptor registered with node");
339
340            let mut event_rx = integration.event_receiver();
341            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
342                ModuleError::IpcError(
343                    "Invocation receiver not available for this module integration".to_string(),
344                )
345            })?;
346            let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
347
348            loop {
349                tokio::select! {
350                    msg = event_rx.recv() => {
351                        if let Ok(ModuleMessage::Event(e)) = msg {
352                            let _ = on_event(e, &*module, &ctx).await;
353                        }
354                    }
355                    inv = invocation_rx.recv() => {
356                        if let Some((invocation, result_tx)) = inv {
357                            let result = match &invocation.invocation_type {
358                                InvocationType::ModuleApi { method, params, caller_module_id } => {
359                                    match module_api
360                                        .handle_request(method, params, caller_module_id)
361                                        .await
362                                    {
363                                        Ok(data) => InvocationResultMessage {
364                                            correlation_id: invocation.correlation_id,
365                                            success: true,
366                                            payload: Some(InvocationResultPayload::ModuleApi(data)),
367                                            error: None,
368                                        },
369                                        Err(e) => InvocationResultMessage {
370                                            correlation_id: invocation.correlation_id,
371                                            success: false,
372                                            payload: None,
373                                            error: Some(e.to_string()),
374                                        },
375                                    }
376                                }
377                                _ => dispatch(invocation, ctx.clone(), &*module, &cli),
378                            };
379                            let _ = result_tx.send(result);
380                        } else {
381                            info!("Invocation channel closed, module unloading");
382                            break;
383                        }
384                    }
385                    _ = sleep(Duration::from_secs(30)) => {
386                        info!("Module running");
387                    }
388                }
389            }
390        }
391        Err(e) => {
392            info!("Node not running, standalone mode: {}", e);
393            loop {
394                sleep(Duration::from_secs(5)).await;
395            }
396        }
397    }
398
399    Ok(())
400}
401
402/// Run a module with optional on_connect (setup) and on_tick (periodic) callbacks.
403#[allow(clippy::too_many_arguments)]
404pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
405    socket_path: impl AsRef<Path>,
406    module_id: &str,
407    module_name: &str,
408    version: &str,
409    cli_spec: CliSpec,
410    rpc_methods: &[&str],
411    event_types: Vec<blvm_node::module::traits::EventType>,
412    dispatch: F,
413    on_event: FE,
414    on_connect: Option<FConnect>,
415    on_tick: Option<FTick>,
416    module: M,
417    cli: C,
418    db: Arc<dyn Database>,
419) -> Result<(), ModuleError>
420where
421    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
422    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
423    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
424    FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
425    FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
426    FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
427    FutTick: std::future::Future<Output = ()> + Send,
428{
429    let socket_path = socket_path.as_ref().to_path_buf();
430
431    match ModuleIntegration::connect(
432        socket_path.clone(),
433        module_id.to_string(),
434        module_name.to_string(),
435        version.to_string(),
436        Some(cli_spec),
437    )
438    .await
439    {
440        Ok(mut integration) => {
441            info!("Connected to node");
442
443            let node_api = integration.node_api();
444            for method in rpc_methods {
445                if is_overrideable_core_rpc_method(method) {
446                    continue;
447                }
448                node_api
449                    .register_rpc_endpoint((*method).to_string(), String::new())
450                    .await?;
451            }
452
453            integration.subscribe_events(event_types).await?;
454
455            if let Some(ref connect) = on_connect {
456                connect(node_api.clone(), Arc::clone(&db)).await?;
457            }
458
459            let mut event_rx = integration.event_receiver();
460            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
461                ModuleError::IpcError(
462                    "Invocation receiver not available for this module integration".to_string(),
463                )
464            })?;
465            let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
466
467            loop {
468                tokio::select! {
469                    msg = event_rx.recv() => {
470                        if let Ok(ModuleMessage::Event(e)) = msg {
471                            let _ = on_event(e, &module, &ctx).await;
472                        }
473                    }
474                    inv = invocation_rx.recv() => {
475                        if let Some((invocation, result_tx)) = inv {
476                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
477                            let _ = result_tx.send(result);
478                        } else {
479                            info!("Invocation channel closed, module unloading");
480                            break;
481                        }
482                    }
483                    _ = sleep(Duration::from_secs(30)) => {
484                        if let Some(ref tick) = on_tick {
485                            tick(node_api.clone(), Arc::clone(&db)).await;
486                        }
487                        info!("Module running");
488                    }
489                }
490            }
491        }
492        Err(e) => {
493            info!("Node not running, standalone mode: {}", e);
494            loop {
495                sleep(Duration::from_secs(5)).await;
496            }
497        }
498    }
499
500    Ok(())
501}