1pub mod structured;
8
9use std::path::{Path, PathBuf};
10use std::time::Duration;
11
12use async_trait::async_trait;
13use tokio::time::Instant;
14use tracing::{debug, warn};
15
16use crate::error::{Error, Result};
17use crate::models::InboxMessage;
18use crate::util::atomic_write::atomic_write_json;
19use crate::util::file_lock::FileLock;
20use crate::util::validate_name;
21
22#[async_trait]
24pub trait InboxManager: Send + Sync {
25 async fn send_message(&self, team: &str, message: InboxMessage) -> Result<()>;
27
28 async fn broadcast(
30 &self,
31 team: &str,
32 from: &str,
33 content: &str,
34 members: &[String],
35 ) -> Result<()>;
36
37 async fn read_inbox(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>>;
39
40 async fn read_unread(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>>;
42
43 async fn mark_read(&self, team: &str, agent: &str, message_id: &str) -> Result<()>;
45
46 async fn poll_inbox(
48 &self,
49 team: &str,
50 agent: &str,
51 timeout: Duration,
52 ) -> Result<Vec<InboxMessage>>;
53
54 async fn clear_inbox(&self, team: &str, agent: &str) -> Result<()>;
56}
57
58#[derive(Debug, Clone)]
66pub struct FileInboxManager {
67 base_dir: PathBuf,
68}
69
70impl Default for FileInboxManager {
71 fn default() -> Self {
72 let base_dir = dirs::home_dir()
73 .unwrap_or_else(|| PathBuf::from("."))
74 .join(".claude")
75 .join("teams");
76 Self { base_dir }
77 }
78}
79
80impl FileInboxManager {
81 pub fn new(base_dir: impl Into<PathBuf>) -> Self {
83 Self {
84 base_dir: base_dir.into(),
85 }
86 }
87
88 fn inbox_dir(&self, team: &str) -> PathBuf {
90 self.base_dir.join(team).join("inboxes")
91 }
92
93 fn inbox_path(&self, team: &str, agent: &str) -> PathBuf {
95 self.inbox_dir(team).join(format!("{agent}.json"))
96 }
97
98 fn lock_path(&self, team: &str, agent: &str) -> PathBuf {
100 self.inbox_dir(team).join(format!("{agent}.lock"))
101 }
102
103 fn ensure_inbox_dir(inbox_dir: &Path) -> Result<()> {
105 std::fs::create_dir_all(inbox_dir)?;
106 Ok(())
107 }
108
109 fn read_inbox_file(path: &Path) -> Result<Vec<InboxMessage>> {
111 match std::fs::read_to_string(path) {
112 Ok(data) => {
113 let messages: Vec<InboxMessage> = serde_json::from_str(&data)?;
114 Ok(messages)
115 }
116 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Vec::new()),
117 Err(e) => Err(e.into()),
118 }
119 }
120}
121
122#[async_trait]
123impl InboxManager for FileInboxManager {
124 async fn send_message(&self, team: &str, message: InboxMessage) -> Result<()> {
125 validate_name(team)?;
126 validate_name(&message.to)?;
127
128 let recipient = message.to.clone();
129 let inbox_dir = self.inbox_dir(team);
130 let lock_path = self.lock_path(team, &recipient);
131 let inbox_path = self.inbox_path(team, &recipient);
132
133 debug!(team, to = %recipient, id = %message.id, "sending message");
134
135 tokio::task::spawn_blocking(move || {
137 Self::ensure_inbox_dir(&inbox_dir)?;
138 let _lock = FileLock::acquire(&lock_path)?;
139 let mut messages = Self::read_inbox_file(&inbox_path)?;
140 messages.push(message);
141 atomic_write_json(&inbox_path, &messages)?;
142 Ok(())
143 })
144 .await
145 .map_err(|e| Error::JoinError(format!("{e}")))?
146 }
147
148 async fn broadcast(
149 &self,
150 team: &str,
151 from: &str,
152 content: &str,
153 members: &[String],
154 ) -> Result<()> {
155 validate_name(team)?;
156
157 debug!(team, from, count = members.len(), "broadcasting message");
158
159 for member in members {
160 if member == from {
161 continue;
162 }
163 let msg = InboxMessage::new(from, member.as_str(), content);
164 self.send_message(team, msg).await?;
165 }
166 Ok(())
167 }
168
169 async fn read_inbox(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>> {
170 validate_name(team)?;
171 validate_name(agent)?;
172
173 let inbox_dir = self.inbox_dir(team);
174 let lock_path = self.lock_path(team, agent);
175 let inbox_path = self.inbox_path(team, agent);
176
177 tokio::task::spawn_blocking(move || {
178 Self::ensure_inbox_dir(&inbox_dir)?;
179 let _lock = FileLock::acquire(&lock_path)?;
180 Self::read_inbox_file(&inbox_path)
181 })
182 .await
183 .map_err(|e| Error::JoinError(format!("{e}")))?
184 }
185
186 async fn read_unread(&self, team: &str, agent: &str) -> Result<Vec<InboxMessage>> {
187 validate_name(team)?;
188 validate_name(agent)?;
189
190 let all = self.read_inbox(team, agent).await?;
191 Ok(all.into_iter().filter(|m| !m.read).collect())
192 }
193
194 async fn mark_read(&self, team: &str, agent: &str, message_id: &str) -> Result<()> {
195 validate_name(team)?;
196 validate_name(agent)?;
197
198 let inbox_dir = self.inbox_dir(team);
199 let lock_path = self.lock_path(team, agent);
200 let inbox_path = self.inbox_path(team, agent);
201 let message_id = message_id.to_owned();
202
203 debug!(team, agent, id = %message_id, "marking message as read");
204
205 tokio::task::spawn_blocking(move || {
206 Self::ensure_inbox_dir(&inbox_dir)?;
207 let _lock = FileLock::acquire(&lock_path)?;
208 let mut messages = Self::read_inbox_file(&inbox_path)?;
209
210 let found = messages.iter_mut().find(|m| m.id == message_id);
211 match found {
212 Some(msg) => {
213 msg.read = true;
214 atomic_write_json(&inbox_path, &messages)?;
215 Ok(())
216 }
217 None => {
218 warn!(id = %message_id, "message not found in inbox");
219 Ok(())
220 }
221 }
222 })
223 .await
224 .map_err(|e| Error::JoinError(format!("{e}")))?
225 }
226
227 async fn poll_inbox(
228 &self,
229 team: &str,
230 agent: &str,
231 timeout: Duration,
232 ) -> Result<Vec<InboxMessage>> {
233 validate_name(team)?;
234 validate_name(agent)?;
235
236 let deadline = Instant::now() + timeout;
237
238 loop {
239 let unread = self.read_unread(team, agent).await?;
240 if !unread.is_empty() {
241 return Ok(unread);
242 }
243
244 if Instant::now() >= deadline {
245 return Ok(Vec::new());
246 }
247
248 let remaining = deadline.saturating_duration_since(Instant::now());
250 let sleep_dur = remaining.min(Duration::from_millis(500));
251 if sleep_dur.is_zero() {
252 return Ok(Vec::new());
253 }
254 tokio::time::sleep(sleep_dur).await;
255 }
256 }
257
258 async fn clear_inbox(&self, team: &str, agent: &str) -> Result<()> {
259 validate_name(team)?;
260 validate_name(agent)?;
261
262 let inbox_dir = self.inbox_dir(team);
263 let lock_path = self.lock_path(team, agent);
264 let inbox_path = self.inbox_path(team, agent);
265
266 debug!(team, agent, "clearing inbox");
267
268 tokio::task::spawn_blocking(move || {
269 Self::ensure_inbox_dir(&inbox_dir)?;
270 let _lock = FileLock::acquire(&lock_path)?;
271 let empty: Vec<InboxMessage> = Vec::new();
272 atomic_write_json(&inbox_path, &empty)?;
273 Ok(())
274 })
275 .await
276 .map_err(|e| Error::JoinError(format!("{e}")))?
277 }
278}
279
280#[cfg(test)]
281mod tests {
282 use super::*;
283 use crate::models::StructuredMessage;
284
285 fn make_manager(dir: &Path) -> FileInboxManager {
286 FileInboxManager::new(dir)
287 }
288
289 #[tokio::test]
290 async fn send_and_read_single_message() {
291 let dir = tempfile::tempdir().unwrap();
292 let mgr = make_manager(dir.path());
293
294 let msg = InboxMessage::new("lead", "worker-1", "Hello worker!");
295 mgr.send_message("test-team", msg).await.unwrap();
296
297 let inbox = mgr.read_inbox("test-team", "worker-1").await.unwrap();
298 assert_eq!(inbox.len(), 1);
299 assert_eq!(inbox[0].from, "lead");
300 assert_eq!(inbox[0].to, "worker-1");
301 assert_eq!(inbox[0].content, "Hello worker!");
302 assert!(!inbox[0].read);
303 }
304
305 #[tokio::test]
306 async fn send_multiple_and_read_unread() {
307 let dir = tempfile::tempdir().unwrap();
308 let mgr = make_manager(dir.path());
309
310 let msg1 = InboxMessage::new("lead", "worker-1", "Task 1");
311 let msg2 = InboxMessage::new("lead", "worker-1", "Task 2");
312 mgr.send_message("test-team", msg1).await.unwrap();
313 mgr.send_message("test-team", msg2).await.unwrap();
314
315 let unread = mgr.read_unread("test-team", "worker-1").await.unwrap();
316 assert_eq!(unread.len(), 2);
317 }
318
319 #[tokio::test]
320 async fn mark_read_filters_unread() {
321 let dir = tempfile::tempdir().unwrap();
322 let mgr = make_manager(dir.path());
323
324 let msg = InboxMessage::new("lead", "worker-1", "Read me");
325 let msg_id = msg.id.clone();
326 mgr.send_message("test-team", msg).await.unwrap();
327
328 mgr.mark_read("test-team", "worker-1", &msg_id)
329 .await
330 .unwrap();
331
332 let unread = mgr.read_unread("test-team", "worker-1").await.unwrap();
333 assert!(unread.is_empty());
334
335 let all = mgr.read_inbox("test-team", "worker-1").await.unwrap();
337 assert_eq!(all.len(), 1);
338 assert!(all[0].read);
339 }
340
341 #[tokio::test]
342 async fn broadcast_sends_to_all_except_sender() {
343 let dir = tempfile::tempdir().unwrap();
344 let mgr = make_manager(dir.path());
345
346 let members = vec![
347 "lead".to_string(),
348 "worker-1".to_string(),
349 "worker-2".to_string(),
350 ];
351 mgr.broadcast("test-team", "lead", "Announcement!", &members)
352 .await
353 .unwrap();
354
355 let lead_inbox = mgr.read_inbox("test-team", "lead").await.unwrap();
357 assert!(lead_inbox.is_empty());
358
359 let w1 = mgr.read_inbox("test-team", "worker-1").await.unwrap();
361 assert_eq!(w1.len(), 1);
362 assert_eq!(w1[0].content, "Announcement!");
363
364 let w2 = mgr.read_inbox("test-team", "worker-2").await.unwrap();
365 assert_eq!(w2.len(), 1);
366 assert_eq!(w2[0].content, "Announcement!");
367 }
368
369 #[tokio::test]
370 async fn clear_inbox_removes_all() {
371 let dir = tempfile::tempdir().unwrap();
372 let mgr = make_manager(dir.path());
373
374 mgr.send_message("t", InboxMessage::new("a", "b", "1"))
375 .await
376 .unwrap();
377 mgr.send_message("t", InboxMessage::new("a", "b", "2"))
378 .await
379 .unwrap();
380
381 mgr.clear_inbox("t", "b").await.unwrap();
382
383 let inbox = mgr.read_inbox("t", "b").await.unwrap();
384 assert!(inbox.is_empty());
385 }
386
387 #[tokio::test]
388 async fn poll_returns_immediately_when_messages_exist() {
389 let dir = tempfile::tempdir().unwrap();
390 let mgr = make_manager(dir.path());
391
392 mgr.send_message("t", InboxMessage::new("a", "b", "hi"))
393 .await
394 .unwrap();
395
396 let start = Instant::now();
397 let msgs = mgr
398 .poll_inbox("t", "b", Duration::from_secs(5))
399 .await
400 .unwrap();
401 let elapsed = start.elapsed();
402
403 assert_eq!(msgs.len(), 1);
404 assert!(elapsed < Duration::from_secs(1), "poll should return immediately");
405 }
406
407 #[tokio::test]
408 async fn poll_times_out_with_empty_result() {
409 let dir = tempfile::tempdir().unwrap();
410 let mgr = make_manager(dir.path());
411
412 let start = Instant::now();
413 let msgs = mgr
414 .poll_inbox("t", "agent", Duration::from_millis(600))
415 .await
416 .unwrap();
417 let elapsed = start.elapsed();
418
419 assert!(msgs.is_empty());
420 assert!(elapsed >= Duration::from_millis(500), "should wait near timeout");
421 assert!(elapsed < Duration::from_secs(3), "should not wait too long");
422 }
423
424 #[tokio::test]
425 async fn read_empty_inbox() {
426 let dir = tempfile::tempdir().unwrap();
427 let mgr = make_manager(dir.path());
428
429 let inbox = mgr.read_inbox("team", "nobody").await.unwrap();
430 assert!(inbox.is_empty());
431 }
432
433 #[tokio::test]
434 async fn structured_message_round_trip_via_inbox() {
435 let dir = tempfile::tempdir().unwrap();
436 let mgr = make_manager(dir.path());
437
438 let structured = StructuredMessage::TaskAssignment {
439 task_id: "42".into(),
440 subject: "Fix the bug".into(),
441 description: Some("It's broken".into()),
442 assigned_by: None,
443 timestamp: None,
444 };
445 let msg = InboxMessage::from_structured("lead", "worker-1", &structured).unwrap();
446 mgr.send_message("t", msg).await.unwrap();
447
448 let inbox = mgr.read_inbox("t", "worker-1").await.unwrap();
449 assert_eq!(inbox.len(), 1);
450
451 let parsed = inbox[0].try_as_structured().unwrap();
452 match parsed {
453 StructuredMessage::TaskAssignment {
454 task_id, subject, ..
455 } => {
456 assert_eq!(task_id, "42");
457 assert_eq!(subject, "Fix the bug");
458 }
459 other => panic!("expected TaskAssignment, got {other:?}"),
460 }
461 }
462
463 #[tokio::test]
464 async fn mark_read_nonexistent_message_is_ok() {
465 let dir = tempfile::tempdir().unwrap();
466 let mgr = make_manager(dir.path());
467
468 mgr.mark_read("t", "agent", "nonexistent-id").await.unwrap();
470 }
471}