1use serde::{Deserialize, Serialize};
7use tokio::sync::broadcast;
8
9#[derive(Debug, Clone, Serialize, Deserialize)]
11pub struct InterAgentMessage {
12 pub from: String,
14 pub to: Option<String>,
16 pub message_type: String,
18 pub payload: serde_json::Value,
20 pub timestamp_ms: u64,
22}
23
24impl InterAgentMessage {
25 pub fn direct(
27 from: impl Into<String>,
28 to: impl Into<String>,
29 message_type: impl Into<String>,
30 payload: serde_json::Value,
31 ) -> Self {
32 Self {
33 from: from.into(),
34 to: Some(to.into()),
35 message_type: message_type.into(),
36 payload,
37 timestamp_ms: std::time::SystemTime::now()
38 .duration_since(std::time::UNIX_EPOCH)
39 .unwrap_or_default()
40 .as_millis() as u64,
41 }
42 }
43
44 pub fn broadcast(
46 from: impl Into<String>,
47 message_type: impl Into<String>,
48 payload: serde_json::Value,
49 ) -> Self {
50 Self {
51 from: from.into(),
52 to: None,
53 message_type: message_type.into(),
54 payload,
55 timestamp_ms: std::time::SystemTime::now()
56 .duration_since(std::time::UNIX_EPOCH)
57 .unwrap_or_default()
58 .as_millis() as u64,
59 }
60 }
61
62 pub fn is_for(&self, agent_id: &str) -> bool {
64 self.to.as_deref() == Some(agent_id) || self.to.is_none()
65 }
66}
67
68#[derive(Clone)]
73pub struct MessageBus {
74 sender: broadcast::Sender<InterAgentMessage>,
75 capacity: usize,
76}
77
78impl MessageBus {
79 pub fn new(capacity: usize) -> Self {
81 let (tx, _rx) = broadcast::channel(capacity);
82 Self {
83 sender: tx,
84 capacity,
85 }
86 }
87
88 pub fn publish(&self, msg: InterAgentMessage) -> usize {
92 self.sender.send(msg).unwrap_or(0)
93 }
94
95 pub fn subscribe(&self) -> broadcast::Receiver<InterAgentMessage> {
97 self.sender.subscribe()
98 }
99
100 pub fn subscriber_count(&self) -> usize {
102 self.sender.receiver_count()
103 }
104
105 pub fn capacity(&self) -> usize {
107 self.capacity
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114 use serde_json::json;
115
116 #[test]
117 fn test_direct_message() {
118 let msg = InterAgentMessage::direct(
119 "agent-1",
120 "agent-2",
121 "task_complete",
122 json!({"result": "ok"}),
123 );
124 assert_eq!(msg.from, "agent-1");
125 assert_eq!(msg.to, Some("agent-2".to_string()));
126 assert!(msg.is_for("agent-2"));
127 assert!(!msg.is_for("agent-1"));
128 assert!(!msg.is_for("agent-3"));
129 }
130
131 #[test]
132 fn test_broadcast_message() {
133 let msg =
134 InterAgentMessage::broadcast("agent-1", "status_update", json!({"status": "idle"}));
135 assert_eq!(msg.from, "agent-1");
136 assert!(msg.to.is_none());
137 assert!(msg.is_for("agent-2"));
138 assert!(msg.is_for("agent-3"));
139 }
140
141 #[tokio::test]
142 async fn test_message_bus_pub_sub() {
143 let bus = MessageBus::new(16);
144 let mut rx = bus.subscribe();
145
146 let msg = InterAgentMessage::broadcast("agent-1", "ping", json!("pong"));
147 bus.publish(msg.clone());
148
149 let received = rx.try_recv().expect("should receive message");
150 assert_eq!(received.from, "agent-1");
151 assert_eq!(received.message_type, "ping");
152 }
153
154 #[tokio::test]
155 async fn test_message_bus_multiple_subscribers() {
156 let bus = MessageBus::new(16);
157 let mut rx1 = bus.subscribe();
158 let mut rx2 = bus.subscribe();
159
160 assert_eq!(bus.subscriber_count(), 2);
161
162 let msg = InterAgentMessage::broadcast("coordinator", "start", json!({}));
163 let count = bus.publish(msg);
164 assert_eq!(count, 2);
165
166 assert!(rx1.try_recv().is_ok());
167 assert!(rx2.try_recv().is_ok());
168 }
169
170 #[test]
171 fn test_message_serialization() {
172 let msg = InterAgentMessage::direct("a", "b", "test", json!({"key": "value"}));
173 let json = serde_json::to_string(&msg).unwrap();
174 let deserialized: InterAgentMessage = serde_json::from_str(&json).unwrap();
175 assert_eq!(deserialized.from, "a");
176 assert_eq!(deserialized.to, Some("b".to_string()));
177 }
178}