1use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8 CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19fn is_overrideable_core_rpc_method(method: &str) -> bool {
22 blvm_node::rpc::methods::OVERRIDABLE_CORE_RPC_METHODS.contains(&method)
23}
24
25pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
31where
32 F: std::future::Future<Output = Result<T, E>>,
33 E: std::fmt::Display,
34{
35 tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
36 .map_err(|e| ModuleError::Other(e.to_string()))
37}
38
39#[derive(Clone)]
44pub struct InvocationContext {
45 db: Arc<dyn Database>,
46 node_api: Option<Arc<dyn NodeAPI>>,
47}
48
49impl InvocationContext {
50 pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
52 let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
53 Self { db, node_api: None }
54 }
55
56 pub fn new(db: Arc<dyn Database>) -> Self {
58 let storage = Arc::new(DatabaseStorageAdapter::new(db));
59 Self::from_storage(storage)
60 }
61
62 pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
64 let storage = Arc::new(DatabaseStorageAdapter::new(db));
65 Self {
66 db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
67 node_api: Some(node_api),
68 }
69 }
70
71 pub fn db(&self) -> &Arc<dyn Database> {
73 &self.db
74 }
75
76 pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
78 self.node_api.clone()
79 }
80}
81
82#[allow(clippy::too_many_arguments)] pub async fn run_module<M, C, F, FE, Fut>(
87 socket_path: impl AsRef<Path>,
88 module_id: &str,
89 module_name: &str,
90 version: &str,
91 cli_spec: CliSpec,
92 rpc_methods: &[&str],
93 event_types: Vec<blvm_node::module::traits::EventType>,
94 dispatch: F,
95 on_event: FE,
96 module: M,
97 cli: C,
98 db: Arc<dyn Database>,
99) -> Result<(), ModuleError>
100where
101 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
102 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
103 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
104{
105 let socket_path = socket_path.as_ref().to_path_buf();
106
107 match ModuleIntegration::connect(
108 socket_path.clone(),
109 module_id.to_string(),
110 module_name.to_string(),
111 version.to_string(),
112 Some(cli_spec),
113 )
114 .await
115 {
116 Ok(mut integration) => {
117 info!("Connected to node");
118
119 let node_api = integration.node_api();
120 for method in rpc_methods {
121 if is_overrideable_core_rpc_method(method) {
122 continue;
123 }
124 node_api
125 .register_rpc_endpoint((*method).to_string(), String::new())
126 .await?;
127 }
128
129 integration.subscribe_events(event_types).await?;
130
131 let mut event_rx = integration.event_receiver();
132 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
133 ModuleError::IpcError(
134 "Invocation receiver not available for this module integration".to_string(),
135 )
136 })?;
137 let ctx = InvocationContext::with_node_api(db, node_api);
138
139 loop {
140 tokio::select! {
141 msg = event_rx.recv() => {
142 if let Ok(ModuleMessage::Event(e)) = msg {
143 let _ = on_event(e, &module, &ctx).await;
144 }
145 }
146 inv = invocation_rx.recv() => {
147 if let Some((invocation, result_tx)) = inv {
148 let result = dispatch(invocation, ctx.clone(), &module, &cli);
149 let _ = result_tx.send(result);
150 } else {
151 info!("Invocation channel closed, module unloading");
152 break;
153 }
154 }
155 _ = sleep(Duration::from_secs(30)) => {
156 info!("Module running");
157 }
158 }
159 }
160 }
161 Err(e) => {
162 info!("Node not running, standalone mode: {}", e);
163 loop {
164 sleep(Duration::from_secs(5)).await;
165 }
166 }
167 }
168
169 Ok(())
170}
171
172#[allow(clippy::too_many_arguments)]
177pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
178 socket_path: impl AsRef<Path>,
179 module_id: &str,
180 module_name: &str,
181 version: &str,
182 cli_spec: CliSpec,
183 rpc_methods: &[&str],
184 event_types: Vec<blvm_node::module::traits::EventType>,
185 dispatch: F,
186 on_event: FE,
187 setup: FSetup,
188 db: Arc<dyn Database>,
189 data_dir: &Path,
190) -> Result<(), ModuleError>
191where
192 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
193 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
194 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
195 FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
196 FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
197{
198 let socket_path = socket_path.as_ref().to_path_buf();
199
200 match ModuleIntegration::connect(
201 socket_path.clone(),
202 module_id.to_string(),
203 module_name.to_string(),
204 version.to_string(),
205 Some(cli_spec),
206 )
207 .await
208 {
209 Ok(mut integration) => {
210 info!("Connected to node");
211
212 let node_api = integration.node_api();
213 for method in rpc_methods {
214 if is_overrideable_core_rpc_method(method) {
215 continue;
216 }
217 node_api
218 .register_rpc_endpoint((*method).to_string(), String::new())
219 .await?;
220 }
221
222 integration.subscribe_events(event_types).await?;
223
224 let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
225 let module = Arc::new(module);
226
227 let mut event_rx = integration.event_receiver();
228 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
229 ModuleError::IpcError(
230 "Invocation receiver not available for this module integration".to_string(),
231 )
232 })?;
233 let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
234
235 loop {
236 tokio::select! {
237 msg = event_rx.recv() => {
238 if let Ok(ModuleMessage::Event(e)) = msg {
239 let _ = on_event(e, &*module, &ctx).await;
240 }
241 }
242 inv = invocation_rx.recv() => {
243 if let Some((invocation, result_tx)) = inv {
244 let result = dispatch(invocation, ctx.clone(), &*module, &cli);
245 let _ = result_tx.send(result);
246 } else {
247 info!("Invocation channel closed, module unloading");
248 break;
249 }
250 }
251 _ = sleep(Duration::from_secs(30)) => {
252 info!("Module running");
253 }
254 }
255 }
256 }
257 Err(e) => {
258 info!("Node not running, standalone mode: {}", e);
259 loop {
260 sleep(Duration::from_secs(5)).await;
261 }
262 }
263 }
264
265 Ok(())
266}
267
268#[allow(clippy::too_many_arguments)]
270pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
271 socket_path: impl AsRef<Path>,
272 module_id: &str,
273 module_name: &str,
274 version: &str,
275 cli_spec: CliSpec,
276 rpc_methods: &[&str],
277 event_types: Vec<blvm_node::module::traits::EventType>,
278 dispatch: F,
279 on_event: FE,
280 on_connect: Option<FConnect>,
281 on_tick: Option<FTick>,
282 module: M,
283 cli: C,
284 db: Arc<dyn Database>,
285) -> Result<(), ModuleError>
286where
287 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
288 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
289 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
290 FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
291 FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
292 FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
293 FutTick: std::future::Future<Output = ()> + Send,
294{
295 let socket_path = socket_path.as_ref().to_path_buf();
296
297 match ModuleIntegration::connect(
298 socket_path.clone(),
299 module_id.to_string(),
300 module_name.to_string(),
301 version.to_string(),
302 Some(cli_spec),
303 )
304 .await
305 {
306 Ok(mut integration) => {
307 info!("Connected to node");
308
309 let node_api = integration.node_api();
310 for method in rpc_methods {
311 if is_overrideable_core_rpc_method(method) {
312 continue;
313 }
314 node_api
315 .register_rpc_endpoint((*method).to_string(), String::new())
316 .await?;
317 }
318
319 integration.subscribe_events(event_types).await?;
320
321 if let Some(ref connect) = on_connect {
322 connect(node_api.clone(), Arc::clone(&db)).await?;
323 }
324
325 let mut event_rx = integration.event_receiver();
326 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
327 ModuleError::IpcError(
328 "Invocation receiver not available for this module integration".to_string(),
329 )
330 })?;
331 let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
332
333 loop {
334 tokio::select! {
335 msg = event_rx.recv() => {
336 if let Ok(ModuleMessage::Event(e)) = msg {
337 let _ = on_event(e, &module, &ctx).await;
338 }
339 }
340 inv = invocation_rx.recv() => {
341 if let Some((invocation, result_tx)) = inv {
342 let result = dispatch(invocation, ctx.clone(), &module, &cli);
343 let _ = result_tx.send(result);
344 } else {
345 info!("Invocation channel closed, module unloading");
346 break;
347 }
348 }
349 _ = sleep(Duration::from_secs(30)) => {
350 if let Some(ref tick) = on_tick {
351 tick(node_api.clone(), Arc::clone(&db)).await;
352 }
353 info!("Module running");
354 }
355 }
356 }
357 }
358 Err(e) => {
359 info!("Node not running, standalone mode: {}", e);
360 loop {
361 sleep(Duration::from_secs(5)).await;
362 }
363 }
364 }
365
366 Ok(())
367}