1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
//! Module integration helper
//!
//! Provides a simple, unified API for modules to integrate with the node.
//! Handles connection, handshake, and provides NodeAPI + event receiver.
use crate::module::api::NodeApiIpc;
use crate::module::ipc::protocol::{
CliSpec, InvocationMessage, InvocationResultMessage, MessageType, ModuleMessage,
RequestMessage, RequestPayload, ResponsePayload,
};
use crate::module::ipc::ModuleIpcClient;
use crate::module::traits::{EventType, ModuleError, NodeAPI};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, oneshot};
/// Simple helper for module integration
/// Handles connection, handshake, and provides NodeAPI + event receiver + invocation handling
pub struct ModuleIntegration {
module_id: String,
node_api: Arc<dyn NodeAPI>,
event_receiver: tokio::sync::broadcast::Receiver<ModuleMessage>,
/// Receiver for CLI/RPC invocations from the node (module handles and sends result)
invocation_receiver:
Option<mpsc::Receiver<(InvocationMessage, oneshot::Sender<InvocationResultMessage>)>>,
}
impl ModuleIntegration {
/// Connect to node via IPC (for out-of-process modules)
///
/// If `cli_spec` is provided, sends it to the node after handshake so the node
/// can expose the module's CLI commands via getmoduleclispecs/runmodulecli.
pub async fn connect(
socket_path: PathBuf,
module_id: String,
module_name: String,
version: String,
cli_spec: Option<CliSpec>,
) -> Result<Self, ModuleError> {
use tracing::{error, info, warn};
info!(
"Connecting to node IPC socket: {:?} (module: {})",
socket_path, module_name
);
// Connect to IPC
let mut ipc_client = ModuleIpcClient::connect(&socket_path).await?;
info!("Connected to IPC socket");
// Handshake
let correlation_id = ipc_client.next_correlation_id();
let request = RequestMessage {
correlation_id,
request_type: MessageType::Handshake,
payload: RequestPayload::Handshake {
module_id: module_id.clone(),
module_name,
version,
},
};
let response = ipc_client.request(request).await?;
match response.payload {
Some(ResponsePayload::HandshakeAck { node_version }) => {
info!("Handshake successful! Node version: {}", node_version);
}
_ => {
return Err(ModuleError::IpcError(
"Invalid handshake response".to_string(),
));
}
}
// Register CLI spec if provided (for dynamic CLI via getmoduleclispecs)
if let Some(spec) = cli_spec {
let correlation_id = ipc_client.next_correlation_id();
let request = RequestMessage {
correlation_id,
request_type: MessageType::RegisterCliSpec,
payload: RequestPayload::RegisterCliSpec { spec },
};
let response = ipc_client.request(request).await?;
if !response.success {
return Err(ModuleError::IpcError(
response
.error
.unwrap_or_else(|| "Failed to register CLI spec".to_string()),
));
}
info!("CLI spec registered with node");
}
// Use broadcast channel for events (allows multiple receivers)
let (broadcast_tx, broadcast_rx) = tokio::sync::broadcast::channel(1000);
let broadcast_tx_arc = Arc::new(broadcast_tx);
let ipc_client_arc = Arc::new(tokio::sync::Mutex::new(ipc_client));
// Channel for invocations: spawn sends (Invocation, oneshot_tx), module receives and responds
let (invocation_tx, invocation_rx) = mpsc::channel(32);
// Spawn message receiver: forwards Events to broadcast, dispatches Invocations to module
let ipc_for_receive = Arc::clone(&ipc_client_arc);
let broadcast_tx_for_events = Arc::clone(&broadcast_tx_arc);
tokio::spawn(async move {
loop {
match ipc_for_receive.lock().await.receive_message().await {
Ok(Some(ModuleMessage::Event(e))) => {
let _ = broadcast_tx_for_events.send(ModuleMessage::Event(e));
}
Ok(Some(ModuleMessage::Invocation(invocation))) => {
let (result_tx, result_rx) = oneshot::channel();
if invocation_tx.send((invocation, result_tx)).await.is_err() {
warn!("Invocation channel closed, dropping invocation");
continue;
}
match result_rx.await {
Ok(result) => {
if let Err(e) = ipc_for_receive
.lock()
.await
.send_invocation_result(result)
.await
{
error!("Failed to send invocation result: {}", e);
}
}
Err(_) => {
warn!("Invocation result channel dropped");
}
}
}
Ok(Some(_)) => {
// Response, Log, InvocationResult - unexpected here, ignore
}
Ok(None) => break,
Err(e) => {
error!("Error receiving message: {}", e);
break;
}
}
}
});
// Create a receiver from the broadcast channel for ModuleIntegration
let event_receiver = broadcast_rx;
// Create NodeAPI over IPC
let mut node_api_impl = NodeApiIpc::new(ipc_client_arc, module_id.clone());
node_api_impl.set_event_broadcast(broadcast_tx_arc);
let node_api = Arc::new(node_api_impl);
Ok(Self {
module_id,
node_api,
event_receiver,
invocation_receiver: Some(invocation_rx),
})
}
/// Create from existing NodeAPI (for in-process modules)
pub fn from_node_api(module_id: String, node_api: Arc<dyn NodeAPI>) -> Self {
// In-process: events handled differently (via callbacks or direct subscription)
// For now, create empty broadcast receiver - can be enhanced later
let (_tx, rx) = tokio::sync::broadcast::channel(1);
Self {
module_id,
node_api,
event_receiver: rx,
invocation_receiver: None,
}
}
/// Subscribe to events
///
/// Note: For out-of-process modules, events are delivered via the event_receiver
/// set up during connect(). This method sends the subscription request via IPC.
/// The receiver is already set up and will receive events automatically.
pub async fn subscribe_events(&self, event_types: Vec<EventType>) -> Result<(), ModuleError> {
// Call NodeAPI's subscribe_events which handles the IPC request
// The receiver returned is the same one we already have in event_receiver
let _receiver = self.node_api.subscribe_events(event_types).await?;
// We already have the receiver via event_receiver(), so we can ignore the returned one
Ok(())
}
/// Get NodeAPI instance
pub fn node_api(&self) -> Arc<dyn NodeAPI> {
Arc::clone(&self.node_api)
}
/// Get event receiver (creates a new receiver from the broadcast channel)
pub fn event_receiver(&self) -> tokio::sync::broadcast::Receiver<ModuleMessage> {
// Create a new receiver from the broadcast channel
// Note: This requires access to the broadcast sender, which we don't have here
// For now, return a clone of the receiver (broadcast receivers can be cloned)
self.event_receiver.resubscribe()
}
/// Get module ID
pub fn module_id(&self) -> &str {
&self.module_id
}
/// Get receiver for CLI/RPC invocations from the node.
///
/// When the node sends an Invocation (e.g. from `runmodulecli`), it will be delivered here.
/// The module must receive, run the handler, and send the result via the oneshot.
/// Returns None for in-process modules.
pub fn invocation_receiver(
&mut self,
) -> Option<&mut mpsc::Receiver<(InvocationMessage, oneshot::Sender<InvocationResultMessage>)>>
{
self.invocation_receiver.as_mut()
}
}