1use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8 CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19fn is_overrideable_core_rpc_method(method: &str) -> bool {
22 blvm_node::rpc::methods::OVERRIDABLE_CORE_RPC_METHODS.contains(&method)
23}
24
25pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
31where
32 F: std::future::Future<Output = Result<T, E>>,
33 E: std::fmt::Display,
34{
35 tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36 .map_err(|e| ModuleError::Other(e.to_string()))
37}
38
39#[derive(Clone)]
44pub struct InvocationContext {
45 db: Arc<dyn Database>,
46 node_api: Option<Arc<dyn NodeAPI>>,
47}
48
49impl InvocationContext {
50 pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
52 let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
53 Self { db, node_api: None }
54 }
55
56 pub fn new(db: Arc<dyn Database>) -> Self {
58 let storage = Arc::new(DatabaseStorageAdapter::new(db));
59 Self::from_storage(storage)
60 }
61
62 pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
64 let storage = Arc::new(DatabaseStorageAdapter::new(db));
65 Self {
66 db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
67 node_api: Some(node_api),
68 }
69 }
70
71 pub fn db(&self) -> &Arc<dyn Database> {
73 &self.db
74 }
75
76 pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
78 self.node_api.clone()
79 }
80}
81
82#[allow(clippy::too_many_arguments)] pub async fn run_module<M, C, F, FE, Fut>(
87 socket_path: impl AsRef<Path>,
88 module_id: &str,
89 module_name: &str,
90 version: &str,
91 cli_spec: CliSpec,
92 rpc_methods: &[&str],
93 event_types: Vec<blvm_node::module::traits::EventType>,
94 dispatch: F,
95 on_event: FE,
96 module: M,
97 cli: C,
98 db: Arc<dyn Database>,
99) -> Result<(), ModuleError>
100where
101 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
102 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
103 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
104{
105 let socket_path = socket_path.as_ref().to_path_buf();
106
107 match ModuleIntegration::connect(
108 socket_path.clone(),
109 module_id.to_string(),
110 module_name.to_string(),
111 version.to_string(),
112 Some(cli_spec),
113 )
114 .await
115 {
116 Ok(mut integration) => {
117 info!("Connected to node");
118
119 let node_api = integration.node_api();
120 for method in rpc_methods {
121 if is_overrideable_core_rpc_method(method) {
122 continue;
123 }
124 node_api
125 .register_rpc_endpoint((*method).to_string(), String::new())
126 .await?;
127 }
128
129 integration.subscribe_events(event_types).await?;
130
131 let mut event_rx = integration.event_receiver();
132 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
133 ModuleError::IpcError(
134 "Invocation receiver not available for this module integration".to_string(),
135 )
136 })?;
137 let ctx = InvocationContext::with_node_api(db, node_api);
138
139 loop {
140 tokio::select! {
141 msg = event_rx.recv() => {
142 if let Ok(ModuleMessage::Event(e)) = msg {
143 let _ = on_event(e, &module, &ctx).await;
144 }
145 }
146 inv = invocation_rx.recv() => {
147 if let Some((invocation, result_tx)) = inv {
148 let result = dispatch(invocation, ctx.clone(), &module, &cli);
149 let _ = result_tx.send(result);
150 } else {
151 info!("Invocation channel closed, module unloading");
152 break;
153 }
154 }
155 _ = sleep(Duration::from_secs(30)) => {
156 info!("Module running");
157 }
158 }
159 }
160 }
161 Err(e) => {
162 info!("Node not running, standalone mode: {}", e);
163 loop {
164 sleep(Duration::from_secs(5)).await;
165 }
166 }
167 }
168
169 Ok(())
170}
171
172#[allow(clippy::too_many_arguments)]
177pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
178 socket_path: impl AsRef<Path>,
179 module_id: &str,
180 module_name: &str,
181 version: &str,
182 cli_spec: CliSpec,
183 rpc_methods: &[&str],
184 event_types: Vec<blvm_node::module::traits::EventType>,
185 dispatch: F,
186 on_event: FE,
187 setup: FSetup,
188 db: Arc<dyn Database>,
189 data_dir: &Path,
190) -> Result<(), ModuleError>
191where
192 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
193 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
194 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
195 FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
196 FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
197{
198 let socket_path = socket_path.as_ref().to_path_buf();
199
200 match ModuleIntegration::connect(
201 socket_path.clone(),
202 module_id.to_string(),
203 module_name.to_string(),
204 version.to_string(),
205 Some(cli_spec),
206 )
207 .await
208 {
209 Ok(mut integration) => {
210 info!("Connected to node");
211
212 let node_api = integration.node_api();
213 for method in rpc_methods {
214 if is_overrideable_core_rpc_method(method) {
215 continue;
216 }
217 node_api
218 .register_rpc_endpoint((*method).to_string(), String::new())
219 .await?;
220 }
221
222 integration.subscribe_events(event_types).await?;
223
224 let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
225 let module = Arc::new(module);
226
227 let mut event_rx = integration.event_receiver();
228 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
229 ModuleError::IpcError(
230 "Invocation receiver not available for this module integration".to_string(),
231 )
232 })?;
233 let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
234
235 loop {
236 tokio::select! {
237 msg = event_rx.recv() => {
238 if let Ok(ModuleMessage::Event(e)) = msg {
239 let _ = on_event(e, &*module, &ctx).await;
240 }
241 }
242 inv = invocation_rx.recv() => {
243 if let Some((invocation, result_tx)) = inv {
244 let result = dispatch(invocation, ctx.clone(), &*module, &cli);
245 let _ = result_tx.send(result);
246 } else {
247 info!("Invocation channel closed, module unloading");
248 break;
249 }
250 }
251 _ = sleep(Duration::from_secs(30)) => {
252 info!("Module running");
253 }
254 }
255 }
256 }
257 Err(e) => {
258 info!("Node not running, standalone mode: {}", e);
259 loop {
260 sleep(Duration::from_secs(5)).await;
261 }
262 }
263 }
264
265 Ok(())
266}
267
268#[allow(clippy::too_many_arguments)]
270pub async fn run_module_with_setup_and_api<M, C, F, FE, Fut, FSetup, FutSetup>(
271 socket_path: impl AsRef<Path>,
272 module_id: &str,
273 module_name: &str,
274 version: &str,
275 cli_spec: CliSpec,
276 rpc_methods: &[&str],
277 event_types: Vec<blvm_node::module::traits::EventType>,
278 dispatch: F,
279 on_event: FE,
280 setup: FSetup,
281 db: Arc<dyn Database>,
282 data_dir: &Path,
283) -> Result<(), ModuleError>
284where
285 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
286 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
287 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
288 FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
289 FutSetup: std::future::Future<
290 Output = Result<
291 (
292 M,
293 C,
294 Arc<dyn blvm_node::module::inter_module::api::ModuleAPI>,
295 ),
296 ModuleError,
297 >,
298 > + Send,
299{
300 use blvm_node::module::ipc::protocol::{InvocationResultPayload, InvocationType};
301
302 let socket_path = socket_path.as_ref().to_path_buf();
303
304 match ModuleIntegration::connect(
305 socket_path.clone(),
306 module_id.to_string(),
307 module_name.to_string(),
308 version.to_string(),
309 Some(cli_spec),
310 )
311 .await
312 {
313 Ok(mut integration) => {
314 info!("Connected to node");
315
316 let node_api = integration.node_api();
317 for method in rpc_methods {
318 if is_overrideable_core_rpc_method(method) {
319 continue;
320 }
321 node_api
322 .register_rpc_endpoint((*method).to_string(), String::new())
323 .await?;
324 }
325
326 integration.subscribe_events(event_types).await?;
327
328 let (module, cli, module_api) =
329 setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
330 let module = Arc::new(module);
331 let module_api = Arc::clone(&module_api);
332
333 if let Err(e) = node_api.register_module_api(module_api.clone()).await {
334 return Err(ModuleError::Other(format!(
335 "Failed to register module API descriptor: {e}"
336 )));
337 }
338 info!("Module API descriptor registered with node");
339
340 let mut event_rx = integration.event_receiver();
341 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
342 ModuleError::IpcError(
343 "Invocation receiver not available for this module integration".to_string(),
344 )
345 })?;
346 let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
347
348 loop {
349 tokio::select! {
350 msg = event_rx.recv() => {
351 if let Ok(ModuleMessage::Event(e)) = msg {
352 let _ = on_event(e, &*module, &ctx).await;
353 }
354 }
355 inv = invocation_rx.recv() => {
356 if let Some((invocation, result_tx)) = inv {
357 let result = match &invocation.invocation_type {
358 InvocationType::ModuleApi { method, params, caller_module_id } => {
359 match module_api
360 .handle_request(method, params, caller_module_id)
361 .await
362 {
363 Ok(data) => InvocationResultMessage {
364 correlation_id: invocation.correlation_id,
365 success: true,
366 payload: Some(InvocationResultPayload::ModuleApi(data)),
367 error: None,
368 },
369 Err(e) => InvocationResultMessage {
370 correlation_id: invocation.correlation_id,
371 success: false,
372 payload: None,
373 error: Some(e.to_string()),
374 },
375 }
376 }
377 _ => dispatch(invocation, ctx.clone(), &*module, &cli),
378 };
379 let _ = result_tx.send(result);
380 } else {
381 info!("Invocation channel closed, module unloading");
382 break;
383 }
384 }
385 _ = sleep(Duration::from_secs(30)) => {
386 info!("Module running");
387 }
388 }
389 }
390 }
391 Err(e) => {
392 info!("Node not running, standalone mode: {}", e);
393 loop {
394 sleep(Duration::from_secs(5)).await;
395 }
396 }
397 }
398
399 Ok(())
400}
401
402#[allow(clippy::too_many_arguments)]
404pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
405 socket_path: impl AsRef<Path>,
406 module_id: &str,
407 module_name: &str,
408 version: &str,
409 cli_spec: CliSpec,
410 rpc_methods: &[&str],
411 event_types: Vec<blvm_node::module::traits::EventType>,
412 dispatch: F,
413 on_event: FE,
414 on_connect: Option<FConnect>,
415 on_tick: Option<FTick>,
416 module: M,
417 cli: C,
418 db: Arc<dyn Database>,
419) -> Result<(), ModuleError>
420where
421 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
422 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
423 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
424 FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
425 FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
426 FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
427 FutTick: std::future::Future<Output = ()> + Send,
428{
429 let socket_path = socket_path.as_ref().to_path_buf();
430
431 match ModuleIntegration::connect(
432 socket_path.clone(),
433 module_id.to_string(),
434 module_name.to_string(),
435 version.to_string(),
436 Some(cli_spec),
437 )
438 .await
439 {
440 Ok(mut integration) => {
441 info!("Connected to node");
442
443 let node_api = integration.node_api();
444 for method in rpc_methods {
445 if is_overrideable_core_rpc_method(method) {
446 continue;
447 }
448 node_api
449 .register_rpc_endpoint((*method).to_string(), String::new())
450 .await?;
451 }
452
453 integration.subscribe_events(event_types).await?;
454
455 if let Some(ref connect) = on_connect {
456 connect(node_api.clone(), Arc::clone(&db)).await?;
457 }
458
459 let mut event_rx = integration.event_receiver();
460 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
461 ModuleError::IpcError(
462 "Invocation receiver not available for this module integration".to_string(),
463 )
464 })?;
465 let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
466
467 loop {
468 tokio::select! {
469 msg = event_rx.recv() => {
470 if let Ok(ModuleMessage::Event(e)) = msg {
471 let _ = on_event(e, &module, &ctx).await;
472 }
473 }
474 inv = invocation_rx.recv() => {
475 if let Some((invocation, result_tx)) = inv {
476 let result = dispatch(invocation, ctx.clone(), &module, &cli);
477 let _ = result_tx.send(result);
478 } else {
479 info!("Invocation channel closed, module unloading");
480 break;
481 }
482 }
483 _ = sleep(Duration::from_secs(30)) => {
484 if let Some(ref tick) = on_tick {
485 tick(node_api.clone(), Arc::clone(&db)).await;
486 }
487 info!("Module running");
488 }
489 }
490 }
491 }
492 Err(e) => {
493 info!("Node not running, standalone mode: {}", e);
494 loop {
495 sleep(Duration::from_secs(5)).await;
496 }
497 }
498 }
499
500 Ok(())
501}