Skip to main content

batty_cli/team/
inbox.rs

1//! Maildir-based inbox messaging system.
2//!
3//! Each team member gets a Maildir at `.batty/inboxes/<member>/` with
4//! `new/`, `cur/`, `tmp/` subdirectories. Messages are JSON blobs stored
5//! atomically via the `maildir` crate.
6//!
7//! - `new/` — undelivered messages (daemon picks these up)
8//! - `cur/` — delivered messages (moved here after tmux injection)
9//! - `tmp/` — atomic write staging (managed by `maildir` crate)
10
11use std::fs;
12use std::path::{Path, PathBuf};
13
14use anyhow::{Context, Result};
15use maildir::Maildir;
16use serde::{Deserialize, Serialize};
17
18/// A message stored in a member's inbox.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct InboxMessage {
21    /// Unique message ID (assigned by maildir filename, not serialized in body).
22    #[serde(skip)]
23    pub id: String,
24    /// Sender name (e.g., "human", "architect", "manager-1").
25    pub from: String,
26    /// Recipient name.
27    pub to: String,
28    /// Message body text.
29    pub body: String,
30    /// Message type: "send" or "assign".
31    pub msg_type: MessageType,
32    /// Unix timestamp (seconds since epoch).
33    pub timestamp: u64,
34}
35
36/// Type of inbox message.
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum MessageType {
40    Send,
41    Assign,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct InboxPurgeSummary {
46    pub roles: usize,
47    pub messages: usize,
48}
49
50impl InboxMessage {
51    /// Create a new send-type message.
52    pub fn new_send(from: &str, to: &str, body: &str) -> Self {
53        Self {
54            id: String::new(),
55            from: from.to_string(),
56            to: to.to_string(),
57            body: body.to_string(),
58            msg_type: MessageType::Send,
59            timestamp: now_unix(),
60        }
61    }
62
63    /// Create a new assign-type message.
64    pub fn new_assign(from: &str, to: &str, task: &str) -> Self {
65        Self {
66            id: String::new(),
67            from: from.to_string(),
68            to: to.to_string(),
69            body: task.to_string(),
70            msg_type: MessageType::Assign,
71            timestamp: now_unix(),
72        }
73    }
74
75    /// Serialize to JSON bytes for storage.
76    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
77        serde_json::to_vec(self).context("failed to serialize inbox message")
78    }
79
80    /// Deserialize from JSON bytes read from a maildir file.
81    pub fn from_json_bytes(data: &[u8], id: &str) -> Result<Self> {
82        let mut msg: Self =
83            serde_json::from_slice(data).context("failed to deserialize inbox message")?;
84        msg.id = id.to_string();
85        Ok(msg)
86    }
87}
88
89/// Resolve the inboxes root directory: `.batty/inboxes/`.
90pub fn inboxes_root(project_root: &Path) -> PathBuf {
91    project_root.join(".batty").join("inboxes")
92}
93
94/// Get the Maildir for a specific member.
95fn member_maildir(inboxes_root: &Path, member: &str) -> Maildir {
96    Maildir::from(inboxes_root.join(member))
97}
98
99/// Initialize a member's inbox (create `new/`, `cur/`, `tmp/` dirs).
100pub fn init_inbox(inboxes_root: &Path, member: &str) -> Result<()> {
101    let md = member_maildir(inboxes_root, member);
102    md.create_dirs()
103        .with_context(|| format!("failed to create inbox dirs for '{member}'"))?;
104    Ok(())
105}
106
107/// Deliver a message to a member's inbox.
108///
109/// The message is atomically written to `new/` via the maildir crate
110/// (write to `tmp/`, rename to `new/`). Returns the maildir message ID.
111pub fn deliver_to_inbox(inboxes_root: &Path, msg: &InboxMessage) -> Result<String> {
112    let md = member_maildir(inboxes_root, &msg.to);
113    // Ensure dirs exist (idempotent)
114    md.create_dirs()
115        .with_context(|| format!("failed to create inbox dirs for '{}'", msg.to))?;
116    let data = msg.to_json_bytes()?;
117    let id = md
118        .store_new(&data)
119        .with_context(|| format!("failed to store message in inbox for '{}'", msg.to))?;
120    Ok(id)
121}
122
123/// List all pending (undelivered) messages in a member's inbox.
124///
125/// These are messages in `new/` that haven't been delivered to the agent yet.
126pub fn pending_messages(inboxes_root: &Path, member: &str) -> Result<Vec<InboxMessage>> {
127    let md = member_maildir(inboxes_root, member);
128    let mut messages = Vec::new();
129
130    for entry in md.list_new() {
131        let entry = match entry {
132            Ok(e) => e,
133            Err(e) => {
134                tracing::warn!(member, error = %e, "skipping unreadable inbox entry");
135                continue;
136            }
137        };
138        let id = entry.id().to_string();
139        let data = match std::fs::read(entry.path()) {
140            Ok(d) => d,
141            Err(e) => {
142                tracing::warn!(member, id = %id, error = %e, "failed to read inbox message");
143                continue;
144            }
145        };
146        match InboxMessage::from_json_bytes(&data, &id) {
147            Ok(msg) => messages.push(msg),
148            Err(e) => {
149                tracing::warn!(member, id = %id, error = %e, "skipping malformed inbox message");
150            }
151        }
152    }
153
154    // Sort by timestamp (oldest first for FIFO delivery)
155    messages.sort_by_key(|m| m.timestamp);
156    Ok(messages)
157}
158
159/// Count undelivered messages in `new/` for a member.
160pub fn pending_message_count(inboxes_root: &Path, member: &str) -> Result<usize> {
161    let new_dir = inboxes_root.join(member).join("new");
162    if !new_dir.is_dir() {
163        return Ok(0);
164    }
165
166    let mut count = 0usize;
167    for entry in std::fs::read_dir(&new_dir)
168        .with_context(|| format!("failed to read {}", new_dir.display()))?
169    {
170        let entry = entry.with_context(|| format!("failed to read {}", new_dir.display()))?;
171        let file_type = entry
172            .file_type()
173            .with_context(|| format!("failed to inspect {}", entry.path().display()))?;
174        if file_type.is_file() {
175            count += 1;
176        }
177    }
178
179    Ok(count)
180}
181
182/// Mark a message as delivered (move from `new/` to `cur/`).
183pub fn mark_delivered(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
184    let md = member_maildir(inboxes_root, member);
185    md.move_new_to_cur(id)
186        .with_context(|| format!("failed to mark message '{id}' as delivered for '{member}'"))?;
187    Ok(())
188}
189
190/// List all messages (both pending and delivered) for a member.
191pub fn all_messages(inboxes_root: &Path, member: &str) -> Result<Vec<(InboxMessage, bool)>> {
192    let md = member_maildir(inboxes_root, member);
193    let mut messages = Vec::new();
194
195    // new/ = pending (not yet delivered)
196    for entry in md.list_new() {
197        let entry = match entry {
198            Ok(e) => e,
199            Err(_) => continue,
200        };
201        let id = entry.id().to_string();
202        let data = match std::fs::read(entry.path()) {
203            Ok(d) => d,
204            Err(_) => continue,
205        };
206        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
207            messages.push((msg, false)); // false = not delivered
208        }
209    }
210
211    // cur/ = delivered
212    for entry in md.list_cur() {
213        let entry = match entry {
214            Ok(e) => e,
215            Err(_) => continue,
216        };
217        let id = entry.id().to_string();
218        let data = match std::fs::read(entry.path()) {
219            Ok(d) => d,
220            Err(_) => continue,
221        };
222        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
223            messages.push((msg, true)); // true = delivered
224        }
225    }
226
227    messages.sort_by_key(|(m, _)| m.timestamp);
228    Ok(messages)
229}
230
231/// Delete a message from a member's inbox (from either new/ or cur/).
232pub fn delete_message(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
233    let md = member_maildir(inboxes_root, member);
234    md.delete(id)
235        .with_context(|| format!("failed to delete message '{id}' from '{member}' inbox"))?;
236    Ok(())
237}
238
239/// Purge delivered messages from a member inbox.
240pub fn purge_delivered_messages(
241    inboxes_root: &Path,
242    member: &str,
243    before: Option<u64>,
244    purge_all: bool,
245) -> Result<usize> {
246    let cur_dir = inboxes_root.join(member).join("cur");
247    if !cur_dir.is_dir() {
248        return Ok(0);
249    }
250
251    let mut removed = 0usize;
252    for entry in
253        fs::read_dir(&cur_dir).with_context(|| format!("failed to read {}", cur_dir.display()))?
254    {
255        let entry = entry.with_context(|| format!("failed to read {}", cur_dir.display()))?;
256        let path = entry.path();
257        let file_type = entry
258            .file_type()
259            .with_context(|| format!("failed to inspect {}", path.display()))?;
260        if !file_type.is_file() {
261            continue;
262        }
263
264        let should_delete = if purge_all {
265            true
266        } else if let Some(cutoff) = before {
267            let data = match fs::read(&path) {
268                Ok(data) => data,
269                Err(_) => continue,
270            };
271            let Some(id) = path.file_name().and_then(|name| name.to_str()) else {
272                continue;
273            };
274            match InboxMessage::from_json_bytes(&data, id) {
275                Ok(message) => message.timestamp < cutoff,
276                Err(_) => false,
277            }
278        } else {
279            false
280        };
281
282        if should_delete {
283            fs::remove_file(&path)
284                .with_context(|| format!("failed to remove {}", path.display()))?;
285            removed += 1;
286        }
287    }
288
289    Ok(removed)
290}
291
292/// Purge delivered messages from every member inbox under `.batty/inboxes/`.
293pub fn purge_delivered_messages_for_all(
294    inboxes_root: &Path,
295    before: Option<u64>,
296    purge_all: bool,
297) -> Result<InboxPurgeSummary> {
298    if !inboxes_root.is_dir() {
299        return Ok(InboxPurgeSummary {
300            roles: 0,
301            messages: 0,
302        });
303    }
304
305    let mut roles = 0usize;
306    let mut messages = 0usize;
307    for entry in fs::read_dir(inboxes_root)
308        .with_context(|| format!("failed to read {}", inboxes_root.display()))?
309    {
310        let entry = entry.with_context(|| format!("failed to read {}", inboxes_root.display()))?;
311        let path = entry.path();
312        let file_type = entry
313            .file_type()
314            .with_context(|| format!("failed to inspect {}", path.display()))?;
315        if !file_type.is_dir() {
316            continue;
317        }
318
319        let Some(member) = path.file_name().and_then(|name| name.to_str()) else {
320            continue;
321        };
322        roles += 1;
323        messages += purge_delivered_messages(inboxes_root, member, before, purge_all)?;
324    }
325
326    Ok(InboxPurgeSummary { roles, messages })
327}
328
329fn now_unix() -> u64 {
330    std::time::SystemTime::now()
331        .duration_since(std::time::UNIX_EPOCH)
332        .unwrap_or_default()
333        .as_secs()
334}
335
336#[cfg(test)]
337mod tests {
338    use super::*;
339
340    #[test]
341    fn inbox_message_send_roundtrip() {
342        let msg = InboxMessage::new_send("human", "architect", "hello world");
343        assert_eq!(msg.from, "human");
344        assert_eq!(msg.to, "architect");
345        assert_eq!(msg.body, "hello world");
346        assert_eq!(msg.msg_type, MessageType::Send);
347        assert!(msg.timestamp > 0);
348
349        let bytes = msg.to_json_bytes().unwrap();
350        let parsed = InboxMessage::from_json_bytes(&bytes, "test-id").unwrap();
351        assert_eq!(parsed.id, "test-id");
352        assert_eq!(parsed.from, "human");
353        assert_eq!(parsed.to, "architect");
354        assert_eq!(parsed.body, "hello world");
355    }
356
357    #[test]
358    fn inbox_message_assign_roundtrip() {
359        let msg = InboxMessage::new_assign("black-lead", "eng-1-1", "fix the auth bug");
360        assert_eq!(msg.msg_type, MessageType::Assign);
361        assert_eq!(msg.from, "black-lead");
362        assert_eq!(msg.to, "eng-1-1");
363        assert_eq!(msg.body, "fix the auth bug");
364
365        let bytes = msg.to_json_bytes().unwrap();
366        let parsed = InboxMessage::from_json_bytes(&bytes, "assign-id").unwrap();
367        assert_eq!(parsed.msg_type, MessageType::Assign);
368    }
369
370    #[test]
371    fn init_inbox_creates_dirs() {
372        let tmp = tempfile::tempdir().unwrap();
373        let root = tmp.path();
374        init_inbox(root, "architect").unwrap();
375
376        assert!(root.join("architect").join("new").is_dir());
377        assert!(root.join("architect").join("cur").is_dir());
378        assert!(root.join("architect").join("tmp").is_dir());
379    }
380
381    #[test]
382    fn init_inbox_is_idempotent() {
383        let tmp = tempfile::tempdir().unwrap();
384        let root = tmp.path();
385        init_inbox(root, "architect").unwrap();
386        init_inbox(root, "architect").unwrap(); // should not error
387    }
388
389    #[test]
390    fn deliver_and_read_pending() {
391        let tmp = tempfile::tempdir().unwrap();
392        let root = tmp.path();
393        init_inbox(root, "architect").unwrap();
394
395        let msg = InboxMessage::new_send("human", "architect", "hello");
396        let id = deliver_to_inbox(root, &msg).unwrap();
397        assert!(!id.is_empty());
398
399        let pending = pending_messages(root, "architect").unwrap();
400        assert_eq!(pending.len(), 1);
401        assert_eq!(pending[0].from, "human");
402        assert_eq!(pending[0].body, "hello");
403        assert_eq!(pending[0].id, id);
404    }
405
406    #[test]
407    fn deliver_creates_dirs_automatically() {
408        let tmp = tempfile::tempdir().unwrap();
409        let root = tmp.path();
410        // Don't call init_inbox — deliver_to_inbox should create dirs
411        let msg = InboxMessage::new_send("human", "manager", "hi");
412        let id = deliver_to_inbox(root, &msg).unwrap();
413        assert!(!id.is_empty());
414
415        let pending = pending_messages(root, "manager").unwrap();
416        assert_eq!(pending.len(), 1);
417    }
418
419    #[test]
420    fn mark_delivered_moves_to_cur() {
421        let tmp = tempfile::tempdir().unwrap();
422        let root = tmp.path();
423        init_inbox(root, "eng-1").unwrap();
424
425        let msg = InboxMessage::new_send("manager", "eng-1", "do this");
426        let id = deliver_to_inbox(root, &msg).unwrap();
427
428        // Before: in new/
429        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 1);
430
431        mark_delivered(root, "eng-1", &id).unwrap();
432
433        // After: not in new/ anymore
434        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 0);
435
436        // But visible in all_messages as delivered
437        let all = all_messages(root, "eng-1").unwrap();
438        assert_eq!(all.len(), 1);
439        assert!(all[0].1); // delivered = true
440    }
441
442    #[test]
443    fn pending_message_count_tracks_new_messages_only() {
444        let tmp = tempfile::tempdir().unwrap();
445        let root = tmp.path();
446        init_inbox(root, "eng-1").unwrap();
447
448        let msg1 = InboxMessage::new_send("manager", "eng-1", "first");
449        let msg2 = InboxMessage::new_send("manager", "eng-1", "second");
450        let id1 = deliver_to_inbox(root, &msg1).unwrap();
451        deliver_to_inbox(root, &msg2).unwrap();
452
453        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 2);
454
455        mark_delivered(root, "eng-1", &id1).unwrap();
456        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 1);
457    }
458
459    #[test]
460    fn multiple_messages_ordered_by_timestamp() {
461        let tmp = tempfile::tempdir().unwrap();
462        let root = tmp.path();
463        init_inbox(root, "arch").unwrap();
464
465        // Deliver messages with different timestamps
466        let mut msg1 = InboxMessage::new_send("human", "arch", "first");
467        msg1.timestamp = 1000;
468        let mut msg2 = InboxMessage::new_send("human", "arch", "second");
469        msg2.timestamp = 2000;
470        let mut msg3 = InboxMessage::new_send("human", "arch", "third");
471        msg3.timestamp = 1500;
472
473        deliver_to_inbox(root, &msg1).unwrap();
474        deliver_to_inbox(root, &msg2).unwrap();
475        deliver_to_inbox(root, &msg3).unwrap();
476
477        let pending = pending_messages(root, "arch").unwrap();
478        assert_eq!(pending.len(), 3);
479        assert_eq!(pending[0].body, "first");
480        assert_eq!(pending[1].body, "third");
481        assert_eq!(pending[2].body, "second");
482    }
483
484    #[test]
485    fn all_messages_combines_new_and_cur() {
486        let tmp = tempfile::tempdir().unwrap();
487        let root = tmp.path();
488        init_inbox(root, "mgr").unwrap();
489
490        let msg1 = InboxMessage::new_send("arch", "mgr", "directive");
491        let id1 = deliver_to_inbox(root, &msg1).unwrap();
492        let msg2 = InboxMessage::new_send("eng-1", "mgr", "done");
493        deliver_to_inbox(root, &msg2).unwrap();
494
495        // Deliver first, leave second pending
496        mark_delivered(root, "mgr", &id1).unwrap();
497
498        let all = all_messages(root, "mgr").unwrap();
499        assert_eq!(all.len(), 2);
500
501        let delivered: Vec<_> = all.iter().filter(|(_, d)| *d).collect();
502        let pending: Vec<_> = all.iter().filter(|(_, d)| !*d).collect();
503        assert_eq!(delivered.len(), 1);
504        assert_eq!(pending.len(), 1);
505    }
506
507    #[test]
508    fn delete_message_removes_from_inbox() {
509        let tmp = tempfile::tempdir().unwrap();
510        let root = tmp.path();
511        init_inbox(root, "eng").unwrap();
512
513        let msg = InboxMessage::new_send("mgr", "eng", "task");
514        let id = deliver_to_inbox(root, &msg).unwrap();
515
516        assert_eq!(pending_messages(root, "eng").unwrap().len(), 1);
517        delete_message(root, "eng", &id).unwrap();
518        assert_eq!(pending_messages(root, "eng").unwrap().len(), 0);
519    }
520
521    #[test]
522    fn pending_messages_empty_inbox() {
523        let tmp = tempfile::tempdir().unwrap();
524        let root = tmp.path();
525        init_inbox(root, "empty").unwrap();
526
527        let pending = pending_messages(root, "empty").unwrap();
528        assert!(pending.is_empty());
529    }
530
531    #[test]
532    fn inboxes_root_path() {
533        let root = std::path::Path::new("/tmp/project");
534        assert_eq!(
535            inboxes_root(root),
536            PathBuf::from("/tmp/project/.batty/inboxes")
537        );
538    }
539
540    #[test]
541    fn malformed_json_skipped() {
542        let tmp = tempfile::tempdir().unwrap();
543        let root = tmp.path();
544        init_inbox(root, "bad").unwrap();
545
546        // Write a non-JSON file directly into new/
547        let new_dir = root.join("bad").join("new");
548        std::fs::write(new_dir.join("1234567890.bad.localhost"), "not json").unwrap();
549
550        // Should not panic, just skip the bad entry
551        let pending = pending_messages(root, "bad").unwrap();
552        assert!(pending.is_empty());
553    }
554
555    #[test]
556    fn purge_delivered_messages_before_timestamp_only_removes_older_entries() {
557        let tmp = tempfile::tempdir().unwrap();
558        let root = tmp.path();
559        init_inbox(root, "eng").unwrap();
560
561        let mut old_msg = InboxMessage::new_send("mgr", "eng", "old");
562        old_msg.timestamp = 10;
563        let old_id = deliver_to_inbox(root, &old_msg).unwrap();
564        mark_delivered(root, "eng", &old_id).unwrap();
565
566        let mut new_msg = InboxMessage::new_send("mgr", "eng", "new");
567        new_msg.timestamp = 20;
568        let new_id = deliver_to_inbox(root, &new_msg).unwrap();
569        mark_delivered(root, "eng", &new_id).unwrap();
570
571        let removed = purge_delivered_messages(root, "eng", Some(15), false).unwrap();
572        assert_eq!(removed, 1);
573
574        let remaining = all_messages(root, "eng").unwrap();
575        assert_eq!(remaining.len(), 1);
576        assert_eq!(remaining[0].0.id, new_id);
577        assert!(remaining[0].1);
578    }
579
580    #[test]
581    fn purge_delivered_messages_all_removes_every_cur_entry() {
582        let tmp = tempfile::tempdir().unwrap();
583        let root = tmp.path();
584        init_inbox(root, "eng").unwrap();
585
586        for body in ["one", "two"] {
587            let msg = InboxMessage::new_send("mgr", "eng", body);
588            let id = deliver_to_inbox(root, &msg).unwrap();
589            mark_delivered(root, "eng", &id).unwrap();
590        }
591
592        let removed = purge_delivered_messages(root, "eng", None, true).unwrap();
593        assert_eq!(removed, 2);
594        assert!(all_messages(root, "eng").unwrap().is_empty());
595    }
596
597    #[test]
598    fn purge_delivered_messages_for_all_scans_every_member_inbox() {
599        let tmp = tempfile::tempdir().unwrap();
600        let root = tmp.path();
601        init_inbox(root, "eng-1").unwrap();
602        init_inbox(root, "eng-2").unwrap();
603
604        let msg1 = InboxMessage::new_send("mgr", "eng-1", "first");
605        let id1 = deliver_to_inbox(root, &msg1).unwrap();
606        mark_delivered(root, "eng-1", &id1).unwrap();
607
608        let msg2 = InboxMessage::new_send("mgr", "eng-2", "second");
609        let id2 = deliver_to_inbox(root, &msg2).unwrap();
610        mark_delivered(root, "eng-2", &id2).unwrap();
611
612        let summary = purge_delivered_messages_for_all(root, None, true).unwrap();
613        assert_eq!(
614            summary,
615            InboxPurgeSummary {
616                roles: 2,
617                messages: 2
618            }
619        );
620        assert!(all_messages(root, "eng-1").unwrap().is_empty());
621        assert!(all_messages(root, "eng-2").unwrap().is_empty());
622    }
623
624    fn production_unwrap_expect_count(source: &str) -> usize {
625        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
626            &source[..pos]
627        } else {
628            source
629        };
630        prod.lines()
631            .filter(|line| {
632                let trimmed = line.trim();
633                !trimmed.starts_with("#[cfg(test)]")
634                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
635            })
636            .count()
637    }
638
639    #[test]
640    fn production_inbox_has_no_unwrap_or_expect_calls() {
641        let src = include_str!("inbox.rs");
642        assert_eq!(
643            production_unwrap_expect_count(src),
644            0,
645            "production inbox.rs should avoid unwrap/expect"
646        );
647    }
648}