Skip to main content

oxide_k/
bus.rs

1//! # Secure Message Bus
2//!
3//! An asynchronous message-passing layer for inter-module communication.
4//!
5//! The bus is built on top of `tokio::sync::mpsc` channels. Modules publish
6//! [`Envelope`]s onto the bus and receive routed messages on their own receivers.
7//! Every envelope carries provenance metadata (source, timestamp, correlation id)
8//! so the kernel can enforce access control and provide an audit trail.
9//!
10//! ## Topology
11//!
12//! ```text
13//! +--------+   send()   +-----------+   route   +-----------+
14//! | Sender | ---------> | MessageBus| --------> | Subscriber|
15//! +--------+            +-----------+           +-----------+
16//! ```
17//!
18//! For the bootstrap implementation the bus performs broadcast-style routing:
19//! every subscriber receives a clone of each published envelope. This is the
20//! simplest model that satisfies the kernel's needs for now; targeted routing
21//! and capability-based access control will be layered on top in later
22//! iterations.
23
24use std::collections::HashSet;
25use std::sync::Arc;
26
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use tokio::sync::{mpsc, RwLock};
30use uuid::Uuid;
31
32use crate::error::{KernelError, Result};
33
34// ---------------------------------------------------------------------------
35// Capability tokens
36// ---------------------------------------------------------------------------
37
38/// An opaque capability token that grants a module the right to publish on
39/// the bus.
40///
41/// Capabilities are strings (e.g. `"browser:navigate"`, `"llm:complete"`).
42/// The bus maintains a **grant set**; `publish_with_capability` rejects any
43/// token not present in the set. Use [`MessageBus::grant_capability`] to
44/// register allowed tokens.
45///
46/// The unguarded [`MessageBus::publish`] remains available for internal
47/// kernel use and backward compatibility.
48#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct Capability(pub String);
50
51impl Capability {
52    /// Create a capability from any string.
53    pub fn new(s: impl Into<String>) -> Self {
54        Self(s.into())
55    }
56}
57
58/// A command instructs a module (or the kernel) to perform an action.
59#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum Command {
62    /// Request a module to start.
63    Start {
64        /// The target module id.
65        module_id: String,
66    },
67    /// Request a module to stop.
68    Stop {
69        /// The target module id.
70        module_id: String,
71    },
72    /// Invoke a named method on a module with an arbitrary JSON payload.
73    Invoke {
74        /// The target module id.
75        module_id: String,
76        /// The method name to invoke.
77        method: String,
78        /// Method arguments encoded as JSON.
79        payload: serde_json::Value,
80    },
81    /// A heartbeat ping used to verify subscribers are alive.
82    Ping,
83}
84
85/// An event reports something that has already happened.
86#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "snake_case")]
88pub enum Event {
89    /// A module has finished starting.
90    ModuleStarted {
91        /// The module id that just started.
92        module_id: String,
93    },
94    /// A module has stopped.
95    ModuleStopped {
96        /// The module id that just stopped.
97        module_id: String,
98    },
99    /// A module emitted a generic, semi-structured event.
100    Custom {
101        /// The module id that emitted the event.
102        module_id: String,
103        /// Event kind, free-form for now.
104        kind: String,
105        /// Payload encoded as JSON.
106        payload: serde_json::Value,
107    },
108    /// A heartbeat pong matching a [`Command::Ping`].
109    Pong {
110        /// The id of the responder.
111        from: String,
112    },
113}
114
115/// The payload carried by an [`Envelope`].
116#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "kind", rename_all = "snake_case")]
118pub enum Message {
119    /// A command sent to a module.
120    Command(Command),
121    /// An event emitted by a module.
122    Event(Event),
123}
124
125/// An envelope wraps a [`Message`] with provenance metadata.
126///
127/// Every message routed through the bus is wrapped in an [`Envelope`]. The
128/// metadata fields make the bus auditable and enable future capability checks.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct Envelope {
131    /// Unique identifier of this envelope.
132    pub id: Uuid,
133    /// Source module / subsystem name.
134    pub source: String,
135    /// Optional correlation id to tie a command to its response event.
136    pub correlation_id: Option<Uuid>,
137    /// Time the envelope was created.
138    pub timestamp: DateTime<Utc>,
139    /// The actual message payload.
140    pub message: Message,
141}
142
143impl Envelope {
144    /// Wrap a [`Message`] in a new envelope tagged with the given source.
145    pub fn new(source: impl Into<String>, message: Message) -> Self {
146        Self {
147            id: Uuid::new_v4(),
148            source: source.into(),
149            correlation_id: None,
150            timestamp: Utc::now(),
151            message,
152        }
153    }
154
155    /// Builder helper to attach a correlation id.
156    #[must_use]
157    pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
158        self.correlation_id = Some(correlation_id);
159        self
160    }
161}
162
163/// The capacity of every subscriber channel.
164///
165/// Small enough to surface back-pressure quickly, large enough to absorb short
166/// bursts. Tuned later based on real workloads.
167const DEFAULT_SUBSCRIBER_CAPACITY: usize = 128;
168
169/// A subscription returned by [`MessageBus::subscribe`].
170///
171/// Holding the [`Subscription`] alive keeps the underlying channel open. When it
172/// is dropped, the subscriber is removed from the bus on the next publish.
173pub struct Subscription {
174    /// Receiver end of the subscriber channel.
175    pub receiver: mpsc::Receiver<Envelope>,
176    /// Stable id for this subscription. Useful for diagnostics.
177    pub id: Uuid,
178}
179
180/// The kernel-internal message bus.
181///
182/// `MessageBus` is cheaply cloneable; clones share the same underlying state and
183/// can be handed out to modules and subsystems freely.
184#[derive(Clone, Default)]
185pub struct MessageBus {
186    inner: Arc<BusInner>,
187}
188
189#[derive(Default)]
190struct BusInner {
191    subscribers: RwLock<Vec<Subscriber>>,
192    /// Granted capability tokens. Empty means "no ACL enforced" for the
193    /// unguarded `publish`; `publish_with_capability` always checks this set.
194    granted: RwLock<HashSet<String>>,
195}
196
197struct Subscriber {
198    id: Uuid,
199    tx: mpsc::Sender<Envelope>,
200}
201
202impl MessageBus {
203    /// Construct a new, empty bus.
204    pub fn new() -> Self {
205        Self::default()
206    }
207
208    // -----------------------------------------------------------------------
209    // Capability management
210    // -----------------------------------------------------------------------
211
212    /// Grant a capability token. After this call, any source holding `cap`
213    /// may call [`Self::publish_with_capability`] successfully.
214    pub async fn grant_capability(&self, cap: Capability) {
215        self.inner.granted.write().await.insert(cap.0);
216    }
217
218    /// Revoke a previously granted capability.
219    pub async fn revoke_capability(&self, cap: &Capability) {
220        self.inner.granted.write().await.remove(&cap.0);
221    }
222
223    /// Publish an [`Envelope`] **only if `cap` has been granted**.
224    ///
225    /// Returns [`KernelError::Denied`] if the capability is not in the grant
226    /// set. On success, routes the envelope identically to [`Self::publish`].
227    pub async fn publish_with_capability(
228        &self,
229        envelope: Envelope,
230        cap: &Capability,
231    ) -> Result<()> {
232        let granted = self.inner.granted.read().await;
233        if !granted.contains(&cap.0) {
234            return Err(KernelError::Denied {
235                publisher: envelope.source.clone(),
236                capability: cap.0.clone(),
237            });
238        }
239        drop(granted);
240        self.publish(envelope).await
241    }
242
243    // -----------------------------------------------------------------------
244    // Subscribers
245    // -----------------------------------------------------------------------
246
247    /// Register a new subscriber and return a [`Subscription`] handle.
248    pub async fn subscribe(&self) -> Subscription {
249        let (tx, rx) = mpsc::channel(DEFAULT_SUBSCRIBER_CAPACITY);
250        let id = Uuid::new_v4();
251        self.inner
252            .subscribers
253            .write()
254            .await
255            .push(Subscriber { id, tx });
256        Subscription { receiver: rx, id }
257    }
258
259    /// Publish an [`Envelope`] to every subscriber.
260    ///
261    /// Subscribers whose channels are closed are silently dropped. If a
262    /// subscriber's channel is full the message is dropped *for that
263    /// subscriber only* and a warning is traced; this prevents one slow
264    /// subscriber from blocking the whole bus.
265    pub async fn publish(&self, envelope: Envelope) -> Result<()> {
266        let mut subs = self.inner.subscribers.write().await;
267        let mut alive = Vec::with_capacity(subs.len());
268
269        for sub in subs.drain(..) {
270            if sub.tx.is_closed() {
271                tracing::debug!(subscriber = %sub.id, "dropping closed subscriber");
272                continue;
273            }
274            match sub.tx.try_send(envelope.clone()) {
275                Ok(()) => alive.push(sub),
276                Err(mpsc::error::TrySendError::Full(_)) => {
277                    tracing::warn!(subscriber = %sub.id, "subscriber channel full; message dropped");
278                    alive.push(sub);
279                }
280                Err(mpsc::error::TrySendError::Closed(_)) => {
281                    tracing::debug!(subscriber = %sub.id, "subscriber closed during publish");
282                }
283            }
284        }
285
286        *subs = alive;
287        Ok(())
288    }
289
290    /// Convenience helper: wrap a [`Command`] in an [`Envelope`] and publish it.
291    pub async fn send_command(&self, source: impl Into<String>, command: Command) -> Result<Uuid> {
292        let envelope = Envelope::new(source, Message::Command(command));
293        let id = envelope.id;
294        self.publish(envelope).await?;
295        Ok(id)
296    }
297
298    /// Convenience helper: wrap an [`Event`] in an [`Envelope`] and publish it.
299    pub async fn emit_event(&self, source: impl Into<String>, event: Event) -> Result<Uuid> {
300        let envelope = Envelope::new(source, Message::Event(event));
301        let id = envelope.id;
302        self.publish(envelope).await?;
303        Ok(id)
304    }
305
306    /// Returns the number of live subscribers (best-effort).
307    pub async fn subscriber_count(&self) -> usize {
308        self.inner.subscribers.read().await.len()
309    }
310}
311
312impl std::fmt::Debug for MessageBus {
313    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314        f.debug_struct("MessageBus").finish_non_exhaustive()
315    }
316}
317
318// `From<mpsc::error::SendError<T>>` is hard to implement generically because
319// `T` would have to be `'static`. Instead we expose a small helper that callers
320// can use when they explicitly want to convert a send failure.
321impl<T> From<mpsc::error::SendError<T>> for KernelError {
322    fn from(err: mpsc::error::SendError<T>) -> Self {
323        KernelError::Bus(format!("channel send failed: {err}"))
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330
331    #[tokio::test]
332    async fn publish_delivers_to_all_subscribers() {
333        let bus = MessageBus::new();
334        let mut sub_a = bus.subscribe().await;
335        let mut sub_b = bus.subscribe().await;
336        assert_eq!(bus.subscriber_count().await, 2);
337
338        bus.send_command("test", Command::Ping).await.unwrap();
339
340        let a = sub_a.receiver.recv().await.expect("a received");
341        let b = sub_b.receiver.recv().await.expect("b received");
342        assert!(matches!(a.message, Message::Command(Command::Ping)));
343        assert!(matches!(b.message, Message::Command(Command::Ping)));
344    }
345
346    #[tokio::test]
347    async fn closed_subscribers_are_pruned() {
348        let bus = MessageBus::new();
349        {
350            let _sub = bus.subscribe().await;
351            assert_eq!(bus.subscriber_count().await, 1);
352        }
353        // Dropping the subscription closes the receiver end. The bus prunes it
354        // on the next publish.
355        bus.emit_event("test", Event::Pong { from: "x".into() })
356            .await
357            .unwrap();
358        assert_eq!(bus.subscriber_count().await, 0);
359    }
360
361    #[tokio::test]
362    async fn envelope_carries_provenance() {
363        let bus = MessageBus::new();
364        let mut sub = bus.subscribe().await;
365
366        bus.emit_event(
367            "kernel",
368            Event::ModuleStarted {
369                module_id: "echo".into(),
370            },
371        )
372        .await
373        .unwrap();
374
375        let env = sub.receiver.recv().await.unwrap();
376        assert_eq!(env.source, "kernel");
377        assert!(env.id != Uuid::nil());
378        match env.message {
379            Message::Event(Event::ModuleStarted { module_id }) => {
380                assert_eq!(module_id, "echo");
381            }
382            other => panic!("unexpected message: {other:?}"),
383        }
384    }
385
386    #[tokio::test]
387    async fn correlation_id_round_trips() {
388        let bus = MessageBus::new();
389        let mut sub = bus.subscribe().await;
390        let cid = Uuid::new_v4();
391
392        let env = Envelope::new("test", Message::Command(Command::Ping)).with_correlation_id(cid);
393        bus.publish(env).await.unwrap();
394
395        let received = sub.receiver.recv().await.unwrap();
396        assert_eq!(received.correlation_id, Some(cid));
397    }
398
399    // -------------------------------------------------------------------
400    // Capability ACL tests (R-19)
401    // -------------------------------------------------------------------
402
403    #[tokio::test]
404    async fn granted_capability_allows_publish() {
405        let bus = MessageBus::new();
406        let mut sub = bus.subscribe().await;
407
408        let cap = Capability::new("browser:navigate");
409        bus.grant_capability(cap.clone()).await;
410
411        let env = Envelope::new("browser-module", Message::Command(Command::Ping));
412        bus.publish_with_capability(env, &cap).await.unwrap();
413
414        let recv = sub.receiver.recv().await.unwrap();
415        assert!(matches!(recv.message, Message::Command(Command::Ping)));
416    }
417
418    #[tokio::test]
419    async fn unganted_capability_returns_denied() {
420        let bus = MessageBus::new();
421        let cap = Capability::new("llm:complete");
422        // Not granted — no call to grant_capability.
423        let env = Envelope::new("llm-module", Message::Command(Command::Ping));
424        let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
425        assert!(
426            matches!(err, KernelError::Denied { .. }),
427            "expected Denied, got {err}"
428        );
429    }
430
431    #[tokio::test]
432    async fn revoked_capability_is_denied() {
433        let bus = MessageBus::new();
434        let cap = Capability::new("mirror:sync");
435        bus.grant_capability(cap.clone()).await;
436        bus.revoke_capability(&cap).await;
437
438        let env = Envelope::new("mirror-module", Message::Command(Command::Ping));
439        let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
440        assert!(matches!(err, KernelError::Denied { .. }));
441    }
442
443    #[tokio::test]
444    async fn unguarded_publish_bypasses_acl() {
445        // The plain publish() must still work regardless of grant set.
446        let bus = MessageBus::new();
447        let mut sub = bus.subscribe().await;
448        bus.send_command("kernel-internal", Command::Ping)
449            .await
450            .unwrap();
451        let recv = sub.receiver.recv().await.unwrap();
452        assert!(matches!(recv.message, Message::Command(Command::Ping)));
453    }
454}