Skip to main content

batty_cli/team/
inbox.rs

1//! Maildir-based inbox messaging system.
2//!
3//! Each team member gets a Maildir at `.batty/inboxes/<member>/` with
4//! `new/`, `cur/`, `tmp/` subdirectories. Messages are JSON blobs stored
5//! atomically via the `maildir` crate.
6//!
7//! - `new/` — undelivered messages (daemon picks these up)
8//! - `cur/` — delivered messages (moved here after tmux injection)
9//! - `tmp/` — atomic write staging (managed by `maildir` crate)
10
11use std::fs;
12use std::hash::{DefaultHasher, Hash, Hasher};
13use std::path::{Path, PathBuf};
14use std::time::Duration;
15
16use anyhow::{Context, Result};
17use maildir::Maildir;
18use serde::{Deserialize, Serialize};
19
20use crate::team::supervisory_notice::{
21    SupervisoryPressure, classify_supervisory_pressure_normalized, extract_task_id, normalized_body,
22};
23
24/// A message stored in a member's inbox.
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct InboxMessage {
27    /// Unique message ID (assigned by maildir filename, not serialized in body).
28    #[serde(skip)]
29    pub id: String,
30    /// Sender name (e.g., "human", "architect", "manager-1").
31    pub from: String,
32    /// Recipient name.
33    pub to: String,
34    /// Message body text.
35    pub body: String,
36    /// Message type: "send" or "assign".
37    pub msg_type: MessageType,
38    /// Unix timestamp (seconds since epoch).
39    pub timestamp: u64,
40}
41
42/// Type of inbox message.
43#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
44#[serde(rename_all = "lowercase")]
45pub enum MessageType {
46    Send,
47    Assign,
48}
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
51pub struct InboxPurgeSummary {
52    pub roles: usize,
53    pub messages: usize,
54}
55
56impl InboxMessage {
57    /// Create a new send-type message.
58    pub fn new_send(from: &str, to: &str, body: &str) -> Self {
59        Self {
60            id: String::new(),
61            from: from.to_string(),
62            to: to.to_string(),
63            body: body.to_string(),
64            msg_type: MessageType::Send,
65            timestamp: now_unix(),
66        }
67    }
68
69    /// Create a new assign-type message.
70    pub fn new_assign(from: &str, to: &str, task: &str) -> Self {
71        Self {
72            id: String::new(),
73            from: from.to_string(),
74            to: to.to_string(),
75            body: task.to_string(),
76            msg_type: MessageType::Assign,
77            timestamp: now_unix(),
78        }
79    }
80
81    /// Serialize to JSON bytes for storage.
82    pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
83        serde_json::to_vec(self).context("failed to serialize inbox message")
84    }
85
86    /// Deserialize from JSON bytes read from a maildir file.
87    pub fn from_json_bytes(data: &[u8], id: &str) -> Result<Self> {
88        let mut msg: Self =
89            serde_json::from_slice(data).context("failed to deserialize inbox message")?;
90        msg.id = id.to_string();
91        Ok(msg)
92    }
93
94    /// Return how long the message has been in the inbox.
95    pub fn age(&self) -> Duration {
96        Duration::from_secs(now_unix().saturating_sub(self.timestamp))
97    }
98}
99
100/// Resolve the inboxes root directory: `.batty/inboxes/`.
101pub fn inboxes_root(project_root: &Path) -> PathBuf {
102    project_root.join(".batty").join("inboxes")
103}
104
105/// Get the Maildir for a specific member.
106fn member_maildir(inboxes_root: &Path, member: &str) -> Maildir {
107    Maildir::from(inboxes_root.join(member))
108}
109
110/// Initialize a member's inbox (create `new/`, `cur/`, `tmp/` dirs).
111pub fn init_inbox(inboxes_root: &Path, member: &str) -> Result<()> {
112    let md = member_maildir(inboxes_root, member);
113    md.create_dirs()
114        .with_context(|| format!("failed to create inbox dirs for '{member}'"))?;
115    Ok(())
116}
117
118/// Deliver a message to a member's inbox.
119///
120/// The message is atomically written to `new/` via the maildir crate
121/// (write to `tmp/`, rename to `new/`). Returns the maildir message ID.
122pub fn deliver_to_inbox(inboxes_root: &Path, msg: &InboxMessage) -> Result<String> {
123    let md = member_maildir(inboxes_root, &msg.to);
124    // Ensure dirs exist (idempotent)
125    md.create_dirs()
126        .with_context(|| format!("failed to create inbox dirs for '{}'", msg.to))?;
127    let data = msg.to_json_bytes()?;
128    let id = md
129        .store_new(&data)
130        .with_context(|| format!("failed to store message in inbox for '{}'", msg.to))?;
131    Ok(id)
132}
133
134/// Read recent messages for a member across both pending and delivered states.
135pub fn read_recent_messages(
136    inboxes_root: &Path,
137    member: &str,
138    max_age: Duration,
139) -> Result<Vec<InboxMessage>> {
140    let cutoff = now_unix().saturating_sub(max_age.as_secs());
141    let mut messages: Vec<InboxMessage> = all_messages(inboxes_root, member)?
142        .into_iter()
143        .map(|(message, _)| message)
144        .filter(|message| message.timestamp >= cutoff)
145        .collect();
146    messages.sort_by_key(|message| message.timestamp);
147    Ok(messages)
148}
149
150/// Produce a stable signature for duplicate detection.
151pub fn message_signature(body: &str) -> u64 {
152    let normalized = body.split_whitespace().collect::<Vec<_>>().join(" ");
153    let preview: String = normalized.chars().take(200).collect();
154    let mut hasher = DefaultHasher::new();
155    preview.hash(&mut hasher);
156    hasher.finish()
157}
158
159/// Return the most recent duplicate message seen within the provided window.
160pub fn find_recent_duplicate(
161    inboxes_root: &Path,
162    member: &str,
163    new_msg: &InboxMessage,
164    max_age: Duration,
165) -> Result<Option<InboxMessage>> {
166    let signature = message_signature(&new_msg.body);
167    let duplicate = read_recent_messages(inboxes_root, member, max_age)?
168        .into_iter()
169        .rev()
170        .find(|existing| {
171            existing.from == new_msg.from
172                && existing.msg_type == new_msg.msg_type
173                && message_signature(&existing.body) == signature
174        });
175    Ok(duplicate)
176}
177
178/// Expire pending messages older than the provided age by marking them delivered.
179pub fn expire_stale_pending_messages(
180    inboxes_root: &Path,
181    member: &str,
182    max_age: Duration,
183) -> Result<Vec<InboxMessage>> {
184    let mut expired = Vec::new();
185    for message in pending_messages(inboxes_root, member)? {
186        if message.age() > max_age {
187            mark_delivered(inboxes_root, member, &message.id)?;
188            expired.push(message);
189        }
190    }
191    Ok(expired)
192}
193
194/// List all pending (undelivered) messages in a member's inbox.
195///
196/// These are messages in `new/` that haven't been delivered to the agent yet.
197pub fn pending_messages(inboxes_root: &Path, member: &str) -> Result<Vec<InboxMessage>> {
198    let md = member_maildir(inboxes_root, member);
199    let mut messages = Vec::new();
200
201    for entry in md.list_new() {
202        let entry = match entry {
203            Ok(e) => e,
204            Err(e) => {
205                tracing::warn!(member, error = %e, "skipping unreadable inbox entry");
206                continue;
207            }
208        };
209        let id = entry.id().to_string();
210        let data = match std::fs::read(entry.path()) {
211            Ok(d) => d,
212            Err(e) => {
213                tracing::warn!(member, id = %id, error = %e, "failed to read inbox message");
214                continue;
215            }
216        };
217        match InboxMessage::from_json_bytes(&data, &id) {
218            Ok(msg) => messages.push(msg),
219            Err(e) => {
220                tracing::warn!(member, id = %id, error = %e, "skipping malformed inbox message");
221            }
222        }
223    }
224
225    // Sort by timestamp (oldest first for FIFO delivery)
226    messages.sort_by_key(|m| m.timestamp);
227    Ok(messages)
228}
229
230/// Count undelivered messages in `new/` for a member.
231pub fn pending_message_count(inboxes_root: &Path, member: &str) -> Result<usize> {
232    let new_dir = inboxes_root.join(member).join("new");
233    if !new_dir.is_dir() {
234        return Ok(0);
235    }
236
237    let mut count = 0usize;
238    for entry in std::fs::read_dir(&new_dir)
239        .with_context(|| format!("failed to read {}", new_dir.display()))?
240    {
241        let entry = entry.with_context(|| format!("failed to read {}", new_dir.display()))?;
242        let file_type = entry
243            .file_type()
244            .with_context(|| format!("failed to inspect {}", entry.path().display()))?;
245        if file_type.is_file() {
246            count += 1;
247        }
248    }
249
250    Ok(count)
251}
252
253/// Mark a message as delivered (move from `new/` to `cur/`).
254pub fn mark_delivered(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
255    let md = member_maildir(inboxes_root, member);
256    md.move_new_to_cur(id)
257        .with_context(|| format!("failed to mark message '{id}' as delivered for '{member}'"))?;
258    Ok(())
259}
260
261/// List all messages (both pending and delivered) for a member.
262pub fn all_messages(inboxes_root: &Path, member: &str) -> Result<Vec<(InboxMessage, bool)>> {
263    let md = member_maildir(inboxes_root, member);
264    let mut messages = Vec::new();
265
266    // new/ = pending (not yet delivered)
267    for entry in md.list_new() {
268        let entry = match entry {
269            Ok(e) => e,
270            Err(_) => continue,
271        };
272        let id = entry.id().to_string();
273        let data = match std::fs::read(entry.path()) {
274            Ok(d) => d,
275            Err(_) => continue,
276        };
277        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
278            messages.push((msg, false)); // false = not delivered
279        }
280    }
281
282    // cur/ = delivered
283    for entry in md.list_cur() {
284        let entry = match entry {
285            Ok(e) => e,
286            Err(_) => continue,
287        };
288        let id = entry.id().to_string();
289        let data = match std::fs::read(entry.path()) {
290            Ok(d) => d,
291            Err(_) => continue,
292        };
293        if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
294            messages.push((msg, true)); // true = delivered
295        }
296    }
297
298    messages.sort_by_key(|(m, _)| m.timestamp);
299    Ok(messages)
300}
301
302/// Delete a message from a member's inbox (from either new/ or cur/).
303pub fn delete_message(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
304    let md = member_maildir(inboxes_root, member);
305    md.delete(id)
306        .with_context(|| format!("failed to delete message '{id}' from '{member}' inbox"))?;
307    Ok(())
308}
309
310/// Purge delivered messages from a member inbox.
311pub fn purge_delivered_messages(
312    inboxes_root: &Path,
313    member: &str,
314    before: Option<u64>,
315    purge_all: bool,
316) -> Result<usize> {
317    let cur_dir = inboxes_root.join(member).join("cur");
318    if !cur_dir.is_dir() {
319        return Ok(0);
320    }
321
322    let mut removed = 0usize;
323    for entry in
324        fs::read_dir(&cur_dir).with_context(|| format!("failed to read {}", cur_dir.display()))?
325    {
326        let entry = entry.with_context(|| format!("failed to read {}", cur_dir.display()))?;
327        let path = entry.path();
328        let file_type = entry
329            .file_type()
330            .with_context(|| format!("failed to inspect {}", path.display()))?;
331        if !file_type.is_file() {
332            continue;
333        }
334
335        let should_delete = if purge_all {
336            true
337        } else if let Some(cutoff) = before {
338            let data = match fs::read(&path) {
339                Ok(data) => data,
340                Err(_) => continue,
341            };
342            let Some(id) = path.file_name().and_then(|name| name.to_str()) else {
343                continue;
344            };
345            match InboxMessage::from_json_bytes(&data, id) {
346                Ok(message) => message.timestamp < cutoff,
347                Err(_) => false,
348            }
349        } else {
350            false
351        };
352
353        if should_delete {
354            fs::remove_file(&path)
355                .with_context(|| format!("failed to remove {}", path.display()))?;
356            removed += 1;
357        }
358    }
359
360    Ok(removed)
361}
362
363/// Purge delivered messages from every member inbox under `.batty/inboxes/`.
364pub fn purge_delivered_messages_for_all(
365    inboxes_root: &Path,
366    before: Option<u64>,
367    purge_all: bool,
368) -> Result<InboxPurgeSummary> {
369    if !inboxes_root.is_dir() {
370        return Ok(InboxPurgeSummary {
371            roles: 0,
372            messages: 0,
373        });
374    }
375
376    let mut roles = 0usize;
377    let mut messages = 0usize;
378    for entry in fs::read_dir(inboxes_root)
379        .with_context(|| format!("failed to read {}", inboxes_root.display()))?
380    {
381        let entry = entry.with_context(|| format!("failed to read {}", inboxes_root.display()))?;
382        let path = entry.path();
383        let file_type = entry
384            .file_type()
385            .with_context(|| format!("failed to inspect {}", path.display()))?;
386        if !file_type.is_dir() {
387            continue;
388        }
389
390        let Some(member) = path.file_name().and_then(|name| name.to_str()) else {
391            continue;
392        };
393        roles += 1;
394        messages += purge_delivered_messages(inboxes_root, member, before, purge_all)?;
395    }
396
397    Ok(InboxPurgeSummary { roles, messages })
398}
399
400fn now_unix() -> u64 {
401    std::time::SystemTime::now()
402        .duration_since(std::time::UNIX_EPOCH)
403        .unwrap_or_default()
404        .as_secs()
405}
406
407// ---------------------------------------------------------------------------
408// Message classification and digest
409// ---------------------------------------------------------------------------
410
411/// Category of an inbox message, used for priority sorting and collapsing.
412#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
413pub enum MessageCategory {
414    /// Escalation — highest priority, always shown individually.
415    Escalation = 0,
416    /// Review request or review-related message.
417    ReviewRequest = 1,
418    /// Blocker report from an engineer.
419    Blocker = 2,
420    /// Status update for a task.
421    Status = 3,
422    /// Idle or review nudge — lowest priority.
423    Nudge = 4,
424}
425
426/// Classify a message body into a category.
427pub fn classify_message(msg: &InboxMessage) -> MessageCategory {
428    let body = normalized_body(&msg.body);
429
430    // Escalation detection
431    if body.contains("escalat")
432        || body.contains("task_escalated")
433        || (body.contains("blocker") && body.contains("escalat"))
434    {
435        return MessageCategory::Escalation;
436    }
437
438    if let Some(pressure) = classify_supervisory_pressure_normalized(&body) {
439        match pressure {
440            SupervisoryPressure::ReviewNudge | SupervisoryPressure::IdleNudge => {
441                return MessageCategory::Nudge;
442            }
443            SupervisoryPressure::ReviewBacklog => return MessageCategory::ReviewRequest,
444            SupervisoryPressure::DispatchRecovery
445            | SupervisoryPressure::UtilizationRecovery
446            | SupervisoryPressure::TriageBacklog => return MessageCategory::Blocker,
447            SupervisoryPressure::RecoveryUpdate => {}
448            SupervisoryPressure::StatusUpdate => return MessageCategory::Status,
449        }
450    }
451
452    // Blocker detection
453    if body.contains("blocked on") || body.contains("blocker:") || body.starts_with("blocked:") {
454        return MessageCategory::Blocker;
455    }
456
457    // Review request detection
458    if body.contains("ready for review")
459        || body.contains("awaiting manual review")
460        || body.contains("requires manual review")
461        || body.contains("review request")
462        || body.contains("ready_for_review")
463        || body.starts_with("review backlog detected:")
464        || body.starts_with("review:")
465    {
466        return MessageCategory::ReviewRequest;
467    }
468
469    // Status update detection
470    if body.contains("status update")
471        || body.contains("progress update")
472        || body.contains("completion packet")
473        || body.starts_with("status:")
474    {
475        return MessageCategory::Status;
476    }
477
478    // Default: treat as status (middle priority)
479    MessageCategory::Status
480}
481
482/// A single entry in the digested inbox view.
483#[derive(Debug, Clone)]
484pub struct DigestEntry {
485    /// The representative message (latest in the group).
486    pub message: InboxMessage,
487    /// Whether the representative message was delivered.
488    pub delivered: bool,
489    /// Category of this entry.
490    pub category: MessageCategory,
491    /// How many raw messages this entry represents (1 = no collapsing).
492    pub collapsed_count: usize,
493}
494
495/// Digest a list of inbox messages: collapse nudges per sender, status
496/// updates per task (keep latest), and priority-sort the result.
497///
498/// Returns `(digest_entries, raw_count)` where `raw_count` is the original
499/// message count before collapsing.
500pub fn digest_messages(messages: &[(InboxMessage, bool)]) -> (Vec<DigestEntry>, usize) {
501    use std::collections::HashMap;
502
503    let raw_count = messages.len();
504    if messages.is_empty() {
505        return (Vec::new(), 0);
506    }
507
508    // Group messages by (category, grouping key).
509    // - Nudges group by sender ("from").
510    // - Status updates group by task ID (if extractable), else by sender.
511    // - Everything else stays individual.
512    let mut groups: HashMap<(MessageCategory, String), Vec<(usize, MessageCategory)>> =
513        HashMap::new();
514
515    let classified: Vec<MessageCategory> = messages
516        .iter()
517        .map(|(msg, _)| classify_message(msg))
518        .collect();
519
520    for (idx, cat) in classified.iter().enumerate() {
521        let (msg, _) = &messages[idx];
522        let key = match cat {
523            MessageCategory::Nudge => {
524                // Group nudges by sender
525                format!("nudge:{}", msg.from)
526            }
527            MessageCategory::Status => {
528                // Group status by task ID if available, else by sender
529                match extract_task_id(&msg.body) {
530                    Some(tid) => format!("status:task#{tid}"),
531                    None => format!("status:from:{}", msg.from),
532                }
533            }
534            // Escalations, review requests, blockers stay individual
535            _ => format!("individual:{idx}"),
536        };
537        groups.entry((*cat, key)).or_default().push((idx, *cat));
538    }
539
540    // Build digest entries: for each group, keep only the latest message.
541    let mut entries: Vec<DigestEntry> = Vec::new();
542    for ((_cat, _key), indices) in &groups {
543        let count = indices.len();
544        // Latest = highest timestamp (messages are sorted by timestamp, so last index)
545        let Some(&(latest_idx, category)) = indices
546            .iter()
547            .max_by_key(|(idx, _)| messages[*idx].0.timestamp)
548        else {
549            continue;
550        };
551        let (msg, delivered) = &messages[latest_idx];
552        entries.push(DigestEntry {
553            message: msg.clone(),
554            delivered: *delivered,
555            category,
556            collapsed_count: count,
557        });
558    }
559
560    // Priority sort: by category (asc = escalation first), then by timestamp (desc = newest first)
561    entries.sort_by(|a, b| {
562        a.category
563            .cmp(&b.category)
564            .then_with(|| b.message.timestamp.cmp(&a.message.timestamp))
565    });
566
567    (entries, raw_count)
568}
569
570/// Extract `#NN` task references from a message body. Used by
571/// [`demote_stale_escalations`] (#612) to determine whether an
572/// escalation body is still actionable against the current board
573/// state.
574pub fn extract_task_ids_from_body(body: &str) -> Vec<u32> {
575    let bytes = body.as_bytes();
576    let mut ids = Vec::new();
577    let mut i = 0;
578    while i < bytes.len() {
579        if bytes[i] == b'#' && i + 1 < bytes.len() && bytes[i + 1].is_ascii_digit() {
580            let start = i + 1;
581            let mut end = start;
582            while end < bytes.len() && bytes[end].is_ascii_digit() {
583                end += 1;
584            }
585            if let Ok(num) = std::str::from_utf8(&bytes[start..end])
586                && let Ok(value) = num.parse::<u32>()
587            {
588                ids.push(value);
589            }
590            i = end;
591        } else {
592            i += 1;
593        }
594    }
595    ids.sort_unstable();
596    ids.dedup();
597    ids
598}
599
600/// Demote digest entries whose referenced tasks are already `done`,
601/// `archived`, or otherwise cleared on the board. Escalations for
602/// cleared tasks are re-categorised as [`MessageCategory::Status`]
603/// so they no longer occupy the top of the inbox digest (#612).
604///
605/// Entries without an extractable task reference are left alone.
606/// Entries whose referenced tasks are still active (todo/in-progress/
607/// review/blocked) are left alone.
608pub fn demote_stale_escalations(
609    entries: Vec<DigestEntry>,
610    board_dir: &std::path::Path,
611) -> Vec<DigestEntry> {
612    let Ok(tasks) = crate::task::load_tasks_from_dir(&board_dir.join("tasks")) else {
613        return entries;
614    };
615    let stale_ids: std::collections::HashSet<u32> = tasks
616        .iter()
617        .filter(|t| matches!(t.status.as_str(), "done" | "archived"))
618        .map(|t| t.id)
619        .collect();
620    if stale_ids.is_empty() {
621        return entries;
622    }
623
624    entries
625        .into_iter()
626        .map(|mut entry| {
627            if !matches!(
628                entry.category,
629                MessageCategory::Escalation | MessageCategory::Blocker
630            ) {
631                return entry;
632            }
633            let referenced = extract_task_ids_from_body(&entry.message.body);
634            if referenced.is_empty() {
635                return entry;
636            }
637            let all_stale = referenced.iter().all(|id| stale_ids.contains(id));
638            if all_stale {
639                entry.category = MessageCategory::Status;
640            }
641            entry
642        })
643        .collect()
644}
645
646#[cfg(test)]
647mod tests {
648    use super::*;
649
650    #[test]
651    fn inbox_message_send_roundtrip() {
652        let msg = InboxMessage::new_send("human", "architect", "hello world");
653        assert_eq!(msg.from, "human");
654        assert_eq!(msg.to, "architect");
655        assert_eq!(msg.body, "hello world");
656        assert_eq!(msg.msg_type, MessageType::Send);
657        assert!(msg.timestamp > 0);
658
659        let bytes = msg.to_json_bytes().unwrap();
660        let parsed = InboxMessage::from_json_bytes(&bytes, "test-id").unwrap();
661        assert_eq!(parsed.id, "test-id");
662        assert_eq!(parsed.from, "human");
663        assert_eq!(parsed.to, "architect");
664        assert_eq!(parsed.body, "hello world");
665    }
666
667    #[test]
668    fn inbox_message_assign_roundtrip() {
669        let msg = InboxMessage::new_assign("black-lead", "eng-1-1", "fix the auth bug");
670        assert_eq!(msg.msg_type, MessageType::Assign);
671        assert_eq!(msg.from, "black-lead");
672        assert_eq!(msg.to, "eng-1-1");
673        assert_eq!(msg.body, "fix the auth bug");
674
675        let bytes = msg.to_json_bytes().unwrap();
676        let parsed = InboxMessage::from_json_bytes(&bytes, "assign-id").unwrap();
677        assert_eq!(parsed.msg_type, MessageType::Assign);
678    }
679
680    #[test]
681    fn inbox_message_age_uses_timestamp() {
682        let mut msg = InboxMessage::new_send("human", "architect", "hello world");
683        msg.timestamp = now_unix().saturating_sub(60);
684
685        assert!(msg.age() >= Duration::from_secs(60));
686    }
687
688    #[test]
689    fn init_inbox_creates_dirs() {
690        let tmp = tempfile::tempdir().unwrap();
691        let root = tmp.path();
692        init_inbox(root, "architect").unwrap();
693
694        assert!(root.join("architect").join("new").is_dir());
695        assert!(root.join("architect").join("cur").is_dir());
696        assert!(root.join("architect").join("tmp").is_dir());
697    }
698
699    #[test]
700    fn init_inbox_is_idempotent() {
701        let tmp = tempfile::tempdir().unwrap();
702        let root = tmp.path();
703        init_inbox(root, "architect").unwrap();
704        init_inbox(root, "architect").unwrap(); // should not error
705    }
706
707    #[test]
708    fn deliver_and_read_pending() {
709        let tmp = tempfile::tempdir().unwrap();
710        let root = tmp.path();
711        init_inbox(root, "architect").unwrap();
712
713        let msg = InboxMessage::new_send("human", "architect", "hello");
714        let id = deliver_to_inbox(root, &msg).unwrap();
715        assert!(!id.is_empty());
716
717        let pending = pending_messages(root, "architect").unwrap();
718        assert_eq!(pending.len(), 1);
719        assert_eq!(pending[0].from, "human");
720        assert_eq!(pending[0].body, "hello");
721        assert_eq!(pending[0].id, id);
722    }
723
724    #[test]
725    fn deliver_creates_dirs_automatically() {
726        let tmp = tempfile::tempdir().unwrap();
727        let root = tmp.path();
728        // Don't call init_inbox — deliver_to_inbox should create dirs
729        let msg = InboxMessage::new_send("human", "manager", "hi");
730        let id = deliver_to_inbox(root, &msg).unwrap();
731        assert!(!id.is_empty());
732
733        let pending = pending_messages(root, "manager").unwrap();
734        assert_eq!(pending.len(), 1);
735    }
736
737    #[test]
738    fn read_recent_messages_filters_old_entries() {
739        let tmp = tempfile::tempdir().unwrap();
740        let root = tmp.path();
741        init_inbox(root, "manager").unwrap();
742
743        let mut old = InboxMessage::new_send("eng-1", "manager", "old");
744        old.timestamp = now_unix().saturating_sub(601);
745        deliver_to_inbox(root, &old).unwrap();
746
747        let mut recent = InboxMessage::new_send("eng-2", "manager", "recent");
748        recent.timestamp = now_unix().saturating_sub(60);
749        let recent_id = deliver_to_inbox(root, &recent).unwrap();
750        mark_delivered(root, "manager", &recent_id).unwrap();
751
752        let messages = read_recent_messages(root, "manager", Duration::from_secs(300)).unwrap();
753        assert_eq!(messages.len(), 1);
754        assert_eq!(messages[0].body, "recent");
755    }
756
757    #[test]
758    fn message_signature_normalizes_whitespace() {
759        let compact = "Task #42 failed after retries";
760        let noisy = "Task   #42\nfailed   after   retries";
761
762        assert_eq!(message_signature(compact), message_signature(noisy));
763    }
764
765    #[test]
766    fn find_recent_duplicate_matches_same_sender_and_body() {
767        let tmp = tempfile::tempdir().unwrap();
768        let root = tmp.path();
769        init_inbox(root, "manager").unwrap();
770
771        let mut existing = InboxMessage::new_send("eng-1", "manager", "status update");
772        existing.timestamp = now_unix().saturating_sub(30);
773        let existing_id = deliver_to_inbox(root, &existing).unwrap();
774        mark_delivered(root, "manager", &existing_id).unwrap();
775
776        let candidate = InboxMessage::new_send("eng-1", "manager", "status   update");
777        let duplicate =
778            find_recent_duplicate(root, "manager", &candidate, Duration::from_secs(300)).unwrap();
779
780        assert!(duplicate.is_some());
781        assert_eq!(duplicate.unwrap().from, "eng-1");
782    }
783
784    #[test]
785    fn find_recent_duplicate_ignores_old_or_different_sender_messages() {
786        let tmp = tempfile::tempdir().unwrap();
787        let root = tmp.path();
788        init_inbox(root, "manager").unwrap();
789
790        let mut old = InboxMessage::new_send("eng-1", "manager", "status update");
791        old.timestamp = now_unix().saturating_sub(601);
792        deliver_to_inbox(root, &old).unwrap();
793
794        let recent_other_sender = InboxMessage::new_send("eng-2", "manager", "status update");
795        deliver_to_inbox(root, &recent_other_sender).unwrap();
796
797        let candidate = InboxMessage::new_send("eng-1", "manager", "status update");
798        let duplicate =
799            find_recent_duplicate(root, "manager", &candidate, Duration::from_secs(300)).unwrap();
800
801        assert!(duplicate.is_none());
802    }
803
804    #[test]
805    fn mark_delivered_moves_to_cur() {
806        let tmp = tempfile::tempdir().unwrap();
807        let root = tmp.path();
808        init_inbox(root, "eng-1").unwrap();
809
810        let msg = InboxMessage::new_send("manager", "eng-1", "do this");
811        let id = deliver_to_inbox(root, &msg).unwrap();
812
813        // Before: in new/
814        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 1);
815
816        mark_delivered(root, "eng-1", &id).unwrap();
817
818        // After: not in new/ anymore
819        assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 0);
820
821        // But visible in all_messages as delivered
822        let all = all_messages(root, "eng-1").unwrap();
823        assert_eq!(all.len(), 1);
824        assert!(all[0].1); // delivered = true
825    }
826
827    #[test]
828    fn pending_message_count_tracks_new_messages_only() {
829        let tmp = tempfile::tempdir().unwrap();
830        let root = tmp.path();
831        init_inbox(root, "eng-1").unwrap();
832
833        let msg1 = InboxMessage::new_send("manager", "eng-1", "first");
834        let msg2 = InboxMessage::new_send("manager", "eng-1", "second");
835        let id1 = deliver_to_inbox(root, &msg1).unwrap();
836        deliver_to_inbox(root, &msg2).unwrap();
837
838        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 2);
839
840        mark_delivered(root, "eng-1", &id1).unwrap();
841        assert_eq!(pending_message_count(root, "eng-1").unwrap(), 1);
842    }
843
844    #[test]
845    fn multiple_messages_ordered_by_timestamp() {
846        let tmp = tempfile::tempdir().unwrap();
847        let root = tmp.path();
848        init_inbox(root, "arch").unwrap();
849
850        // Deliver messages with different timestamps
851        let mut msg1 = InboxMessage::new_send("human", "arch", "first");
852        msg1.timestamp = 1000;
853        let mut msg2 = InboxMessage::new_send("human", "arch", "second");
854        msg2.timestamp = 2000;
855        let mut msg3 = InboxMessage::new_send("human", "arch", "third");
856        msg3.timestamp = 1500;
857
858        deliver_to_inbox(root, &msg1).unwrap();
859        deliver_to_inbox(root, &msg2).unwrap();
860        deliver_to_inbox(root, &msg3).unwrap();
861
862        let pending = pending_messages(root, "arch").unwrap();
863        assert_eq!(pending.len(), 3);
864        assert_eq!(pending[0].body, "first");
865        assert_eq!(pending[1].body, "third");
866        assert_eq!(pending[2].body, "second");
867    }
868
869    #[test]
870    fn all_messages_combines_new_and_cur() {
871        let tmp = tempfile::tempdir().unwrap();
872        let root = tmp.path();
873        init_inbox(root, "mgr").unwrap();
874
875        let msg1 = InboxMessage::new_send("arch", "mgr", "directive");
876        let id1 = deliver_to_inbox(root, &msg1).unwrap();
877        let msg2 = InboxMessage::new_send("eng-1", "mgr", "done");
878        deliver_to_inbox(root, &msg2).unwrap();
879
880        // Deliver first, leave second pending
881        mark_delivered(root, "mgr", &id1).unwrap();
882
883        let all = all_messages(root, "mgr").unwrap();
884        assert_eq!(all.len(), 2);
885
886        let delivered: Vec<_> = all.iter().filter(|(_, d)| *d).collect();
887        let pending: Vec<_> = all.iter().filter(|(_, d)| !*d).collect();
888        assert_eq!(delivered.len(), 1);
889        assert_eq!(pending.len(), 1);
890    }
891
892    #[test]
893    fn delete_message_removes_from_inbox() {
894        let tmp = tempfile::tempdir().unwrap();
895        let root = tmp.path();
896        init_inbox(root, "eng").unwrap();
897
898        let msg = InboxMessage::new_send("mgr", "eng", "task");
899        let id = deliver_to_inbox(root, &msg).unwrap();
900
901        assert_eq!(pending_messages(root, "eng").unwrap().len(), 1);
902        delete_message(root, "eng", &id).unwrap();
903        assert_eq!(pending_messages(root, "eng").unwrap().len(), 0);
904    }
905
906    #[test]
907    fn expire_stale_pending_messages_marks_old_entries_delivered() {
908        let tmp = tempfile::tempdir().unwrap();
909        let root = tmp.path();
910        init_inbox(root, "manager").unwrap();
911
912        let mut old = InboxMessage::new_send("eng-1", "manager", "old");
913        old.timestamp = now_unix().saturating_sub(900);
914        let old_id = deliver_to_inbox(root, &old).unwrap();
915
916        let mut fresh = InboxMessage::new_send("eng-2", "manager", "fresh");
917        fresh.timestamp = now_unix().saturating_sub(30);
918        deliver_to_inbox(root, &fresh).unwrap();
919
920        let expired =
921            expire_stale_pending_messages(root, "manager", Duration::from_secs(600)).unwrap();
922        assert_eq!(expired.len(), 1);
923        assert_eq!(expired[0].id, old_id);
924
925        let pending = pending_messages(root, "manager").unwrap();
926        assert_eq!(pending.len(), 1);
927        assert_eq!(pending[0].body, "fresh");
928
929        let all = all_messages(root, "manager").unwrap();
930        assert_eq!(all.len(), 2);
931        assert!(
932            all.iter()
933                .any(|(message, delivered)| message.body == "old" && *delivered)
934        );
935    }
936
937    #[test]
938    fn pending_messages_empty_inbox() {
939        let tmp = tempfile::tempdir().unwrap();
940        let root = tmp.path();
941        init_inbox(root, "empty").unwrap();
942
943        let pending = pending_messages(root, "empty").unwrap();
944        assert!(pending.is_empty());
945    }
946
947    #[test]
948    fn inboxes_root_path() {
949        let root = std::path::Path::new("/tmp/project");
950        assert_eq!(
951            inboxes_root(root),
952            PathBuf::from("/tmp/project/.batty/inboxes")
953        );
954    }
955
956    #[test]
957    fn malformed_json_skipped() {
958        let tmp = tempfile::tempdir().unwrap();
959        let root = tmp.path();
960        init_inbox(root, "bad").unwrap();
961
962        // Write a non-JSON file directly into new/
963        let new_dir = root.join("bad").join("new");
964        std::fs::write(new_dir.join("1234567890.bad.localhost"), "not json").unwrap();
965
966        // Should not panic, just skip the bad entry
967        let pending = pending_messages(root, "bad").unwrap();
968        assert!(pending.is_empty());
969    }
970
971    #[test]
972    fn purge_delivered_messages_before_timestamp_only_removes_older_entries() {
973        let tmp = tempfile::tempdir().unwrap();
974        let root = tmp.path();
975        init_inbox(root, "eng").unwrap();
976
977        let mut old_msg = InboxMessage::new_send("mgr", "eng", "old");
978        old_msg.timestamp = 10;
979        let old_id = deliver_to_inbox(root, &old_msg).unwrap();
980        mark_delivered(root, "eng", &old_id).unwrap();
981
982        let mut new_msg = InboxMessage::new_send("mgr", "eng", "new");
983        new_msg.timestamp = 20;
984        let new_id = deliver_to_inbox(root, &new_msg).unwrap();
985        mark_delivered(root, "eng", &new_id).unwrap();
986
987        let removed = purge_delivered_messages(root, "eng", Some(15), false).unwrap();
988        assert_eq!(removed, 1);
989
990        let remaining = all_messages(root, "eng").unwrap();
991        assert_eq!(remaining.len(), 1);
992        assert_eq!(remaining[0].0.id, new_id);
993        assert!(remaining[0].1);
994    }
995
996    #[test]
997    fn purge_delivered_messages_all_removes_every_cur_entry() {
998        let tmp = tempfile::tempdir().unwrap();
999        let root = tmp.path();
1000        init_inbox(root, "eng").unwrap();
1001
1002        for body in ["one", "two"] {
1003            let msg = InboxMessage::new_send("mgr", "eng", body);
1004            let id = deliver_to_inbox(root, &msg).unwrap();
1005            mark_delivered(root, "eng", &id).unwrap();
1006        }
1007
1008        let removed = purge_delivered_messages(root, "eng", None, true).unwrap();
1009        assert_eq!(removed, 2);
1010        assert!(all_messages(root, "eng").unwrap().is_empty());
1011    }
1012
1013    #[test]
1014    fn purge_delivered_messages_for_all_scans_every_member_inbox() {
1015        let tmp = tempfile::tempdir().unwrap();
1016        let root = tmp.path();
1017        init_inbox(root, "eng-1").unwrap();
1018        init_inbox(root, "eng-2").unwrap();
1019
1020        let msg1 = InboxMessage::new_send("mgr", "eng-1", "first");
1021        let id1 = deliver_to_inbox(root, &msg1).unwrap();
1022        mark_delivered(root, "eng-1", &id1).unwrap();
1023
1024        let msg2 = InboxMessage::new_send("mgr", "eng-2", "second");
1025        let id2 = deliver_to_inbox(root, &msg2).unwrap();
1026        mark_delivered(root, "eng-2", &id2).unwrap();
1027
1028        let summary = purge_delivered_messages_for_all(root, None, true).unwrap();
1029        assert_eq!(
1030            summary,
1031            InboxPurgeSummary {
1032                roles: 2,
1033                messages: 2
1034            }
1035        );
1036        assert!(all_messages(root, "eng-1").unwrap().is_empty());
1037        assert!(all_messages(root, "eng-2").unwrap().is_empty());
1038    }
1039
1040    // ---- Message classification tests ----
1041
1042    fn make_msg(from: &str, to: &str, body: &str, ts: u64) -> InboxMessage {
1043        let mut msg = InboxMessage::new_send(from, to, body);
1044        msg.timestamp = ts;
1045        msg
1046    }
1047
1048    #[test]
1049    fn classify_idle_nudge() {
1050        let msg = make_msg(
1051            "daemon",
1052            "eng-1",
1053            "Idle nudge: you have been idle past your configured timeout. Move forward.",
1054            100,
1055        );
1056        assert_eq!(classify_message(&msg), MessageCategory::Nudge);
1057    }
1058
1059    #[test]
1060    fn classify_review_nudge() {
1061        let msg = make_msg(
1062            "daemon",
1063            "manager",
1064            "Review nudge: task #42 awaiting review",
1065            100,
1066        );
1067        assert_eq!(classify_message(&msg), MessageCategory::Nudge);
1068    }
1069
1070    #[test]
1071    fn classify_escalation() {
1072        let msg = make_msg(
1073            "eng-1",
1074            "manager",
1075            "Task #42 escalated: build failures",
1076            100,
1077        );
1078        assert_eq!(classify_message(&msg), MessageCategory::Escalation);
1079    }
1080
1081    #[test]
1082    fn classify_blocker() {
1083        let msg = make_msg("eng-1", "manager", "Blocked on #42: missing API key", 100);
1084        assert_eq!(classify_message(&msg), MessageCategory::Blocker);
1085    }
1086
1087    #[test]
1088    fn classify_review_request() {
1089        let msg = make_msg("eng-1", "manager", "Task #42 ready for review", 100);
1090        assert_eq!(classify_message(&msg), MessageCategory::ReviewRequest);
1091    }
1092
1093    #[test]
1094    fn classify_manual_review_notice_as_review_request() {
1095        let msg = make_msg(
1096            "eng-1",
1097            "manager",
1098            "[eng-1] Task #42 passed tests but requires manual review.\nTitle: Inbox routing",
1099            100,
1100        );
1101        assert_eq!(classify_message(&msg), MessageCategory::ReviewRequest);
1102    }
1103
1104    #[test]
1105    fn classify_review_backlog_notice_as_review_request() {
1106        let msg = make_msg(
1107            "daemon",
1108            "manager",
1109            "Review backlog detected: direct-report work is waiting for your review on Task #42.",
1110            100,
1111        );
1112        assert_eq!(classify_message(&msg), MessageCategory::ReviewRequest);
1113    }
1114
1115    #[test]
1116    fn classify_status_update() {
1117        let msg = make_msg(
1118            "eng-1",
1119            "manager",
1120            "Status update on task #42: tests passing",
1121            100,
1122        );
1123        assert_eq!(classify_message(&msg), MessageCategory::Status);
1124    }
1125
1126    #[test]
1127    fn classify_generic_message_as_status() {
1128        let msg = make_msg("eng-1", "manager", "Hello, just checking in", 100);
1129        assert_eq!(classify_message(&msg), MessageCategory::Status);
1130    }
1131
1132    #[test]
1133    fn classify_nudge_with_idle_action_text() {
1134        let msg = make_msg("daemon", "eng-1", "If you are idle, take action NOW", 100);
1135        assert_eq!(classify_message(&msg), MessageCategory::Nudge);
1136    }
1137
1138    // ---- extract_task_id tests ----
1139
1140    #[test]
1141    fn extract_task_id_hash_pattern() {
1142        assert_eq!(extract_task_id("Task #42 is done"), Some("42".to_string()));
1143    }
1144
1145    #[test]
1146    fn extract_task_id_from_json() {
1147        assert_eq!(
1148            extract_task_id(r#"{"task_id": 99, "status": "done"}"#),
1149            Some("99".to_string())
1150        );
1151    }
1152
1153    #[test]
1154    fn extract_task_id_none_when_missing() {
1155        assert_eq!(extract_task_id("no task reference here"), None);
1156    }
1157
1158    // ---- digest_messages tests ----
1159
1160    #[test]
1161    fn digest_empty_messages() {
1162        let (entries, raw) = digest_messages(&[]);
1163        assert!(entries.is_empty());
1164        assert_eq!(raw, 0);
1165    }
1166
1167    #[test]
1168    fn digest_collapses_nudges_per_sender() {
1169        let msgs: Vec<(InboxMessage, bool)> = vec![
1170            (
1171                make_msg("daemon", "eng-1", "Idle nudge: move forward", 100),
1172                true,
1173            ),
1174            (
1175                make_msg("daemon", "eng-1", "Idle nudge: move forward", 200),
1176                true,
1177            ),
1178            (
1179                make_msg("daemon", "eng-1", "Idle nudge: move forward", 300),
1180                true,
1181            ),
1182        ];
1183
1184        let (entries, raw_count) = digest_messages(&msgs);
1185        assert_eq!(raw_count, 3);
1186        assert_eq!(
1187            entries.len(),
1188            1,
1189            "3 nudges from same sender should collapse to 1"
1190        );
1191        assert_eq!(entries[0].collapsed_count, 3);
1192        assert_eq!(entries[0].message.timestamp, 300, "should keep latest");
1193        assert_eq!(entries[0].category, MessageCategory::Nudge);
1194    }
1195
1196    #[test]
1197    fn digest_keeps_nudges_separate_per_sender() {
1198        // Manager inbox with nudges from different sources
1199        let msgs: Vec<(InboxMessage, bool)> = vec![
1200            (
1201                make_msg("daemon", "manager", "Idle nudge: eng-1 is idle", 100),
1202                true,
1203            ),
1204            (
1205                make_msg(
1206                    "architect",
1207                    "manager",
1208                    "Review nudge: task #42 awaiting review",
1209                    200,
1210                ),
1211                true,
1212            ),
1213        ];
1214
1215        let (entries, _) = digest_messages(&msgs);
1216        assert_eq!(
1217            entries.len(),
1218            2,
1219            "nudges from different senders stay separate"
1220        );
1221    }
1222
1223    #[test]
1224    fn digest_collapses_status_updates_per_task() {
1225        let msgs: Vec<(InboxMessage, bool)> = vec![
1226            (
1227                make_msg(
1228                    "eng-1",
1229                    "manager",
1230                    "Status update on task #42: compiling",
1231                    100,
1232                ),
1233                true,
1234            ),
1235            (
1236                make_msg(
1237                    "eng-1",
1238                    "manager",
1239                    "Status update on task #42: tests passing",
1240                    200,
1241                ),
1242                true,
1243            ),
1244            (
1245                make_msg("eng-1", "manager", "Status update on task #42: done", 300),
1246                true,
1247            ),
1248        ];
1249
1250        let (entries, raw_count) = digest_messages(&msgs);
1251        assert_eq!(raw_count, 3);
1252        assert_eq!(
1253            entries.len(),
1254            1,
1255            "3 status updates for same task should collapse"
1256        );
1257        assert_eq!(entries[0].collapsed_count, 3);
1258        assert_eq!(entries[0].message.timestamp, 300, "should keep latest");
1259    }
1260
1261    #[test]
1262    fn digest_keeps_status_separate_per_task() {
1263        let msgs: Vec<(InboxMessage, bool)> = vec![
1264            (
1265                make_msg("eng-1", "manager", "Status update on task #42: done", 100),
1266                true,
1267            ),
1268            (
1269                make_msg(
1270                    "eng-1",
1271                    "manager",
1272                    "Status update on task #99: compiling",
1273                    200,
1274                ),
1275                true,
1276            ),
1277        ];
1278
1279        let (entries, _) = digest_messages(&msgs);
1280        assert_eq!(entries.len(), 2, "status for different tasks stay separate");
1281    }
1282
1283    #[test]
1284    fn extract_task_ids_from_body_pulls_multiple_refs() {
1285        assert_eq!(extract_task_ids_from_body("Task #42 escalated"), vec![42]);
1286        assert_eq!(
1287            extract_task_ids_from_body("Blocker on #42 and #43 cascading into #44"),
1288            vec![42, 43, 44]
1289        );
1290        assert_eq!(
1291            extract_task_ids_from_body("no refs here"),
1292            Vec::<u32>::new()
1293        );
1294        assert_eq!(
1295            extract_task_ids_from_body("#1 #1 #2 — duplicates"),
1296            vec![1, 2]
1297        );
1298    }
1299
1300    /// Regression for #612: escalations whose referenced tasks are
1301    /// already `done` on the board should be demoted from Escalation
1302    /// to Status so they no longer occupy the top of the digest.
1303    #[test]
1304    fn demote_stale_escalations_moves_done_task_escalations_off_top() {
1305        let tmp = tempfile::tempdir().unwrap();
1306        let tasks_dir = tmp.path().join("tasks");
1307        std::fs::create_dir_all(&tasks_dir).unwrap();
1308
1309        // Done task #42 and live task #43
1310        std::fs::write(
1311            tasks_dir.join("042-done.md"),
1312            "---\nid: 42\ntitle: done\nstatus: done\npriority: high\nclass: standard\n---\n",
1313        )
1314        .unwrap();
1315        std::fs::write(
1316            tasks_dir.join("043-live.md"),
1317            "---\nid: 43\ntitle: live\nstatus: in-progress\npriority: high\nclass: standard\n---\n",
1318        )
1319        .unwrap();
1320
1321        let entries = vec![
1322            DigestEntry {
1323                message: make_msg("eng-1", "manager", "Task #42 escalated: old blocker", 100),
1324                delivered: false,
1325                category: MessageCategory::Escalation,
1326                collapsed_count: 1,
1327            },
1328            DigestEntry {
1329                message: make_msg("eng-2", "manager", "Task #43 escalated: real blocker", 200),
1330                delivered: false,
1331                category: MessageCategory::Escalation,
1332                collapsed_count: 1,
1333            },
1334        ];
1335
1336        let filtered = demote_stale_escalations(entries, tmp.path());
1337        assert_eq!(
1338            filtered[0].category,
1339            MessageCategory::Status,
1340            "#42 is done, its escalation should be demoted"
1341        );
1342        assert_eq!(
1343            filtered[1].category,
1344            MessageCategory::Escalation,
1345            "#43 is still live, its escalation should stay at top priority"
1346        );
1347    }
1348
1349    #[test]
1350    fn demote_stale_escalations_preserves_entries_without_task_refs() {
1351        let tmp = tempfile::tempdir().unwrap();
1352        let tasks_dir = tmp.path().join("tasks");
1353        std::fs::create_dir_all(&tasks_dir).unwrap();
1354        std::fs::write(
1355            tasks_dir.join("042-done.md"),
1356            "---\nid: 42\ntitle: done\nstatus: done\npriority: high\nclass: standard\n---\n",
1357        )
1358        .unwrap();
1359
1360        let entries = vec![DigestEntry {
1361            message: make_msg("eng-1", "manager", "Generic escalation with no ID", 100),
1362            delivered: false,
1363            category: MessageCategory::Escalation,
1364            collapsed_count: 1,
1365        }];
1366
1367        let filtered = demote_stale_escalations(entries, tmp.path());
1368        assert_eq!(
1369            filtered[0].category,
1370            MessageCategory::Escalation,
1371            "entries without task refs should not be demoted"
1372        );
1373    }
1374
1375    #[test]
1376    fn digest_never_collapses_escalations() {
1377        let msgs: Vec<(InboxMessage, bool)> = vec![
1378            (
1379                make_msg(
1380                    "eng-1",
1381                    "manager",
1382                    "Task #42 escalated: build failures",
1383                    100,
1384                ),
1385                false,
1386            ),
1387            (
1388                make_msg("eng-2", "manager", "Task #42 escalated: tests broken", 200),
1389                false,
1390            ),
1391        ];
1392
1393        let (entries, _) = digest_messages(&msgs);
1394        assert_eq!(entries.len(), 2, "escalations are never collapsed");
1395        assert_eq!(entries[0].category, MessageCategory::Escalation);
1396        assert_eq!(entries[1].category, MessageCategory::Escalation);
1397    }
1398
1399    #[test]
1400    fn digest_priority_sorts_escalations_first_nudges_last() {
1401        let msgs: Vec<(InboxMessage, bool)> = vec![
1402            (
1403                make_msg("daemon", "manager", "Idle nudge: move forward", 400),
1404                true,
1405            ),
1406            (
1407                make_msg("eng-1", "manager", "Status update on task #42: done", 300),
1408                true,
1409            ),
1410            (
1411                make_msg("eng-1", "manager", "Blocked on #99: missing key", 200),
1412                true,
1413            ),
1414            (
1415                make_msg("eng-2", "manager", "Task #50 escalated: critical", 100),
1416                false,
1417            ),
1418            (
1419                make_msg("eng-1", "manager", "Task #42 ready for review", 350),
1420                true,
1421            ),
1422        ];
1423
1424        let (entries, _) = digest_messages(&msgs);
1425
1426        let categories: Vec<MessageCategory> = entries.iter().map(|e| e.category).collect();
1427        // Verify ordering: Escalation < ReviewRequest < Blocker < Status < Nudge
1428        for i in 1..categories.len() {
1429            assert!(
1430                categories[i - 1] <= categories[i],
1431                "category at {} ({:?}) should come before or equal category at {} ({:?})",
1432                i - 1,
1433                categories[i - 1],
1434                i,
1435                categories[i]
1436            );
1437        }
1438        assert_eq!(categories[0], MessageCategory::Escalation);
1439        assert_eq!(*categories.last().unwrap(), MessageCategory::Nudge);
1440    }
1441
1442    #[test]
1443    fn classify_message_prioritizes_dispatch_and_utilization_alerts() {
1444        let dispatch = InboxMessage::new_send(
1445            "daemon",
1446            "manager",
1447            "Dispatch recovery needed: idle reports and runnable work are waiting.",
1448        );
1449        let utilization = InboxMessage::new_send(
1450            "daemon",
1451            "manager",
1452            "Utilization recovery needed: 2 idle engineer(s), top task #42.",
1453        );
1454
1455        assert_eq!(classify_message(&dispatch), MessageCategory::Blocker);
1456        assert_eq!(classify_message(&utilization), MessageCategory::Blocker);
1457    }
1458
1459    #[test]
1460    fn classify_recovery_update_keeps_existing_status_bucket() {
1461        let recovery = InboxMessage::new_send(
1462            "manager",
1463            "architect",
1464            "Recovery: lane blocked while waiting on upstream review ownership.",
1465        );
1466
1467        assert_eq!(classify_message(&recovery), MessageCategory::Status);
1468    }
1469
1470    #[test]
1471    fn digest_mixed_scenario_achieves_significant_reduction() {
1472        // Simulate a typical noisy session: 5 nudges (same eng), 4 status updates (same task),
1473        // 1 escalation, 1 review request, 1 blocker = 12 raw messages
1474        let mut msgs: Vec<(InboxMessage, bool)> = Vec::new();
1475        for i in 0..5 {
1476            msgs.push((
1477                make_msg("daemon", "eng-1", "Idle nudge: move forward", 100 + i),
1478                true,
1479            ));
1480        }
1481        for i in 0..4 {
1482            msgs.push((
1483                make_msg(
1484                    "eng-1",
1485                    "manager",
1486                    &format!("Status update on task #42: step {i}"),
1487                    200 + i,
1488                ),
1489                true,
1490            ));
1491        }
1492        msgs.push((
1493            make_msg(
1494                "eng-2",
1495                "manager",
1496                "Task #99 escalated: critical failure",
1497                300,
1498            ),
1499            false,
1500        ));
1501        msgs.push((
1502            make_msg("eng-1", "manager", "Task #42 ready for review", 350),
1503            true,
1504        ));
1505        msgs.push((
1506            make_msg("eng-3", "manager", "Blocked on #55: need credentials", 320),
1507            true,
1508        ));
1509
1510        let (entries, raw_count) = digest_messages(&msgs);
1511        assert_eq!(raw_count, 12);
1512        // Expected: 1 escalation + 1 review + 1 blocker + 1 status(collapsed 4) + 1 nudge(collapsed 5) = 5 entries
1513        assert_eq!(entries.len(), 5);
1514        // Reduction: 12 -> 5 = 58% reduction, close to the 60% target
1515        let reduction_pct = ((raw_count - entries.len()) as f64 / raw_count as f64) * 100.0;
1516        assert!(
1517            reduction_pct >= 50.0,
1518            "Expected 50%+ reduction, got {reduction_pct:.0}%"
1519        );
1520    }
1521
1522    #[test]
1523    fn digest_preserves_delivered_status_of_latest() {
1524        let msgs: Vec<(InboxMessage, bool)> = vec![
1525            (make_msg("daemon", "eng-1", "Idle nudge: old", 100), true),
1526            (
1527                make_msg("daemon", "eng-1", "Idle nudge: latest", 200),
1528                false,
1529            ),
1530        ];
1531
1532        let (entries, _) = digest_messages(&msgs);
1533        assert_eq!(entries.len(), 1);
1534        assert!(
1535            !entries[0].delivered,
1536            "should use delivered status of latest message"
1537        );
1538    }
1539
1540    fn production_unwrap_expect_count(source: &str) -> usize {
1541        let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
1542            &source[..pos]
1543        } else {
1544            source
1545        };
1546        prod.lines()
1547            .filter(|line| {
1548                let trimmed = line.trim();
1549                !trimmed.starts_with("#[cfg(test)]")
1550                    && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
1551            })
1552            .count()
1553    }
1554
1555    #[test]
1556    fn production_inbox_has_no_unwrap_or_expect_calls() {
1557        let src = include_str!("inbox.rs");
1558        assert_eq!(
1559            production_unwrap_expect_count(src),
1560            0,
1561            "production inbox.rs should avoid unwrap/expect"
1562        );
1563    }
1564}