1use std::fs;
12use std::path::{Path, PathBuf};
13
14use anyhow::{Context, Result};
15use maildir::Maildir;
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct InboxMessage {
21 #[serde(skip)]
23 pub id: String,
24 pub from: String,
26 pub to: String,
28 pub body: String,
30 pub msg_type: MessageType,
32 pub timestamp: u64,
34}
35
36#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
38#[serde(rename_all = "lowercase")]
39pub enum MessageType {
40 Send,
41 Assign,
42}
43
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub struct InboxPurgeSummary {
46 pub roles: usize,
47 pub messages: usize,
48}
49
50impl InboxMessage {
51 pub fn new_send(from: &str, to: &str, body: &str) -> Self {
53 Self {
54 id: String::new(),
55 from: from.to_string(),
56 to: to.to_string(),
57 body: body.to_string(),
58 msg_type: MessageType::Send,
59 timestamp: now_unix(),
60 }
61 }
62
63 pub fn new_assign(from: &str, to: &str, task: &str) -> Self {
65 Self {
66 id: String::new(),
67 from: from.to_string(),
68 to: to.to_string(),
69 body: task.to_string(),
70 msg_type: MessageType::Assign,
71 timestamp: now_unix(),
72 }
73 }
74
75 pub fn to_json_bytes(&self) -> Result<Vec<u8>> {
77 serde_json::to_vec(self).context("failed to serialize inbox message")
78 }
79
80 pub fn from_json_bytes(data: &[u8], id: &str) -> Result<Self> {
82 let mut msg: Self =
83 serde_json::from_slice(data).context("failed to deserialize inbox message")?;
84 msg.id = id.to_string();
85 Ok(msg)
86 }
87}
88
89pub fn inboxes_root(project_root: &Path) -> PathBuf {
91 project_root.join(".batty").join("inboxes")
92}
93
94fn member_maildir(inboxes_root: &Path, member: &str) -> Maildir {
96 Maildir::from(inboxes_root.join(member))
97}
98
99pub fn init_inbox(inboxes_root: &Path, member: &str) -> Result<()> {
101 let md = member_maildir(inboxes_root, member);
102 md.create_dirs()
103 .with_context(|| format!("failed to create inbox dirs for '{member}'"))?;
104 Ok(())
105}
106
107pub fn deliver_to_inbox(inboxes_root: &Path, msg: &InboxMessage) -> Result<String> {
112 let md = member_maildir(inboxes_root, &msg.to);
113 md.create_dirs()
115 .with_context(|| format!("failed to create inbox dirs for '{}'", msg.to))?;
116 let data = msg.to_json_bytes()?;
117 let id = md
118 .store_new(&data)
119 .with_context(|| format!("failed to store message in inbox for '{}'", msg.to))?;
120 Ok(id)
121}
122
123pub fn pending_messages(inboxes_root: &Path, member: &str) -> Result<Vec<InboxMessage>> {
127 let md = member_maildir(inboxes_root, member);
128 let mut messages = Vec::new();
129
130 for entry in md.list_new() {
131 let entry = match entry {
132 Ok(e) => e,
133 Err(e) => {
134 tracing::warn!(member, error = %e, "skipping unreadable inbox entry");
135 continue;
136 }
137 };
138 let id = entry.id().to_string();
139 let data = match std::fs::read(entry.path()) {
140 Ok(d) => d,
141 Err(e) => {
142 tracing::warn!(member, id = %id, error = %e, "failed to read inbox message");
143 continue;
144 }
145 };
146 match InboxMessage::from_json_bytes(&data, &id) {
147 Ok(msg) => messages.push(msg),
148 Err(e) => {
149 tracing::warn!(member, id = %id, error = %e, "skipping malformed inbox message");
150 }
151 }
152 }
153
154 messages.sort_by_key(|m| m.timestamp);
156 Ok(messages)
157}
158
159pub fn pending_message_count(inboxes_root: &Path, member: &str) -> Result<usize> {
161 let new_dir = inboxes_root.join(member).join("new");
162 if !new_dir.is_dir() {
163 return Ok(0);
164 }
165
166 let mut count = 0usize;
167 for entry in std::fs::read_dir(&new_dir)
168 .with_context(|| format!("failed to read {}", new_dir.display()))?
169 {
170 let entry = entry.with_context(|| format!("failed to read {}", new_dir.display()))?;
171 let file_type = entry
172 .file_type()
173 .with_context(|| format!("failed to inspect {}", entry.path().display()))?;
174 if file_type.is_file() {
175 count += 1;
176 }
177 }
178
179 Ok(count)
180}
181
182pub fn mark_delivered(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
184 let md = member_maildir(inboxes_root, member);
185 md.move_new_to_cur(id)
186 .with_context(|| format!("failed to mark message '{id}' as delivered for '{member}'"))?;
187 Ok(())
188}
189
190pub fn all_messages(inboxes_root: &Path, member: &str) -> Result<Vec<(InboxMessage, bool)>> {
192 let md = member_maildir(inboxes_root, member);
193 let mut messages = Vec::new();
194
195 for entry in md.list_new() {
197 let entry = match entry {
198 Ok(e) => e,
199 Err(_) => continue,
200 };
201 let id = entry.id().to_string();
202 let data = match std::fs::read(entry.path()) {
203 Ok(d) => d,
204 Err(_) => continue,
205 };
206 if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
207 messages.push((msg, false)); }
209 }
210
211 for entry in md.list_cur() {
213 let entry = match entry {
214 Ok(e) => e,
215 Err(_) => continue,
216 };
217 let id = entry.id().to_string();
218 let data = match std::fs::read(entry.path()) {
219 Ok(d) => d,
220 Err(_) => continue,
221 };
222 if let Ok(msg) = InboxMessage::from_json_bytes(&data, &id) {
223 messages.push((msg, true)); }
225 }
226
227 messages.sort_by_key(|(m, _)| m.timestamp);
228 Ok(messages)
229}
230
231#[allow(dead_code)] pub fn delete_message(inboxes_root: &Path, member: &str, id: &str) -> Result<()> {
234 let md = member_maildir(inboxes_root, member);
235 md.delete(id)
236 .with_context(|| format!("failed to delete message '{id}' from '{member}' inbox"))?;
237 Ok(())
238}
239
240pub fn purge_delivered_messages(
242 inboxes_root: &Path,
243 member: &str,
244 before: Option<u64>,
245 purge_all: bool,
246) -> Result<usize> {
247 let cur_dir = inboxes_root.join(member).join("cur");
248 if !cur_dir.is_dir() {
249 return Ok(0);
250 }
251
252 let mut removed = 0usize;
253 for entry in
254 fs::read_dir(&cur_dir).with_context(|| format!("failed to read {}", cur_dir.display()))?
255 {
256 let entry = entry.with_context(|| format!("failed to read {}", cur_dir.display()))?;
257 let path = entry.path();
258 let file_type = entry
259 .file_type()
260 .with_context(|| format!("failed to inspect {}", path.display()))?;
261 if !file_type.is_file() {
262 continue;
263 }
264
265 let should_delete = if purge_all {
266 true
267 } else if let Some(cutoff) = before {
268 let data = match fs::read(&path) {
269 Ok(data) => data,
270 Err(_) => continue,
271 };
272 let Some(id) = path.file_name().and_then(|name| name.to_str()) else {
273 continue;
274 };
275 match InboxMessage::from_json_bytes(&data, id) {
276 Ok(message) => message.timestamp < cutoff,
277 Err(_) => false,
278 }
279 } else {
280 false
281 };
282
283 if should_delete {
284 fs::remove_file(&path)
285 .with_context(|| format!("failed to remove {}", path.display()))?;
286 removed += 1;
287 }
288 }
289
290 Ok(removed)
291}
292
293pub fn purge_delivered_messages_for_all(
295 inboxes_root: &Path,
296 before: Option<u64>,
297 purge_all: bool,
298) -> Result<InboxPurgeSummary> {
299 if !inboxes_root.is_dir() {
300 return Ok(InboxPurgeSummary {
301 roles: 0,
302 messages: 0,
303 });
304 }
305
306 let mut roles = 0usize;
307 let mut messages = 0usize;
308 for entry in fs::read_dir(inboxes_root)
309 .with_context(|| format!("failed to read {}", inboxes_root.display()))?
310 {
311 let entry = entry.with_context(|| format!("failed to read {}", inboxes_root.display()))?;
312 let path = entry.path();
313 let file_type = entry
314 .file_type()
315 .with_context(|| format!("failed to inspect {}", path.display()))?;
316 if !file_type.is_dir() {
317 continue;
318 }
319
320 let Some(member) = path.file_name().and_then(|name| name.to_str()) else {
321 continue;
322 };
323 roles += 1;
324 messages += purge_delivered_messages(inboxes_root, member, before, purge_all)?;
325 }
326
327 Ok(InboxPurgeSummary { roles, messages })
328}
329
330fn now_unix() -> u64 {
331 std::time::SystemTime::now()
332 .duration_since(std::time::UNIX_EPOCH)
333 .unwrap_or_default()
334 .as_secs()
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn inbox_message_send_roundtrip() {
343 let msg = InboxMessage::new_send("human", "architect", "hello world");
344 assert_eq!(msg.from, "human");
345 assert_eq!(msg.to, "architect");
346 assert_eq!(msg.body, "hello world");
347 assert_eq!(msg.msg_type, MessageType::Send);
348 assert!(msg.timestamp > 0);
349
350 let bytes = msg.to_json_bytes().unwrap();
351 let parsed = InboxMessage::from_json_bytes(&bytes, "test-id").unwrap();
352 assert_eq!(parsed.id, "test-id");
353 assert_eq!(parsed.from, "human");
354 assert_eq!(parsed.to, "architect");
355 assert_eq!(parsed.body, "hello world");
356 }
357
358 #[test]
359 fn inbox_message_assign_roundtrip() {
360 let msg = InboxMessage::new_assign("black-lead", "eng-1-1", "fix the auth bug");
361 assert_eq!(msg.msg_type, MessageType::Assign);
362 assert_eq!(msg.from, "black-lead");
363 assert_eq!(msg.to, "eng-1-1");
364 assert_eq!(msg.body, "fix the auth bug");
365
366 let bytes = msg.to_json_bytes().unwrap();
367 let parsed = InboxMessage::from_json_bytes(&bytes, "assign-id").unwrap();
368 assert_eq!(parsed.msg_type, MessageType::Assign);
369 }
370
371 #[test]
372 fn init_inbox_creates_dirs() {
373 let tmp = tempfile::tempdir().unwrap();
374 let root = tmp.path();
375 init_inbox(root, "architect").unwrap();
376
377 assert!(root.join("architect").join("new").is_dir());
378 assert!(root.join("architect").join("cur").is_dir());
379 assert!(root.join("architect").join("tmp").is_dir());
380 }
381
382 #[test]
383 fn init_inbox_is_idempotent() {
384 let tmp = tempfile::tempdir().unwrap();
385 let root = tmp.path();
386 init_inbox(root, "architect").unwrap();
387 init_inbox(root, "architect").unwrap(); }
389
390 #[test]
391 fn deliver_and_read_pending() {
392 let tmp = tempfile::tempdir().unwrap();
393 let root = tmp.path();
394 init_inbox(root, "architect").unwrap();
395
396 let msg = InboxMessage::new_send("human", "architect", "hello");
397 let id = deliver_to_inbox(root, &msg).unwrap();
398 assert!(!id.is_empty());
399
400 let pending = pending_messages(root, "architect").unwrap();
401 assert_eq!(pending.len(), 1);
402 assert_eq!(pending[0].from, "human");
403 assert_eq!(pending[0].body, "hello");
404 assert_eq!(pending[0].id, id);
405 }
406
407 #[test]
408 fn deliver_creates_dirs_automatically() {
409 let tmp = tempfile::tempdir().unwrap();
410 let root = tmp.path();
411 let msg = InboxMessage::new_send("human", "manager", "hi");
413 let id = deliver_to_inbox(root, &msg).unwrap();
414 assert!(!id.is_empty());
415
416 let pending = pending_messages(root, "manager").unwrap();
417 assert_eq!(pending.len(), 1);
418 }
419
420 #[test]
421 fn mark_delivered_moves_to_cur() {
422 let tmp = tempfile::tempdir().unwrap();
423 let root = tmp.path();
424 init_inbox(root, "eng-1").unwrap();
425
426 let msg = InboxMessage::new_send("manager", "eng-1", "do this");
427 let id = deliver_to_inbox(root, &msg).unwrap();
428
429 assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 1);
431
432 mark_delivered(root, "eng-1", &id).unwrap();
433
434 assert_eq!(pending_messages(root, "eng-1").unwrap().len(), 0);
436
437 let all = all_messages(root, "eng-1").unwrap();
439 assert_eq!(all.len(), 1);
440 assert!(all[0].1); }
442
443 #[test]
444 fn pending_message_count_tracks_new_messages_only() {
445 let tmp = tempfile::tempdir().unwrap();
446 let root = tmp.path();
447 init_inbox(root, "eng-1").unwrap();
448
449 let msg1 = InboxMessage::new_send("manager", "eng-1", "first");
450 let msg2 = InboxMessage::new_send("manager", "eng-1", "second");
451 let id1 = deliver_to_inbox(root, &msg1).unwrap();
452 deliver_to_inbox(root, &msg2).unwrap();
453
454 assert_eq!(pending_message_count(root, "eng-1").unwrap(), 2);
455
456 mark_delivered(root, "eng-1", &id1).unwrap();
457 assert_eq!(pending_message_count(root, "eng-1").unwrap(), 1);
458 }
459
460 #[test]
461 fn multiple_messages_ordered_by_timestamp() {
462 let tmp = tempfile::tempdir().unwrap();
463 let root = tmp.path();
464 init_inbox(root, "arch").unwrap();
465
466 let mut msg1 = InboxMessage::new_send("human", "arch", "first");
468 msg1.timestamp = 1000;
469 let mut msg2 = InboxMessage::new_send("human", "arch", "second");
470 msg2.timestamp = 2000;
471 let mut msg3 = InboxMessage::new_send("human", "arch", "third");
472 msg3.timestamp = 1500;
473
474 deliver_to_inbox(root, &msg1).unwrap();
475 deliver_to_inbox(root, &msg2).unwrap();
476 deliver_to_inbox(root, &msg3).unwrap();
477
478 let pending = pending_messages(root, "arch").unwrap();
479 assert_eq!(pending.len(), 3);
480 assert_eq!(pending[0].body, "first");
481 assert_eq!(pending[1].body, "third");
482 assert_eq!(pending[2].body, "second");
483 }
484
485 #[test]
486 fn all_messages_combines_new_and_cur() {
487 let tmp = tempfile::tempdir().unwrap();
488 let root = tmp.path();
489 init_inbox(root, "mgr").unwrap();
490
491 let msg1 = InboxMessage::new_send("arch", "mgr", "directive");
492 let id1 = deliver_to_inbox(root, &msg1).unwrap();
493 let msg2 = InboxMessage::new_send("eng-1", "mgr", "done");
494 deliver_to_inbox(root, &msg2).unwrap();
495
496 mark_delivered(root, "mgr", &id1).unwrap();
498
499 let all = all_messages(root, "mgr").unwrap();
500 assert_eq!(all.len(), 2);
501
502 let delivered: Vec<_> = all.iter().filter(|(_, d)| *d).collect();
503 let pending: Vec<_> = all.iter().filter(|(_, d)| !*d).collect();
504 assert_eq!(delivered.len(), 1);
505 assert_eq!(pending.len(), 1);
506 }
507
508 #[test]
509 fn delete_message_removes_from_inbox() {
510 let tmp = tempfile::tempdir().unwrap();
511 let root = tmp.path();
512 init_inbox(root, "eng").unwrap();
513
514 let msg = InboxMessage::new_send("mgr", "eng", "task");
515 let id = deliver_to_inbox(root, &msg).unwrap();
516
517 assert_eq!(pending_messages(root, "eng").unwrap().len(), 1);
518 delete_message(root, "eng", &id).unwrap();
519 assert_eq!(pending_messages(root, "eng").unwrap().len(), 0);
520 }
521
522 #[test]
523 fn pending_messages_empty_inbox() {
524 let tmp = tempfile::tempdir().unwrap();
525 let root = tmp.path();
526 init_inbox(root, "empty").unwrap();
527
528 let pending = pending_messages(root, "empty").unwrap();
529 assert!(pending.is_empty());
530 }
531
532 #[test]
533 fn inboxes_root_path() {
534 let root = std::path::Path::new("/tmp/project");
535 assert_eq!(
536 inboxes_root(root),
537 PathBuf::from("/tmp/project/.batty/inboxes")
538 );
539 }
540
541 #[test]
542 fn malformed_json_skipped() {
543 let tmp = tempfile::tempdir().unwrap();
544 let root = tmp.path();
545 init_inbox(root, "bad").unwrap();
546
547 let new_dir = root.join("bad").join("new");
549 std::fs::write(new_dir.join("1234567890.bad.localhost"), "not json").unwrap();
550
551 let pending = pending_messages(root, "bad").unwrap();
553 assert!(pending.is_empty());
554 }
555
556 #[test]
557 fn purge_delivered_messages_before_timestamp_only_removes_older_entries() {
558 let tmp = tempfile::tempdir().unwrap();
559 let root = tmp.path();
560 init_inbox(root, "eng").unwrap();
561
562 let mut old_msg = InboxMessage::new_send("mgr", "eng", "old");
563 old_msg.timestamp = 10;
564 let old_id = deliver_to_inbox(root, &old_msg).unwrap();
565 mark_delivered(root, "eng", &old_id).unwrap();
566
567 let mut new_msg = InboxMessage::new_send("mgr", "eng", "new");
568 new_msg.timestamp = 20;
569 let new_id = deliver_to_inbox(root, &new_msg).unwrap();
570 mark_delivered(root, "eng", &new_id).unwrap();
571
572 let removed = purge_delivered_messages(root, "eng", Some(15), false).unwrap();
573 assert_eq!(removed, 1);
574
575 let remaining = all_messages(root, "eng").unwrap();
576 assert_eq!(remaining.len(), 1);
577 assert_eq!(remaining[0].0.id, new_id);
578 assert!(remaining[0].1);
579 }
580
581 #[test]
582 fn purge_delivered_messages_all_removes_every_cur_entry() {
583 let tmp = tempfile::tempdir().unwrap();
584 let root = tmp.path();
585 init_inbox(root, "eng").unwrap();
586
587 for body in ["one", "two"] {
588 let msg = InboxMessage::new_send("mgr", "eng", body);
589 let id = deliver_to_inbox(root, &msg).unwrap();
590 mark_delivered(root, "eng", &id).unwrap();
591 }
592
593 let removed = purge_delivered_messages(root, "eng", None, true).unwrap();
594 assert_eq!(removed, 2);
595 assert!(all_messages(root, "eng").unwrap().is_empty());
596 }
597
598 #[test]
599 fn purge_delivered_messages_for_all_scans_every_member_inbox() {
600 let tmp = tempfile::tempdir().unwrap();
601 let root = tmp.path();
602 init_inbox(root, "eng-1").unwrap();
603 init_inbox(root, "eng-2").unwrap();
604
605 let msg1 = InboxMessage::new_send("mgr", "eng-1", "first");
606 let id1 = deliver_to_inbox(root, &msg1).unwrap();
607 mark_delivered(root, "eng-1", &id1).unwrap();
608
609 let msg2 = InboxMessage::new_send("mgr", "eng-2", "second");
610 let id2 = deliver_to_inbox(root, &msg2).unwrap();
611 mark_delivered(root, "eng-2", &id2).unwrap();
612
613 let summary = purge_delivered_messages_for_all(root, None, true).unwrap();
614 assert_eq!(
615 summary,
616 InboxPurgeSummary {
617 roles: 2,
618 messages: 2
619 }
620 );
621 assert!(all_messages(root, "eng-1").unwrap().is_empty());
622 assert!(all_messages(root, "eng-2").unwrap().is_empty());
623 }
624
625 fn production_unwrap_expect_count(source: &str) -> usize {
626 let prod = if let Some(pos) = source.find("\n#[cfg(test)]\nmod tests") {
627 &source[..pos]
628 } else {
629 source
630 };
631 prod.lines()
632 .filter(|line| {
633 let trimmed = line.trim();
634 !trimmed.starts_with("#[cfg(test)]")
635 && (trimmed.contains(".unwrap(") || trimmed.contains(".expect("))
636 })
637 .count()
638 }
639
640 #[test]
641 fn production_inbox_has_no_unwrap_or_expect_calls() {
642 let src = include_str!("inbox.rs");
643 assert_eq!(
644 production_unwrap_expect_count(src),
645 0,
646 "production inbox.rs should avoid unwrap/expect"
647 );
648 }
649}