Skip to main content

agent_teams/messaging/
mod.rs

1//! Inbox-based messaging for agent teams.
2//!
3//! Provides the [`InboxManager`] trait and its file-system implementation
4//! [`FileInboxManager`], which stores messages as JSON arrays in
5//! `~/.claude/teams/{team}/inboxes/{agent}.json`.
6
7pub mod structured;
8
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11
12use async_trait::async_trait;
13use tokio::time::Instant;
14use tracing::{debug, warn};
15
16use crate::error::{Error, Result};
17use crate::models::InboxMessage;
18use crate::util::atomic_write::atomic_write_json;
19use crate::util::file_lock::FileLock;
20use crate::util::validate_name;
21
22/// Trait for inbox-based messaging between agents.
23#[async_trait]
24pub trait InboxManager: Send + Sync {
25    /// Send a message to the recipient specified in `message.to`.
26    async fn send_message(&self, team: &str, message: InboxMessage) -> Result<()>;
27
28    /// Broadcast a plain-text message to all `members` except the sender.
29    async fn broadcast(
30        &self,
31        team: &str,
32        from: &str,
33        content: &str,
34        members: &[String],
35    ) -> Result<()>;
36
37    /// Read all messages in an agent's inbox.
38    async fn read_inbox(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>>;
39
40    /// Read only unread messages in an agent's inbox.
41    async fn read_unread(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>>;
42
43    /// Mark a specific message as read.
44    async fn mark_read(&self, team: &str, agent: &str, message_id: &str) -> Result<()>;
45
46    /// Poll for new unread messages, blocking up to `timeout`.
47    async fn poll_inbox(
48        &self,
49        team: &str,
50        agent: &str,
51        timeout: Duration,
52    ) -> Result<Vec<InboxMessage>>;
53
54    /// Clear all messages from an agent's inbox.
55    async fn clear_inbox(&self, team: &str, agent: &str) -> Result<()>;
56}
57
58/// File-system backed [`InboxManager`].
59///
60/// Layout:
61/// ```text
62/// {base_dir}/{team}/inboxes/{agent}.json   # message array
63/// {base_dir}/{team}/inboxes/{agent}.lock   # flock guard
64/// ```
65#[derive(Debug, Clone)]
66pub struct FileInboxManager {
67    base_dir: PathBuf,
68}
69
70impl Default for FileInboxManager {
71    fn default() -> Self {
72        let base_dir = dirs::home_dir()
73            .unwrap_or_else(|| PathBuf::from("."))
74            .join(".claude")
75            .join("teams");
76        Self { base_dir }
77    }
78}
79
80impl FileInboxManager {
81    /// Create a new `FileInboxManager` rooted at `base_dir`.
82    pub fn new(base_dir: impl Into<PathBuf>) -> Self {
83        Self {
84            base_dir: base_dir.into(),
85        }
86    }
87
88    /// Directory containing all inboxes for a team.
89    fn inbox_dir(&self, team: &str) -> PathBuf {
90        self.base_dir.join(team).join("inboxes")
91    }
92
93    /// Path to a specific agent's inbox JSON file.
94    fn inbox_path(&self, team: &str, agent: &str) -> PathBuf {
95        self.inbox_dir(team).join(format!("{agent}.json"))
96    }
97
98    /// Path to a specific agent's inbox lock file.
99    fn lock_path(&self, team: &str, agent: &str) -> PathBuf {
100        self.inbox_dir(team).join(format!("{agent}.lock"))
101    }
102
103    /// Ensure the inbox directory for a team exists.
104    fn ensure_inbox_dir(inbox_dir: &Path) -> Result<()> {
105        std::fs::create_dir_all(inbox_dir)?;
106        Ok(())
107    }
108
109    /// Read the inbox file, returning an empty vec if the file doesn't exist.
110    fn read_inbox_file(path: &Path) -> Result<Vec<InboxMessage>> {
111        match std::fs::read_to_string(path) {
112            Ok(data) => {
113                let messages: Vec<InboxMessage> = serde_json::from_str(&data)?;
114                Ok(messages)
115            }
116            Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
117            Err(e) => Err(e.into()),
118        }
119    }
120}
121
122#[async_trait]
123impl InboxManager for FileInboxManager {
124    async fn send_message(&self, team: &str, message: InboxMessage) -> Result<()> {
125        validate_name(team)?;
126        validate_name(&message.to)?;
127
128        let recipient = message.to.clone();
129        let inbox_dir = self.inbox_dir(team);
130        let lock_path = self.lock_path(team, &recipient);
131        let inbox_path = self.inbox_path(team, &recipient);
132
133        debug!(team, to = %recipient, id = %message.id, "sending message");
134
135        // File locking is blocking, so offload to a blocking thread.
136        tokio::task::spawn_blocking(move || {
137            Self::ensure_inbox_dir(&inbox_dir)?;
138            let _lock = FileLock::acquire(&lock_path)?;
139            let mut messages = Self::read_inbox_file(&inbox_path)?;
140            messages.push(message);
141            atomic_write_json(&inbox_path, &messages)?;
142            Ok(())
143        })
144        .await
145        .map_err(|e| Error::JoinError(format!("{e}")))?
146    }
147
148    async fn broadcast(
149        &self,
150        team: &str,
151        from: &str,
152        content: &str,
153        members: &[String],
154    ) -> Result<()> {
155        validate_name(team)?;
156
157        debug!(team, from, count = members.len(), "broadcasting message");
158
159        for member in members {
160            if member == from {
161                continue;
162            }
163            let msg = InboxMessage::new(from, member.as_str(), content);
164            self.send_message(team, msg).await?;
165        }
166        Ok(())
167    }
168
169    async fn read_inbox(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>> {
170        validate_name(team)?;
171        validate_name(agent)?;
172
173        let inbox_dir = self.inbox_dir(team);
174        let lock_path = self.lock_path(team, agent);
175        let inbox_path = self.inbox_path(team, agent);
176
177        tokio::task::spawn_blocking(move || {
178            Self::ensure_inbox_dir(&inbox_dir)?;
179            let _lock = FileLock::acquire(&lock_path)?;
180            Self::read_inbox_file(&inbox_path)
181        })
182        .await
183        .map_err(|e| Error::JoinError(format!("{e}")))?
184    }
185
186    async fn read_unread(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>> {
187        validate_name(team)?;
188        validate_name(agent)?;
189
190        let all = self.read_inbox(team, agent).await?;
191        Ok(all.into_iter().filter(|m| !m.read).collect())
192    }
193
194    async fn mark_read(&self, team: &str, agent: &str, message_id: &str) -> Result<()> {
195        validate_name(team)?;
196        validate_name(agent)?;
197
198        let inbox_dir = self.inbox_dir(team);
199        let lock_path = self.lock_path(team, agent);
200        let inbox_path = self.inbox_path(team, agent);
201        let message_id = message_id.to_owned();
202
203        debug!(team, agent, id = %message_id, "marking message as read");
204
205        tokio::task::spawn_blocking(move || {
206            Self::ensure_inbox_dir(&inbox_dir)?;
207            let _lock = FileLock::acquire(&lock_path)?;
208            let mut messages = Self::read_inbox_file(&inbox_path)?;
209
210            let found = messages.iter_mut().find(|m| m.id == message_id);
211            match found {
212                Some(msg) => {
213                    msg.read = true;
214                    atomic_write_json(&inbox_path, &messages)?;
215                    Ok(())
216                }
217                None => {
218                    warn!(id = %message_id, "message not found in inbox");
219                    Ok(())
220                }
221            }
222        })
223        .await
224        .map_err(|e| Error::JoinError(format!("{e}")))?
225    }
226
227    async fn poll_inbox(
228        &self,
229        team: &str,
230        agent: &str,
231        timeout: Duration,
232    ) -> Result<Vec<InboxMessage>> {
233        validate_name(team)?;
234        validate_name(agent)?;
235
236        let deadline = Instant::now() + timeout;
237
238        loop {
239            let unread = self.read_unread(team, agent).await?;
240            if !unread.is_empty() {
241                return Ok(unread);
242            }
243
244            if Instant::now() >= deadline {
245                return Ok(Vec::new());
246            }
247
248            // Sleep before polling again, but don't overshoot the deadline.
249            let remaining = deadline.saturating_duration_since(Instant::now());
250            let sleep_dur = remaining.min(Duration::from_millis(500));
251            if sleep_dur.is_zero() {
252                return Ok(Vec::new());
253            }
254            tokio::time::sleep(sleep_dur).await;
255        }
256    }
257
258    async fn clear_inbox(&self, team: &str, agent: &str) -> Result<()> {
259        validate_name(team)?;
260        validate_name(agent)?;
261
262        let inbox_dir = self.inbox_dir(team);
263        let lock_path = self.lock_path(team, agent);
264        let inbox_path = self.inbox_path(team, agent);
265
266        debug!(team, agent, "clearing inbox");
267
268        tokio::task::spawn_blocking(move || {
269            Self::ensure_inbox_dir(&inbox_dir)?;
270            let _lock = FileLock::acquire(&lock_path)?;
271            let empty: Vec<InboxMessage> = Vec::new();
272            atomic_write_json(&inbox_path, &empty)?;
273            Ok(())
274        })
275        .await
276        .map_err(|e| Error::JoinError(format!("{e}")))?
277    }
278}
279
280#[cfg(test)]
281mod tests {
282    use super::*;
283    use crate::models::StructuredMessage;
284
285    fn make_manager(dir: &Path) -> FileInboxManager {
286        FileInboxManager::new(dir)
287    }
288
289    #[tokio::test]
290    async fn send_and_read_single_message() {
291        let dir = tempfile::tempdir().unwrap();
292        let mgr = make_manager(dir.path());
293
294        let msg = InboxMessage::new("lead", "worker-1", "Hello worker!");
295        mgr.send_message("test-team", msg).await.unwrap();
296
297        let inbox = mgr.read_inbox("test-team", "worker-1").await.unwrap();
298        assert_eq!(inbox.len(), 1);
299        assert_eq!(inbox[0].from, "lead");
300        assert_eq!(inbox[0].to, "worker-1");
301        assert_eq!(inbox[0].content, "Hello worker!");
302        assert!(!inbox[0].read);
303    }
304
305    #[tokio::test]
306    async fn send_multiple_and_read_unread() {
307        let dir = tempfile::tempdir().unwrap();
308        let mgr = make_manager(dir.path());
309
310        let msg1 = InboxMessage::new("lead", "worker-1", "Task 1");
311        let msg2 = InboxMessage::new("lead", "worker-1", "Task 2");
312        mgr.send_message("test-team", msg1).await.unwrap();
313        mgr.send_message("test-team", msg2).await.unwrap();
314
315        let unread = mgr.read_unread("test-team", "worker-1").await.unwrap();
316        assert_eq!(unread.len(), 2);
317    }
318
319    #[tokio::test]
320    async fn mark_read_filters_unread() {
321        let dir = tempfile::tempdir().unwrap();
322        let mgr = make_manager(dir.path());
323
324        let msg = InboxMessage::new("lead", "worker-1", "Read me");
325        let msg_id = msg.id.clone();
326        mgr.send_message("test-team", msg).await.unwrap();
327
328        mgr.mark_read("test-team", "worker-1", &msg_id)
329            .await
330            .unwrap();
331
332        let unread = mgr.read_unread("test-team", "worker-1").await.unwrap();
333        assert!(unread.is_empty());
334
335        // But the message is still in the full inbox.
336        let all = mgr.read_inbox("test-team", "worker-1").await.unwrap();
337        assert_eq!(all.len(), 1);
338        assert!(all[0].read);
339    }
340
341    #[tokio::test]
342    async fn broadcast_sends_to_all_except_sender() {
343        let dir = tempfile::tempdir().unwrap();
344        let mgr = make_manager(dir.path());
345
346        let members = vec![
347            "lead".to_string(),
348            "worker-1".to_string(),
349            "worker-2".to_string(),
350        ];
351        mgr.broadcast("test-team", "lead", "Announcement!", &members)
352            .await
353            .unwrap();
354
355        // lead should NOT have a message
356        let lead_inbox = mgr.read_inbox("test-team", "lead").await.unwrap();
357        assert!(lead_inbox.is_empty());
358
359        // Both workers should have the message
360        let w1 = mgr.read_inbox("test-team", "worker-1").await.unwrap();
361        assert_eq!(w1.len(), 1);
362        assert_eq!(w1[0].content, "Announcement!");
363
364        let w2 = mgr.read_inbox("test-team", "worker-2").await.unwrap();
365        assert_eq!(w2.len(), 1);
366        assert_eq!(w2[0].content, "Announcement!");
367    }
368
369    #[tokio::test]
370    async fn clear_inbox_removes_all() {
371        let dir = tempfile::tempdir().unwrap();
372        let mgr = make_manager(dir.path());
373
374        mgr.send_message("t", InboxMessage::new("a", "b", "1"))
375            .await
376            .unwrap();
377        mgr.send_message("t", InboxMessage::new("a", "b", "2"))
378            .await
379            .unwrap();
380
381        mgr.clear_inbox("t", "b").await.unwrap();
382
383        let inbox = mgr.read_inbox("t", "b").await.unwrap();
384        assert!(inbox.is_empty());
385    }
386
387    #[tokio::test]
388    async fn poll_returns_immediately_when_messages_exist() {
389        let dir = tempfile::tempdir().unwrap();
390        let mgr = make_manager(dir.path());
391
392        mgr.send_message("t", InboxMessage::new("a", "b", "hi"))
393            .await
394            .unwrap();
395
396        let start = Instant::now();
397        let msgs = mgr
398            .poll_inbox("t", "b", Duration::from_secs(5))
399            .await
400            .unwrap();
401        let elapsed = start.elapsed();
402
403        assert_eq!(msgs.len(), 1);
404        assert!(elapsed < Duration::from_secs(1), "poll should return immediately");
405    }
406
407    #[tokio::test]
408    async fn poll_times_out_with_empty_result() {
409        let dir = tempfile::tempdir().unwrap();
410        let mgr = make_manager(dir.path());
411
412        let start = Instant::now();
413        let msgs = mgr
414            .poll_inbox("t", "agent", Duration::from_millis(600))
415            .await
416            .unwrap();
417        let elapsed = start.elapsed();
418
419        assert!(msgs.is_empty());
420        assert!(elapsed >= Duration::from_millis(500), "should wait near timeout");
421        assert!(elapsed < Duration::from_secs(3), "should not wait too long");
422    }
423
424    #[tokio::test]
425    async fn read_empty_inbox() {
426        let dir = tempfile::tempdir().unwrap();
427        let mgr = make_manager(dir.path());
428
429        let inbox = mgr.read_inbox("team", "nobody").await.unwrap();
430        assert!(inbox.is_empty());
431    }
432
433    #[tokio::test]
434    async fn structured_message_round_trip_via_inbox() {
435        let dir = tempfile::tempdir().unwrap();
436        let mgr = make_manager(dir.path());
437
438        let structured = StructuredMessage::TaskAssignment {
439            task_id: "42".into(),
440            subject: "Fix the bug".into(),
441            description: Some("It's broken".into()),
442            assigned_by: None,
443            timestamp: None,
444        };
445        let msg = InboxMessage::from_structured("lead", "worker-1", &structured).unwrap();
446        mgr.send_message("t", msg).await.unwrap();
447
448        let inbox = mgr.read_inbox("t", "worker-1").await.unwrap();
449        assert_eq!(inbox.len(), 1);
450
451        let parsed = inbox[0].try_as_structured().unwrap();
452        match parsed {
453            StructuredMessage::TaskAssignment {
454                task_id, subject, ..
455            } => {
456                assert_eq!(task_id, "42");
457                assert_eq!(subject, "Fix the bug");
458            }
459            other => panic!("expected TaskAssignment, got {other:?}"),
460        }
461    }
462
463    #[tokio::test]
464    async fn mark_read_nonexistent_message_is_ok() {
465        let dir = tempfile::tempdir().unwrap();
466        let mgr = make_manager(dir.path());
467
468        // Marking read on a non-existent inbox is fine (no panic, no error).
469        mgr.mark_read("t", "agent", "nonexistent-id").await.unwrap();
470    }
471}