Skip to main content

agent_orchestrator/collab/
message.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use anyhow::Result;
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use tokio::sync::{mpsc, RwLock};
9use uuid::Uuid;
10
11use super::artifact::Artifact;
12use super::context::AgentContextRef;
13use super::output::AgentOutput;
14
15/// Message envelope exchanged between collaborating agents.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct AgentMessage {
18    /// Unique message identifier.
19    pub id: Uuid,
20    /// Delivery semantic of the message.
21    pub msg_type: MessageType,
22    /// Sending endpoint.
23    pub sender: AgentEndpoint,
24    /// Intended receiving endpoints.
25    pub receivers: Vec<AgentEndpoint>,
26    /// Message body.
27    pub payload: MessagePayload,
28    /// Correlation identifier used to tie responses to requests.
29    pub correlation_id: Option<Uuid>,
30    /// Creation timestamp.
31    pub timestamp: DateTime<Utc>,
32    /// Time-to-live before the message should be discarded.
33    pub ttl: Duration,
34    /// Delivery guarantees requested for the transport.
35    pub delivery_mode: DeliveryMode,
36}
37
38impl AgentMessage {
39    /// Creates a request message addressed to one or more receivers.
40    pub fn new(
41        sender: AgentEndpoint,
42        receivers: Vec<AgentEndpoint>,
43        payload: MessagePayload,
44    ) -> Self {
45        Self {
46            id: Uuid::new_v4(),
47            msg_type: MessageType::Request,
48            sender,
49            receivers,
50            payload,
51            correlation_id: None,
52            timestamp: Utc::now(),
53            ttl: Duration::from_secs(300),
54            delivery_mode: DeliveryMode::AtLeastOnce,
55        }
56    }
57
58    /// Builds a response message for a prior request.
59    pub fn response_to(original: &AgentMessage, payload: MessagePayload) -> Self {
60        Self {
61            id: Uuid::new_v4(),
62            msg_type: MessageType::Response,
63            sender: original
64                .receivers
65                .first()
66                .cloned()
67                .unwrap_or(original.sender.clone()),
68            receivers: vec![original.sender.clone()],
69            payload,
70            correlation_id: Some(original.id),
71            timestamp: Utc::now(),
72            ttl: Duration::from_secs(300),
73            delivery_mode: DeliveryMode::AtLeastOnce,
74        }
75    }
76
77    /// Builds a broadcast-style publish message.
78    pub fn publish(sender: AgentEndpoint, payload: MessagePayload) -> Self {
79        Self {
80            id: Uuid::new_v4(),
81            msg_type: MessageType::Publish,
82            sender,
83            receivers: Vec::new(),
84            payload,
85            correlation_id: None,
86            timestamp: Utc::now(),
87            ttl: Duration::from_secs(60),
88            delivery_mode: DeliveryMode::Broadcast,
89        }
90    }
91}
92
93/// Address of an agent or a specific agent execution scope.
94#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
95pub struct AgentEndpoint {
96    /// Agent identifier.
97    pub agent_id: String,
98    /// Optional phase scope.
99    pub phase: Option<String>,
100    /// Optional task scope.
101    pub task_id: Option<String>,
102    /// Optional task-item scope.
103    pub item_id: Option<String>,
104}
105
106impl AgentEndpoint {
107    /// Creates an endpoint scoped only to an agent identifier.
108    pub fn agent(agent_id: &str) -> Self {
109        Self {
110            agent_id: agent_id.to_string(),
111            phase: None,
112            task_id: None,
113            item_id: None,
114        }
115    }
116
117    /// Creates an endpoint scoped to a specific agent phase.
118    pub fn for_phase(agent_id: &str, phase: &str) -> Self {
119        Self {
120            agent_id: agent_id.to_string(),
121            phase: Some(phase.to_string()),
122            task_id: None,
123            item_id: None,
124        }
125    }
126
127    /// Creates an endpoint scoped to a specific task item.
128    pub fn for_task_item(agent_id: &str, task_id: &str, item_id: &str) -> Self {
129        Self {
130            agent_id: agent_id.to_string(),
131            phase: None,
132            task_id: Some(task_id.to_string()),
133            item_id: Some(item_id.to_string()),
134        }
135    }
136}
137
138/// High-level message intent.
139#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
140pub enum MessageType {
141    /// Request expecting a follow-up response.
142    Request,
143    /// Response correlated to a request.
144    Response,
145    /// Acknowledgement without a payload-specific result.
146    Ack,
147    /// Publish/subscribe broadcast.
148    Publish,
149    /// Relay or delegated message.
150    Forward,
151}
152
153/// Delivery guarantees requested by the sender.
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub enum DeliveryMode {
156    /// No delivery confirmation required.
157    FireAndForget,
158    /// Message may be delivered more than once but should not be lost.
159    AtLeastOnce,
160    /// Transport should avoid duplicate delivery.
161    ExactlyOnce,
162    /// Broadcast to all interested subscribers.
163    Broadcast,
164}
165
166/// Message body variants supported by the collaboration layer.
167#[derive(Debug, Clone, Serialize, Deserialize)]
168pub enum MessagePayload {
169    /// Command or phase execution request.
170    ExecutionRequest(ExecutionRequest),
171    /// Result of an execution request.
172    ExecutionResult(ExecutionResult),
173    /// Standalone artifact transmission.
174    Artifact(Artifact),
175    /// Shared-context mutation event.
176    ContextUpdate(ContextUpdate),
177    /// Runtime control signal.
178    ControlSignal(ControlSignal),
179    /// Extensible custom JSON payload.
180    Custom(serde_json::Value),
181}
182
183/// Request payload that asks another agent to perform work.
184#[derive(Debug, Clone, Serialize, Deserialize)]
185pub struct ExecutionRequest {
186    /// Command or prompt to execute.
187    pub command: String,
188    /// Serialized execution context.
189    pub context: AgentContextRef,
190    /// Input artifacts made available to the receiver.
191    pub input_artifacts: Vec<Artifact>,
192    /// Optional expectations for validation and scoring.
193    pub expectations: Option<ExecutionExpectations>,
194}
195
196/// Response payload carrying execution output.
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct ExecutionResult {
199    /// Run identifier of the delegated execution.
200    pub run_id: Uuid,
201    /// Structured output produced by the execution.
202    pub output: AgentOutput,
203    /// Success flag derived from validation and exit status.
204    pub success: bool,
205    /// Optional error message when execution failed.
206    pub error: Option<String>,
207}
208
209/// Validation expectations associated with an execution request.
210#[derive(Debug, Clone, Serialize, Deserialize)]
211pub struct ExecutionExpectations {
212    /// Optional JSON schema used to validate structured output.
213    pub output_schema: Option<serde_json::Value>,
214    /// Additional validation rules evaluated by the caller.
215    pub validation_rules: Vec<ValidationRule>,
216    /// Minimum acceptable quality score.
217    pub quality_threshold: f32,
218}
219
220/// A single named validation rule.
221#[derive(Debug, Clone, Serialize, Deserialize)]
222pub struct ValidationRule {
223    /// Rule identifier.
224    pub name: String,
225    /// Expression or predicate body.
226    pub expression: String,
227    /// Error message emitted when validation fails.
228    pub error_message: String,
229}
230
231/// Shared-context mutation payload.
232#[derive(Debug, Clone, Serialize, Deserialize)]
233pub struct ContextUpdate {
234    /// Shared-state key being updated.
235    pub key: String,
236    /// Value applied by the operation.
237    pub value: serde_json::Value,
238    /// Mutation operation kind.
239    pub operation: ContextUpdateOp,
240}
241
242/// Supported shared-context mutation operations.
243#[derive(Debug, Clone, Serialize, Deserialize)]
244pub enum ContextUpdateOp {
245    /// Replace the current value.
246    Set,
247    /// Append to an existing collection-like value.
248    Append,
249    /// Remove the key entirely.
250    Remove,
251}
252
253/// Control-plane signal sent between agents.
254#[derive(Debug, Clone, Serialize, Deserialize)]
255pub struct ControlSignal {
256    /// Signal verb.
257    pub signal: Signal,
258    /// Optional human-readable reason.
259    pub reason: Option<String>,
260}
261
262/// Runtime control actions understood by the collaboration layer.
263#[derive(Debug, Clone, Serialize, Deserialize)]
264pub enum Signal {
265    /// Cancel work in progress.
266    Cancel,
267    /// Pause work in progress.
268    Pause,
269    /// Resume paused work.
270    Resume,
271    /// Request a retry.
272    Retry,
273    /// Skip the current work item.
274    Skip,
275}
276
277/// In-memory message bus used by collaboration flows.
278pub struct MessageBus {
279    tx: mpsc::Sender<AgentMessage>,
280    message_store: Arc<RwLock<HashMap<Uuid, AgentMessage>>>,
281}
282
283impl MessageBus {
284    /// Creates an empty message bus.
285    pub fn new() -> Self {
286        let (tx, _rx) = mpsc::channel(1000);
287        Self {
288            tx,
289            message_store: Arc::new(RwLock::new(HashMap::new())),
290        }
291    }
292
293    /// Publish a message to the bus
294    pub async fn publish(&self, msg: AgentMessage) -> Result<Uuid> {
295        let msg_id = msg.id;
296
297        {
298            let mut store = self.message_store.write().await;
299            store.insert(msg_id, msg.clone());
300        }
301
302        for _receiver in &msg.receivers {
303            let _ = self.tx.send(msg.clone()).await;
304        }
305
306        Ok(msg_id)
307    }
308}
309
310impl Default for MessageBus {
311    fn default() -> Self {
312        Self::new()
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use super::*;
319
320    #[test]
321    fn test_agent_endpoint_constructors() {
322        let ep1 = AgentEndpoint::agent("qa_agent");
323        assert_eq!(ep1.agent_id, "qa_agent");
324        assert!(ep1.phase.is_none());
325
326        let ep2 = AgentEndpoint::for_phase("impl_agent", "implement");
327        assert_eq!(ep2.agent_id, "impl_agent");
328        assert_eq!(ep2.phase.as_deref(), Some("implement"));
329
330        let ep3 = AgentEndpoint::for_task_item("agent1", "task1", "item1");
331        assert_eq!(ep3.task_id.as_deref(), Some("task1"));
332        assert_eq!(ep3.item_id.as_deref(), Some("item1"));
333    }
334
335    #[test]
336    fn test_agent_message_new() {
337        let sender = AgentEndpoint::agent("sender");
338        let receiver = AgentEndpoint::agent("receiver");
339        let msg = AgentMessage::new(
340            sender.clone(),
341            vec![receiver.clone()],
342            MessagePayload::Custom(serde_json::json!("hello")),
343        );
344        assert_eq!(msg.msg_type, MessageType::Request);
345        assert_eq!(msg.sender.agent_id, "sender");
346        assert_eq!(msg.receivers.len(), 1);
347    }
348
349    #[test]
350    fn test_agent_message_response_to() {
351        let original = AgentMessage::new(
352            AgentEndpoint::agent("alice"),
353            vec![AgentEndpoint::agent("bob")],
354            MessagePayload::Custom(serde_json::json!("req")),
355        );
356        let response =
357            AgentMessage::response_to(&original, MessagePayload::Custom(serde_json::json!("resp")));
358        assert_eq!(response.msg_type, MessageType::Response);
359        assert_eq!(response.correlation_id, Some(original.id));
360        assert_eq!(response.sender.agent_id, "bob");
361        assert_eq!(response.receivers[0].agent_id, "alice");
362    }
363
364    #[test]
365    fn test_agent_message_publish() {
366        let msg = AgentMessage::publish(
367            AgentEndpoint::agent("broadcaster"),
368            MessagePayload::Custom(serde_json::json!("broadcast")),
369        );
370        assert_eq!(msg.msg_type, MessageType::Publish);
371        assert!(msg.receivers.is_empty());
372    }
373
374    #[tokio::test]
375    async fn test_message_bus_publish_stores_message() {
376        let bus = MessageBus::new();
377        let msg = AgentMessage::new(
378            AgentEndpoint::agent("sender"),
379            vec![AgentEndpoint::agent("receiver")],
380            MessagePayload::Custom(serde_json::json!({"data": "test"})),
381        );
382        let msg_id = msg.id;
383        let result = bus.publish(msg).await;
384        assert!(result.is_ok());
385        assert_eq!(result.unwrap(), msg_id);
386
387        // Verify the message is stored
388        let store = bus.message_store.read().await;
389        assert!(store.contains_key(&msg_id));
390        assert_eq!(store[&msg_id].sender.agent_id, "sender");
391    }
392
393    #[tokio::test]
394    async fn test_message_bus_publish_broadcast_no_panic() {
395        let bus = MessageBus::new();
396        // Broadcast messages have empty receivers — publish should not panic or hang
397        let msg = AgentMessage::publish(
398            AgentEndpoint::agent("broadcaster"),
399            MessagePayload::Custom(serde_json::json!("event")),
400        );
401        let msg_id = msg.id;
402        let result = bus.publish(msg).await;
403        assert!(result.is_ok());
404        assert_eq!(result.unwrap(), msg_id);
405
406        // Message should still be stored even with no receivers
407        let store = bus.message_store.read().await;
408        assert!(store.contains_key(&msg_id));
409    }
410}