1use std::sync::Arc;
4
5use async_trait::async_trait;
6use oxide_k::bus::{Command, Event, Message, MessageBus};
7use oxide_k::module::{Module, ModuleKind, ModuleMetadata};
8use oxide_k::{KernelError, Result as KernelResult};
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12
13use crate::local::{LocalMesh, MeshHandle};
14use crate::message::{PeerCapability, PeerMessage};
15
16pub const DEFAULT_MODULE_ID: &str = "mesh";
18
19pub struct MeshModule {
21 id: String,
22 mesh: LocalMesh,
23 self_handle: Arc<Mutex<Option<MeshHandle>>>,
25 listener: Option<JoinHandle<()>>,
26 forwarder: Option<JoinHandle<()>>,
27}
28
29impl MeshModule {
30 pub fn new() -> Self {
32 Self::with_mesh(LocalMesh::new())
33 }
34
35 pub fn with_mesh(mesh: LocalMesh) -> Self {
37 Self {
38 id: DEFAULT_MODULE_ID.into(),
39 mesh,
40 self_handle: Arc::new(Mutex::new(None)),
41 listener: None,
42 forwarder: None,
43 }
44 }
45
46 pub fn mesh(&self) -> &LocalMesh {
48 &self.mesh
49 }
50}
51
52impl Default for MeshModule {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58#[async_trait]
59impl Module for MeshModule {
60 fn metadata(&self) -> ModuleMetadata {
61 ModuleMetadata {
62 id: self.id.clone(),
63 name: "Oxide Mesh".into(),
64 version: env!("CARGO_PKG_VERSION").into(),
65 kind: ModuleKind::Native,
66 description: Some(
67 "Inter-agent communication mesh; routes peer messages and republishes inbound peer traffic onto the kernel bus.".into(),
68 ),
69 }
70 }
71
72 async fn init(&mut self, bus: MessageBus) -> KernelResult<()> {
73 let (mut peer, handle) = self
77 .mesh
78 .join(
79 "kernel",
80 vec![PeerCapability {
81 name: "kernel".into(),
82 version: Some(env!("CARGO_PKG_VERSION").into()),
83 }],
84 Vec::new(),
85 )
86 .await
87 .map_err(|e| KernelError::Other(anyhow::anyhow!(e)))?;
88 *self.self_handle.lock().await = Some(handle);
89
90 let id = self.id.clone();
91 let bus_for_inbound = bus.clone();
92 let forwarder = tokio::spawn(async move {
93 while let Some(msg) = peer.receiver.recv().await {
94 let kind = match &msg {
95 PeerMessage::Hello { .. } => "peer.hello",
96 PeerMessage::Broadcast { .. } => "peer.broadcast",
97 PeerMessage::Direct { .. } => "peer.direct",
98 PeerMessage::Task { .. } => "peer.task",
99 PeerMessage::Result { .. } => "peer.result",
100 };
101 let payload = match serde_json::to_value(&msg) {
102 Ok(v) => v,
103 Err(_) => continue,
104 };
105 let _ = bus_for_inbound
106 .emit_event(
107 id.clone(),
108 Event::Custom {
109 module_id: id.clone(),
110 kind: kind.into(),
111 payload,
112 },
113 )
114 .await;
115 }
116 });
117 self.forwarder = Some(forwarder);
118
119 let mut sub = bus.subscribe().await;
121 let mesh = self.mesh.clone();
122 let self_handle = self.self_handle.clone();
123 let id = self.id.clone();
124 let bus_emit = bus.clone();
125 let listener = tokio::spawn(async move {
126 while let Some(env) = sub.receiver.recv().await {
127 let Message::Command(Command::Invoke {
128 module_id,
129 method,
130 payload,
131 }) = env.message
132 else {
133 continue;
134 };
135 if module_id != id {
136 continue;
137 }
138 let result = dispatch(&mesh, &self_handle, &method, payload).await;
139 let event = match result {
140 Ok(value) => Event::Custom {
141 module_id: id.clone(),
142 kind: format!("{method}.ok"),
143 payload: value,
144 },
145 Err(err) => Event::Custom {
146 module_id: id.clone(),
147 kind: format!("{method}.err"),
148 payload: serde_json::json!({ "error": err.to_string() }),
149 },
150 };
151 let _ = bus_emit.emit_event(id.clone(), event).await;
152 }
153 });
154 self.listener = Some(listener);
155 Ok(())
156 }
157
158 async fn start(&mut self) -> KernelResult<()> {
159 Ok(())
160 }
161
162 async fn stop(&mut self) -> KernelResult<()> {
163 if let Some(h) = self.listener.take() {
164 h.abort();
165 }
166 if let Some(h) = self.forwarder.take() {
167 h.abort();
168 }
169 let _ = self.mesh.leave(&"kernel".to_string()).await;
170 Ok(())
171 }
172}
173
174#[derive(Deserialize)]
175struct PublishPayload {
176 message: PeerMessage,
177}
178
179#[derive(Deserialize)]
180struct DirectoryPayload {}
181
182async fn dispatch(
183 mesh: &LocalMesh,
184 self_handle: &Arc<Mutex<Option<MeshHandle>>>,
185 method: &str,
186 payload: serde_json::Value,
187) -> KernelResult<serde_json::Value> {
188 let to_kernel = |e: crate::error::MeshError| KernelError::Other(anyhow::anyhow!(e));
189 match method {
190 "publish" => {
191 let p: PublishPayload = serde_json::from_value(payload)?;
192 let guard = self_handle.lock().await;
193 let handle = guard
194 .as_ref()
195 .ok_or_else(|| KernelError::Other(anyhow::anyhow!("mesh not initialised")))?;
196 handle.publish(p.message).await.map_err(to_kernel)?;
197 Ok(serde_json::json!({"ok": true}))
198 }
199 "directory" => {
200 let _p: DirectoryPayload =
201 serde_json::from_value(payload).unwrap_or(DirectoryPayload {});
202 let dir = mesh.directory().await;
203 Ok(serde_json::to_value(dir)?)
204 }
205 "peer_count" => Ok(serde_json::json!({"count": mesh.peer_count().await})),
206 other => Err(KernelError::Other(anyhow::anyhow!(
207 "unknown mesh method `{other}`"
208 ))),
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use oxide_k::bus::{Event, Message};
216 use serde_json::json;
217
218 #[tokio::test]
219 async fn module_publishes_broadcast_via_bus() {
220 let mesh = LocalMesh::new();
221 let (mut peer_x, _h) = mesh
222 .join(
223 "x",
224 vec![PeerCapability {
225 name: "x".into(),
226 version: None,
227 }],
228 vec![],
229 )
230 .await
231 .unwrap();
232
233 let mut module = MeshModule::with_mesh(mesh.clone());
234 let bus = MessageBus::new();
235 Module::init(&mut module, bus.clone()).await.unwrap();
236 Module::start(&mut module).await.unwrap();
237
238 let msg = PeerMessage::broadcast("kernel", "topic", json!({"hi": 1}));
239 bus.send_command(
240 "test",
241 Command::Invoke {
242 module_id: DEFAULT_MODULE_ID.into(),
243 method: "publish".into(),
244 payload: json!({"message": msg}),
245 },
246 )
247 .await
248 .unwrap();
249
250 let received = tokio::time::timeout(
251 std::time::Duration::from_millis(500),
252 peer_x.receiver.recv(),
253 )
254 .await
255 .unwrap()
256 .unwrap();
257 match received {
258 PeerMessage::Broadcast { from, .. } => assert_eq!(from, "kernel"),
259 other => panic!("unexpected: {other:?}"),
260 }
261 Module::stop(&mut module).await.unwrap();
262 }
263
264 #[tokio::test]
265 async fn forwarder_emits_peer_event_for_direct() {
266 let mesh = LocalMesh::new();
267 let mut module = MeshModule::with_mesh(mesh.clone());
268 let bus = MessageBus::new();
269 let mut sub = bus.subscribe().await;
270 Module::init(&mut module, bus.clone()).await.unwrap();
271 Module::start(&mut module).await.unwrap();
272
273 let (_pa, handle_a) = mesh
275 .join(
276 "a",
277 vec![PeerCapability {
278 name: "a".into(),
279 version: None,
280 }],
281 vec![],
282 )
283 .await
284 .unwrap();
285 handle_a
286 .publish(PeerMessage::direct("a", "kernel", json!({"ping": true})))
287 .await
288 .unwrap();
289
290 let mut saw = false;
291 for _ in 0..15 {
292 match tokio::time::timeout(std::time::Duration::from_millis(300), sub.receiver.recv())
293 .await
294 {
295 Ok(Some(env)) => {
296 if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
297 if kind == "peer.direct" {
298 assert_eq!(payload["from"], json!("a"));
299 saw = true;
300 break;
301 }
302 }
303 }
304 _ => break,
305 }
306 }
307 assert!(saw, "expected peer.direct event");
308 Module::stop(&mut module).await.unwrap();
309 }
310}