Skip to main content

mako_engine/
inbox.rs

1//! Inbox deduplication for inbound messages.
2//!
3//! In AS4-based market communication, messages may be retransmitted by the
4//! sender (AS4 retry logic). Without deduplication, a retry would create a
5//! duplicate process or trigger a duplicate state transition.
6//!
7//! The inbox assigns a stable **idempotency key** to each inbound message
8//! (typically the UNH message reference + sender GLN) and refuses to process
9//! the same key twice.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! use mako_engine::inbox::{InboxStore, InMemoryInboxStore, inbox_key};
15//!
16//! let inbox = InMemoryInboxStore::new();
17//! let key = inbox_key(&sender_party_id, &message_ref)?;;
18//! if !inbox.accept(&key).await? {
19//!     return Ok(()); // duplicate — drop silently
20//! }
21//! // process the message ...
22//! ```
23
24#[cfg(any(test, feature = "testing"))]
25use std::{collections::HashSet, sync::Arc};
26#[cfg(any(test, feature = "testing"))]
27use tokio::sync::RwLock;
28
29use crate::error::EngineError;
30
31/// Maximum byte length of the **caller-supplied** inbox deduplication key.
32///
33/// AS4 `MessageId` values are bounded to 255 bytes by the AS4 specification.
34/// Adding the sender GLN (13 digits) and a separator gives ≤ 270 bytes in
35/// practice. 509 bytes provides a generous margin while ensuring the stored
36/// SlateDB key (`ib/{key}` — 3 additional bytes for the `ib/` prefix) never
37/// exceeds 512 bytes.
38pub const MAX_INBOX_KEY_LEN: usize = 509;
39
40// ── InboxStore trait ──────────────────────────────────────────────────────────
41
42/// Async idempotency store for inbound messages.
43///
44/// Implement this trait to plug in persistent deduplication storage (e.g.
45/// PostgreSQL, redb). Use [`InMemoryInboxStore`] for tests and development.
46#[allow(async_fn_in_trait)]
47pub trait InboxStore: Send + Sync {
48    /// Check whether `key` has been seen before and, if not, register it.
49    ///
50    /// Returns `Ok(true)` when `key` is **new** (the message should be
51    /// processed). Returns `Ok(false)` when `key` was already seen (duplicate
52    /// — the caller should drop the message).
53    ///
54    /// Implementations must guarantee atomic check-and-set semantics under
55    /// concurrent callers.
56    ///
57    /// # Errors
58    ///
59    /// Returns [`EngineError::Store`] on storage failure.
60    async fn accept(&self, key: &str) -> Result<bool, EngineError>;
61}
62
63// ── InMemoryInboxStore ────────────────────────────────────────────────────────
64
65/// An in-memory [`InboxStore`] for tests and development.
66///
67/// Backed by a `HashSet` protected by a `RwLock`. Cloning shares the underlying
68/// data via `Arc` — all clones see the same deduplication state.
69///
70/// For production use, replace with a persistent backend (e.g. a PostgreSQL
71/// table or a redb database) to survive process restarts.
72///
73/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
74#[cfg(any(test, feature = "testing"))]
75#[derive(Debug, Default, Clone)]
76pub struct InMemoryInboxStore {
77    seen: Arc<RwLock<HashSet<String>>>,
78}
79
80#[cfg(any(test, feature = "testing"))]
81impl InMemoryInboxStore {
82    /// Create an empty inbox store.
83    #[must_use]
84    pub fn new() -> Self {
85        Self::default()
86    }
87
88    /// Return the number of registered keys.
89    pub async fn len(&self) -> usize {
90        self.seen.read().await.len()
91    }
92
93    /// Return `true` when no keys have been registered yet.
94    pub async fn is_empty(&self) -> bool {
95        self.seen.read().await.is_empty()
96    }
97}
98
99#[cfg(any(test, feature = "testing"))]
100impl InboxStore for InMemoryInboxStore {
101    async fn accept(&self, key: &str) -> Result<bool, EngineError> {
102        if key.len() > MAX_INBOX_KEY_LEN {
103            return Err(EngineError::inbox(format!(
104                "inbox key is {} bytes, exceeds maximum of {MAX_INBOX_KEY_LEN}",
105                key.len()
106            )));
107        }
108        Ok(self.seen.write().await.insert(key.to_owned()))
109    }
110}
111
112// ── InboxKey helpers ──────────────────────────────────────────────────────────
113
114/// Build a canonical inbox key from an EDIFACT message reference and the
115/// sender GLN, returning an error if either component is empty.
116///
117/// The combination `<sender>:<message_ref>` is unique per market participant
118/// per message. Using only the message reference is insufficient because
119/// different senders may use the same reference numbering.
120///
121/// # Errors
122///
123/// Returns an error string when either `sender_party_id` or `message_ref` is empty.
124/// BDEW codes and GLNs are always 13 digits; EIC codes are 16 chars;
125/// UNH message references are always non-empty. An empty component indicates
126/// a parsing error upstream and must not silently pass deduplication checks.
127pub fn inbox_key(sender_party_id: &str, message_ref: &str) -> Result<String, &'static str> {
128    if sender_party_id.is_empty() {
129        return Err("inbox_key: sender_party_id must not be empty");
130    }
131    if message_ref.is_empty() {
132        return Err("inbox_key: message_ref must not be empty");
133    }
134    Ok(format!("{sender_party_id}:{message_ref}"))
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140
141    #[tokio::test]
142    async fn new_message_is_accepted() {
143        let inbox = InMemoryInboxStore::new();
144        assert!(inbox.accept("sender:ref-001").await.unwrap());
145    }
146
147    #[tokio::test]
148    async fn duplicate_message_is_rejected() {
149        let inbox = InMemoryInboxStore::new();
150        assert!(inbox.accept("sender:ref-001").await.unwrap());
151        assert!(
152            !inbox.accept("sender:ref-001").await.unwrap(),
153            "second accept should return false"
154        );
155    }
156
157    #[tokio::test]
158    async fn different_senders_same_ref_are_independent() {
159        let inbox = InMemoryInboxStore::new();
160        assert!(
161            inbox
162                .accept(&inbox_key("sender-A", "ref-001").unwrap())
163                .await
164                .unwrap()
165        );
166        assert!(
167            inbox
168                .accept(&inbox_key("sender-B", "ref-001").unwrap())
169                .await
170                .unwrap()
171        );
172    }
173
174    #[test]
175    fn inbox_key_rejects_empty_party_id() {
176        assert!(inbox_key("", "ref-001").is_err());
177    }
178
179    #[test]
180    fn inbox_key_rejects_empty_ref() {
181        assert!(inbox_key("4012345000023", "").is_err());
182    }
183
184    #[test]
185    fn inbox_key_formats_correctly() {
186        assert_eq!(
187            inbox_key("4012345000023", "MSG-001").unwrap(),
188            "4012345000023:MSG-001",
189        );
190    }
191}