portalis_core/
message.rs

1//! Message passing infrastructure for agent communication
2//!
3//! Following London School TDD: agents communicate via messages, not direct calls.
4//! This enables easy mocking and testing of agent interactions.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use tokio::sync::mpsc;
9use uuid::Uuid;
10
11use crate::{AgentId, Error, Result};
12
13/// Unique message identifier
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
15pub struct MessageId(Uuid);
16
17impl MessageId {
18    pub fn new() -> Self {
19        Self(Uuid::new_v4())
20    }
21}
22
23impl Default for MessageId {
24    fn default() -> Self {
25        Self::new()
26    }
27}
28
29/// Message payload types
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub enum MessagePayload {
32    /// Start processing
33    Start,
34    /// Data payload (JSON-encoded)
35    Data(serde_json::Value),
36    /// Task completed successfully
37    Complete,
38    /// Task failed with error
39    Error(String),
40    /// Request for status
41    StatusRequest,
42    /// Status response
43    StatusResponse(String),
44}
45
46/// Message envelope for agent communication
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct Message {
49    pub id: MessageId,
50    pub from: AgentId,
51    pub to: AgentId,
52    pub payload: MessagePayload,
53    pub timestamp: i64,
54}
55
56impl Message {
57    /// Create a new message
58    pub fn new(from: AgentId, to: AgentId, payload: MessagePayload) -> Self {
59        Self {
60            id: MessageId::new(),
61            from,
62            to,
63            payload,
64            timestamp: chrono::Utc::now().timestamp(),
65        }
66    }
67
68    /// Create a data message
69    pub fn data(from: AgentId, to: AgentId, data: serde_json::Value) -> Self {
70        Self::new(from, to, MessagePayload::Data(data))
71    }
72
73    /// Create a start message
74    pub fn start(from: AgentId, to: AgentId) -> Self {
75        Self::new(from, to, MessagePayload::Start)
76    }
77
78    /// Create a complete message
79    pub fn complete(from: AgentId, to: AgentId) -> Self {
80        Self::new(from, to, MessagePayload::Complete)
81    }
82
83    /// Create an error message
84    pub fn error(from: AgentId, to: AgentId, error: impl Into<String>) -> Self {
85        Self::new(from, to, MessagePayload::Error(error.into()))
86    }
87}
88
89/// Message bus for agent communication
90///
91/// Following London School TDD:
92/// - Agents don't call each other directly
93/// - All communication goes through the message bus
94/// - Easy to mock for testing
95pub struct MessageBus {
96    channels: HashMap<AgentId, mpsc::UnboundedSender<Message>>,
97}
98
99impl MessageBus {
100    /// Create a new message bus
101    pub fn new() -> Self {
102        Self {
103            channels: HashMap::new(),
104        }
105    }
106
107    /// Register an agent with the message bus
108    pub fn register(&mut self, agent_id: AgentId) -> mpsc::UnboundedReceiver<Message> {
109        let (tx, rx) = mpsc::unbounded_channel();
110        self.channels.insert(agent_id, tx);
111        rx
112    }
113
114    /// Unregister an agent from the message bus
115    pub fn unregister(&mut self, agent_id: &AgentId) {
116        self.channels.remove(agent_id);
117    }
118
119    /// Send a message to a specific agent
120    pub async fn send(&self, message: Message) -> Result<()> {
121        let channel = self.channels
122            .get(&message.to)
123            .ok_or_else(|| Error::MessageBus(format!("Agent {} not registered", message.to)))?;
124
125        channel
126            .send(message)
127            .map_err(|e| Error::MessageBus(format!("Failed to send message: {}", e)))?;
128
129        Ok(())
130    }
131
132    /// Broadcast a message to all registered agents
133    pub async fn broadcast(&self, message: Message) -> Result<()> {
134        for channel in self.channels.values() {
135            channel
136                .send(message.clone())
137                .map_err(|e| Error::MessageBus(format!("Failed to broadcast: {}", e)))?;
138        }
139        Ok(())
140    }
141
142    /// Get the number of registered agents
143    pub fn agent_count(&self) -> usize {
144        self.channels.len()
145    }
146}
147
148impl Default for MessageBus {
149    fn default() -> Self {
150        Self::new()
151    }
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157
158    #[test]
159    fn test_message_creation() {
160        let from = AgentId::new();
161        let to = AgentId::new();
162        let msg = Message::start(from, to);
163
164        assert_eq!(msg.from, from);
165        assert_eq!(msg.to, to);
166        assert!(matches!(msg.payload, MessagePayload::Start));
167    }
168
169    #[test]
170    fn test_message_bus_registration() {
171        let mut bus = MessageBus::new();
172        let agent_id = AgentId::new();
173
174        let _rx = bus.register(agent_id);
175        assert_eq!(bus.agent_count(), 1);
176
177        bus.unregister(&agent_id);
178        assert_eq!(bus.agent_count(), 0);
179    }
180
181    #[tokio::test]
182    async fn test_message_bus_send() {
183        let mut bus = MessageBus::new();
184        let agent1 = AgentId::new();
185        let agent2 = AgentId::new();
186
187        let mut rx = bus.register(agent2);
188        let _tx = bus.register(agent1);
189
190        let msg = Message::start(agent1, agent2);
191        bus.send(msg.clone()).await.unwrap();
192
193        let received = rx.recv().await.unwrap();
194        assert_eq!(received.id, msg.id);
195        assert_eq!(received.from, agent1);
196    }
197}