Skip to main content

oxi_sdk/
message_bus.rs

1//! Inter-agent message bus for multi-agent communication.
2//!
3//! Provides a broadcast-based message bus that agents can use to
4//! communicate with each other in an oxios environment.
5
6use serde::{Deserialize, Serialize};
7use tokio::sync::broadcast;
8
9/// A message sent between agents.
10#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct InterAgentMessage {
12    /// Sender agent ID.
13    pub from: String,
14    /// Recipient agent ID. `None` means broadcast to all subscribers.
15    pub to: Option<String>,
16    /// Message type (e.g. "task_complete", "delegation", "status").
17    pub message_type: String,
18    /// Message payload (arbitrary JSON).
19    pub payload: serde_json::Value,
20    /// Unix timestamp in milliseconds.
21    pub timestamp_ms: u64,
22}
23
24impl InterAgentMessage {
25    /// Create a new directed message.
26    pub fn direct(
27        from: impl Into<String>,
28        to: impl Into<String>,
29        message_type: impl Into<String>,
30        payload: serde_json::Value,
31    ) -> Self {
32        Self {
33            from: from.into(),
34            to: Some(to.into()),
35            message_type: message_type.into(),
36            payload,
37            timestamp_ms: std::time::SystemTime::now()
38                .duration_since(std::time::UNIX_EPOCH)
39                .unwrap_or_default()
40                .as_millis() as u64,
41        }
42    }
43
44    /// Create a broadcast message.
45    pub fn broadcast(
46        from: impl Into<String>,
47        message_type: impl Into<String>,
48        payload: serde_json::Value,
49    ) -> Self {
50        Self {
51            from: from.into(),
52            to: None,
53            message_type: message_type.into(),
54            payload,
55            timestamp_ms: std::time::SystemTime::now()
56                .duration_since(std::time::UNIX_EPOCH)
57                .unwrap_or_default()
58                .as_millis() as u64,
59        }
60    }
61
62    /// Check if this message is intended for the given agent.
63    pub fn is_for(&self, agent_id: &str) -> bool {
64        self.to.as_deref() == Some(agent_id) || self.to.is_none()
65    }
66}
67
68/// Broadcast-based message bus for inter-agent communication.
69///
70/// Agents subscribe to the bus and receive messages addressed to them
71/// or broadcast messages. Thread-safe and async-compatible.
72#[derive(Clone)]
73pub struct MessageBus {
74    sender: broadcast::Sender<InterAgentMessage>,
75    capacity: usize,
76}
77
78impl MessageBus {
79    /// Create a new message bus with the given channel capacity.
80    pub fn new(capacity: usize) -> Self {
81        let (tx, _rx) = broadcast::channel(capacity);
82        Self {
83            sender: tx,
84            capacity,
85        }
86    }
87
88    /// Publish a message to the bus.
89    ///
90    /// Returns the number of receivers that received the message.
91    pub fn publish(&self, msg: InterAgentMessage) -> usize {
92        self.sender.send(msg).unwrap_or(0)
93    }
94
95    /// Subscribe to all messages on the bus.
96    pub fn subscribe(&self) -> broadcast::Receiver<InterAgentMessage> {
97        self.sender.subscribe()
98    }
99
100    /// Get the number of active subscribers.
101    pub fn subscriber_count(&self) -> usize {
102        self.sender.receiver_count()
103    }
104
105    /// Get the configured capacity.
106    pub fn capacity(&self) -> usize {
107        self.capacity
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114    use serde_json::json;
115
116    #[test]
117    fn test_direct_message() {
118        let msg = InterAgentMessage::direct(
119            "agent-1",
120            "agent-2",
121            "task_complete",
122            json!({"result": "ok"}),
123        );
124        assert_eq!(msg.from, "agent-1");
125        assert_eq!(msg.to, Some("agent-2".to_string()));
126        assert!(msg.is_for("agent-2"));
127        assert!(!msg.is_for("agent-1"));
128        assert!(!msg.is_for("agent-3"));
129    }
130
131    #[test]
132    fn test_broadcast_message() {
133        let msg =
134            InterAgentMessage::broadcast("agent-1", "status_update", json!({"status": "idle"}));
135        assert_eq!(msg.from, "agent-1");
136        assert!(msg.to.is_none());
137        assert!(msg.is_for("agent-2"));
138        assert!(msg.is_for("agent-3"));
139    }
140
141    #[tokio::test]
142    async fn test_message_bus_pub_sub() {
143        let bus = MessageBus::new(16);
144        let mut rx = bus.subscribe();
145
146        let msg = InterAgentMessage::broadcast("agent-1", "ping", json!("pong"));
147        bus.publish(msg.clone());
148
149        let received = rx.try_recv().expect("should receive message");
150        assert_eq!(received.from, "agent-1");
151        assert_eq!(received.message_type, "ping");
152    }
153
154    #[tokio::test]
155    async fn test_message_bus_multiple_subscribers() {
156        let bus = MessageBus::new(16);
157        let mut rx1 = bus.subscribe();
158        let mut rx2 = bus.subscribe();
159
160        assert_eq!(bus.subscriber_count(), 2);
161
162        let msg = InterAgentMessage::broadcast("coordinator", "start", json!({}));
163        let count = bus.publish(msg);
164        assert_eq!(count, 2);
165
166        assert!(rx1.try_recv().is_ok());
167        assert!(rx2.try_recv().is_ok());
168    }
169
170    #[test]
171    fn test_message_serialization() {
172        let msg = InterAgentMessage::direct("a", "b", "test", json!({"key": "value"}));
173        let json = serde_json::to_string(&msg).unwrap();
174        let deserialized: InterAgentMessage = serde_json::from_str(&json).unwrap();
175        assert_eq!(deserialized.from, "a");
176        assert_eq!(deserialized.to, Some("b".to_string()));
177    }
178}