Skip to main content

atomr_patterns/inbox/
mod.rs

1//! Inbox pattern — idempotent receiver for messages arriving from
2//! external sources.
3//!
4//! The mirror image of [`crate::outbox::OutboxPattern`]: outbox
5//! guarantees at-least-once delivery *out* of a system; inbox makes
6//! the *receiving* side idempotent so duplicates are silently
7//! suppressed.
8//!
9//! ```ignore
10//! let inbox = InboxPattern::<OrderEvent>::builder()
11//!     .name("orders-inbox")
12//!     .key(|e: &OrderEvent| e.id().to_string())
13//!     .source(rx)
14//!     .handler(|e: OrderEvent| async move { process(e).await; true })
15//!     .store(Arc::new(InMemoryInboxStore::new()))
16//!     .build()?
17//!     .materialize(&system)
18//!     .await?;
19//! ```
20
21use std::collections::HashSet;
22use std::marker::PhantomData;
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use atomr_core::actor::ActorSystem;
27use parking_lot::Mutex;
28use tokio::sync::mpsc::UnboundedReceiver;
29
30use crate::topology::Topology;
31use crate::PatternError;
32
33/// Persistent record of which idempotency keys have been processed.
34#[async_trait]
35pub trait InboxStore: Send + Sync + 'static {
36    /// True if `key` has already been recorded as processed.
37    async fn was_seen(&self, key: &str) -> bool;
38    /// Record `key` as processed. Idempotent — repeat calls are
39    /// no-ops.
40    async fn mark_seen(&self, key: &str);
41}
42
43/// In-memory reference implementation. Survives runner restarts
44/// within the same process; loses everything on process restart.
45#[derive(Default)]
46pub struct InMemoryInboxStore {
47    inner: Arc<Mutex<HashSet<String>>>,
48}
49
50impl InMemoryInboxStore {
51    pub fn new() -> Self {
52        Self::default()
53    }
54}
55
56#[async_trait]
57impl InboxStore for InMemoryInboxStore {
58    async fn was_seen(&self, key: &str) -> bool {
59        self.inner.lock().contains(key)
60    }
61    async fn mark_seen(&self, key: &str) {
62        self.inner.lock().insert(key.into());
63    }
64}
65
66type KeyFn<E> = Arc<dyn Fn(&E) -> String + Send + Sync + 'static>;
67type Handler<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
68
69/// Public, zero-sized handle to the inbox pattern.
70pub struct InboxPattern<E>(PhantomData<E>);
71
72impl<E: Send + 'static> InboxPattern<E> {
73    pub fn builder() -> InboxBuilder<E> {
74        InboxBuilder { name: None, key: None, source: None, handler: None, store: None }
75    }
76}
77
78pub struct InboxBuilder<E: Send + 'static> {
79    name: Option<String>,
80    key: Option<KeyFn<E>>,
81    source: Option<UnboundedReceiver<E>>,
82    handler: Option<Handler<E>>,
83    store: Option<Arc<dyn InboxStore>>,
84}
85
86impl<E: Send + 'static> InboxBuilder<E> {
87    pub fn name(mut self, n: impl Into<String>) -> Self {
88        self.name = Some(n.into());
89        self
90    }
91
92    /// Closure that derives the idempotency key from a message.
93    pub fn key<F>(mut self, f: F) -> Self
94    where
95        F: Fn(&E) -> String + Send + Sync + 'static,
96    {
97        self.key = Some(Arc::new(f));
98        self
99    }
100
101    /// Inbound message source.
102    pub fn source(mut self, rx: UnboundedReceiver<E>) -> Self {
103        self.source = Some(rx);
104        self
105    }
106
107    /// Handler for not-yet-seen messages. Returning `true` marks the
108    /// message as processed; `false` leaves it unmarked so a
109    /// retry can re-attempt processing.
110    pub fn handler<F, Fut>(mut self, f: F) -> Self
111    where
112        F: Fn(E) -> Fut + Send + Sync + 'static,
113        Fut: std::future::Future<Output = bool> + Send + 'static,
114    {
115        let f = Arc::new(f);
116        self.handler = Some(Arc::new(move |e| {
117            let f = f.clone();
118            Box::pin(async move { f(e).await })
119        }));
120        self
121    }
122
123    pub fn store<S: InboxStore>(mut self, store: Arc<S>) -> Self {
124        self.store = Some(store);
125        self
126    }
127
128    pub fn build(self) -> Result<InboxTopology<E>, PatternError<()>> {
129        Ok(InboxTopology {
130            name: self.name.unwrap_or_else(|| "inbox".into()),
131            key: self.key.ok_or(PatternError::NotConfigured("key"))?,
132            source: self.source.ok_or(PatternError::NotConfigured("source"))?,
133            handler: self.handler.ok_or(PatternError::NotConfigured("handler"))?,
134            store: self.store.unwrap_or_else(|| Arc::new(InMemoryInboxStore::new())),
135        })
136    }
137}
138
139pub struct InboxTopology<E: Send + 'static> {
140    name: String,
141    key: KeyFn<E>,
142    source: UnboundedReceiver<E>,
143    handler: Handler<E>,
144    store: Arc<dyn InboxStore>,
145}
146
147pub struct InboxHandles {
148    pub name: String,
149}
150
151#[async_trait]
152impl<E: Send + 'static> Topology for InboxTopology<E> {
153    type Handles = InboxHandles;
154
155    async fn materialize(self, _system: &ActorSystem) -> Result<InboxHandles, PatternError<()>> {
156        let InboxTopology { name, key, mut source, handler, store } = self;
157        let task_name = name.clone();
158        tokio::spawn(async move {
159            while let Some(msg) = source.recv().await {
160                let k = (key)(&msg);
161                if store.was_seen(&k).await {
162                    tracing::trace!(inbox = %task_name, key = %k, "duplicate suppressed");
163                    continue;
164                }
165                if (handler)(msg).await {
166                    store.mark_seen(&k).await;
167                } else {
168                    tracing::warn!(inbox = %task_name, key = %k, "handler returned false; key not marked");
169                }
170            }
171        });
172        Ok(InboxHandles { name })
173    }
174}