Skip to main content

clawft_types/
agent_bus.rs

1//! Inter-agent communication types.
2//!
3//! Defines [`InterAgentMessage`] for agent-to-agent communication
4//! and [`MessagePayload`] for structured/binary content transport.
5//!
6//! These types are used by the [`AgentBus`](clawft_core::agent_bus::AgentBus)
7//! for per-agent inbox delivery with TTL enforcement.
8
9use std::time::Duration;
10
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use uuid::Uuid;
14
15/// A message exchanged between agents via the [`AgentBus`].
16///
17/// Each message has a unique ID, sender/recipient agent IDs, a task
18/// description, and an arbitrary JSON payload. Messages can optionally
19/// reference a parent message via `reply_to` for request/response patterns.
20///
21/// # TTL enforcement
22///
23/// Messages have a time-to-live. If undelivered within this duration,
24/// they are dropped and logged at `warn` level.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct InterAgentMessage {
27    /// Unique message identifier.
28    pub id: Uuid,
29
30    /// Agent ID of the sender.
31    pub from_agent: String,
32
33    /// Agent ID of the recipient.
34    pub to_agent: String,
35
36    /// Task description or intent.
37    pub task: String,
38
39    /// Arbitrary JSON payload.
40    pub payload: Value,
41
42    /// If this is a reply, the ID of the original message.
43    #[serde(default, skip_serializing_if = "Option::is_none")]
44    pub reply_to: Option<Uuid>,
45
46    /// Time-to-live: message expires if undelivered within this duration.
47    #[serde(
48        serialize_with = "serialize_duration_secs",
49        deserialize_with = "deserialize_duration_secs"
50    )]
51    pub ttl: Duration,
52
53    /// Timestamp when the message was created (milliseconds since epoch).
54    #[serde(default = "now_millis")]
55    pub created_at_ms: i64,
56}
57
58impl InterAgentMessage {
59    /// Create a new inter-agent message.
60    pub fn new(
61        from_agent: impl Into<String>,
62        to_agent: impl Into<String>,
63        task: impl Into<String>,
64        payload: Value,
65        ttl: Duration,
66    ) -> Self {
67        Self {
68            id: Uuid::new_v4(),
69            from_agent: from_agent.into(),
70            to_agent: to_agent.into(),
71            task: task.into(),
72            payload,
73            reply_to: None,
74            ttl,
75            created_at_ms: now_millis(),
76        }
77    }
78
79    /// Create a reply to an existing message.
80    pub fn reply(
81        original: &InterAgentMessage,
82        payload: Value,
83        ttl: Duration,
84    ) -> Self {
85        Self {
86            id: Uuid::new_v4(),
87            from_agent: original.to_agent.clone(),
88            to_agent: original.from_agent.clone(),
89            task: format!("reply to: {}", original.task),
90            payload,
91            reply_to: Some(original.id),
92            ttl,
93            created_at_ms: now_millis(),
94        }
95    }
96
97    /// Check whether this message has expired based on its TTL.
98    pub fn is_expired(&self) -> bool {
99        let elapsed_ms = now_millis() - self.created_at_ms;
100        elapsed_ms > self.ttl.as_millis() as i64
101    }
102}
103
104/// Structured and binary payload types for future canvas/voice support.
105///
106/// This enum provides forward-compatibility for rich content types
107/// beyond plain text.
108#[non_exhaustive]
109#[derive(Debug, Clone, Serialize, Deserialize)]
110#[serde(tag = "type", rename_all = "snake_case")]
111pub enum MessagePayload {
112    /// Plain text content.
113    Text { content: String },
114
115    /// Structured JSON content.
116    Structured { content: Value },
117
118    /// Binary content with MIME type (e.g., image/png, audio/wav).
119    Binary {
120        mime_type: String,
121        #[serde(with = "base64_bytes")]
122        data: Vec<u8>,
123    },
124}
125
126/// Errors from the agent bus.
127#[non_exhaustive]
128#[derive(Debug, thiserror::Error)]
129pub enum AgentBusError {
130    /// The target agent is not registered on the bus.
131    #[error("agent not found: {0}")]
132    AgentNotFound(String),
133
134    /// The agent's inbox is full (backpressure).
135    #[error("inbox full for agent: {0}")]
136    InboxFull(String),
137
138    /// The message has expired (TTL exceeded).
139    #[error("message expired (ttl: {ttl:?})")]
140    MessageExpired { ttl: Duration },
141}
142
143fn now_millis() -> i64 {
144    chrono::Utc::now().timestamp_millis()
145}
146
147fn serialize_duration_secs<S>(d: &Duration, s: S) -> Result<S::Ok, S::Error>
148where
149    S: serde::Serializer,
150{
151    s.serialize_u64(d.as_secs())
152}
153
154fn deserialize_duration_secs<'de, D>(d: D) -> Result<Duration, D::Error>
155where
156    D: serde::Deserializer<'de>,
157{
158    let secs = u64::deserialize(d)?;
159    Ok(Duration::from_secs(secs))
160}
161
162/// Base64 serialization for binary payloads.
163mod base64_bytes {
164    use serde::{Deserialize, Deserializer, Serialize, Serializer};
165
166    pub fn serialize<S>(data: &[u8], serializer: S) -> Result<S::Ok, S::Error>
167    where
168        S: Serializer,
169    {
170        // Simple hex encoding (no external base64 dep needed).
171        let hex: String = data.iter().map(|b| format!("{b:02x}")).collect();
172        hex.serialize(serializer)
173    }
174
175    pub fn deserialize<'de, D>(deserializer: D) -> Result<Vec<u8>, D::Error>
176    where
177        D: Deserializer<'de>,
178    {
179        let hex = String::deserialize(deserializer)?;
180        (0..hex.len())
181            .step_by(2)
182            .map(|i| {
183                u8::from_str_radix(&hex[i..i + 2], 16)
184                    .map_err(serde::de::Error::custom)
185            })
186            .collect()
187    }
188}
189
190#[cfg(test)]
191mod tests {
192    use super::*;
193
194    #[test]
195    fn inter_agent_message_new() {
196        let msg = InterAgentMessage::new(
197            "agent-a",
198            "agent-b",
199            "summarize document",
200            serde_json::json!({"doc_id": 42}),
201            Duration::from_secs(60),
202        );
203        assert_eq!(msg.from_agent, "agent-a");
204        assert_eq!(msg.to_agent, "agent-b");
205        assert_eq!(msg.task, "summarize document");
206        assert!(msg.reply_to.is_none());
207        assert!(!msg.is_expired());
208    }
209
210    #[test]
211    fn inter_agent_message_reply() {
212        let original = InterAgentMessage::new(
213            "agent-a",
214            "agent-b",
215            "task",
216            Value::Null,
217            Duration::from_secs(60),
218        );
219        let reply = InterAgentMessage::reply(
220            &original,
221            serde_json::json!({"result": "done"}),
222            Duration::from_secs(30),
223        );
224        assert_eq!(reply.from_agent, "agent-b");
225        assert_eq!(reply.to_agent, "agent-a");
226        assert_eq!(reply.reply_to, Some(original.id));
227    }
228
229    #[test]
230    fn inter_agent_message_serde_roundtrip() {
231        let msg = InterAgentMessage::new(
232            "a",
233            "b",
234            "test",
235            serde_json::json!({"key": "value"}),
236            Duration::from_secs(120),
237        );
238        let json = serde_json::to_string(&msg).unwrap();
239        let restored: InterAgentMessage = serde_json::from_str(&json).unwrap();
240        assert_eq!(restored.from_agent, "a");
241        assert_eq!(restored.to_agent, "b");
242        assert_eq!(restored.task, "test");
243        assert_eq!(restored.ttl, Duration::from_secs(120));
244    }
245
246    #[test]
247    fn message_payload_text() {
248        let p = MessagePayload::Text {
249            content: "hello".into(),
250        };
251        let json = serde_json::to_string(&p).unwrap();
252        let restored: MessagePayload = serde_json::from_str(&json).unwrap();
253        match restored {
254            MessagePayload::Text { content } => assert_eq!(content, "hello"),
255            _ => panic!("expected Text variant"),
256        }
257    }
258
259    #[test]
260    fn message_payload_structured() {
261        let p = MessagePayload::Structured {
262            content: serde_json::json!({"key": "value"}),
263        };
264        let json = serde_json::to_string(&p).unwrap();
265        let restored: MessagePayload = serde_json::from_str(&json).unwrap();
266        match restored {
267            MessagePayload::Structured { content } => {
268                assert_eq!(content["key"], "value");
269            }
270            _ => panic!("expected Structured variant"),
271        }
272    }
273
274    #[test]
275    fn message_payload_binary() {
276        let p = MessagePayload::Binary {
277            mime_type: "image/png".into(),
278            data: vec![0x89, 0x50, 0x4e, 0x47],
279        };
280        let json = serde_json::to_string(&p).unwrap();
281        let restored: MessagePayload = serde_json::from_str(&json).unwrap();
282        match restored {
283            MessagePayload::Binary { mime_type, data } => {
284                assert_eq!(mime_type, "image/png");
285                assert_eq!(data, vec![0x89, 0x50, 0x4e, 0x47]);
286            }
287            _ => panic!("expected Binary variant"),
288        }
289    }
290
291    #[test]
292    fn agent_bus_error_display() {
293        let err = AgentBusError::AgentNotFound("agent-x".into());
294        assert_eq!(err.to_string(), "agent not found: agent-x");
295
296        let err = AgentBusError::InboxFull("agent-y".into());
297        assert_eq!(err.to_string(), "inbox full for agent: agent-y");
298
299        let err = AgentBusError::MessageExpired {
300            ttl: Duration::from_secs(30),
301        };
302        assert_eq!(err.to_string(), "message expired (ttl: 30s)");
303    }
304}