1use std::collections::HashSet;
18use std::io::ErrorKind;
19use std::path::PathBuf;
20
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23
24use crate::error::{atomic_write, Result, StoreError};
25
26#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
28pub struct MsgId(pub String);
29
30impl MsgId {
31 pub fn new() -> Self {
32 MsgId(uuid::Uuid::new_v4().to_string())
33 }
34 pub fn as_str(&self) -> &str {
35 &self.0
36 }
37}
38
39impl Default for MsgId {
40 fn default() -> Self {
41 Self::new()
42 }
43}
44
45#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct AgentRef {
48 pub session_id: String,
49 #[serde(default, skip_serializing_if = "Option::is_none")]
50 pub role: Option<String>,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "snake_case")]
56pub enum InboxKind {
57 Task,
58 Ask,
59 Handoff,
60 Reply,
61 McpRequest,
65 McpReply,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub enum AskMode {
73 #[default]
76 Query,
77 Steer,
81}
82
83#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
85pub struct AskBody {
86 pub question: String,
87 #[serde(default)]
88 pub mode: AskMode,
89}
90
91#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub struct ReplyBody {
94 pub answer: String,
95}
96
97#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
99pub struct InboxMessage {
100 pub id: MsgId,
101 pub from: AgentRef,
102 pub kind: InboxKind,
103 pub body: serde_json::Value,
104 pub created_at: DateTime<Utc>,
105 #[serde(default, skip_serializing_if = "Option::is_none")]
109 pub correlation_id: Option<MsgId>,
110}
111
112#[derive(Debug, Clone)]
114pub struct Delivered {
115 pub msg: InboxMessage,
116 pub cur_path: PathBuf,
117}
118
119pub struct Mailbox {
121 dir: PathBuf,
122}
123
124impl Mailbox {
125 pub fn at(dir: impl Into<PathBuf>) -> Self {
126 Self { dir: dir.into() }
127 }
128
129 fn new_dir(&self) -> PathBuf {
130 self.dir.join("new")
131 }
132 fn cur_dir(&self) -> PathBuf {
133 self.dir.join("cur")
134 }
135 fn corrupt_dir(&self) -> PathBuf {
136 self.dir.join("corrupt")
137 }
138
139 pub async fn ensure_dirs(&self) -> Result<()> {
140 for d in [self.new_dir(), self.cur_dir(), self.corrupt_dir()] {
141 tokio::fs::create_dir_all(&d)
142 .await
143 .map_err(|e| StoreError::io(&d, e))?;
144 }
145 Ok(())
146 }
147
148 pub async fn deliver(&self, msg: &InboxMessage) -> Result<MsgId> {
152 let bytes = serde_json::to_vec_pretty(msg).map_err(|e| StoreError::decode(&self.dir, e))?;
153 let nanos = msg.created_at.timestamp_nanos_opt().unwrap_or(0).max(0);
154 let name = format!("{nanos:020}-{}.json", msg.id.0);
156 atomic_write(&self.new_dir().join(&name), &bytes).await?;
158 Ok(msg.id.clone())
159 }
160
161 pub async fn drain(&self) -> Result<Vec<Delivered>> {
166 self.ensure_dirs().await?;
167 let names = self.sorted_json_names(&self.new_dir()).await?;
168 let mut out = Vec::new();
169 for name in names {
170 let src = self.new_dir().join(&name);
171 let dst = self.cur_dir().join(&name);
172 if tokio::fs::rename(&src, &dst).await.is_err() {
174 continue;
175 }
176 match read_msg(&dst).await {
177 Ok(msg) => out.push(Delivered { msg, cur_path: dst }),
178 Err(_) => {
179 let _ = tokio::fs::rename(&dst, &self.corrupt_dir().join(&name)).await;
180 }
181 }
182 }
183 Ok(out)
184 }
185
186 pub async fn ack_delivered(&self, delivered: &Delivered) -> Result<()> {
189 match tokio::fs::remove_file(&delivered.cur_path).await {
190 Ok(()) => Ok(()),
191 Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
192 Err(e) => Err(StoreError::io(&delivered.cur_path, e)),
193 }
194 }
195
196 pub async fn ack(&self, id: &MsgId) -> Result<()> {
200 let needle = format!("-{}.json", id.0);
201 let cur = self.cur_dir();
202 let mut rd = match tokio::fs::read_dir(&cur).await {
203 Ok(rd) => rd,
204 Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()),
205 Err(e) => return Err(StoreError::io(&cur, e)),
206 };
207 while let Some(ent) = rd.next_entry().await.map_err(|e| StoreError::io(&cur, e))? {
208 let fname = ent.file_name().to_string_lossy().into_owned();
209 if fname.ends_with(&needle) {
210 tokio::fs::remove_file(ent.path())
211 .await
212 .map_err(|e| StoreError::io(ent.path(), e))?;
213 return Ok(());
214 }
215 }
216 Ok(())
217 }
218
219 pub async fn recover(&self) -> Result<Vec<Delivered>> {
221 self.ensure_dirs().await?;
222 let names = self.sorted_json_names(&self.cur_dir()).await?;
223 let mut out = Vec::new();
224 for name in names {
225 let path = self.cur_dir().join(&name);
226 match read_msg(&path).await {
227 Ok(msg) => out.push(Delivered {
228 msg,
229 cur_path: path,
230 }),
231 Err(_) => {
232 let _ = tokio::fs::rename(&path, &self.corrupt_dir().join(&name)).await;
233 }
234 }
235 }
236 Ok(out)
237 }
238
239 pub async fn is_empty(&self) -> Result<bool> {
241 Ok(self.sorted_json_names(&self.new_dir()).await?.is_empty())
242 }
243
244 async fn sorted_json_names(&self, dir: &std::path::Path) -> Result<Vec<String>> {
245 let mut rd = match tokio::fs::read_dir(dir).await {
246 Ok(rd) => rd,
247 Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
248 Err(e) => return Err(StoreError::io(dir, e)),
249 };
250 let mut names = Vec::new();
251 while let Some(ent) = rd.next_entry().await.map_err(|e| StoreError::io(dir, e))? {
252 let fname = ent.file_name().to_string_lossy().into_owned();
253 if fname.starts_with('.') || !fname.ends_with(".json") {
254 continue; }
256 names.push(fname);
257 }
258 names.sort();
259 Ok(names)
260 }
261}
262
263async fn read_msg(path: &std::path::Path) -> Result<InboxMessage> {
264 let bytes = tokio::fs::read(path)
265 .await
266 .map_err(|e| StoreError::io(path, e))?;
267 serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e))
268}
269
270#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
277#[serde(from = "Vec<MsgId>", into = "Vec<MsgId>")]
278pub struct AdmittedSet {
279 order: std::collections::VecDeque<MsgId>,
280 index: HashSet<MsgId>,
281}
282
283pub const ADMITTED_SET_CAPACITY: usize = 4096;
285
286impl AdmittedSet {
287 pub fn contains(&self, id: &MsgId) -> bool {
288 self.index.contains(id)
289 }
290 pub fn insert(&mut self, id: MsgId) -> bool {
292 if !self.index.insert(id.clone()) {
293 return false;
294 }
295 self.order.push_back(id);
296 while self.order.len() > ADMITTED_SET_CAPACITY {
297 if let Some(evicted) = self.order.pop_front() {
298 self.index.remove(&evicted);
299 }
300 }
301 true
302 }
303 pub fn len(&self) -> usize {
304 self.order.len()
305 }
306 pub fn is_empty(&self) -> bool {
307 self.order.is_empty()
308 }
309}
310
311impl From<Vec<MsgId>> for AdmittedSet {
312 fn from(ids: Vec<MsgId>) -> Self {
313 let mut set = AdmittedSet::default();
314 for id in ids {
315 set.insert(id);
316 }
317 set
318 }
319}
320
321impl From<AdmittedSet> for Vec<MsgId> {
322 fn from(set: AdmittedSet) -> Self {
323 set.order.into_iter().collect()
324 }
325}
326
327#[cfg(test)]
328mod tests {
329 use super::*;
330 use chrono::TimeZone;
331 use serde_json::json;
332 use tempfile::TempDir;
333
334 fn mailbox() -> (TempDir, Mailbox) {
335 let dir = TempDir::new().unwrap();
336 let mb = Mailbox::at(dir.path().join("mailbox"));
337 (dir, mb)
338 }
339
340 fn msg(seq: u32) -> InboxMessage {
341 InboxMessage {
342 id: MsgId::new(),
343 from: AgentRef {
344 session_id: "parent".into(),
345 role: None,
346 },
347 kind: InboxKind::Task,
348 body: json!({ "seq": seq }),
349 created_at: Utc::now(),
350 correlation_id: None,
351 }
352 }
353
354 #[test]
355 fn ask_reply_bodies_and_correlation_round_trip() {
356 let ask = AskBody {
358 question: "what did you find?".into(),
359 mode: AskMode::Query,
360 };
361 let ask_json = serde_json::to_value(&ask).unwrap();
362 assert_eq!(ask_json["mode"], "query");
363 assert_eq!(serde_json::from_value::<AskBody>(ask_json).unwrap(), ask);
364 let defaulted: AskBody = serde_json::from_value(json!({ "question": "q" })).unwrap();
366 assert_eq!(defaulted.mode, AskMode::Query);
367 assert_eq!(
369 serde_json::from_value::<AskMode>(json!("steer")).unwrap(),
370 AskMode::Steer
371 );
372
373 let ask_id = MsgId::new();
375 let reply = InboxMessage {
376 id: MsgId::new(),
377 from: AgentRef {
378 session_id: "child".into(),
379 role: None,
380 },
381 kind: InboxKind::Reply,
382 body: serde_json::to_value(ReplyBody {
383 answer: "found X".into(),
384 })
385 .unwrap(),
386 created_at: Utc::now(),
387 correlation_id: Some(ask_id.clone()),
388 };
389 let round: InboxMessage =
390 serde_json::from_value(serde_json::to_value(&reply).unwrap()).unwrap();
391 assert_eq!(round.correlation_id, Some(ask_id));
392 assert_eq!(round.kind, InboxKind::Reply);
393 let legacy: InboxMessage = serde_json::from_value(json!({
395 "id": MsgId::new(),
396 "from": { "session_id": "p" },
397 "kind": "task",
398 "body": {},
399 "created_at": Utc::now().to_rfc3339(),
400 }))
401 .unwrap();
402 assert_eq!(legacy.correlation_id, None);
403 }
404
405 #[tokio::test]
406 async fn deliver_then_drain_then_ack() {
407 let (_d, mb) = mailbox();
408 let m = msg(1);
409 mb.deliver(&m).await.unwrap();
410
411 assert!(!mb.is_empty().await.unwrap());
412 let batch = mb.drain().await.unwrap();
413 assert_eq!(batch.len(), 1);
414 assert_eq!(batch[0].msg.id, m.id);
415 assert!(mb.is_empty().await.unwrap()); mb.ack(&m.id).await.unwrap();
418 assert!(mb.recover().await.unwrap().is_empty());
420 }
421
422 #[tokio::test]
423 async fn multi_writer_no_loss() {
424 let (_d, mb) = mailbox();
425 mb.ensure_dirs().await.unwrap();
426 let dir = mb.dir.clone();
427
428 let mut handles = Vec::new();
429 for i in 0..50u32 {
430 let d = dir.clone();
431 handles.push(tokio::spawn(async move {
432 let mb = Mailbox::at(d);
433 mb.deliver(&msg(i)).await.unwrap();
434 }));
435 }
436 for h in handles {
437 h.await.unwrap();
438 }
439
440 let batch = mb.drain().await.unwrap();
441 assert_eq!(batch.len(), 50);
442 let ids: HashSet<_> = batch.iter().map(|d| d.msg.id.clone()).collect();
443 assert_eq!(ids.len(), 50); }
445
446 #[tokio::test]
447 async fn drain_is_time_ordered() {
448 let (_d, mb) = mailbox();
449 let base = Utc.timestamp_opt(1_700_000_000, 0).unwrap();
450 for i in 0..5u32 {
451 let mut m = msg(i);
452 m.created_at = base + chrono::Duration::seconds(i as i64);
453 mb.deliver(&m).await.unwrap();
454 }
455 let batch = mb.drain().await.unwrap();
456 let seqs: Vec<u32> = batch
457 .iter()
458 .map(|d| d.msg.body["seq"].as_u64().unwrap() as u32)
459 .collect();
460 assert_eq!(seqs, vec![0, 1, 2, 3, 4]);
461 }
462
463 #[tokio::test]
464 async fn recover_returns_unacked_leftovers() {
465 let (_d, mb) = mailbox();
466 let m = msg(1);
467 mb.deliver(&m).await.unwrap();
468 let batch = mb.drain().await.unwrap(); assert_eq!(batch.len(), 1);
470
471 let mb2 = Mailbox::at(mb.dir.clone());
473 let recovered = mb2.recover().await.unwrap();
474 assert_eq!(recovered.len(), 1);
475 assert_eq!(recovered[0].msg.id, m.id);
476 }
477
478 #[tokio::test]
479 async fn corrupt_file_is_quarantined() {
480 let (_d, mb) = mailbox();
481 mb.ensure_dirs().await.unwrap();
482 mb.deliver(&msg(1)).await.unwrap();
484 tokio::fs::write(
485 mb.new_dir().join("00000000000000000001-bogus.json"),
486 b"not json",
487 )
488 .await
489 .unwrap();
490
491 let batch = mb.drain().await.unwrap();
492 assert_eq!(batch.len(), 1); let mut rd = tokio::fs::read_dir(mb.corrupt_dir()).await.unwrap();
494 let mut corrupt = 0;
495 while rd.next_entry().await.unwrap().is_some() {
496 corrupt += 1;
497 }
498 assert_eq!(corrupt, 1); }
500
501 #[tokio::test]
502 async fn admitted_set_dedupes() {
503 let mut seen = AdmittedSet::default();
504 let id = MsgId::new();
505 assert!(seen.insert(id.clone())); assert!(seen.contains(&id));
507 assert!(!seen.insert(id.clone())); assert_eq!(seen.len(), 1);
509 }
510
511 #[test]
512 fn admitted_set_is_bounded_and_serde_round_trips() {
513 let mut seen = AdmittedSet::default();
514 let first = MsgId::new();
515 seen.insert(first.clone());
516 for _ in 0..ADMITTED_SET_CAPACITY {
517 seen.insert(MsgId::new());
518 }
519 assert_eq!(seen.len(), ADMITTED_SET_CAPACITY);
521 assert!(!seen.contains(&first));
522
523 let json = serde_json::to_string(&seen).unwrap();
525 let restored: AdmittedSet = serde_json::from_str(&json).unwrap();
526 assert_eq!(restored.len(), seen.len());
527 let probe = Vec::<MsgId>::from(seen.clone())[0].clone();
528 assert!(restored.contains(&probe));
529 }
530
531 #[tokio::test]
532 async fn ack_delivered_removes_by_path() {
533 let (_d, mb) = mailbox();
534 mb.deliver(&msg(1)).await.unwrap();
535 let batch = mb.drain().await.unwrap();
536 mb.ack_delivered(&batch[0]).await.unwrap();
537 assert!(mb.recover().await.unwrap().is_empty()); mb.ack_delivered(&batch[0]).await.unwrap();
540 }
541}