Skip to main content

blvm_sdk/module/
runner.rs

1//! Module runner and invocation context.
2//!
3//! Provides `InvocationContext` for CLI/RPC handlers, `run_async` for sync-over-async CLI,
4//! and `run_module` for the unified connect/dispatch/loop lifecycle.
5
6use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8    CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19/// Run an async future from a sync context (e.g. CLI handler).
20/// Blocks the current thread and executes the future on the current runtime.
21/// Use when `#[command]` methods need to call async APIs.
22///
23/// When the future only returns `Ok(_)` with no error path, use `Ok::<_, String>(...)` to fix inference.
24pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
25where
26    F: std::future::Future<Output = Result<T, E>>,
27    E: std::fmt::Display,
28{
29    tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
30        .map_err(|e| ModuleError::Other(e.to_string()))
31}
32
33/// Context passed to CLI handlers for database and config access.
34///
35/// Uses `ModuleStorage` internally; `ctx.db()` returns the database interface for compatibility.
36/// When connected to a node, `node_api()` provides access to blockchain data (e.g. get_transaction).
37#[derive(Clone)]
38pub struct InvocationContext {
39    db: Arc<dyn Database>,
40    node_api: Option<Arc<dyn NodeAPI>>,
41}
42
43impl InvocationContext {
44    /// Create a new invocation context from module storage.
45    pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
46        let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
47        Self { db, node_api: None }
48    }
49
50    /// Create a new invocation context from a database (legacy; wraps in ModuleStorage).
51    pub fn new(db: Arc<dyn Database>) -> Self {
52        let storage = Arc::new(DatabaseStorageAdapter::new(db));
53        Self::from_storage(storage)
54    }
55
56    /// Create invocation context with NodeAPI for CLI commands that need blockchain access.
57    pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
58        let storage = Arc::new(DatabaseStorageAdapter::new(db));
59        Self {
60            db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
61            node_api: Some(node_api),
62        }
63    }
64
65    /// Get the module's database.
66    pub fn db(&self) -> &Arc<dyn Database> {
67        &self.db
68    }
69
70    /// Get NodeAPI when connected to node (for fetch-by-txid, etc.).
71    pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
72        self.node_api.clone()
73    }
74}
75
76/// Run a module with automatic connect, registration, event subscription, and dispatch.
77///
78/// Handles the full lifecycle: connect → register CLI/RPC/events → loop (invocations + events) → unload on disconnect.
79#[allow(clippy::too_many_arguments)] // Explicit wiring for embedders; splitting would obscure the lifecycle.
80pub async fn run_module<M, C, F, FE, Fut>(
81    socket_path: impl AsRef<Path>,
82    module_id: &str,
83    module_name: &str,
84    version: &str,
85    cli_spec: CliSpec,
86    rpc_methods: &[&str],
87    event_types: Vec<blvm_node::module::traits::EventType>,
88    dispatch: F,
89    on_event: FE,
90    module: M,
91    cli: C,
92    db: Arc<dyn Database>,
93) -> Result<(), ModuleError>
94where
95    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
96    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
97    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
98{
99    let socket_path = socket_path.as_ref().to_path_buf();
100
101    match ModuleIntegration::connect(
102        socket_path.clone(),
103        module_id.to_string(),
104        module_name.to_string(),
105        version.to_string(),
106        Some(cli_spec),
107    )
108    .await
109    {
110        Ok(mut integration) => {
111            info!("Connected to node");
112
113            let node_api = integration.node_api();
114            for method in rpc_methods {
115                node_api
116                    .register_rpc_endpoint((*method).to_string(), String::new())
117                    .await?;
118            }
119
120            integration.subscribe_events(event_types).await?;
121
122            let mut event_rx = integration.event_receiver();
123            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
124                ModuleError::IpcError(
125                    "Invocation receiver not available for this module integration".to_string(),
126                )
127            })?;
128            let ctx = InvocationContext::with_node_api(db, node_api);
129
130            loop {
131                tokio::select! {
132                    msg = event_rx.recv() => {
133                        if let Ok(ModuleMessage::Event(e)) = msg {
134                            let _ = on_event(e, &module, &ctx).await;
135                        }
136                    }
137                    inv = invocation_rx.recv() => {
138                        if let Some((invocation, result_tx)) = inv {
139                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
140                            let _ = result_tx.send(result);
141                        } else {
142                            info!("Invocation channel closed, module unloading");
143                            break;
144                        }
145                    }
146                    _ = sleep(Duration::from_secs(30)) => {
147                        info!("Module running");
148                    }
149                }
150            }
151        }
152        Err(e) => {
153            info!("Node not running, standalone mode: {}", e);
154            loop {
155                sleep(Duration::from_secs(5)).await;
156            }
157        }
158    }
159
160    Ok(())
161}
162
163/// Run a module where (module, cli) are created after connect.
164///
165/// Use when the module depends on NodeAPI (e.g. datum creates DatumServer with node_api).
166/// The setup receives (node_api, db, data_dir) and returns (module, cli).
167#[allow(clippy::too_many_arguments)]
168pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
169    socket_path: impl AsRef<Path>,
170    module_id: &str,
171    module_name: &str,
172    version: &str,
173    cli_spec: CliSpec,
174    rpc_methods: &[&str],
175    event_types: Vec<blvm_node::module::traits::EventType>,
176    dispatch: F,
177    on_event: FE,
178    setup: FSetup,
179    db: Arc<dyn Database>,
180    data_dir: &Path,
181) -> Result<(), ModuleError>
182where
183    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
184    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
185    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
186    FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
187    FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
188{
189    let socket_path = socket_path.as_ref().to_path_buf();
190
191    match ModuleIntegration::connect(
192        socket_path.clone(),
193        module_id.to_string(),
194        module_name.to_string(),
195        version.to_string(),
196        Some(cli_spec),
197    )
198    .await
199    {
200        Ok(mut integration) => {
201            info!("Connected to node");
202
203            let node_api = integration.node_api();
204            for method in rpc_methods {
205                node_api
206                    .register_rpc_endpoint((*method).to_string(), String::new())
207                    .await?;
208            }
209
210            integration.subscribe_events(event_types).await?;
211
212            let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
213            let module = Arc::new(module);
214
215            let mut event_rx = integration.event_receiver();
216            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
217                ModuleError::IpcError(
218                    "Invocation receiver not available for this module integration".to_string(),
219                )
220            })?;
221            let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
222
223            loop {
224                tokio::select! {
225                    msg = event_rx.recv() => {
226                        if let Ok(ModuleMessage::Event(e)) = msg {
227                            let _ = on_event(e, &*module, &ctx).await;
228                        }
229                    }
230                    inv = invocation_rx.recv() => {
231                        if let Some((invocation, result_tx)) = inv {
232                            let result = dispatch(invocation, ctx.clone(), &*module, &cli);
233                            let _ = result_tx.send(result);
234                        } else {
235                            info!("Invocation channel closed, module unloading");
236                            break;
237                        }
238                    }
239                    _ = sleep(Duration::from_secs(30)) => {
240                        info!("Module running");
241                    }
242                }
243            }
244        }
245        Err(e) => {
246            info!("Node not running, standalone mode: {}", e);
247            loop {
248                sleep(Duration::from_secs(5)).await;
249            }
250        }
251    }
252
253    Ok(())
254}
255
256/// Run a module with optional on_connect (setup) and on_tick (periodic) callbacks.
257#[allow(clippy::too_many_arguments)]
258pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
259    socket_path: impl AsRef<Path>,
260    module_id: &str,
261    module_name: &str,
262    version: &str,
263    cli_spec: CliSpec,
264    rpc_methods: &[&str],
265    event_types: Vec<blvm_node::module::traits::EventType>,
266    dispatch: F,
267    on_event: FE,
268    on_connect: Option<FConnect>,
269    on_tick: Option<FTick>,
270    module: M,
271    cli: C,
272    db: Arc<dyn Database>,
273) -> Result<(), ModuleError>
274where
275    F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
276    FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
277    Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
278    FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
279    FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
280    FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
281    FutTick: std::future::Future<Output = ()> + Send,
282{
283    let socket_path = socket_path.as_ref().to_path_buf();
284
285    match ModuleIntegration::connect(
286        socket_path.clone(),
287        module_id.to_string(),
288        module_name.to_string(),
289        version.to_string(),
290        Some(cli_spec),
291    )
292    .await
293    {
294        Ok(mut integration) => {
295            info!("Connected to node");
296
297            let node_api = integration.node_api();
298            for method in rpc_methods {
299                node_api
300                    .register_rpc_endpoint((*method).to_string(), String::new())
301                    .await?;
302            }
303
304            integration.subscribe_events(event_types).await?;
305
306            if let Some(ref connect) = on_connect {
307                connect(node_api.clone(), Arc::clone(&db)).await?;
308            }
309
310            let mut event_rx = integration.event_receiver();
311            let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
312                ModuleError::IpcError(
313                    "Invocation receiver not available for this module integration".to_string(),
314                )
315            })?;
316            let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
317
318            loop {
319                tokio::select! {
320                    msg = event_rx.recv() => {
321                        if let Ok(ModuleMessage::Event(e)) = msg {
322                            let _ = on_event(e, &module, &ctx).await;
323                        }
324                    }
325                    inv = invocation_rx.recv() => {
326                        if let Some((invocation, result_tx)) = inv {
327                            let result = dispatch(invocation, ctx.clone(), &module, &cli);
328                            let _ = result_tx.send(result);
329                        } else {
330                            info!("Invocation channel closed, module unloading");
331                            break;
332                        }
333                    }
334                    _ = sleep(Duration::from_secs(30)) => {
335                        if let Some(ref tick) = on_tick {
336                            tick(node_api.clone(), Arc::clone(&db)).await;
337                        }
338                        info!("Module running");
339                    }
340                }
341            }
342        }
343        Err(e) => {
344            info!("Node not running, standalone mode: {}", e);
345            loop {
346                sleep(Duration::from_secs(5)).await;
347            }
348        }
349    }
350
351    Ok(())
352}