1use blvm_node::module::integration::ModuleIntegration;
7use blvm_node::module::ipc::protocol::{
8 CliSpec, InvocationMessage, InvocationResultMessage, ModuleMessage,
9};
10use blvm_node::module::traits::{ModuleError, NodeAPI};
11use blvm_node::storage::database::Database;
12use std::path::Path;
13use std::sync::Arc;
14use tokio::time::{sleep, Duration};
15use tracing::info;
16
17use crate::module::storage::{DatabaseStorageAdapter, ModuleStorage, ModuleStorageDatabaseBridge};
18
19pub fn run_async<F, T, E>(f: F) -> Result<T, ModuleError>
25where
26 F: std::future::Future<Output = Result<T, E>>,
27 E: std::fmt::Display,
28{
29 tokio::task::block_in_place(|| tokio::runtime::Handle::current().block_on(f))
30 .map_err(|e| ModuleError::Other(e.to_string()))
31}
32
33#[derive(Clone)]
38pub struct InvocationContext {
39 db: Arc<dyn Database>,
40 node_api: Option<Arc<dyn NodeAPI>>,
41}
42
43impl InvocationContext {
44 pub fn from_storage(storage: Arc<dyn ModuleStorage>) -> Self {
46 let db = Arc::new(ModuleStorageDatabaseBridge::new(storage));
47 Self { db, node_api: None }
48 }
49
50 pub fn new(db: Arc<dyn Database>) -> Self {
52 let storage = Arc::new(DatabaseStorageAdapter::new(db));
53 Self::from_storage(storage)
54 }
55
56 pub fn with_node_api(db: Arc<dyn Database>, node_api: Arc<dyn NodeAPI>) -> Self {
58 let storage = Arc::new(DatabaseStorageAdapter::new(db));
59 Self {
60 db: Arc::new(ModuleStorageDatabaseBridge::new(storage)),
61 node_api: Some(node_api),
62 }
63 }
64
65 pub fn db(&self) -> &Arc<dyn Database> {
67 &self.db
68 }
69
70 pub fn node_api(&self) -> Option<Arc<dyn NodeAPI>> {
72 self.node_api.clone()
73 }
74}
75
76#[allow(clippy::too_many_arguments)] pub async fn run_module<M, C, F, FE, Fut>(
81 socket_path: impl AsRef<Path>,
82 module_id: &str,
83 module_name: &str,
84 version: &str,
85 cli_spec: CliSpec,
86 rpc_methods: &[&str],
87 event_types: Vec<blvm_node::module::traits::EventType>,
88 dispatch: F,
89 on_event: FE,
90 module: M,
91 cli: C,
92 db: Arc<dyn Database>,
93) -> Result<(), ModuleError>
94where
95 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
96 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
97 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
98{
99 let socket_path = socket_path.as_ref().to_path_buf();
100
101 match ModuleIntegration::connect(
102 socket_path.clone(),
103 module_id.to_string(),
104 module_name.to_string(),
105 version.to_string(),
106 Some(cli_spec),
107 )
108 .await
109 {
110 Ok(mut integration) => {
111 info!("Connected to node");
112
113 let node_api = integration.node_api();
114 for method in rpc_methods {
115 node_api
116 .register_rpc_endpoint((*method).to_string(), String::new())
117 .await?;
118 }
119
120 integration.subscribe_events(event_types).await?;
121
122 let mut event_rx = integration.event_receiver();
123 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
124 ModuleError::IpcError(
125 "Invocation receiver not available for this module integration".to_string(),
126 )
127 })?;
128 let ctx = InvocationContext::with_node_api(db, node_api);
129
130 loop {
131 tokio::select! {
132 msg = event_rx.recv() => {
133 if let Ok(ModuleMessage::Event(e)) = msg {
134 let _ = on_event(e, &module, &ctx).await;
135 }
136 }
137 inv = invocation_rx.recv() => {
138 if let Some((invocation, result_tx)) = inv {
139 let result = dispatch(invocation, ctx.clone(), &module, &cli);
140 let _ = result_tx.send(result);
141 } else {
142 info!("Invocation channel closed, module unloading");
143 break;
144 }
145 }
146 _ = sleep(Duration::from_secs(30)) => {
147 info!("Module running");
148 }
149 }
150 }
151 }
152 Err(e) => {
153 info!("Node not running, standalone mode: {}", e);
154 loop {
155 sleep(Duration::from_secs(5)).await;
156 }
157 }
158 }
159
160 Ok(())
161}
162
163#[allow(clippy::too_many_arguments)]
168pub async fn run_module_with_setup<M, C, F, FE, Fut, FSetup, FutSetup>(
169 socket_path: impl AsRef<Path>,
170 module_id: &str,
171 module_name: &str,
172 version: &str,
173 cli_spec: CliSpec,
174 rpc_methods: &[&str],
175 event_types: Vec<blvm_node::module::traits::EventType>,
176 dispatch: F,
177 on_event: FE,
178 setup: FSetup,
179 db: Arc<dyn Database>,
180 data_dir: &Path,
181) -> Result<(), ModuleError>
182where
183 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
184 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
185 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
186 FSetup: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>, &Path) -> FutSetup,
187 FutSetup: std::future::Future<Output = Result<(M, C), ModuleError>> + Send,
188{
189 let socket_path = socket_path.as_ref().to_path_buf();
190
191 match ModuleIntegration::connect(
192 socket_path.clone(),
193 module_id.to_string(),
194 module_name.to_string(),
195 version.to_string(),
196 Some(cli_spec),
197 )
198 .await
199 {
200 Ok(mut integration) => {
201 info!("Connected to node");
202
203 let node_api = integration.node_api();
204 for method in rpc_methods {
205 node_api
206 .register_rpc_endpoint((*method).to_string(), String::new())
207 .await?;
208 }
209
210 integration.subscribe_events(event_types).await?;
211
212 let (module, cli) = setup(node_api.clone(), Arc::clone(&db), data_dir).await?;
213 let module = Arc::new(module);
214
215 let mut event_rx = integration.event_receiver();
216 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
217 ModuleError::IpcError(
218 "Invocation receiver not available for this module integration".to_string(),
219 )
220 })?;
221 let ctx = InvocationContext::with_node_api(Arc::clone(&db), node_api);
222
223 loop {
224 tokio::select! {
225 msg = event_rx.recv() => {
226 if let Ok(ModuleMessage::Event(e)) = msg {
227 let _ = on_event(e, &*module, &ctx).await;
228 }
229 }
230 inv = invocation_rx.recv() => {
231 if let Some((invocation, result_tx)) = inv {
232 let result = dispatch(invocation, ctx.clone(), &*module, &cli);
233 let _ = result_tx.send(result);
234 } else {
235 info!("Invocation channel closed, module unloading");
236 break;
237 }
238 }
239 _ = sleep(Duration::from_secs(30)) => {
240 info!("Module running");
241 }
242 }
243 }
244 }
245 Err(e) => {
246 info!("Node not running, standalone mode: {}", e);
247 loop {
248 sleep(Duration::from_secs(5)).await;
249 }
250 }
251 }
252
253 Ok(())
254}
255
256#[allow(clippy::too_many_arguments)]
258pub async fn run_module_with_tick<M, C, F, FE, Fut, FConnect, FutConnect, FTick, FutTick>(
259 socket_path: impl AsRef<Path>,
260 module_id: &str,
261 module_name: &str,
262 version: &str,
263 cli_spec: CliSpec,
264 rpc_methods: &[&str],
265 event_types: Vec<blvm_node::module::traits::EventType>,
266 dispatch: F,
267 on_event: FE,
268 on_connect: Option<FConnect>,
269 on_tick: Option<FTick>,
270 module: M,
271 cli: C,
272 db: Arc<dyn Database>,
273) -> Result<(), ModuleError>
274where
275 F: Fn(InvocationMessage, InvocationContext, &M, &C) -> InvocationResultMessage,
276 FE: Fn(blvm_node::module::ipc::protocol::EventMessage, &M, &InvocationContext) -> Fut,
277 Fut: std::future::Future<Output = Result<(), ModuleError>> + Send,
278 FConnect: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutConnect,
279 FutConnect: std::future::Future<Output = Result<(), ModuleError>> + Send,
280 FTick: Fn(Arc<dyn NodeAPI>, Arc<dyn Database>) -> FutTick,
281 FutTick: std::future::Future<Output = ()> + Send,
282{
283 let socket_path = socket_path.as_ref().to_path_buf();
284
285 match ModuleIntegration::connect(
286 socket_path.clone(),
287 module_id.to_string(),
288 module_name.to_string(),
289 version.to_string(),
290 Some(cli_spec),
291 )
292 .await
293 {
294 Ok(mut integration) => {
295 info!("Connected to node");
296
297 let node_api = integration.node_api();
298 for method in rpc_methods {
299 node_api
300 .register_rpc_endpoint((*method).to_string(), String::new())
301 .await?;
302 }
303
304 integration.subscribe_events(event_types).await?;
305
306 if let Some(ref connect) = on_connect {
307 connect(node_api.clone(), Arc::clone(&db)).await?;
308 }
309
310 let mut event_rx = integration.event_receiver();
311 let invocation_rx = integration.invocation_receiver().ok_or_else(|| {
312 ModuleError::IpcError(
313 "Invocation receiver not available for this module integration".to_string(),
314 )
315 })?;
316 let ctx = InvocationContext::with_node_api(Arc::clone(&db), Arc::clone(&node_api));
317
318 loop {
319 tokio::select! {
320 msg = event_rx.recv() => {
321 if let Ok(ModuleMessage::Event(e)) = msg {
322 let _ = on_event(e, &module, &ctx).await;
323 }
324 }
325 inv = invocation_rx.recv() => {
326 if let Some((invocation, result_tx)) = inv {
327 let result = dispatch(invocation, ctx.clone(), &module, &cli);
328 let _ = result_tx.send(result);
329 } else {
330 info!("Invocation channel closed, module unloading");
331 break;
332 }
333 }
334 _ = sleep(Duration::from_secs(30)) => {
335 if let Some(ref tick) = on_tick {
336 tick(node_api.clone(), Arc::clone(&db)).await;
337 }
338 info!("Module running");
339 }
340 }
341 }
342 }
343 Err(e) => {
344 info!("Node not running, standalone mode: {}", e);
345 loop {
346 sleep(Duration::from_secs(5)).await;
347 }
348 }
349 }
350
351 Ok(())
352}