Skip to main content

batty_cli/team/
inbox.rs

1//! Maildir-based inbox messaging system.
2//!
3//! Each team member gets a Maildir at `.batty/inboxes/<member>/` with
4//! `new/`, `cur/`, `tmp/` subdirectories. Messages are JSON blobs stored
5//! atomically via the `maildir` crate.
6//!
7//! - `new/` — undelivered messages (daemon picks these up)
8//! - `cur/` — delivered messages (moved here after tmux injection)
9//! - `tmp/` — atomic write staging (managed by `maildir` crate)
10
11use std::fs;
12use std::path::{Path, PathBuf};
13
14use anyhow::{Context, Result};
15use maildir::Maildir;
16use serde::{Deserialize, Serialize};
17
18/// A message stored in a member's inbox.
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct InboxMessage {
21    /// Unique message ID (assigned by maildir filename, not serialized in body).
22    #[serde(skip)]
23    pub id: String,
24    /// Sender name (e.g., "human", "architect", "manager-1").
25    pub from: String,
26    /// Recipient name.
27    pub to: String,
28    /// Message body text.
29    pub body: String,
30    /// Message type: "send" or "assign".
31    pub msg_type: MessageType,
32    /// Unix timestamp (seconds since epoch).
33    pub timestamp: u64,
34}
35
36/// Type of inbox message.
37#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum MessageType {
40    Send,
41    Assign,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct InboxPurgeSummary {
46    pub roles: usize,
47    pub messages: usize,
48}
49
50impl InboxMessage {
51    /// Create a new send-type message.
52    pub fn new_send(from: &str, to: &str, body: &str) -> Self {
53        Self {
54            id: String::new(),
55            from: from.to_string(),
56            to: to.to_string(),
57            body: body.to_string(),
58            msg_type: MessageType::Send,
59            timestamp: now_unix(),
60        }
61    }
62
63    /// Create a new assign-type message.
64    pub fn new_assign(from: &str, to: &str, task: &str) -> Self {
65        Self {
66            id: String::new(),
67            from: from.to_string(),
68            to: to.to_string(),
69            body: task.to_string(),
70            msg_type: MessageType::Assign,
71            timestamp: now_unix(),
72        }
73    }
74
75    /// Serialize to JSON bytes for storage.
76    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
77        serde_json::to_vec(self).context("failed to serialize inbox message")
78    }
79
80    /// Deserialize from JSON bytes read from a maildir file.
81    pub fn from_json_bytes(data: &[u8], id: &str) -> Result<Self> {
82        let mut msg: Self =
83            serde_json::from_slice(data).context("failed to deserialize inbox message")?;
84        msg.id = id.to_string();
85        Ok(msg)
86    }
87}
88
89/// Resolve the inboxes root directory: `.batty/inboxes/`.
90pub fn inboxes_root(project_root: &Path) -> PathBuf {
91    project_root.join(".batty").join("inboxes")
92}
93
94/// Get the Maildir for a specific member.
95fn member_maildir(inboxes_root: &Path, member: &str) -> Maildir {
96    Maildir::from(inboxes_root.join(member))
97}
98
99/// Initialize a member's inbox (create `new/`, `cur/`, `tmp/` dirs).
100pub fn init_inbox(inboxes_root: &Path, member: &str) -> Result<()> {
101    let md = member_maildir(inboxes_root, member);
102    md.create_dirs()
103        .with_context(|| format!("failed to create inbox dirs for '{member}'"))?;
104    Ok(())
105}
106
107/// Deliver a message to a member's inbox.
108///
109/// The message is atomically written to `new/` via the maildir crate
110/// (write to `tmp/`, rename to `new/`). Returns the maildir message ID.
111pub fn deliver_to_inbox(inboxes_root: &Path, msg: &InboxMessage) -> Result<String> {
112    let md = member_maildir(inboxes_root, &msg.to);
113    // Ensure dirs exist (idempotent)
114    md.create_dirs()
115        .with_context(|| format!("failed to create inbox dirs for '{}'", msg.to))?;
116    let data = msg.to_json_bytes()?;
117    let id = md
118        .store_new(&data)
119        .with_context(|| format!("failed to store message in inbox for '{}'", msg.to))?;
120    Ok(id)
121}
122
123/// List all pending (undelivered) messages in a member's inbox.
124///
125/// These are messages in `new/` that haven't been delivered to the agent yet.
126pub fn pending_messages(inboxes_root: &Path, member: &str) -> Result<Vec<InboxMessage>> {
127    let md = member_maildir(inboxes_root, member);
128    let mut messages = Vec::new();
129
130    for entry in md.list_new() {
131        let entry = match entry {
132            Ok(e) => e,
133            Err(e) => {
134                tracing::warn!(member, error = %e, "skipping unreadable inbox entry");
135                continue;
136            }
137        };
138        let id = entry.id().to_string();
139        let data = match std::fs::read(entry.path()) {
140            Ok(d) => d,
141            Err(e) => {
142                tracing::warn!(member, id = %id, error = %e, "failed to read inbox message");
143                continue;
144            }
145        };
146        match InboxMessage::from_json_bytes(&data, &id) {
147            Ok(msg) => messages.push(msg),
148            Err(e) => {
149                tracing::warn!(member, id = %id, error = %e, "skipping malformed inbox message");
150            }
151        }
152    }
153
154    // Sort by timestamp (oldest first for FIFO delivery)
155    messages.sort_by_key(|m| m.timestamp);
156    Ok(messages)
157}
158
159/// Count undelivered messages in `new/` for a member.
160pub fn pending_message_count(inboxes_root: &Path, member: &str) -> Result<usize> {
161    let new_dir = inboxes_root.join(member).join("new");
162    if !new_dir.is_dir() {
163        return Ok(0);
164    }
165
166    let mut count = 0usize;
167    for entry in std::fs::read_dir(&new_dir)
168        .with_context(|| format!("failed to read {}", new_dir.display()))?
169    {
170        let entry = entry.with_context(|| format!("failed to read {}", new_dir.display()))?;
171        let file_type = entry
172            .file_type()
173            .with_context(|| format!("failed to inspect {}", entry.path().display()))?;
174        if file_type.is_file() {
175            count += 1;
176        }
177    }
178
179    Ok(count)
180}
181
182/// Mark a message as delivered (move from `new/` to `cur/`).
183pub fn mark_delivered(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
184    let md = member_maildir(inboxes_root, member);
185    md.move_new_to_cur(id)
186        .with_context(|| format!("failed to mark message '{id}' as delivered for '{member}'"))?;
187    Ok(())
188}
189
190/// List all messages (both pending and delivered) for a member.
191pub fn all_messages(inboxes_root: &Path, member: &str) -> Result<Vec<(InboxMessage, bool)>> {
192    let md = member_maildir(inboxes_root, member);
193    let mut messages = Vec::new();
194
195    // new/ = pending (not yet delivered)
196    for entry in md.list_new() {
197        let entry = match entry {
198            Ok(e) => e,
199            Err(_) => continue,
200        };
201        let id = entry.id().to_string();
202        let data = match std::fs::read(entry.path()) {
203            Ok(d) => d,
204            Err(_) => continue,
205        };
206        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
207            messages.push((msg, false)); // false = not delivered
208        }
209    }
210
211    // cur/ = delivered
212    for entry in md.list_cur() {
213        let entry = match entry {
214            Ok(e) => e,
215            Err(_) => continue,
216        };
217        let id = entry.id().to_string();
218        let data = match std::fs::read(entry.path()) {
219            Ok(d) => d,
220            Err(_) => continue,
221        };
222        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
223            messages.push((msg, true)); // true = delivered
224        }
225    }
226
227    messages.sort_by_key(|(m, _)| m.timestamp);
228    Ok(messages)
229}
230
231/// Delete a message from a member's inbox (from either new/ or cur/).
232#[allow(dead_code)] // Exercised in tests; runtime deletion flow is not wired yet.
233pub fn delete_message(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
234    let md = member_maildir(inboxes_root, member);
235    md.delete(id)
236        .with_context(|| format!("failed to delete message '{id}' from '{member}' inbox"))?;
237    Ok(())
238}
239
240/// Purge delivered messages from a member inbox.
241pub fn purge_delivered_messages(
242    inboxes_root: &Path,
243    member: &str,
244    before: Option<u64>,
245    purge_all: bool,
246) -> Result<usize> {
247    let cur_dir = inboxes_root.join(member).join("cur");
248    if !cur_dir.is_dir() {
249        return Ok(0);
250    }
251
252    let mut removed = 0usize;
253    for entry in
254        fs::read_dir(&cur_dir).with_context(|| format!("failed to read {}", cur_dir.display()))?
255    {
256        let entry = entry.with_context(|| format!("failed to read {}", cur_dir.display()))?;
257        let path = entry.path();
258        let file_type = entry
259            .file_type()
260            .with_context(|| format!("failed to inspect {}", path.display()))?;
261        if !file_type.is_file() {
262            continue;
263        }
264
265        let should_delete = if purge_all {
266            true
267        } else if let Some(cutoff) = before {
268            let data = match fs::read(&path) {
269                Ok(data) => data,
270                Err(_) => continue,
271            };
272            let Some(id) = path.file_name().and_then(|name| name.to_str()) else {
273                continue;
274            };
275            match InboxMessage::from_json_bytes(&data, id) {
276                Ok(message) => message.timestamp < cutoff,
277                Err(_) => false,
278            }
279        } else {
280            false
281        };
282
283        if should_delete {
284            fs::remove_file(&path)
285                .with_context(|| format!("failed to remove {}", path.display()))?;
286            removed += 1;
287        }
288    }
289
290    Ok(removed)
291}
292
293/// Purge delivered messages from every member inbox under `.batty/inboxes/`.
294pub fn purge_delivered_messages_for_all(
295    inboxes_root: &Path,
296    before: Option<u64>,
297    purge_all: bool,
298) -> Result<InboxPurgeSummary> {
299    if !inboxes_root.is_dir() {
300        return Ok(InboxPurgeSummary {
301            roles: 0,
302            messages: 0,
303        });
304    }
305
306    let mut roles = 0usize;
307    let mut messages = 0usize;
308    for entry in fs::read_dir(inboxes_root)
309        .with_context(|| format!("failed to read {}", inboxes_root.display()))?
310    {
311        let entry = entry.with_context(|| format!("failed to read {}", inboxes_root.display()))?;
312        let path = entry.path();
313        let file_type = entry
314            .file_type()
315            .with_context(|| format!("failed to inspect {}", path.display()))?;
316        if !file_type.is_dir() {
317            continue;
318        }
319
320        let Some(member) = path.file_name().and_then(|name| name.to_str()) else {
321            continue;
322        };
323        roles += 1;
324        messages += purge_delivered_messages(inboxes_root, member, before, purge_all)?;
325    }
326
327    Ok(InboxPurgeSummary { roles, messages })
328}
329
330fn now_unix() -> u64 {
331    std::time::SystemTime::now()
332        .duration_since(std::time::UNIX_EPOCH)
333        .unwrap_or_default()
334        .as_secs()
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340
341    #[test]
342    fn inbox_message_send_roundtrip() {
343        let msg = InboxMessage::new_send("human", "architect", "hello world");
344        assert_eq!(msg.from, "human");
345        assert_eq!(msg.to, "architect");
346        assert_eq!(msg.body, "hello world");
347        assert_eq!(msg.msg_type, MessageType::Send);
348        assert!(msg.timestamp > 0);
349
350        let bytes = msg.to_json_bytes().unwrap();
351        let parsed = InboxMessage::from_json_bytes(&bytes, "test-id").unwrap();
352        assert_eq!(parsed.id, "test-id");
353        assert_eq!(parsed.from, "human");
354        assert_eq!(parsed.to, "architect");
355        assert_eq!(parsed.body, "hello world");
356    }
357
358    #[test]
359    fn inbox_message_assign_roundtrip() {
360        let msg = InboxMessage::new_assign("black-lead", "eng-1-1", "fix the auth bug");
361        assert_eq!(msg.msg_type, MessageType::Assign);
362        assert_eq!(msg.from, "black-lead");
363        assert_eq!(msg.to, "eng-1-1");
364        assert_eq!(msg.body, "fix the auth bug");
365
366        let bytes = msg.to_json_bytes().unwrap();
367        let parsed = InboxMessage::from_json_bytes(&bytes, "assign-id").unwrap();
368        assert_eq!(parsed.msg_type, MessageType::Assign);
369    }
370
371    #[test]
372    fn init_inbox_creates_dirs() {
373        let tmp = tempfile::tempdir().unwrap();
374        let root = tmp.path();
375        init_inbox(root, "architect").unwrap();
376
377        assert!(root.join("architect").join("new").is_dir());
378        assert!(root.join("architect").join("cur").is_dir());
379        assert!(root.join("architect").join("tmp").is_dir());
380    }
381
382    #[test]
383    fn init_inbox_is_idempotent() {
384        let tmp = tempfile::tempdir().unwrap();
385        let root = tmp.path();
386        init_inbox(root, "architect").unwrap();
387        init_inbox(root, "architect").unwrap(); // should not error
388    }
389
390    #[test]
391    fn deliver_and_read_pending() {
392        let tmp = tempfile::tempdir().unwrap();
393        let root = tmp.path();
394        init_inbox(root, "architect").unwrap();
395
396        let msg = InboxMessage::new_send("human", "architect", "hello");
397        let id = deliver_to_inbox(root, &msg).unwrap();
398        assert!(!id.is_empty());
399
400        let pending = pending_messages(root, "architect").unwrap();
401        assert_eq!(pending.len(), 1);
402        assert_eq!(pending[0].from, "human");
403        assert_eq!(pending[0].body, "hello");
404        assert_eq!(pending[0].id, id);
405    }
406
407    #[test]
408    fn deliver_creates_dirs_automatically() {
409        let tmp = tempfile::tempdir().unwrap();
410        let root = tmp.path();
411        // Don't call init_inbox — deliver_to_inbox should create dirs
412        let msg = InboxMessage::new_send("human", "manager", "hi");
413        let id = deliver_to_inbox(root, &msg).unwrap();
414        assert!(!id.is_empty());
415
416        let pending = pending_messages(root, "manager").unwrap();
417        assert_eq!(pending.len(), 1);
418    }
419
420    #[test]
421    fn mark_delivered_moves_to_cur() {
422        let tmp = tempfile::tempdir().unwrap();
423        let root = tmp.path();
424        init_inbox(root, "eng-1").unwrap();
425
426        let msg = InboxMessage::new_send("manager", "eng-1", "do this");
427        let id = deliver_to_inbox(root, &msg).unwrap();
428
429        // Before: in new/
430        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 1);
431
432        mark_delivered(root, "eng-1", &id).unwrap();
433
434        // After: not in new/ anymore
435        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 0);
436
437        // But visible in all_messages as delivered
438        let all = all_messages(root, "eng-1").unwrap();
439        assert_eq!(all.len(), 1);
440        assert!(all[0].1); // delivered = true
441    }
442
443    #[test]
444    fn pending_message_count_tracks_new_messages_only() {
445        let tmp = tempfile::tempdir().unwrap();
446        let root = tmp.path();
447        init_inbox(root, "eng-1").unwrap();
448
449        let msg1 = InboxMessage::new_send("manager", "eng-1", "first");
450        let msg2 = InboxMessage::new_send("manager", "eng-1", "second");
451        let id1 = deliver_to_inbox(root, &msg1).unwrap();
452        deliver_to_inbox(root, &msg2).unwrap();
453
454        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 2);
455
456        mark_delivered(root, "eng-1", &id1).unwrap();
457        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 1);
458    }
459
460    #[test]
461    fn multiple_messages_ordered_by_timestamp() {
462        let tmp = tempfile::tempdir().unwrap();
463        let root = tmp.path();
464        init_inbox(root, "arch").unwrap();
465
466        // Deliver messages with different timestamps
467        let mut msg1 = InboxMessage::new_send("human", "arch", "first");
468        msg1.timestamp = 1000;
469        let mut msg2 = InboxMessage::new_send("human", "arch", "second");
470        msg2.timestamp = 2000;
471        let mut msg3 = InboxMessage::new_send("human", "arch", "third");
472        msg3.timestamp = 1500;
473
474        deliver_to_inbox(root, &msg1).unwrap();
475        deliver_to_inbox(root, &msg2).unwrap();
476        deliver_to_inbox(root, &msg3).unwrap();
477
478        let pending = pending_messages(root, "arch").unwrap();
479        assert_eq!(pending.len(), 3);
480        assert_eq!(pending[0].body, "first");
481        assert_eq!(pending[1].body, "third");
482        assert_eq!(pending[2].body, "second");
483    }
484
485    #[test]
486    fn all_messages_combines_new_and_cur() {
487        let tmp = tempfile::tempdir().unwrap();
488        let root = tmp.path();
489        init_inbox(root, "mgr").unwrap();
490
491        let msg1 = InboxMessage::new_send("arch", "mgr", "directive");
492        let id1 = deliver_to_inbox(root, &msg1).unwrap();
493        let msg2 = InboxMessage::new_send("eng-1", "mgr", "done");
494        deliver_to_inbox(root, &msg2).unwrap();
495
496        // Deliver first, leave second pending
497        mark_delivered(root, "mgr", &id1).unwrap();
498
499        let all = all_messages(root, "mgr").unwrap();
500        assert_eq!(all.len(), 2);
501
502        let delivered: Vec<_> = all.iter().filter(|(_, d)| *d).collect();
503        let pending: Vec<_> = all.iter().filter(|(_, d)| !*d).collect();
504        assert_eq!(delivered.len(), 1);
505        assert_eq!(pending.len(), 1);
506    }
507
508    #[test]
509    fn delete_message_removes_from_inbox() {
510        let tmp = tempfile::tempdir().unwrap();
511        let root = tmp.path();
512        init_inbox(root, "eng").unwrap();
513
514        let msg = InboxMessage::new_send("mgr", "eng", "task");
515        let id = deliver_to_inbox(root, &msg).unwrap();
516
517        assert_eq!(pending_messages(root, "eng").unwrap().len(), 1);
518        delete_message(root, "eng", &id).unwrap();
519        assert_eq!(pending_messages(root, "eng").unwrap().len(), 0);
520    }
521
522    #[test]
523    fn pending_messages_empty_inbox() {
524        let tmp = tempfile::tempdir().unwrap();
525        let root = tmp.path();
526        init_inbox(root, "empty").unwrap();
527
528        let pending = pending_messages(root, "empty").unwrap();
529        assert!(pending.is_empty());
530    }
531
532    #[test]
533    fn inboxes_root_path() {
534        let root = std::path::Path::new("/tmp/project");
535        assert_eq!(
536            inboxes_root(root),
537            PathBuf::from("/tmp/project/.batty/inboxes")
538        );
539    }
540
541    #[test]
542    fn malformed_json_skipped() {
543        let tmp = tempfile::tempdir().unwrap();
544        let root = tmp.path();
545        init_inbox(root, "bad").unwrap();
546
547        // Write a non-JSON file directly into new/
548        let new_dir = root.join("bad").join("new");
549        std::fs::write(new_dir.join("1234567890.bad.localhost"), "not json").unwrap();
550
551        // Should not panic, just skip the bad entry
552        let pending = pending_messages(root, "bad").unwrap();
553        assert!(pending.is_empty());
554    }
555
556    #[test]
557    fn purge_delivered_messages_before_timestamp_only_removes_older_entries() {
558        let tmp = tempfile::tempdir().unwrap();
559        let root = tmp.path();
560        init_inbox(root, "eng").unwrap();
561
562        let mut old_msg = InboxMessage::new_send("mgr", "eng", "old");
563        old_msg.timestamp = 10;
564        let old_id = deliver_to_inbox(root, &old_msg).unwrap();
565        mark_delivered(root, "eng", &old_id).unwrap();
566
567        let mut new_msg = InboxMessage::new_send("mgr", "eng", "new");
568        new_msg.timestamp = 20;
569        let new_id = deliver_to_inbox(root, &new_msg).unwrap();
570        mark_delivered(root, "eng", &new_id).unwrap();
571
572        let removed = purge_delivered_messages(root, "eng", Some(15), false).unwrap();
573        assert_eq!(removed, 1);
574
575        let remaining = all_messages(root, "eng").unwrap();
576        assert_eq!(remaining.len(), 1);
577        assert_eq!(remaining[0].0.id, new_id);
578        assert!(remaining[0].1);
579    }
580
581    #[test]
582    fn purge_delivered_messages_all_removes_every_cur_entry() {
583        let tmp = tempfile::tempdir().unwrap();
584        let root = tmp.path();
585        init_inbox(root, "eng").unwrap();
586
587        for body in ["one", "two"] {
588            let msg = InboxMessage::new_send("mgr", "eng", body);
589            let id = deliver_to_inbox(root, &msg).unwrap();
590            mark_delivered(root, "eng", &id).unwrap();
591        }
592
593        let removed = purge_delivered_messages(root, "eng", None, true).unwrap();
594        assert_eq!(removed, 2);
595        assert!(all_messages(root, "eng").unwrap().is_empty());
596    }
597
598    #[test]
599    fn purge_delivered_messages_for_all_scans_every_member_inbox() {
600        let tmp = tempfile::tempdir().unwrap();
601        let root = tmp.path();
602        init_inbox(root, "eng-1").unwrap();
603        init_inbox(root, "eng-2").unwrap();
604
605        let msg1 = InboxMessage::new_send("mgr", "eng-1", "first");
606        let id1 = deliver_to_inbox(root, &msg1).unwrap();
607        mark_delivered(root, "eng-1", &id1).unwrap();
608
609        let msg2 = InboxMessage::new_send("mgr", "eng-2", "second");
610        let id2 = deliver_to_inbox(root, &msg2).unwrap();
611        mark_delivered(root, "eng-2", &id2).unwrap();
612
613        let summary = purge_delivered_messages_for_all(root, None, true).unwrap();
614        assert_eq!(
615            summary,
616            InboxPurgeSummary {
617                roles: 2,
618                messages: 2
619            }
620        );
621        assert!(all_messages(root, "eng-1").unwrap().is_empty());
622        assert!(all_messages(root, "eng-2").unwrap().is_empty());
623    }
624
625    fn production_unwrap_expect_count(source: &str) -> usize {
626        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
627            &source[..pos]
628        } else {
629            source
630        };
631        prod.lines()
632            .filter(|line| {
633                let trimmed = line.trim();
634                !trimmed.starts_with("#[cfg(test)]")
635                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
636            })
637            .count()
638    }
639
640    #[test]
641    fn production_inbox_has_no_unwrap_or_expect_calls() {
642        let src = include_str!("inbox.rs");
643        assert_eq!(
644            production_unwrap_expect_count(src),
645            0,
646            "production inbox.rs should avoid unwrap/expect"
647        );
648    }
649}