1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use tokio::sync::{Mutex, mpsc, watch};
8
9pub const ROOT_AGENT_PATH: &str = "/root";
10
11#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
12pub struct AgentPath(String);
13
14impl AgentPath {
15 pub fn from_string(raw: impl AsRef<str>) -> Result<Self, String> {
16 let normalized = normalize_agent_path(raw.as_ref())?;
17 Ok(Self(normalized))
18 }
19
20 pub fn root() -> Self {
21 Self(ROOT_AGENT_PATH.to_owned())
22 }
23
24 pub fn as_str(&self) -> &str {
25 self.0.as_str()
26 }
27
28 pub fn join(&self, child: impl AsRef<str>) -> Result<Self, String> {
29 let child = child.as_ref().trim();
30 if child.is_empty() {
31 return Err("agent_path_invalid: child segment must not be empty".to_owned());
32 }
33 if child.contains('/') {
34 return Err("agent_path_invalid: child segment must not contain `/`".to_owned());
35 }
36 if !is_valid_segment(child) {
37 return Err(format!(
38 "agent_path_invalid: child segment `{child}` contains unsupported characters"
39 ));
40 }
41 Self::from_string(format!("{}/{}", self.0, child))
42 }
43}
44
45impl Default for AgentPath {
46 fn default() -> Self {
47 Self::root()
48 }
49}
50
51impl AsRef<str> for AgentPath {
52 fn as_ref(&self) -> &str {
53 self.as_str()
54 }
55}
56
57#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
58pub enum MailboxContent {
59 DelegateResult {
60 session_id: String,
61 frozen_result: Value,
62 },
63 StatusNotification {
64 reason: String,
65 },
66}
67
68#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub struct InterAgentMessage {
70 pub author: AgentPath,
71 pub recipient: AgentPath,
72 pub content: MailboxContent,
73 pub trigger_turn: bool,
74}
75
76#[derive(Debug)]
77struct AgentMailboxState {
78 receiver: Mutex<mpsc::UnboundedReceiver<InterAgentMessage>>,
79 sequence: AtomicU64,
80 notifier: watch::Sender<u64>,
81}
82
83#[derive(Debug, Clone)]
84pub struct AgentMailbox {
85 sender: mpsc::UnboundedSender<InterAgentMessage>,
86 state: Arc<AgentMailboxState>,
87}
88
89impl AgentMailbox {
90 pub fn new() -> Self {
91 let (sender, receiver) = mpsc::unbounded_channel();
92 let (notifier, _) = watch::channel(0_u64);
93 Self {
94 sender,
95 state: Arc::new(AgentMailboxState {
96 receiver: Mutex::new(receiver),
97 sequence: AtomicU64::new(0),
98 notifier,
99 }),
100 }
101 }
102
103 pub fn send(&self, msg: InterAgentMessage) -> Result<(), String> {
104 self.sender
105 .send(msg)
106 .map_err(|error| format!("agent_mailbox_closed: {error}"))?;
107 let next_seq = self.state.sequence.fetch_add(1, Ordering::Relaxed) + 1;
108 let _ = self.state.notifier.send(next_seq);
109 Ok(())
110 }
111
112 pub fn subscribe(&self) -> watch::Receiver<u64> {
113 self.state.notifier.subscribe()
114 }
115
116 pub async fn drain(&self) -> Vec<InterAgentMessage> {
117 let mut receiver = self.state.receiver.lock().await;
118 let mut drained = VecDeque::new();
119 while let Ok(message) = receiver.try_recv() {
120 drained.push_back(message);
121 }
122 drained.into_iter().collect()
123 }
124}
125
126impl Default for AgentMailbox {
127 fn default() -> Self {
128 Self::new()
129 }
130}
131
132fn normalize_agent_path(raw: &str) -> Result<String, String> {
133 let trimmed = raw.trim();
134 if trimmed.is_empty() {
135 return Err("agent_path_invalid: path must not be empty".to_owned());
136 }
137 if !trimmed.starts_with('/') {
138 return Err("agent_path_invalid: path must start with `/`".to_owned());
139 }
140
141 let mut segments = Vec::new();
142 for segment in trimmed.split('/').skip(1) {
143 if segment.is_empty() {
144 return Err("agent_path_invalid: empty path segment".to_owned());
145 }
146 if !is_valid_segment(segment) {
147 return Err(format!(
148 "agent_path_invalid: segment `{segment}` contains unsupported characters"
149 ));
150 }
151 segments.push(segment);
152 }
153
154 if segments.is_empty() {
155 return Err("agent_path_invalid: root segment is required".to_owned());
156 }
157
158 Ok(format!("/{}", segments.join("/")))
159}
160
161fn is_valid_segment(segment: &str) -> bool {
162 segment
163 .chars()
164 .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' || ch == '.' || ch == ':')
165}
166
167#[cfg(test)]
168mod tests {
169 use super::*;
170 use serde_json::json;
171
172 #[tokio::test]
173 async fn mailbox_send_subscribe_drain_lifecycle() {
174 let mailbox = AgentMailbox::new();
175 let mut subscription = mailbox.subscribe();
176
177 let author = AgentPath::root();
178 let recipient = author.join("task").unwrap_or_else(|_| AgentPath::root());
179 let send_result = mailbox.send(InterAgentMessage {
180 author,
181 recipient,
182 content: MailboxContent::StatusNotification {
183 reason: "child_completed".to_owned(),
184 },
185 trigger_turn: true,
186 });
187 assert!(send_result.is_ok());
188
189 let changed_result = subscription.changed().await;
190 assert!(changed_result.is_ok());
191
192 let drained = mailbox.drain().await;
193 assert_eq!(drained.len(), 1);
194 }
195
196 #[tokio::test]
197 async fn mailbox_sequence_increments() {
198 let mailbox = AgentMailbox::new();
199 let mut subscription = mailbox.subscribe();
200
201 let first = mailbox.send(InterAgentMessage {
202 author: AgentPath::root(),
203 recipient: AgentPath::root(),
204 content: MailboxContent::StatusNotification {
205 reason: "first".to_owned(),
206 },
207 trigger_turn: false,
208 });
209 assert!(first.is_ok());
210 let first_changed = subscription.changed().await;
211 assert!(first_changed.is_ok());
212 let first_seq = *subscription.borrow();
213
214 let second = mailbox.send(InterAgentMessage {
215 author: AgentPath::root(),
216 recipient: AgentPath::root(),
217 content: MailboxContent::DelegateResult {
218 session_id: "child-1".to_owned(),
219 frozen_result: json!({"status": "ok"}),
220 },
221 trigger_turn: true,
222 });
223 assert!(second.is_ok());
224 let second_changed = subscription.changed().await;
225 assert!(second_changed.is_ok());
226 let second_seq = *subscription.borrow();
227
228 assert!(second_seq > first_seq);
229 }
230
231 #[tokio::test]
232 async fn mailbox_supports_multiple_senders() {
233 let mailbox = AgentMailbox::new();
234 let mailbox_2 = mailbox.clone();
235
236 let send_1 = mailbox.send(InterAgentMessage {
237 author: AgentPath::root(),
238 recipient: AgentPath::root(),
239 content: MailboxContent::StatusNotification {
240 reason: "a".to_owned(),
241 },
242 trigger_turn: false,
243 });
244 assert!(send_1.is_ok());
245
246 let send_2 = mailbox_2.send(InterAgentMessage {
247 author: AgentPath::root(),
248 recipient: AgentPath::root(),
249 content: MailboxContent::StatusNotification {
250 reason: "b".to_owned(),
251 },
252 trigger_turn: false,
253 });
254 assert!(send_2.is_ok());
255
256 let drained = mailbox.drain().await;
257 assert_eq!(drained.len(), 2);
258 }
259
260 #[test]
261 fn agent_path_validates_and_joins() {
262 let root = AgentPath::from_string(ROOT_AGENT_PATH);
263 assert!(root.is_ok());
264 let root = root.unwrap_or_else(|_| AgentPath::root());
265
266 let child = root.join("subtask");
267 assert!(child.is_ok());
268 let child = child.unwrap_or_else(|_| AgentPath::root());
269
270 assert_eq!(child.as_str(), "/root/subtask");
271 assert!(AgentPath::from_string("root/subtask").is_err());
272 assert!(AgentPath::from_string("/root//subtask").is_err());
273 }
274}