Skip to main content

oxide_mesh/
kernel.rs

1//! `oxide-k` integration: expose the mesh on the kernel bus.
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use oxide_k::bus::{Command, Event, Message, MessageBus};
7use oxide_k::module::{Module, ModuleKind, ModuleMetadata};
8use oxide_k::{KernelError, Result as KernelResult};
9use serde::Deserialize;
10use tokio::sync::Mutex;
11use tokio::task::JoinHandle;
12
13use crate::local::{LocalMesh, MeshHandle};
14use crate::message::{PeerCapability, PeerMessage};
15
16/// Default module id.
17pub const DEFAULT_MODULE_ID: &str = "mesh";
18
19/// Mesh module wrapping a [`LocalMesh`].
20pub struct MeshModule {
21    id: String,
22    mesh: LocalMesh,
23    /// Identity this module uses when it publishes on behalf of the kernel.
24    self_handle: Arc<Mutex<Option<MeshHandle>>>,
25    listener: Option<JoinHandle<()>>,
26    forwarder: Option<JoinHandle<()>>,
27}
28
29impl MeshModule {
30    /// Build with default id wrapping a fresh [`LocalMesh`].
31    pub fn new() -> Self {
32        Self::with_mesh(LocalMesh::new())
33    }
34
35    /// Build with an explicit mesh.
36    pub fn with_mesh(mesh: LocalMesh) -> Self {
37        Self {
38            id: DEFAULT_MODULE_ID.into(),
39            mesh,
40            self_handle: Arc::new(Mutex::new(None)),
41            listener: None,
42            forwarder: None,
43        }
44    }
45
46    /// Read-only access to the underlying mesh.
47    pub fn mesh(&self) -> &LocalMesh {
48        &self.mesh
49    }
50}
51
52impl Default for MeshModule {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58#[async_trait]
59impl Module for MeshModule {
60    fn metadata(&self) -> ModuleMetadata {
61        ModuleMetadata {
62            id: self.id.clone(),
63            name: "Oxide Mesh".into(),
64            version: env!("CARGO_PKG_VERSION").into(),
65            kind: ModuleKind::Native,
66            description: Some(
67                "Inter-agent communication mesh; routes peer messages and republishes inbound peer traffic onto the kernel bus.".into(),
68            ),
69        }
70    }
71
72    async fn init(&mut self, bus: MessageBus) -> KernelResult<()> {
73        // Register the kernel itself as a peer on the mesh so external
74        // peers can address it by name. The receiver is forwarded onto the
75        // kernel bus.
76        let (mut peer, handle) = self
77            .mesh
78            .join(
79                "kernel",
80                vec![PeerCapability {
81                    name: "kernel".into(),
82                    version: Some(env!("CARGO_PKG_VERSION").into()),
83                }],
84                Vec::new(),
85            )
86            .await
87            .map_err(|e| KernelError::Other(anyhow::anyhow!(e)))?;
88        *self.self_handle.lock().await = Some(handle);
89
90        let id = self.id.clone();
91        let bus_for_inbound = bus.clone();
92        let forwarder = tokio::spawn(async move {
93            while let Some(msg) = peer.receiver.recv().await {
94                let kind = match &msg {
95                    PeerMessage::Hello { .. } => "peer.hello",
96                    PeerMessage::Broadcast { .. } => "peer.broadcast",
97                    PeerMessage::Direct { .. } => "peer.direct",
98                    PeerMessage::Task { .. } => "peer.task",
99                    PeerMessage::Result { .. } => "peer.result",
100                };
101                let payload = match serde_json::to_value(&msg) {
102                    Ok(v) => v,
103                    Err(_) => continue,
104                };
105                let _ = bus_for_inbound
106                    .emit_event(
107                        id.clone(),
108                        Event::Custom {
109                            module_id: id.clone(),
110                            kind: kind.into(),
111                            payload,
112                        },
113                    )
114                    .await;
115            }
116        });
117        self.forwarder = Some(forwarder);
118
119        // Listen on the bus for outbound commands.
120        let mut sub = bus.subscribe().await;
121        let mesh = self.mesh.clone();
122        let self_handle = self.self_handle.clone();
123        let id = self.id.clone();
124        let bus_emit = bus.clone();
125        let listener = tokio::spawn(async move {
126            while let Some(env) = sub.receiver.recv().await {
127                let Message::Command(Command::Invoke {
128                    module_id,
129                    method,
130                    payload,
131                }) = env.message
132                else {
133                    continue;
134                };
135                if module_id != id {
136                    continue;
137                }
138                let result = dispatch(&mesh, &self_handle, &method, payload).await;
139                let event = match result {
140                    Ok(value) => Event::Custom {
141                        module_id: id.clone(),
142                        kind: format!("{method}.ok"),
143                        payload: value,
144                    },
145                    Err(err) => Event::Custom {
146                        module_id: id.clone(),
147                        kind: format!("{method}.err"),
148                        payload: serde_json::json!({ "error": err.to_string() }),
149                    },
150                };
151                let _ = bus_emit.emit_event(id.clone(), event).await;
152            }
153        });
154        self.listener = Some(listener);
155        Ok(())
156    }
157
158    async fn start(&mut self) -> KernelResult<()> {
159        Ok(())
160    }
161
162    async fn stop(&mut self) -> KernelResult<()> {
163        if let Some(h) = self.listener.take() {
164            h.abort();
165        }
166        if let Some(h) = self.forwarder.take() {
167            h.abort();
168        }
169        let _ = self.mesh.leave(&"kernel".to_string()).await;
170        Ok(())
171    }
172}
173
174#[derive(Deserialize)]
175struct PublishPayload {
176    message: PeerMessage,
177}
178
179#[derive(Deserialize)]
180struct DirectoryPayload {}
181
182async fn dispatch(
183    mesh: &LocalMesh,
184    self_handle: &Arc<Mutex<Option<MeshHandle>>>,
185    method: &str,
186    payload: serde_json::Value,
187) -> KernelResult<serde_json::Value> {
188    let to_kernel = |e: crate::error::MeshError| KernelError::Other(anyhow::anyhow!(e));
189    match method {
190        "publish" => {
191            let p: PublishPayload = serde_json::from_value(payload)?;
192            let guard = self_handle.lock().await;
193            let handle = guard
194                .as_ref()
195                .ok_or_else(|| KernelError::Other(anyhow::anyhow!("mesh not initialised")))?;
196            handle.publish(p.message).await.map_err(to_kernel)?;
197            Ok(serde_json::json!({"ok": true}))
198        }
199        "directory" => {
200            let _p: DirectoryPayload =
201                serde_json::from_value(payload).unwrap_or(DirectoryPayload {});
202            let dir = mesh.directory().await;
203            Ok(serde_json::to_value(dir)?)
204        }
205        "peer_count" => Ok(serde_json::json!({"count": mesh.peer_count().await})),
206        other => Err(KernelError::Other(anyhow::anyhow!(
207            "unknown mesh method `{other}`"
208        ))),
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    use super::*;
215    use oxide_k::bus::{Event, Message};
216    use serde_json::json;
217
218    #[tokio::test]
219    async fn module_publishes_broadcast_via_bus() {
220        let mesh = LocalMesh::new();
221        let (mut peer_x, _h) = mesh
222            .join(
223                "x",
224                vec![PeerCapability {
225                    name: "x".into(),
226                    version: None,
227                }],
228                vec![],
229            )
230            .await
231            .unwrap();
232
233        let mut module = MeshModule::with_mesh(mesh.clone());
234        let bus = MessageBus::new();
235        Module::init(&mut module, bus.clone()).await.unwrap();
236        Module::start(&mut module).await.unwrap();
237
238        let msg = PeerMessage::broadcast("kernel", "topic", json!({"hi": 1}));
239        bus.send_command(
240            "test",
241            Command::Invoke {
242                module_id: DEFAULT_MODULE_ID.into(),
243                method: "publish".into(),
244                payload: json!({"message": msg}),
245            },
246        )
247        .await
248        .unwrap();
249
250        let received = tokio::time::timeout(
251            std::time::Duration::from_millis(500),
252            peer_x.receiver.recv(),
253        )
254        .await
255        .unwrap()
256        .unwrap();
257        match received {
258            PeerMessage::Broadcast { from, .. } => assert_eq!(from, "kernel"),
259            other => panic!("unexpected: {other:?}"),
260        }
261        Module::stop(&mut module).await.unwrap();
262    }
263
264    #[tokio::test]
265    async fn forwarder_emits_peer_event_for_direct() {
266        let mesh = LocalMesh::new();
267        let mut module = MeshModule::with_mesh(mesh.clone());
268        let bus = MessageBus::new();
269        let mut sub = bus.subscribe().await;
270        Module::init(&mut module, bus.clone()).await.unwrap();
271        Module::start(&mut module).await.unwrap();
272
273        // External peer joins after kernel registers itself.
274        let (_pa, handle_a) = mesh
275            .join(
276                "a",
277                vec![PeerCapability {
278                    name: "a".into(),
279                    version: None,
280                }],
281                vec![],
282            )
283            .await
284            .unwrap();
285        handle_a
286            .publish(PeerMessage::direct("a", "kernel", json!({"ping": true})))
287            .await
288            .unwrap();
289
290        let mut saw = false;
291        for _ in 0..15 {
292            match tokio::time::timeout(std::time::Duration::from_millis(300), sub.receiver.recv())
293                .await
294            {
295                Ok(Some(env)) => {
296                    if let Message::Event(Event::Custom { kind, payload, .. }) = env.message {
297                        if kind == "peer.direct" {
298                            assert_eq!(payload["from"], json!("a"));
299                            saw = true;
300                            break;
301                        }
302                    }
303                }
304                _ => break,
305            }
306        }
307        assert!(saw, "expected peer.direct event");
308        Module::stop(&mut module).await.unwrap();
309    }
310}