Skip to main content

bamboo_subagent/
mailbox.rs

1//! Maildir-style persistent mailbox (design §3.4).
2//!
3//! ```text
4//! mailbox/
5//!   new/      delivered, unprocessed   <unix_nanos>-<msgid>.json
6//!   cur/      claimed, being processed
7//!   corrupt/  quarantined parse failures
8//! ```
9//!
10//! - **Multi-writer / single-reader, lock-free.** Senders [`deliver`](Mailbox::deliver) via
11//!   atomic temp+rename into `new/`; the owning actor [`drain`](Mailbox::drain)s by renaming
12//!   `new/ -> cur/` (claim), processes, then [`ack`](Mailbox::ack)s (delete from `cur/`).
13//! - **Crash-safe, at-least-once.** A crash between claim and ack leaves the message in `cur/`;
14//!   [`recover`](Mailbox::recover) re-yields it on next activation. Dedupe is the consumer's job
15//!   (see [`AdmittedSet`]), keyed by [`MsgId`].
16
17use std::collections::HashSet;
18use std::io::ErrorKind;
19use std::path::PathBuf;
20
21use chrono::{DateTime, Utc};
22use serde::{Deserialize, Serialize};
23
24use crate::error::{atomic_write, Result, StoreError};
25
26/// Idempotency key for a delivered message.
27#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
28pub struct MsgId(pub String);
29
30impl MsgId {
31    pub fn new() -> Self {
32        MsgId(uuid::Uuid::new_v4().to_string())
33    }
34    pub fn as_str(&self) -> &str {
35        &self.0
36    }
37}
38
39impl Default for MsgId {
40    fn default() -> Self {
41        Self::new()
42    }
43}
44
45/// Sender identity attached to an inbox message.
46#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
47pub struct AgentRef {
48    pub session_id: String,
49    #[serde(default, skip_serializing_if = "Option::is_none")]
50    pub role: Option<String>,
51}
52
53/// In-band message kind (control signals like `cancel` do NOT travel here — they are out-of-band).
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
55#[serde(rename_all = "snake_case")]
56pub enum InboxKind {
57    Task,
58    Ask,
59    Handoff,
60    Reply,
61    /// A worker→orchestrator MCP proxy request (fetch the proxiable tool manifest,
62    /// or invoke one of those tools). The orchestrator runs the real (host-bound)
63    /// MCP server and answers with [`InboxKind::McpReply`].
64    McpRequest,
65    /// The orchestrator's answer to an [`InboxKind::McpRequest`].
66    McpReply,
67}
68
69/// How a sub-agent should answer an [`InboxKind::Ask`].
70#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
71#[serde(rename_all = "snake_case")]
72pub enum AskMode {
73    /// Summarize/extract: answer from the agent's current state via an ephemeral
74    /// side-query (a clone of the session), leaving the live task untouched.
75    #[default]
76    Query,
77    /// Insert into the live conversation / redirect the goal: the question
78    /// becomes a real user turn in the agent's running session, and the
79    /// resulting assistant message is the answer.
80    Steer,
81}
82
83/// Body of an [`InboxKind::Ask`] message (carried in `InboxMessage.body`).
84#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
85pub struct AskBody {
86    pub question: String,
87    #[serde(default)]
88    pub mode: AskMode,
89}
90
91/// Body of an [`InboxKind::Reply`] message (carried in `InboxMessage.body`).
92#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
93pub struct ReplyBody {
94    pub answer: String,
95}
96
97/// A message addressed to an actor's mailbox. `body` is the opaque chat payload (domain `Message`).
98#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
99pub struct InboxMessage {
100    pub id: MsgId,
101    pub from: AgentRef,
102    pub kind: InboxKind,
103    pub body: serde_json::Value,
104    pub created_at: DateTime<Utc>,
105    /// For an [`InboxKind::Reply`], the [`MsgId`] of the [`InboxKind::Ask`] it
106    /// answers — lets a parent match a reply to the ask it is awaiting. `None`
107    /// for unsolicited messages (Task/Ask/Handoff).
108    #[serde(default, skip_serializing_if = "Option::is_none")]
109    pub correlation_id: Option<MsgId>,
110}
111
112/// A claimed message plus its location in `cur/` (for `ack`).
113#[derive(Debug, Clone)]
114pub struct Delivered {
115    pub msg: InboxMessage,
116    pub cur_path: PathBuf,
117}
118
119/// Per-actor mailbox rooted at a `mailbox/` directory.
120pub struct Mailbox {
121    dir: PathBuf,
122}
123
124impl Mailbox {
125    pub fn at(dir: impl Into<PathBuf>) -> Self {
126        Self { dir: dir.into() }
127    }
128
129    fn new_dir(&self) -> PathBuf {
130        self.dir.join("new")
131    }
132    fn cur_dir(&self) -> PathBuf {
133        self.dir.join("cur")
134    }
135    fn corrupt_dir(&self) -> PathBuf {
136        self.dir.join("corrupt")
137    }
138
139    pub async fn ensure_dirs(&self) -> Result<()> {
140        for d in [self.new_dir(), self.cur_dir(), self.corrupt_dir()] {
141            tokio::fs::create_dir_all(&d)
142                .await
143                .map_err(|e| StoreError::io(&d, e))?;
144        }
145        Ok(())
146    }
147
148    // ---- sender side (multi-writer, lock-free) ----------------------------
149
150    /// Atomically deliver `msg` into `new/`. Safe under concurrent writers.
151    pub async fn deliver(&self, msg: &InboxMessage) -> Result<MsgId> {
152        let bytes = serde_json::to_vec_pretty(msg).map_err(|e| StoreError::decode(&self.dir, e))?;
153        let nanos = msg.created_at.timestamp_nanos_opt().unwrap_or(0).max(0);
154        // 20-digit zero-padded prefix => lexicographic order == time order; msgid breaks ties.
155        let name = format!("{nanos:020}-{}.json", msg.id.0);
156        // atomic_write puts its temp in new/ as a hidden `.`-file that drain skips.
157        atomic_write(&self.new_dir().join(&name), &bytes).await?;
158        Ok(msg.id.clone())
159    }
160
161    // ---- receiver side (single reader = the actor) ------------------------
162
163    /// Claim and return all pending messages in `new/`, in delivery order.
164    /// Each is renamed `new/ -> cur/`; corrupt files are quarantined and skipped.
165    pub async fn drain(&self) -> Result<Vec<Delivered>> {
166        self.ensure_dirs().await?;
167        let names = self.sorted_json_names(&self.new_dir()).await?;
168        let mut out = Vec::new();
169        for name in names {
170            let src = self.new_dir().join(&name);
171            let dst = self.cur_dir().join(&name);
172            // claim; if it's already gone (lost race), skip.
173            if tokio::fs::rename(&src, &dst).await.is_err() {
174                continue;
175            }
176            match read_msg(&dst).await {
177                Ok(msg) => out.push(Delivered { msg, cur_path: dst }),
178                Err(_) => {
179                    let _ = tokio::fs::rename(&dst, &self.corrupt_dir().join(&name)).await;
180                }
181            }
182        }
183        Ok(out)
184    }
185
186    /// Acknowledge a processed message by its claimed location (O(1); preferred —
187    /// [`Delivered::cur_path`] carries it). Idempotent (no-op if already gone).
188    pub async fn ack_delivered(&self, delivered: &Delivered) -> Result<()> {
189        match tokio::fs::remove_file(&delivered.cur_path).await {
190            Ok(()) => Ok(()),
191            Err(e) if e.kind() == ErrorKind::NotFound => Ok(()),
192            Err(e) => Err(StoreError::io(&delivered.cur_path, e)),
193        }
194    }
195
196    /// Acknowledge a processed message by id: delete it from `cur/`.
197    /// O(n) directory scan — prefer [`ack_delivered`](Self::ack_delivered)
198    /// when you still hold the [`Delivered`]. Idempotent (no-op if gone).
199    pub async fn ack(&self, id: &MsgId) -> Result<()> {
200        let needle = format!("-{}.json", id.0);
201        let cur = self.cur_dir();
202        let mut rd = match tokio::fs::read_dir(&cur).await {
203            Ok(rd) => rd,
204            Err(e) if e.kind() == ErrorKind::NotFound => return Ok(()),
205            Err(e) => return Err(StoreError::io(&cur, e)),
206        };
207        while let Some(ent) = rd.next_entry().await.map_err(|e| StoreError::io(&cur, e))? {
208            let fname = ent.file_name().to_string_lossy().into_owned();
209            if fname.ends_with(&needle) {
210                tokio::fs::remove_file(ent.path())
211                    .await
212                    .map_err(|e| StoreError::io(ent.path(), e))?;
213                return Ok(());
214            }
215        }
216        Ok(())
217    }
218
219    /// Re-yield messages left in `cur/` by a previous (crashed) activation, in order.
220    pub async fn recover(&self) -> Result<Vec<Delivered>> {
221        self.ensure_dirs().await?;
222        let names = self.sorted_json_names(&self.cur_dir()).await?;
223        let mut out = Vec::new();
224        for name in names {
225            let path = self.cur_dir().join(&name);
226            match read_msg(&path).await {
227                Ok(msg) => out.push(Delivered {
228                    msg,
229                    cur_path: path,
230                }),
231                Err(_) => {
232                    let _ = tokio::fs::rename(&path, &self.corrupt_dir().join(&name)).await;
233                }
234            }
235        }
236        Ok(out)
237    }
238
239    /// True if `new/` has no pending messages.
240    pub async fn is_empty(&self) -> Result<bool> {
241        Ok(self.sorted_json_names(&self.new_dir()).await?.is_empty())
242    }
243
244    async fn sorted_json_names(&self, dir: &std::path::Path) -> Result<Vec<String>> {
245        let mut rd = match tokio::fs::read_dir(dir).await {
246            Ok(rd) => rd,
247            Err(e) if e.kind() == ErrorKind::NotFound => return Ok(Vec::new()),
248            Err(e) => return Err(StoreError::io(dir, e)),
249        };
250        let mut names = Vec::new();
251        while let Some(ent) = rd.next_entry().await.map_err(|e| StoreError::io(dir, e))? {
252            let fname = ent.file_name().to_string_lossy().into_owned();
253            if fname.starts_with('.') || !fname.ends_with(".json") {
254                continue; // skip hidden temp files / non-messages
255            }
256            names.push(fname);
257        }
258        names.sort();
259        Ok(names)
260    }
261}
262
263async fn read_msg(path: &std::path::Path) -> Result<InboxMessage> {
264    let bytes = tokio::fs::read(path)
265        .await
266        .map_err(|e| StoreError::io(path, e))?;
267    serde_json::from_slice(&bytes).map_err(|e| StoreError::decode(path, e))
268}
269
270/// Consumer-side dedupe set for at-least-once delivery; persist with the session state.
271///
272/// **Bounded**: keeps the most recent [`ADMITTED_SET_CAPACITY`] ids, evicting the
273/// oldest. Redelivery only ever happens for *recently* claimed-but-unacked
274/// messages, so a bounded recency window is sufficient — and a long-lived actor
275/// can't grow it without limit.
276#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
277#[serde(from = "Vec<MsgId>", into = "Vec<MsgId>")]
278pub struct AdmittedSet {
279    order: std::collections::VecDeque<MsgId>,
280    index: HashSet<MsgId>,
281}
282
283/// Max ids an [`AdmittedSet`] retains (oldest evicted beyond this).
284pub const ADMITTED_SET_CAPACITY: usize = 4096;
285
286impl AdmittedSet {
287    pub fn contains(&self, id: &MsgId) -> bool {
288        self.index.contains(id)
289    }
290    /// Record `id` as admitted. Returns `true` if newly inserted (i.e. should admit now).
291    pub fn insert(&mut self, id: MsgId) -> bool {
292        if !self.index.insert(id.clone()) {
293            return false;
294        }
295        self.order.push_back(id);
296        while self.order.len() > ADMITTED_SET_CAPACITY {
297            if let Some(evicted) = self.order.pop_front() {
298                self.index.remove(&evicted);
299            }
300        }
301        true
302    }
303    pub fn len(&self) -> usize {
304        self.order.len()
305    }
306    pub fn is_empty(&self) -> bool {
307        self.order.is_empty()
308    }
309}
310
311impl From<Vec<MsgId>> for AdmittedSet {
312    fn from(ids: Vec<MsgId>) -> Self {
313        let mut set = AdmittedSet::default();
314        for id in ids {
315            set.insert(id);
316        }
317        set
318    }
319}
320
321impl From<AdmittedSet> for Vec<MsgId> {
322    fn from(set: AdmittedSet) -> Self {
323        set.order.into_iter().collect()
324    }
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use chrono::TimeZone;
331    use serde_json::json;
332    use tempfile::TempDir;
333
334    fn mailbox() -> (TempDir, Mailbox) {
335        let dir = TempDir::new().unwrap();
336        let mb = Mailbox::at(dir.path().join("mailbox"));
337        (dir, mb)
338    }
339
340    fn msg(seq: u32) -> InboxMessage {
341        InboxMessage {
342            id: MsgId::new(),
343            from: AgentRef {
344                session_id: "parent".into(),
345                role: None,
346            },
347            kind: InboxKind::Task,
348            body: json!({ "seq": seq }),
349            created_at: Utc::now(),
350            correlation_id: None,
351        }
352    }
353
354    #[test]
355    fn ask_reply_bodies_and_correlation_round_trip() {
356        // Ask body: question + mode (default query).
357        let ask = AskBody {
358            question: "what did you find?".into(),
359            mode: AskMode::Query,
360        };
361        let ask_json = serde_json::to_value(&ask).unwrap();
362        assert_eq!(ask_json["mode"], "query");
363        assert_eq!(serde_json::from_value::<AskBody>(ask_json).unwrap(), ask);
364        // mode defaults to query when absent.
365        let defaulted: AskBody = serde_json::from_value(json!({ "question": "q" })).unwrap();
366        assert_eq!(defaulted.mode, AskMode::Query);
367        // steer round-trips.
368        assert_eq!(
369            serde_json::from_value::<AskMode>(json!("steer")).unwrap(),
370            AskMode::Steer
371        );
372
373        // Reply correlation: a Reply carries the Ask's id.
374        let ask_id = MsgId::new();
375        let reply = InboxMessage {
376            id: MsgId::new(),
377            from: AgentRef {
378                session_id: "child".into(),
379                role: None,
380            },
381            kind: InboxKind::Reply,
382            body: serde_json::to_value(ReplyBody {
383                answer: "found X".into(),
384            })
385            .unwrap(),
386            created_at: Utc::now(),
387            correlation_id: Some(ask_id.clone()),
388        };
389        let round: InboxMessage =
390            serde_json::from_value(serde_json::to_value(&reply).unwrap()).unwrap();
391        assert_eq!(round.correlation_id, Some(ask_id));
392        assert_eq!(round.kind, InboxKind::Reply);
393        // Back-compat: a message serialized without correlation_id still parses.
394        let legacy: InboxMessage = serde_json::from_value(json!({
395            "id": MsgId::new(),
396            "from": { "session_id": "p" },
397            "kind": "task",
398            "body": {},
399            "created_at": Utc::now().to_rfc3339(),
400        }))
401        .unwrap();
402        assert_eq!(legacy.correlation_id, None);
403    }
404
405    #[tokio::test]
406    async fn deliver_then_drain_then_ack() {
407        let (_d, mb) = mailbox();
408        let m = msg(1);
409        mb.deliver(&m).await.unwrap();
410
411        assert!(!mb.is_empty().await.unwrap());
412        let batch = mb.drain().await.unwrap();
413        assert_eq!(batch.len(), 1);
414        assert_eq!(batch[0].msg.id, m.id);
415        assert!(mb.is_empty().await.unwrap()); // moved out of new/
416
417        mb.ack(&m.id).await.unwrap();
418        // nothing left in cur/ -> recover yields nothing
419        assert!(mb.recover().await.unwrap().is_empty());
420    }
421
422    #[tokio::test]
423    async fn multi_writer_no_loss() {
424        let (_d, mb) = mailbox();
425        mb.ensure_dirs().await.unwrap();
426        let dir = mb.dir.clone();
427
428        let mut handles = Vec::new();
429        for i in 0..50u32 {
430            let d = dir.clone();
431            handles.push(tokio::spawn(async move {
432                let mb = Mailbox::at(d);
433                mb.deliver(&msg(i)).await.unwrap();
434            }));
435        }
436        for h in handles {
437            h.await.unwrap();
438        }
439
440        let batch = mb.drain().await.unwrap();
441        assert_eq!(batch.len(), 50);
442        let ids: HashSet<_> = batch.iter().map(|d| d.msg.id.clone()).collect();
443        assert_eq!(ids.len(), 50); // all unique, none lost
444    }
445
446    #[tokio::test]
447    async fn drain_is_time_ordered() {
448        let (_d, mb) = mailbox();
449        let base = Utc.timestamp_opt(1_700_000_000, 0).unwrap();
450        for i in 0..5u32 {
451            let mut m = msg(i);
452            m.created_at = base + chrono::Duration::seconds(i as i64);
453            mb.deliver(&m).await.unwrap();
454        }
455        let batch = mb.drain().await.unwrap();
456        let seqs: Vec<u32> = batch
457            .iter()
458            .map(|d| d.msg.body["seq"].as_u64().unwrap() as u32)
459            .collect();
460        assert_eq!(seqs, vec![0, 1, 2, 3, 4]);
461    }
462
463    #[tokio::test]
464    async fn recover_returns_unacked_leftovers() {
465        let (_d, mb) = mailbox();
466        let m = msg(1);
467        mb.deliver(&m).await.unwrap();
468        let batch = mb.drain().await.unwrap(); // claimed into cur/, not acked
469        assert_eq!(batch.len(), 1);
470
471        // simulate crash + reactivation: a fresh handle on the same dir
472        let mb2 = Mailbox::at(mb.dir.clone());
473        let recovered = mb2.recover().await.unwrap();
474        assert_eq!(recovered.len(), 1);
475        assert_eq!(recovered[0].msg.id, m.id);
476    }
477
478    #[tokio::test]
479    async fn corrupt_file_is_quarantined() {
480        let (_d, mb) = mailbox();
481        mb.ensure_dirs().await.unwrap();
482        // a well-formed message + a bogus one
483        mb.deliver(&msg(1)).await.unwrap();
484        tokio::fs::write(
485            mb.new_dir().join("00000000000000000001-bogus.json"),
486            b"not json",
487        )
488        .await
489        .unwrap();
490
491        let batch = mb.drain().await.unwrap();
492        assert_eq!(batch.len(), 1); // the good one came through
493        let mut rd = tokio::fs::read_dir(mb.corrupt_dir()).await.unwrap();
494        let mut corrupt = 0;
495        while rd.next_entry().await.unwrap().is_some() {
496            corrupt += 1;
497        }
498        assert_eq!(corrupt, 1); // the bogus one quarantined
499    }
500
501    #[tokio::test]
502    async fn admitted_set_dedupes() {
503        let mut seen = AdmittedSet::default();
504        let id = MsgId::new();
505        assert!(seen.insert(id.clone())); // first time -> admit
506        assert!(seen.contains(&id));
507        assert!(!seen.insert(id.clone())); // redelivery -> skip
508        assert_eq!(seen.len(), 1);
509    }
510
511    #[test]
512    fn admitted_set_is_bounded_and_serde_round_trips() {
513        let mut seen = AdmittedSet::default();
514        let first = MsgId::new();
515        seen.insert(first.clone());
516        for _ in 0..ADMITTED_SET_CAPACITY {
517            seen.insert(MsgId::new());
518        }
519        // capacity respected; the oldest id was evicted
520        assert_eq!(seen.len(), ADMITTED_SET_CAPACITY);
521        assert!(!seen.contains(&first));
522
523        // serde round-trip preserves membership (index rebuilt on load)
524        let json = serde_json::to_string(&seen).unwrap();
525        let restored: AdmittedSet = serde_json::from_str(&json).unwrap();
526        assert_eq!(restored.len(), seen.len());
527        let probe = Vec::<MsgId>::from(seen.clone())[0].clone();
528        assert!(restored.contains(&probe));
529    }
530
531    #[tokio::test]
532    async fn ack_delivered_removes_by_path() {
533        let (_d, mb) = mailbox();
534        mb.deliver(&msg(1)).await.unwrap();
535        let batch = mb.drain().await.unwrap();
536        mb.ack_delivered(&batch[0]).await.unwrap();
537        assert!(mb.recover().await.unwrap().is_empty()); // cur/ empty
538                                                         // idempotent
539        mb.ack_delivered(&batch[0]).await.unwrap();
540    }
541}