1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{mpsc, RwLock};
9use uuid::Uuid;
10
11use super::artifact::Artifact;
12use super::context::AgentContextRef;
13use super::output::AgentOutput;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct AgentMessage {
18 pub id: Uuid,
20 pub msg_type: MessageType,
22 pub sender: AgentEndpoint,
24 pub receivers: Vec<AgentEndpoint>,
26 pub payload: MessagePayload,
28 pub correlation_id: Option<Uuid>,
30 pub timestamp: DateTime<Utc>,
32 pub ttl: Duration,
34 pub delivery_mode: DeliveryMode,
36}
37
38impl AgentMessage {
39 pub fn new(
41 sender: AgentEndpoint,
42 receivers: Vec<AgentEndpoint>,
43 payload: MessagePayload,
44 ) -> Self {
45 Self {
46 id: Uuid::new_v4(),
47 msg_type: MessageType::Request,
48 sender,
49 receivers,
50 payload,
51 correlation_id: None,
52 timestamp: Utc::now(),
53 ttl: Duration::from_secs(300),
54 delivery_mode: DeliveryMode::AtLeastOnce,
55 }
56 }
57
58 pub fn response_to(original: &AgentMessage, payload: MessagePayload) -> Self {
60 Self {
61 id: Uuid::new_v4(),
62 msg_type: MessageType::Response,
63 sender: original
64 .receivers
65 .first()
66 .cloned()
67 .unwrap_or(original.sender.clone()),
68 receivers: vec![original.sender.clone()],
69 payload,
70 correlation_id: Some(original.id),
71 timestamp: Utc::now(),
72 ttl: Duration::from_secs(300),
73 delivery_mode: DeliveryMode::AtLeastOnce,
74 }
75 }
76
77 pub fn publish(sender: AgentEndpoint, payload: MessagePayload) -> Self {
79 Self {
80 id: Uuid::new_v4(),
81 msg_type: MessageType::Publish,
82 sender,
83 receivers: Vec::new(),
84 payload,
85 correlation_id: None,
86 timestamp: Utc::now(),
87 ttl: Duration::from_secs(60),
88 delivery_mode: DeliveryMode::Broadcast,
89 }
90 }
91}
92
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
95pub struct AgentEndpoint {
96 pub agent_id: String,
98 pub phase: Option<String>,
100 pub task_id: Option<String>,
102 pub item_id: Option<String>,
104}
105
106impl AgentEndpoint {
107 pub fn agent(agent_id: &str) -> Self {
109 Self {
110 agent_id: agent_id.to_string(),
111 phase: None,
112 task_id: None,
113 item_id: None,
114 }
115 }
116
117 pub fn for_phase(agent_id: &str, phase: &str) -> Self {
119 Self {
120 agent_id: agent_id.to_string(),
121 phase: Some(phase.to_string()),
122 task_id: None,
123 item_id: None,
124 }
125 }
126
127 pub fn for_task_item(agent_id: &str, task_id: &str, item_id: &str) -> Self {
129 Self {
130 agent_id: agent_id.to_string(),
131 phase: None,
132 task_id: Some(task_id.to_string()),
133 item_id: Some(item_id.to_string()),
134 }
135 }
136}
137
138#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
140pub enum MessageType {
141 Request,
143 Response,
145 Ack,
147 Publish,
149 Forward,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum DeliveryMode {
156 FireAndForget,
158 AtLeastOnce,
160 ExactlyOnce,
162 Broadcast,
164}
165
166#[derive(Debug, Clone, Serialize, Deserialize)]
168pub enum MessagePayload {
169 ExecutionRequest(ExecutionRequest),
171 ExecutionResult(ExecutionResult),
173 Artifact(Artifact),
175 ContextUpdate(ContextUpdate),
177 ControlSignal(ControlSignal),
179 Custom(serde_json::Value),
181}
182
183#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ExecutionRequest {
186 pub command: String,
188 pub context: AgentContextRef,
190 pub input_artifacts: Vec<Artifact>,
192 pub expectations: Option<ExecutionExpectations>,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct ExecutionResult {
199 pub run_id: Uuid,
201 pub output: AgentOutput,
203 pub success: bool,
205 pub error: Option<String>,
207}
208
209#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct ExecutionExpectations {
212 pub output_schema: Option<serde_json::Value>,
214 pub validation_rules: Vec<ValidationRule>,
216 pub quality_threshold: f32,
218}
219
220#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct ValidationRule {
223 pub name: String,
225 pub expression: String,
227 pub error_message: String,
229}
230
231#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ContextUpdate {
234 pub key: String,
236 pub value: serde_json::Value,
238 pub operation: ContextUpdateOp,
240}
241
242#[derive(Debug, Clone, Serialize, Deserialize)]
244pub enum ContextUpdateOp {
245 Set,
247 Append,
249 Remove,
251}
252
253#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct ControlSignal {
256 pub signal: Signal,
258 pub reason: Option<String>,
260}
261
262#[derive(Debug, Clone, Serialize, Deserialize)]
264pub enum Signal {
265 Cancel,
267 Pause,
269 Resume,
271 Retry,
273 Skip,
275}
276
277pub struct MessageBus {
279 tx: mpsc::Sender<AgentMessage>,
280 message_store: Arc<RwLock<HashMap<Uuid, AgentMessage>>>,
281}
282
283impl MessageBus {
284 pub fn new() -> Self {
286 let (tx, _rx) = mpsc::channel(1000);
287 Self {
288 tx,
289 message_store: Arc::new(RwLock::new(HashMap::new())),
290 }
291 }
292
293 pub async fn publish(&self, msg: AgentMessage) -> Result<Uuid> {
295 let msg_id = msg.id;
296
297 {
298 let mut store = self.message_store.write().await;
299 store.insert(msg_id, msg.clone());
300 }
301
302 for _receiver in &msg.receivers {
303 let _ = self.tx.send(msg.clone()).await;
304 }
305
306 Ok(msg_id)
307 }
308}
309
310impl Default for MessageBus {
311 fn default() -> Self {
312 Self::new()
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use super::*;
319
320 #[test]
321 fn test_agent_endpoint_constructors() {
322 let ep1 = AgentEndpoint::agent("qa_agent");
323 assert_eq!(ep1.agent_id, "qa_agent");
324 assert!(ep1.phase.is_none());
325
326 let ep2 = AgentEndpoint::for_phase("impl_agent", "implement");
327 assert_eq!(ep2.agent_id, "impl_agent");
328 assert_eq!(ep2.phase.as_deref(), Some("implement"));
329
330 let ep3 = AgentEndpoint::for_task_item("agent1", "task1", "item1");
331 assert_eq!(ep3.task_id.as_deref(), Some("task1"));
332 assert_eq!(ep3.item_id.as_deref(), Some("item1"));
333 }
334
335 #[test]
336 fn test_agent_message_new() {
337 let sender = AgentEndpoint::agent("sender");
338 let receiver = AgentEndpoint::agent("receiver");
339 let msg = AgentMessage::new(
340 sender.clone(),
341 vec![receiver.clone()],
342 MessagePayload::Custom(serde_json::json!("hello")),
343 );
344 assert_eq!(msg.msg_type, MessageType::Request);
345 assert_eq!(msg.sender.agent_id, "sender");
346 assert_eq!(msg.receivers.len(), 1);
347 }
348
349 #[test]
350 fn test_agent_message_response_to() {
351 let original = AgentMessage::new(
352 AgentEndpoint::agent("alice"),
353 vec![AgentEndpoint::agent("bob")],
354 MessagePayload::Custom(serde_json::json!("req")),
355 );
356 let response =
357 AgentMessage::response_to(&original, MessagePayload::Custom(serde_json::json!("resp")));
358 assert_eq!(response.msg_type, MessageType::Response);
359 assert_eq!(response.correlation_id, Some(original.id));
360 assert_eq!(response.sender.agent_id, "bob");
361 assert_eq!(response.receivers[0].agent_id, "alice");
362 }
363
364 #[test]
365 fn test_agent_message_publish() {
366 let msg = AgentMessage::publish(
367 AgentEndpoint::agent("broadcaster"),
368 MessagePayload::Custom(serde_json::json!("broadcast")),
369 );
370 assert_eq!(msg.msg_type, MessageType::Publish);
371 assert!(msg.receivers.is_empty());
372 }
373
374 #[tokio::test]
375 async fn test_message_bus_publish_stores_message() {
376 let bus = MessageBus::new();
377 let msg = AgentMessage::new(
378 AgentEndpoint::agent("sender"),
379 vec![AgentEndpoint::agent("receiver")],
380 MessagePayload::Custom(serde_json::json!({"data": "test"})),
381 );
382 let msg_id = msg.id;
383 let result = bus.publish(msg).await;
384 assert!(result.is_ok());
385 assert_eq!(result.unwrap(), msg_id);
386
387 let store = bus.message_store.read().await;
389 assert!(store.contains_key(&msg_id));
390 assert_eq!(store[&msg_id].sender.agent_id, "sender");
391 }
392
393 #[tokio::test]
394 async fn test_message_bus_publish_broadcast_no_panic() {
395 let bus = MessageBus::new();
396 let msg = AgentMessage::publish(
398 AgentEndpoint::agent("broadcaster"),
399 MessagePayload::Custom(serde_json::json!("event")),
400 );
401 let msg_id = msg.id;
402 let result = bus.publish(msg).await;
403 assert!(result.is_ok());
404 assert_eq!(result.unwrap(), msg_id);
405
406 let store = bus.message_store.read().await;
408 assert!(store.contains_key(&msg_id));
409 }
410}