1use std::fs;
12use std::path::{Path, PathBuf};
13
14use anyhow::{Context, Result};
15use maildir::Maildir;
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct InboxMessage {
21 #[serde(skip)]
23 pub id: String,
24 pub from: String,
26 pub to: String,
28 pub body: String,
30 pub msg_type: MessageType,
32 pub timestamp: u64,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum MessageType {
40 Send,
41 Assign,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct InboxPurgeSummary {
46 pub roles: usize,
47 pub messages: usize,
48}
49
50impl InboxMessage {
51 pub fn new_send(from: &str, to: &str, body: &str) -> Self {
53 Self {
54 id: String::new(),
55 from: from.to_string(),
56 to: to.to_string(),
57 body: body.to_string(),
58 msg_type: MessageType::Send,
59 timestamp: now_unix(),
60 }
61 }
62
63 pub fn new_assign(from: &str, to: &str, task: &str) -> Self {
65 Self {
66 id: String::new(),
67 from: from.to_string(),
68 to: to.to_string(),
69 body: task.to_string(),
70 msg_type: MessageType::Assign,
71 timestamp: now_unix(),
72 }
73 }
74
75 pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
77 serde_json::to_vec(self).context("failed to serialize inbox message")
78 }
79
80 pub fn from_json_bytes(data: &[u8], id: &str) -> Result<Self> {
82 let mut msg: Self =
83 serde_json::from_slice(data).context("failed to deserialize inbox message")?;
84 msg.id = id.to_string();
85 Ok(msg)
86 }
87}
88
89pub fn inboxes_root(project_root: &Path) -> PathBuf {
91 project_root.join(".batty").join("inboxes")
92}
93
94fn member_maildir(inboxes_root: &Path, member: &str) -> Maildir {
96 Maildir::from(inboxes_root.join(member))
97}
98
99pub fn init_inbox(inboxes_root: &Path, member: &str) -> Result<()> {
101 let md = member_maildir(inboxes_root, member);
102 md.create_dirs()
103 .with_context(|| format!("failed to create inbox dirs for '{member}'"))?;
104 Ok(())
105}
106
107pub fn deliver_to_inbox(inboxes_root: &Path, msg: &InboxMessage) -> Result<String> {
112 let md = member_maildir(inboxes_root, &msg.to);
113 md.create_dirs()
115 .with_context(|| format!("failed to create inbox dirs for '{}'", msg.to))?;
116 let data = msg.to_json_bytes()?;
117 let id = md
118 .store_new(&data)
119 .with_context(|| format!("failed to store message in inbox for '{}'", msg.to))?;
120 Ok(id)
121}
122
123pub fn pending_messages(inboxes_root: &Path, member: &str) -> Result<Vec<InboxMessage>> {
127 let md = member_maildir(inboxes_root, member);
128 let mut messages = Vec::new();
129
130 for entry in md.list_new() {
131 let entry = match entry {
132 Ok(e) => e,
133 Err(e) => {
134 tracing::warn!(member, error = %e, "skipping unreadable inbox entry");
135 continue;
136 }
137 };
138 let id = entry.id().to_string();
139 let data = match std::fs::read(entry.path()) {
140 Ok(d) => d,
141 Err(e) => {
142 tracing::warn!(member, id = %id, error = %e, "failed to read inbox message");
143 continue;
144 }
145 };
146 match InboxMessage::from_json_bytes(&data, &id) {
147 Ok(msg) => messages.push(msg),
148 Err(e) => {
149 tracing::warn!(member, id = %id, error = %e, "skipping malformed inbox message");
150 }
151 }
152 }
153
154 messages.sort_by_key(|m| m.timestamp);
156 Ok(messages)
157}
158
159pub fn pending_message_count(inboxes_root: &Path, member: &str) -> Result<usize> {
161 let new_dir = inboxes_root.join(member).join("new");
162 if !new_dir.is_dir() {
163 return Ok(0);
164 }
165
166 let mut count = 0usize;
167 for entry in std::fs::read_dir(&new_dir)
168 .with_context(|| format!("failed to read {}", new_dir.display()))?
169 {
170 let entry = entry.with_context(|| format!("failed to read {}", new_dir.display()))?;
171 let file_type = entry
172 .file_type()
173 .with_context(|| format!("failed to inspect {}", entry.path().display()))?;
174 if file_type.is_file() {
175 count += 1;
176 }
177 }
178
179 Ok(count)
180}
181
182pub fn mark_delivered(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
184 let md = member_maildir(inboxes_root, member);
185 md.move_new_to_cur(id)
186 .with_context(|| format!("failed to mark message '{id}' as delivered for '{member}'"))?;
187 Ok(())
188}
189
190pub fn all_messages(inboxes_root: &Path, member: &str) -> Result<Vec<(InboxMessage, bool)>> {
192 let md = member_maildir(inboxes_root, member);
193 let mut messages = Vec::new();
194
195 for entry in md.list_new() {
197 let entry = match entry {
198 Ok(e) => e,
199 Err(_) => continue,
200 };
201 let id = entry.id().to_string();
202 let data = match std::fs::read(entry.path()) {
203 Ok(d) => d,
204 Err(_) => continue,
205 };
206 if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
207 messages.push((msg, false)); }
209 }
210
211 for entry in md.list_cur() {
213 let entry = match entry {
214 Ok(e) => e,
215 Err(_) => continue,
216 };
217 let id = entry.id().to_string();
218 let data = match std::fs::read(entry.path()) {
219 Ok(d) => d,
220 Err(_) => continue,
221 };
222 if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
223 messages.push((msg, true)); }
225 }
226
227 messages.sort_by_key(|(m, _)| m.timestamp);
228 Ok(messages)
229}
230
231pub fn delete_message(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
233 let md = member_maildir(inboxes_root, member);
234 md.delete(id)
235 .with_context(|| format!("failed to delete message '{id}' from '{member}' inbox"))?;
236 Ok(())
237}
238
239pub fn purge_delivered_messages(
241 inboxes_root: &Path,
242 member: &str,
243 before: Option<u64>,
244 purge_all: bool,
245) -> Result<usize> {
246 let cur_dir = inboxes_root.join(member).join("cur");
247 if !cur_dir.is_dir() {
248 return Ok(0);
249 }
250
251 let mut removed = 0usize;
252 for entry in
253 fs::read_dir(&cur_dir).with_context(|| format!("failed to read {}", cur_dir.display()))?
254 {
255 let entry = entry.with_context(|| format!("failed to read {}", cur_dir.display()))?;
256 let path = entry.path();
257 let file_type = entry
258 .file_type()
259 .with_context(|| format!("failed to inspect {}", path.display()))?;
260 if !file_type.is_file() {
261 continue;
262 }
263
264 let should_delete = if purge_all {
265 true
266 } else if let Some(cutoff) = before {
267 let data = match fs::read(&path) {
268 Ok(data) => data,
269 Err(_) => continue,
270 };
271 let Some(id) = path.file_name().and_then(|name| name.to_str()) else {
272 continue;
273 };
274 match InboxMessage::from_json_bytes(&data, id) {
275 Ok(message) => message.timestamp < cutoff,
276 Err(_) => false,
277 }
278 } else {
279 false
280 };
281
282 if should_delete {
283 fs::remove_file(&path)
284 .with_context(|| format!("failed to remove {}", path.display()))?;
285 removed += 1;
286 }
287 }
288
289 Ok(removed)
290}
291
292pub fn purge_delivered_messages_for_all(
294 inboxes_root: &Path,
295 before: Option<u64>,
296 purge_all: bool,
297) -> Result<InboxPurgeSummary> {
298 if !inboxes_root.is_dir() {
299 return Ok(InboxPurgeSummary {
300 roles: 0,
301 messages: 0,
302 });
303 }
304
305 let mut roles = 0usize;
306 let mut messages = 0usize;
307 for entry in fs::read_dir(inboxes_root)
308 .with_context(|| format!("failed to read {}", inboxes_root.display()))?
309 {
310 let entry = entry.with_context(|| format!("failed to read {}", inboxes_root.display()))?;
311 let path = entry.path();
312 let file_type = entry
313 .file_type()
314 .with_context(|| format!("failed to inspect {}", path.display()))?;
315 if !file_type.is_dir() {
316 continue;
317 }
318
319 let Some(member) = path.file_name().and_then(|name| name.to_str()) else {
320 continue;
321 };
322 roles += 1;
323 messages += purge_delivered_messages(inboxes_root, member, before, purge_all)?;
324 }
325
326 Ok(InboxPurgeSummary { roles, messages })
327}
328
329fn now_unix() -> u64 {
330 std::time::SystemTime::now()
331 .duration_since(std::time::UNIX_EPOCH)
332 .unwrap_or_default()
333 .as_secs()
334}
335
336#[cfg(test)]
337mod tests {
338 use super::*;
339
340 #[test]
341 fn inbox_message_send_roundtrip() {
342 let msg = InboxMessage::new_send("human", "architect", "hello world");
343 assert_eq!(msg.from, "human");
344 assert_eq!(msg.to, "architect");
345 assert_eq!(msg.body, "hello world");
346 assert_eq!(msg.msg_type, MessageType::Send);
347 assert!(msg.timestamp > 0);
348
349 let bytes = msg.to_json_bytes().unwrap();
350 let parsed = InboxMessage::from_json_bytes(&bytes, "test-id").unwrap();
351 assert_eq!(parsed.id, "test-id");
352 assert_eq!(parsed.from, "human");
353 assert_eq!(parsed.to, "architect");
354 assert_eq!(parsed.body, "hello world");
355 }
356
357 #[test]
358 fn inbox_message_assign_roundtrip() {
359 let msg = InboxMessage::new_assign("black-lead", "eng-1-1", "fix the auth bug");
360 assert_eq!(msg.msg_type, MessageType::Assign);
361 assert_eq!(msg.from, "black-lead");
362 assert_eq!(msg.to, "eng-1-1");
363 assert_eq!(msg.body, "fix the auth bug");
364
365 let bytes = msg.to_json_bytes().unwrap();
366 let parsed = InboxMessage::from_json_bytes(&bytes, "assign-id").unwrap();
367 assert_eq!(parsed.msg_type, MessageType::Assign);
368 }
369
370 #[test]
371 fn init_inbox_creates_dirs() {
372 let tmp = tempfile::tempdir().unwrap();
373 let root = tmp.path();
374 init_inbox(root, "architect").unwrap();
375
376 assert!(root.join("architect").join("new").is_dir());
377 assert!(root.join("architect").join("cur").is_dir());
378 assert!(root.join("architect").join("tmp").is_dir());
379 }
380
381 #[test]
382 fn init_inbox_is_idempotent() {
383 let tmp = tempfile::tempdir().unwrap();
384 let root = tmp.path();
385 init_inbox(root, "architect").unwrap();
386 init_inbox(root, "architect").unwrap(); }
388
389 #[test]
390 fn deliver_and_read_pending() {
391 let tmp = tempfile::tempdir().unwrap();
392 let root = tmp.path();
393 init_inbox(root, "architect").unwrap();
394
395 let msg = InboxMessage::new_send("human", "architect", "hello");
396 let id = deliver_to_inbox(root, &msg).unwrap();
397 assert!(!id.is_empty());
398
399 let pending = pending_messages(root, "architect").unwrap();
400 assert_eq!(pending.len(), 1);
401 assert_eq!(pending[0].from, "human");
402 assert_eq!(pending[0].body, "hello");
403 assert_eq!(pending[0].id, id);
404 }
405
406 #[test]
407 fn deliver_creates_dirs_automatically() {
408 let tmp = tempfile::tempdir().unwrap();
409 let root = tmp.path();
410 let msg = InboxMessage::new_send("human", "manager", "hi");
412 let id = deliver_to_inbox(root, &msg).unwrap();
413 assert!(!id.is_empty());
414
415 let pending = pending_messages(root, "manager").unwrap();
416 assert_eq!(pending.len(), 1);
417 }
418
419 #[test]
420 fn mark_delivered_moves_to_cur() {
421 let tmp = tempfile::tempdir().unwrap();
422 let root = tmp.path();
423 init_inbox(root, "eng-1").unwrap();
424
425 let msg = InboxMessage::new_send("manager", "eng-1", "do this");
426 let id = deliver_to_inbox(root, &msg).unwrap();
427
428 assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 1);
430
431 mark_delivered(root, "eng-1", &id).unwrap();
432
433 assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 0);
435
436 let all = all_messages(root, "eng-1").unwrap();
438 assert_eq!(all.len(), 1);
439 assert!(all[0].1); }
441
442 #[test]
443 fn pending_message_count_tracks_new_messages_only() {
444 let tmp = tempfile::tempdir().unwrap();
445 let root = tmp.path();
446 init_inbox(root, "eng-1").unwrap();
447
448 let msg1 = InboxMessage::new_send("manager", "eng-1", "first");
449 let msg2 = InboxMessage::new_send("manager", "eng-1", "second");
450 let id1 = deliver_to_inbox(root, &msg1).unwrap();
451 deliver_to_inbox(root, &msg2).unwrap();
452
453 assert_eq!(pending_message_count(root, "eng-1").unwrap(), 2);
454
455 mark_delivered(root, "eng-1", &id1).unwrap();
456 assert_eq!(pending_message_count(root, "eng-1").unwrap(), 1);
457 }
458
459 #[test]
460 fn multiple_messages_ordered_by_timestamp() {
461 let tmp = tempfile::tempdir().unwrap();
462 let root = tmp.path();
463 init_inbox(root, "arch").unwrap();
464
465 let mut msg1 = InboxMessage::new_send("human", "arch", "first");
467 msg1.timestamp = 1000;
468 let mut msg2 = InboxMessage::new_send("human", "arch", "second");
469 msg2.timestamp = 2000;
470 let mut msg3 = InboxMessage::new_send("human", "arch", "third");
471 msg3.timestamp = 1500;
472
473 deliver_to_inbox(root, &msg1).unwrap();
474 deliver_to_inbox(root, &msg2).unwrap();
475 deliver_to_inbox(root, &msg3).unwrap();
476
477 let pending = pending_messages(root, "arch").unwrap();
478 assert_eq!(pending.len(), 3);
479 assert_eq!(pending[0].body, "first");
480 assert_eq!(pending[1].body, "third");
481 assert_eq!(pending[2].body, "second");
482 }
483
484 #[test]
485 fn all_messages_combines_new_and_cur() {
486 let tmp = tempfile::tempdir().unwrap();
487 let root = tmp.path();
488 init_inbox(root, "mgr").unwrap();
489
490 let msg1 = InboxMessage::new_send("arch", "mgr", "directive");
491 let id1 = deliver_to_inbox(root, &msg1).unwrap();
492 let msg2 = InboxMessage::new_send("eng-1", "mgr", "done");
493 deliver_to_inbox(root, &msg2).unwrap();
494
495 mark_delivered(root, "mgr", &id1).unwrap();
497
498 let all = all_messages(root, "mgr").unwrap();
499 assert_eq!(all.len(), 2);
500
501 let delivered: Vec<_> = all.iter().filter(|(_, d)| *d).collect();
502 let pending: Vec<_> = all.iter().filter(|(_, d)| !*d).collect();
503 assert_eq!(delivered.len(), 1);
504 assert_eq!(pending.len(), 1);
505 }
506
507 #[test]
508 fn delete_message_removes_from_inbox() {
509 let tmp = tempfile::tempdir().unwrap();
510 let root = tmp.path();
511 init_inbox(root, "eng").unwrap();
512
513 let msg = InboxMessage::new_send("mgr", "eng", "task");
514 let id = deliver_to_inbox(root, &msg).unwrap();
515
516 assert_eq!(pending_messages(root, "eng").unwrap().len(), 1);
517 delete_message(root, "eng", &id).unwrap();
518 assert_eq!(pending_messages(root, "eng").unwrap().len(), 0);
519 }
520
521 #[test]
522 fn pending_messages_empty_inbox() {
523 let tmp = tempfile::tempdir().unwrap();
524 let root = tmp.path();
525 init_inbox(root, "empty").unwrap();
526
527 let pending = pending_messages(root, "empty").unwrap();
528 assert!(pending.is_empty());
529 }
530
531 #[test]
532 fn inboxes_root_path() {
533 let root = std::path::Path::new("/tmp/project");
534 assert_eq!(
535 inboxes_root(root),
536 PathBuf::from("/tmp/project/.batty/inboxes")
537 );
538 }
539
540 #[test]
541 fn malformed_json_skipped() {
542 let tmp = tempfile::tempdir().unwrap();
543 let root = tmp.path();
544 init_inbox(root, "bad").unwrap();
545
546 let new_dir = root.join("bad").join("new");
548 std::fs::write(new_dir.join("1234567890.bad.localhost"), "not json").unwrap();
549
550 let pending = pending_messages(root, "bad").unwrap();
552 assert!(pending.is_empty());
553 }
554
555 #[test]
556 fn purge_delivered_messages_before_timestamp_only_removes_older_entries() {
557 let tmp = tempfile::tempdir().unwrap();
558 let root = tmp.path();
559 init_inbox(root, "eng").unwrap();
560
561 let mut old_msg = InboxMessage::new_send("mgr", "eng", "old");
562 old_msg.timestamp = 10;
563 let old_id = deliver_to_inbox(root, &old_msg).unwrap();
564 mark_delivered(root, "eng", &old_id).unwrap();
565
566 let mut new_msg = InboxMessage::new_send("mgr", "eng", "new");
567 new_msg.timestamp = 20;
568 let new_id = deliver_to_inbox(root, &new_msg).unwrap();
569 mark_delivered(root, "eng", &new_id).unwrap();
570
571 let removed = purge_delivered_messages(root, "eng", Some(15), false).unwrap();
572 assert_eq!(removed, 1);
573
574 let remaining = all_messages(root, "eng").unwrap();
575 assert_eq!(remaining.len(), 1);
576 assert_eq!(remaining[0].0.id, new_id);
577 assert!(remaining[0].1);
578 }
579
580 #[test]
581 fn purge_delivered_messages_all_removes_every_cur_entry() {
582 let tmp = tempfile::tempdir().unwrap();
583 let root = tmp.path();
584 init_inbox(root, "eng").unwrap();
585
586 for body in ["one", "two"] {
587 let msg = InboxMessage::new_send("mgr", "eng", body);
588 let id = deliver_to_inbox(root, &msg).unwrap();
589 mark_delivered(root, "eng", &id).unwrap();
590 }
591
592 let removed = purge_delivered_messages(root, "eng", None, true).unwrap();
593 assert_eq!(removed, 2);
594 assert!(all_messages(root, "eng").unwrap().is_empty());
595 }
596
597 #[test]
598 fn purge_delivered_messages_for_all_scans_every_member_inbox() {
599 let tmp = tempfile::tempdir().unwrap();
600 let root = tmp.path();
601 init_inbox(root, "eng-1").unwrap();
602 init_inbox(root, "eng-2").unwrap();
603
604 let msg1 = InboxMessage::new_send("mgr", "eng-1", "first");
605 let id1 = deliver_to_inbox(root, &msg1).unwrap();
606 mark_delivered(root, "eng-1", &id1).unwrap();
607
608 let msg2 = InboxMessage::new_send("mgr", "eng-2", "second");
609 let id2 = deliver_to_inbox(root, &msg2).unwrap();
610 mark_delivered(root, "eng-2", &id2).unwrap();
611
612 let summary = purge_delivered_messages_for_all(root, None, true).unwrap();
613 assert_eq!(
614 summary,
615 InboxPurgeSummary {
616 roles: 2,
617 messages: 2
618 }
619 );
620 assert!(all_messages(root, "eng-1").unwrap().is_empty());
621 assert!(all_messages(root, "eng-2").unwrap().is_empty());
622 }
623
624 fn production_unwrap_expect_count(source: &str) -> usize {
625 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
626 &source[..pos]
627 } else {
628 source
629 };
630 prod.lines()
631 .filter(|line| {
632 let trimmed = line.trim();
633 !trimmed.starts_with("#[cfg(test)]")
634 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
635 })
636 .count()
637 }
638
639 #[test]
640 fn production_inbox_has_no_unwrap_or_expect_calls() {
641 let src = include_str!("inbox.rs");
642 assert_eq!(
643 production_unwrap_expect_count(src),
644 0,
645 "production inbox.rs should avoid unwrap/expect"
646 );
647 }
648}