1use std::collections::HashSet;
25use std::sync::Arc;
26
27use chrono::{DateTime, Utc};
28use serde::{Deserialize, Serialize};
29use tokio::sync::{mpsc, RwLock};
30use uuid::Uuid;
31
32use crate::error::{KernelError, Result};
33
34#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
49pub struct Capability(pub String);
50
51impl Capability {
52 pub fn new(s: impl Into<String>) -> Self {
54 Self(s.into())
55 }
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60#[serde(tag = "type", rename_all = "snake_case")]
61pub enum Command {
62 Start {
64 module_id: String,
66 },
67 Stop {
69 module_id: String,
71 },
72 Invoke {
74 module_id: String,
76 method: String,
78 payload: serde_json::Value,
80 },
81 Ping,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87#[serde(tag = "type", rename_all = "snake_case")]
88pub enum Event {
89 ModuleStarted {
91 module_id: String,
93 },
94 ModuleStopped {
96 module_id: String,
98 },
99 Custom {
101 module_id: String,
103 kind: String,
105 payload: serde_json::Value,
107 },
108 Pong {
110 from: String,
112 },
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117#[serde(tag = "kind", rename_all = "snake_case")]
118pub enum Message {
119 Command(Command),
121 Event(Event),
123}
124
125#[derive(Debug, Clone, Serialize, Deserialize)]
130pub struct Envelope {
131 pub id: Uuid,
133 pub source: String,
135 pub correlation_id: Option<Uuid>,
137 pub timestamp: DateTime<Utc>,
139 pub message: Message,
141}
142
143impl Envelope {
144 pub fn new(source: impl Into<String>, message: Message) -> Self {
146 Self {
147 id: Uuid::new_v4(),
148 source: source.into(),
149 correlation_id: None,
150 timestamp: Utc::now(),
151 message,
152 }
153 }
154
155 #[must_use]
157 pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
158 self.correlation_id = Some(correlation_id);
159 self
160 }
161}
162
163const DEFAULT_SUBSCRIBER_CAPACITY: usize = 128;
168
169pub struct Subscription {
174 pub receiver: mpsc::Receiver<Envelope>,
176 pub id: Uuid,
178}
179
180#[derive(Clone, Default)]
185pub struct MessageBus {
186 inner: Arc<BusInner>,
187}
188
189#[derive(Default)]
190struct BusInner {
191 subscribers: RwLock<Vec<Subscriber>>,
192 granted: RwLock<HashSet<String>>,
195}
196
197struct Subscriber {
198 id: Uuid,
199 tx: mpsc::Sender<Envelope>,
200}
201
202impl MessageBus {
203 pub fn new() -> Self {
205 Self::default()
206 }
207
208 pub async fn grant_capability(&self, cap: Capability) {
215 self.inner.granted.write().await.insert(cap.0);
216 }
217
218 pub async fn revoke_capability(&self, cap: &Capability) {
220 self.inner.granted.write().await.remove(&cap.0);
221 }
222
223 pub async fn publish_with_capability(
228 &self,
229 envelope: Envelope,
230 cap: &Capability,
231 ) -> Result<()> {
232 let granted = self.inner.granted.read().await;
233 if !granted.contains(&cap.0) {
234 return Err(KernelError::Denied {
235 publisher: envelope.source.clone(),
236 capability: cap.0.clone(),
237 });
238 }
239 drop(granted);
240 self.publish(envelope).await
241 }
242
243 pub async fn subscribe(&self) -> Subscription {
249 let (tx, rx) = mpsc::channel(DEFAULT_SUBSCRIBER_CAPACITY);
250 let id = Uuid::new_v4();
251 self.inner
252 .subscribers
253 .write()
254 .await
255 .push(Subscriber { id, tx });
256 Subscription { receiver: rx, id }
257 }
258
259 pub async fn publish(&self, envelope: Envelope) -> Result<()> {
266 let mut subs = self.inner.subscribers.write().await;
267 let mut alive = Vec::with_capacity(subs.len());
268
269 for sub in subs.drain(..) {
270 if sub.tx.is_closed() {
271 tracing::debug!(subscriber = %sub.id, "dropping closed subscriber");
272 continue;
273 }
274 match sub.tx.try_send(envelope.clone()) {
275 Ok(()) => alive.push(sub),
276 Err(mpsc::error::TrySendError::Full(_)) => {
277 tracing::warn!(subscriber = %sub.id, "subscriber channel full; message dropped");
278 alive.push(sub);
279 }
280 Err(mpsc::error::TrySendError::Closed(_)) => {
281 tracing::debug!(subscriber = %sub.id, "subscriber closed during publish");
282 }
283 }
284 }
285
286 *subs = alive;
287 Ok(())
288 }
289
290 pub async fn send_command(&self, source: impl Into<String>, command: Command) -> Result<Uuid> {
292 let envelope = Envelope::new(source, Message::Command(command));
293 let id = envelope.id;
294 self.publish(envelope).await?;
295 Ok(id)
296 }
297
298 pub async fn emit_event(&self, source: impl Into<String>, event: Event) -> Result<Uuid> {
300 let envelope = Envelope::new(source, Message::Event(event));
301 let id = envelope.id;
302 self.publish(envelope).await?;
303 Ok(id)
304 }
305
306 pub async fn subscriber_count(&self) -> usize {
308 self.inner.subscribers.read().await.len()
309 }
310}
311
312impl std::fmt::Debug for MessageBus {
313 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
314 f.debug_struct("MessageBus").finish_non_exhaustive()
315 }
316}
317
318impl<T> From<mpsc::error::SendError<T>> for KernelError {
322 fn from(err: mpsc::error::SendError<T>) -> Self {
323 KernelError::Bus(format!("channel send failed: {err}"))
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330
331 #[tokio::test]
332 async fn publish_delivers_to_all_subscribers() {
333 let bus = MessageBus::new();
334 let mut sub_a = bus.subscribe().await;
335 let mut sub_b = bus.subscribe().await;
336 assert_eq!(bus.subscriber_count().await, 2);
337
338 bus.send_command("test", Command::Ping).await.unwrap();
339
340 let a = sub_a.receiver.recv().await.expect("a received");
341 let b = sub_b.receiver.recv().await.expect("b received");
342 assert!(matches!(a.message, Message::Command(Command::Ping)));
343 assert!(matches!(b.message, Message::Command(Command::Ping)));
344 }
345
346 #[tokio::test]
347 async fn closed_subscribers_are_pruned() {
348 let bus = MessageBus::new();
349 {
350 let _sub = bus.subscribe().await;
351 assert_eq!(bus.subscriber_count().await, 1);
352 }
353 bus.emit_event("test", Event::Pong { from: "x".into() })
356 .await
357 .unwrap();
358 assert_eq!(bus.subscriber_count().await, 0);
359 }
360
361 #[tokio::test]
362 async fn envelope_carries_provenance() {
363 let bus = MessageBus::new();
364 let mut sub = bus.subscribe().await;
365
366 bus.emit_event(
367 "kernel",
368 Event::ModuleStarted {
369 module_id: "echo".into(),
370 },
371 )
372 .await
373 .unwrap();
374
375 let env = sub.receiver.recv().await.unwrap();
376 assert_eq!(env.source, "kernel");
377 assert!(env.id != Uuid::nil());
378 match env.message {
379 Message::Event(Event::ModuleStarted { module_id }) => {
380 assert_eq!(module_id, "echo");
381 }
382 other => panic!("unexpected message: {other:?}"),
383 }
384 }
385
386 #[tokio::test]
387 async fn correlation_id_round_trips() {
388 let bus = MessageBus::new();
389 let mut sub = bus.subscribe().await;
390 let cid = Uuid::new_v4();
391
392 let env = Envelope::new("test", Message::Command(Command::Ping)).with_correlation_id(cid);
393 bus.publish(env).await.unwrap();
394
395 let received = sub.receiver.recv().await.unwrap();
396 assert_eq!(received.correlation_id, Some(cid));
397 }
398
399 #[tokio::test]
404 async fn granted_capability_allows_publish() {
405 let bus = MessageBus::new();
406 let mut sub = bus.subscribe().await;
407
408 let cap = Capability::new("browser:navigate");
409 bus.grant_capability(cap.clone()).await;
410
411 let env = Envelope::new("browser-module", Message::Command(Command::Ping));
412 bus.publish_with_capability(env, &cap).await.unwrap();
413
414 let recv = sub.receiver.recv().await.unwrap();
415 assert!(matches!(recv.message, Message::Command(Command::Ping)));
416 }
417
418 #[tokio::test]
419 async fn unganted_capability_returns_denied() {
420 let bus = MessageBus::new();
421 let cap = Capability::new("llm:complete");
422 let env = Envelope::new("llm-module", Message::Command(Command::Ping));
424 let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
425 assert!(
426 matches!(err, KernelError::Denied { .. }),
427 "expected Denied, got {err}"
428 );
429 }
430
431 #[tokio::test]
432 async fn revoked_capability_is_denied() {
433 let bus = MessageBus::new();
434 let cap = Capability::new("mirror:sync");
435 bus.grant_capability(cap.clone()).await;
436 bus.revoke_capability(&cap).await;
437
438 let env = Envelope::new("mirror-module", Message::Command(Command::Ping));
439 let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
440 assert!(matches!(err, KernelError::Denied { .. }));
441 }
442
443 #[tokio::test]
444 async fn unguarded_publish_bypasses_acl() {
445 let bus = MessageBus::new();
447 let mut sub = bus.subscribe().await;
448 bus.send_command("kernel-internal", Command::Ping)
449 .await
450 .unwrap();
451 let recv = sub.receiver.recv().await.unwrap();
452 assert!(matches!(recv.message, Message::Command(Command::Ping)));
453 }
454}