Skip to main content

loong_kernel/
mailbox.rs

1use std::collections::VecDeque;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicU64, Ordering};
4
5use serde::{Deserialize, Serialize};
6use serde_json::Value;
7use tokio::sync::{Mutex, mpsc, watch};
8
9pub const ROOT_AGENT_PATH: &str = "/root";
10
11#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
12pub struct AgentPath(String);
13
14impl AgentPath {
15    pub fn from_string(raw: impl AsRef<str>) -> Result<Self, String> {
16        let normalized = normalize_agent_path(raw.as_ref())?;
17        Ok(Self(normalized))
18    }
19
20    pub fn root() -> Self {
21        Self(ROOT_AGENT_PATH.to_owned())
22    }
23
24    pub fn as_str(&self) -> &str {
25        self.0.as_str()
26    }
27
28    pub fn join(&self, child: impl AsRef<str>) -> Result<Self, String> {
29        let child = child.as_ref().trim();
30        if child.is_empty() {
31            return Err("agent_path_invalid: child segment must not be empty".to_owned());
32        }
33        if child.contains('/') {
34            return Err("agent_path_invalid: child segment must not contain `/`".to_owned());
35        }
36        if !is_valid_segment(child) {
37            return Err(format!(
38                "agent_path_invalid: child segment `{child}` contains unsupported characters"
39            ));
40        }
41        Self::from_string(format!("{}/{}", self.0, child))
42    }
43}
44
45impl Default for AgentPath {
46    fn default() -> Self {
47        Self::root()
48    }
49}
50
51impl AsRef<str> for AgentPath {
52    fn as_ref(&self) -> &str {
53        self.as_str()
54    }
55}
56
57#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
58pub enum MailboxContent {
59    DelegateResult {
60        session_id: String,
61        frozen_result: Value,
62    },
63    StatusNotification {
64        reason: String,
65    },
66}
67
68#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
69pub struct InterAgentMessage {
70    pub author: AgentPath,
71    pub recipient: AgentPath,
72    pub content: MailboxContent,
73    pub trigger_turn: bool,
74}
75
76#[derive(Debug)]
77struct AgentMailboxState {
78    receiver: Mutex<mpsc::UnboundedReceiver<InterAgentMessage>>,
79    sequence: AtomicU64,
80    notifier: watch::Sender<u64>,
81}
82
83#[derive(Debug, Clone)]
84pub struct AgentMailbox {
85    sender: mpsc::UnboundedSender<InterAgentMessage>,
86    state: Arc<AgentMailboxState>,
87}
88
89impl AgentMailbox {
90    pub fn new() -> Self {
91        let (sender, receiver) = mpsc::unbounded_channel();
92        let (notifier, _) = watch::channel(0_u64);
93        Self {
94            sender,
95            state: Arc::new(AgentMailboxState {
96                receiver: Mutex::new(receiver),
97                sequence: AtomicU64::new(0),
98                notifier,
99            }),
100        }
101    }
102
103    pub fn send(&self, msg: InterAgentMessage) -> Result<(), String> {
104        self.sender
105            .send(msg)
106            .map_err(|error| format!("agent_mailbox_closed: {error}"))?;
107        let next_seq = self.state.sequence.fetch_add(1, Ordering::Relaxed) + 1;
108        let _ = self.state.notifier.send(next_seq);
109        Ok(())
110    }
111
112    pub fn subscribe(&self) -> watch::Receiver<u64> {
113        self.state.notifier.subscribe()
114    }
115
116    pub async fn drain(&self) -> Vec<InterAgentMessage> {
117        let mut receiver = self.state.receiver.lock().await;
118        let mut drained = VecDeque::new();
119        while let Ok(message) = receiver.try_recv() {
120            drained.push_back(message);
121        }
122        drained.into_iter().collect()
123    }
124}
125
126impl Default for AgentMailbox {
127    fn default() -> Self {
128        Self::new()
129    }
130}
131
132fn normalize_agent_path(raw: &str) -> Result<String, String> {
133    let trimmed = raw.trim();
134    if trimmed.is_empty() {
135        return Err("agent_path_invalid: path must not be empty".to_owned());
136    }
137    if !trimmed.starts_with('/') {
138        return Err("agent_path_invalid: path must start with `/`".to_owned());
139    }
140
141    let mut segments = Vec::new();
142    for segment in trimmed.split('/').skip(1) {
143        if segment.is_empty() {
144            return Err("agent_path_invalid: empty path segment".to_owned());
145        }
146        if !is_valid_segment(segment) {
147            return Err(format!(
148                "agent_path_invalid: segment `{segment}` contains unsupported characters"
149            ));
150        }
151        segments.push(segment);
152    }
153
154    if segments.is_empty() {
155        return Err("agent_path_invalid: root segment is required".to_owned());
156    }
157
158    Ok(format!("/{}", segments.join("/")))
159}
160
161fn is_valid_segment(segment: &str) -> bool {
162    segment
163        .chars()
164        .all(|ch| ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' || ch == '.' || ch == ':')
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use serde_json::json;
171
172    #[tokio::test]
173    async fn mailbox_send_subscribe_drain_lifecycle() {
174        let mailbox = AgentMailbox::new();
175        let mut subscription = mailbox.subscribe();
176
177        let author = AgentPath::root();
178        let recipient = author.join("task").unwrap_or_else(|_| AgentPath::root());
179        let send_result = mailbox.send(InterAgentMessage {
180            author,
181            recipient,
182            content: MailboxContent::StatusNotification {
183                reason: "child_completed".to_owned(),
184            },
185            trigger_turn: true,
186        });
187        assert!(send_result.is_ok());
188
189        let changed_result = subscription.changed().await;
190        assert!(changed_result.is_ok());
191
192        let drained = mailbox.drain().await;
193        assert_eq!(drained.len(), 1);
194    }
195
196    #[tokio::test]
197    async fn mailbox_sequence_increments() {
198        let mailbox = AgentMailbox::new();
199        let mut subscription = mailbox.subscribe();
200
201        let first = mailbox.send(InterAgentMessage {
202            author: AgentPath::root(),
203            recipient: AgentPath::root(),
204            content: MailboxContent::StatusNotification {
205                reason: "first".to_owned(),
206            },
207            trigger_turn: false,
208        });
209        assert!(first.is_ok());
210        let first_changed = subscription.changed().await;
211        assert!(first_changed.is_ok());
212        let first_seq = *subscription.borrow();
213
214        let second = mailbox.send(InterAgentMessage {
215            author: AgentPath::root(),
216            recipient: AgentPath::root(),
217            content: MailboxContent::DelegateResult {
218                session_id: "child-1".to_owned(),
219                frozen_result: json!({"status": "ok"}),
220            },
221            trigger_turn: true,
222        });
223        assert!(second.is_ok());
224        let second_changed = subscription.changed().await;
225        assert!(second_changed.is_ok());
226        let second_seq = *subscription.borrow();
227
228        assert!(second_seq > first_seq);
229    }
230
231    #[tokio::test]
232    async fn mailbox_supports_multiple_senders() {
233        let mailbox = AgentMailbox::new();
234        let mailbox_2 = mailbox.clone();
235
236        let send_1 = mailbox.send(InterAgentMessage {
237            author: AgentPath::root(),
238            recipient: AgentPath::root(),
239            content: MailboxContent::StatusNotification {
240                reason: "a".to_owned(),
241            },
242            trigger_turn: false,
243        });
244        assert!(send_1.is_ok());
245
246        let send_2 = mailbox_2.send(InterAgentMessage {
247            author: AgentPath::root(),
248            recipient: AgentPath::root(),
249            content: MailboxContent::StatusNotification {
250                reason: "b".to_owned(),
251            },
252            trigger_turn: false,
253        });
254        assert!(send_2.is_ok());
255
256        let drained = mailbox.drain().await;
257        assert_eq!(drained.len(), 2);
258    }
259
260    #[test]
261    fn agent_path_validates_and_joins() {
262        let root = AgentPath::from_string(ROOT_AGENT_PATH);
263        assert!(root.is_ok());
264        let root = root.unwrap_or_else(|_| AgentPath::root());
265
266        let child = root.join("subtask");
267        assert!(child.is_ok());
268        let child = child.unwrap_or_else(|_| AgentPath::root());
269
270        assert_eq!(child.as_str(), "/root/subtask");
271        assert!(AgentPath::from_string("root/subtask").is_err());
272        assert!(AgentPath::from_string("/root//subtask").is_err());
273    }
274}