use std::collections::HashSet;
use std::sync::Arc;
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
use crate::error::{KernelError, Result};
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct Capability(pub String);
impl Capability {
pub fn new(s: impl Into<String>) -> Self {
Self(s.into())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Command {
Start {
module_id: String,
},
Stop {
module_id: String,
},
Invoke {
module_id: String,
method: String,
payload: serde_json::Value,
},
Ping,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum Event {
ModuleStarted {
module_id: String,
},
ModuleStopped {
module_id: String,
},
Custom {
module_id: String,
kind: String,
payload: serde_json::Value,
},
Pong {
from: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "kind", rename_all = "snake_case")]
pub enum Message {
Command(Command),
Event(Event),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Envelope {
pub id: Uuid,
pub source: String,
pub correlation_id: Option<Uuid>,
pub timestamp: DateTime<Utc>,
pub message: Message,
}
impl Envelope {
pub fn new(source: impl Into<String>, message: Message) -> Self {
Self {
id: Uuid::new_v4(),
source: source.into(),
correlation_id: None,
timestamp: Utc::now(),
message,
}
}
#[must_use]
pub fn with_correlation_id(mut self, correlation_id: Uuid) -> Self {
self.correlation_id = Some(correlation_id);
self
}
}
const DEFAULT_SUBSCRIBER_CAPACITY: usize = 128;
pub struct Subscription {
pub receiver: mpsc::Receiver<Envelope>,
pub id: Uuid,
}
#[derive(Clone, Default)]
pub struct MessageBus {
inner: Arc<BusInner>,
}
#[derive(Default)]
struct BusInner {
subscribers: RwLock<Vec<Subscriber>>,
granted: RwLock<HashSet<String>>,
}
struct Subscriber {
id: Uuid,
tx: mpsc::Sender<Envelope>,
}
impl MessageBus {
pub fn new() -> Self {
Self::default()
}
pub async fn grant_capability(&self, cap: Capability) {
self.inner.granted.write().await.insert(cap.0);
}
pub async fn revoke_capability(&self, cap: &Capability) {
self.inner.granted.write().await.remove(&cap.0);
}
pub async fn publish_with_capability(
&self,
envelope: Envelope,
cap: &Capability,
) -> Result<()> {
let granted = self.inner.granted.read().await;
if !granted.contains(&cap.0) {
return Err(KernelError::Denied {
publisher: envelope.source.clone(),
capability: cap.0.clone(),
});
}
drop(granted);
self.publish(envelope).await
}
pub async fn subscribe(&self) -> Subscription {
let (tx, rx) = mpsc::channel(DEFAULT_SUBSCRIBER_CAPACITY);
let id = Uuid::new_v4();
self.inner
.subscribers
.write()
.await
.push(Subscriber { id, tx });
Subscription { receiver: rx, id }
}
pub async fn publish(&self, envelope: Envelope) -> Result<()> {
let mut subs = self.inner.subscribers.write().await;
let mut alive = Vec::with_capacity(subs.len());
for sub in subs.drain(..) {
if sub.tx.is_closed() {
tracing::debug!(subscriber = %sub.id, "dropping closed subscriber");
continue;
}
match sub.tx.try_send(envelope.clone()) {
Ok(()) => alive.push(sub),
Err(mpsc::error::TrySendError::Full(_)) => {
tracing::warn!(subscriber = %sub.id, "subscriber channel full; message dropped");
alive.push(sub);
}
Err(mpsc::error::TrySendError::Closed(_)) => {
tracing::debug!(subscriber = %sub.id, "subscriber closed during publish");
}
}
}
*subs = alive;
Ok(())
}
pub async fn send_command(&self, source: impl Into<String>, command: Command) -> Result<Uuid> {
let envelope = Envelope::new(source, Message::Command(command));
let id = envelope.id;
self.publish(envelope).await?;
Ok(id)
}
pub async fn emit_event(&self, source: impl Into<String>, event: Event) -> Result<Uuid> {
let envelope = Envelope::new(source, Message::Event(event));
let id = envelope.id;
self.publish(envelope).await?;
Ok(id)
}
pub async fn subscriber_count(&self) -> usize {
self.inner.subscribers.read().await.len()
}
}
impl std::fmt::Debug for MessageBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MessageBus").finish_non_exhaustive()
}
}
impl<T> From<mpsc::error::SendError<T>> for KernelError {
fn from(err: mpsc::error::SendError<T>) -> Self {
KernelError::Bus(format!("channel send failed: {err}"))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn publish_delivers_to_all_subscribers() {
let bus = MessageBus::new();
let mut sub_a = bus.subscribe().await;
let mut sub_b = bus.subscribe().await;
assert_eq!(bus.subscriber_count().await, 2);
bus.send_command("test", Command::Ping).await.unwrap();
let a = sub_a.receiver.recv().await.expect("a received");
let b = sub_b.receiver.recv().await.expect("b received");
assert!(matches!(a.message, Message::Command(Command::Ping)));
assert!(matches!(b.message, Message::Command(Command::Ping)));
}
#[tokio::test]
async fn closed_subscribers_are_pruned() {
let bus = MessageBus::new();
{
let _sub = bus.subscribe().await;
assert_eq!(bus.subscriber_count().await, 1);
}
bus.emit_event("test", Event::Pong { from: "x".into() })
.await
.unwrap();
assert_eq!(bus.subscriber_count().await, 0);
}
#[tokio::test]
async fn envelope_carries_provenance() {
let bus = MessageBus::new();
let mut sub = bus.subscribe().await;
bus.emit_event(
"kernel",
Event::ModuleStarted {
module_id: "echo".into(),
},
)
.await
.unwrap();
let env = sub.receiver.recv().await.unwrap();
assert_eq!(env.source, "kernel");
assert!(env.id != Uuid::nil());
match env.message {
Message::Event(Event::ModuleStarted { module_id }) => {
assert_eq!(module_id, "echo");
}
other => panic!("unexpected message: {other:?}"),
}
}
#[tokio::test]
async fn correlation_id_round_trips() {
let bus = MessageBus::new();
let mut sub = bus.subscribe().await;
let cid = Uuid::new_v4();
let env = Envelope::new("test", Message::Command(Command::Ping)).with_correlation_id(cid);
bus.publish(env).await.unwrap();
let received = sub.receiver.recv().await.unwrap();
assert_eq!(received.correlation_id, Some(cid));
}
#[tokio::test]
async fn granted_capability_allows_publish() {
let bus = MessageBus::new();
let mut sub = bus.subscribe().await;
let cap = Capability::new("browser:navigate");
bus.grant_capability(cap.clone()).await;
let env = Envelope::new("browser-module", Message::Command(Command::Ping));
bus.publish_with_capability(env, &cap).await.unwrap();
let recv = sub.receiver.recv().await.unwrap();
assert!(matches!(recv.message, Message::Command(Command::Ping)));
}
#[tokio::test]
async fn unganted_capability_returns_denied() {
let bus = MessageBus::new();
let cap = Capability::new("llm:complete");
let env = Envelope::new("llm-module", Message::Command(Command::Ping));
let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
assert!(
matches!(err, KernelError::Denied { .. }),
"expected Denied, got {err}"
);
}
#[tokio::test]
async fn revoked_capability_is_denied() {
let bus = MessageBus::new();
let cap = Capability::new("mirror:sync");
bus.grant_capability(cap.clone()).await;
bus.revoke_capability(&cap).await;
let env = Envelope::new("mirror-module", Message::Command(Command::Ping));
let err = bus.publish_with_capability(env, &cap).await.unwrap_err();
assert!(matches!(err, KernelError::Denied { .. }));
}
#[tokio::test]
async fn unguarded_publish_bypasses_acl() {
let bus = MessageBus::new();
let mut sub = bus.subscribe().await;
bus.send_command("kernel-internal", Command::Ping)
.await
.unwrap();
let recv = sub.receiver.recv().await.unwrap();
assert!(matches!(recv.message, Message::Command(Command::Ping)));
}
}