mako_engine/inbox.rs
1//! Inbox deduplication for inbound messages.
2//!
3//! In AS4-based market communication, messages may be retransmitted by the
4//! sender (AS4 retry logic). Without deduplication, a retry would create a
5//! duplicate process or trigger a duplicate state transition.
6//!
7//! The inbox assigns a stable **idempotency key** to each inbound message
8//! (typically the UNH message reference + sender GLN) and refuses to process
9//! the same key twice.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! use mako_engine::inbox::{InboxStore, InMemoryInboxStore, inbox_key};
15//!
16//! let inbox = InMemoryInboxStore::new();
17//! let key = inbox_key(&sender_party_id, &message_ref)?;;
18//! if !inbox.accept(&key).await? {
19//! return Ok(()); // duplicate — drop silently
20//! }
21//! // process the message ...
22//! ```
23
24#[cfg(any(test, feature = "testing"))]
25use std::{collections::HashSet, sync::Arc};
26#[cfg(any(test, feature = "testing"))]
27use tokio::sync::RwLock;
28
29use crate::error::EngineError;
30
31/// Maximum byte length of the **caller-supplied** inbox deduplication key.
32///
33/// AS4 `MessageId` values are bounded to 255 bytes by the AS4 specification.
34/// Adding the sender GLN (13 digits) and a separator gives ≤ 270 bytes in
35/// practice. 509 bytes provides a generous margin while ensuring the stored
36/// SlateDB key (`ib/{key}` — 3 additional bytes for the `ib/` prefix) never
37/// exceeds 512 bytes.
38pub const MAX_INBOX_KEY_LEN: usize = 509;
39
40// ── InboxStore trait ──────────────────────────────────────────────────────────
41
42/// Async idempotency store for inbound messages.
43///
44/// Implement this trait to plug in persistent deduplication storage (e.g.
45/// PostgreSQL, redb). Use [`InMemoryInboxStore`] for tests and development.
46#[allow(async_fn_in_trait)]
47pub trait InboxStore: Send + Sync {
48 /// Check whether `key` has been seen before and, if not, register it.
49 ///
50 /// Returns `Ok(true)` when `key` is **new** (the message should be
51 /// processed). Returns `Ok(false)` when `key` was already seen (duplicate
52 /// — the caller should drop the message).
53 ///
54 /// Implementations must guarantee atomic check-and-set semantics under
55 /// concurrent callers.
56 ///
57 /// # Errors
58 ///
59 /// Returns [`EngineError::Store`] on storage failure.
60 async fn accept(&self, key: &str) -> Result<bool, EngineError>;
61}
62
63// ── InMemoryInboxStore ────────────────────────────────────────────────────────
64
65/// An in-memory [`InboxStore`] for tests and development.
66///
67/// Backed by a `HashSet` protected by a `RwLock`. Cloning shares the underlying
68/// data via `Arc` — all clones see the same deduplication state.
69///
70/// For production use, replace with a persistent backend (e.g. a PostgreSQL
71/// table or a redb database) to survive process restarts.
72///
73/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
74#[cfg(any(test, feature = "testing"))]
75#[derive(Debug, Default, Clone)]
76pub struct InMemoryInboxStore {
77 seen: Arc<RwLock<HashSet<String>>>,
78}
79
80#[cfg(any(test, feature = "testing"))]
81impl InMemoryInboxStore {
82 /// Create an empty inbox store.
83 #[must_use]
84 pub fn new() -> Self {
85 Self::default()
86 }
87
88 /// Return the number of registered keys.
89 pub async fn len(&self) -> usize {
90 self.seen.read().await.len()
91 }
92
93 /// Return `true` when no keys have been registered yet.
94 pub async fn is_empty(&self) -> bool {
95 self.seen.read().await.is_empty()
96 }
97}
98
99#[cfg(any(test, feature = "testing"))]
100impl InboxStore for InMemoryInboxStore {
101 async fn accept(&self, key: &str) -> Result<bool, EngineError> {
102 if key.len() > MAX_INBOX_KEY_LEN {
103 return Err(EngineError::inbox(format!(
104 "inbox key is {} bytes, exceeds maximum of {MAX_INBOX_KEY_LEN}",
105 key.len()
106 )));
107 }
108 Ok(self.seen.write().await.insert(key.to_owned()))
109 }
110}
111
112// ── InboxKey helpers ──────────────────────────────────────────────────────────
113
114/// Build a canonical inbox key from an EDIFACT message reference and the
115/// sender GLN, returning an error if either component is empty.
116///
117/// The combination `<sender>:<message_ref>` is unique per market participant
118/// per message. Using only the message reference is insufficient because
119/// different senders may use the same reference numbering.
120///
121/// # Errors
122///
123/// Returns an error string when either `sender_party_id` or `message_ref` is empty.
124/// BDEW codes and GLNs are always 13 digits; EIC codes are 16 chars;
125/// UNH message references are always non-empty. An empty component indicates
126/// a parsing error upstream and must not silently pass deduplication checks.
127pub fn inbox_key(sender_party_id: &str, message_ref: &str) -> Result<String, &'static str> {
128 if sender_party_id.is_empty() {
129 return Err("inbox_key: sender_party_id must not be empty");
130 }
131 if message_ref.is_empty() {
132 return Err("inbox_key: message_ref must not be empty");
133 }
134 Ok(format!("{sender_party_id}:{message_ref}"))
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140
141 #[tokio::test]
142 async fn new_message_is_accepted() {
143 let inbox = InMemoryInboxStore::new();
144 assert!(inbox.accept("sender:ref-001").await.unwrap());
145 }
146
147 #[tokio::test]
148 async fn duplicate_message_is_rejected() {
149 let inbox = InMemoryInboxStore::new();
150 assert!(inbox.accept("sender:ref-001").await.unwrap());
151 assert!(
152 !inbox.accept("sender:ref-001").await.unwrap(),
153 "second accept should return false"
154 );
155 }
156
157 #[tokio::test]
158 async fn different_senders_same_ref_are_independent() {
159 let inbox = InMemoryInboxStore::new();
160 assert!(
161 inbox
162 .accept(&inbox_key("sender-A", "ref-001").unwrap())
163 .await
164 .unwrap()
165 );
166 assert!(
167 inbox
168 .accept(&inbox_key("sender-B", "ref-001").unwrap())
169 .await
170 .unwrap()
171 );
172 }
173
174 #[test]
175 fn inbox_key_rejects_empty_party_id() {
176 assert!(inbox_key("", "ref-001").is_err());
177 }
178
179 #[test]
180 fn inbox_key_rejects_empty_ref() {
181 assert!(inbox_key("4012345000023", "").is_err());
182 }
183
184 #[test]
185 fn inbox_key_formats_correctly() {
186 assert_eq!(
187 inbox_key("4012345000023", "MSG-001").unwrap(),
188 "4012345000023:MSG-001",
189 );
190 }
191}