Skip to main content

atomr_patterns/outbox/
mod.rs

1//! Transactional Outbox pattern.
2//!
3//! Tail-follows the [`atomr_persistence_query::ReadJournal`] and
4//! re-emits events into a publish callback, persisting the offset so
5//! restarts don't double-publish. Use this when you have a side-effect
6//! (e.g. publishing to Kafka, hitting a webhook) that must occur
7//! "at-least-once after every successful aggregate write."
8
9mod journal_offset_store;
10
11pub use journal_offset_store::JournalOffsetStore;
12
13use std::collections::HashMap;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use atomr_core::actor::ActorSystem;
21use atomr_persistence_query::ReadJournal;
22use parking_lot::Mutex;
23use tokio::sync::oneshot;
24
25use crate::topology::Topology;
26use crate::PatternError;
27
28/// Public, zero-sized handle to the outbox pattern.
29pub struct OutboxPattern<E>(PhantomData<E>);
30
31impl<E: Clone + Send + 'static> OutboxPattern<E> {
32    pub fn builder() -> OutboxBuilder<E> {
33        OutboxBuilder {
34            name: None,
35            read_journal: None,
36            poll_interval: Duration::from_millis(50),
37            decode: None,
38            publish: None,
39            offset_store: None,
40        }
41    }
42}
43
44type Decoder<E> = Arc<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync>;
45type Publisher<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
46
47/// Pluggable per-pid offset persistence. `load`/`save` return offsets
48/// keyed by persistence_id.
49pub trait OutboxOffsetStore: Send + Sync + 'static {
50    fn load(&self) -> HashMap<String, u64>;
51    fn save(&self, offsets: &HashMap<String, u64>);
52}
53
54/// In-memory offset store — useful for tests. State is kept in a
55/// `Mutex<HashMap>`; survives restarts of the publisher loop, but not
56/// process restarts.
57#[derive(Default)]
58pub struct InMemoryOffsetStore {
59    inner: Arc<Mutex<HashMap<String, u64>>>,
60}
61
62impl InMemoryOffsetStore {
63    pub fn new() -> Self {
64        Self::default()
65    }
66
67    pub fn snapshot(&self) -> HashMap<String, u64> {
68        self.inner.lock().clone()
69    }
70}
71
72impl OutboxOffsetStore for InMemoryOffsetStore {
73    fn load(&self) -> HashMap<String, u64> {
74        self.inner.lock().clone()
75    }
76    fn save(&self, offsets: &HashMap<String, u64>) {
77        let mut guard = self.inner.lock();
78        for (k, v) in offsets {
79            guard.insert(k.clone(), *v);
80        }
81    }
82}
83
84pub struct OutboxBuilder<E> {
85    name: Option<String>,
86    read_journal: Option<Arc<dyn ReadJournal>>,
87    poll_interval: Duration,
88    decode: Option<Decoder<E>>,
89    publish: Option<Publisher<E>>,
90    offset_store: Option<Arc<dyn OutboxOffsetStore>>,
91}
92
93impl<E: Clone + Send + 'static> OutboxBuilder<E> {
94    pub fn name(mut self, n: impl Into<String>) -> Self {
95        self.name = Some(n.into());
96        self
97    }
98
99    pub fn read_journal<R: ReadJournal>(mut self, rj: Arc<R>) -> Self {
100        self.read_journal = Some(rj);
101        self
102    }
103
104    pub fn poll_interval(mut self, d: Duration) -> Self {
105        self.poll_interval = d;
106        self
107    }
108
109    pub fn decode<F>(mut self, f: F) -> Self
110    where
111        F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
112    {
113        self.decode = Some(Arc::new(f));
114        self
115    }
116
117    pub fn publish<F, Fut>(mut self, f: F) -> Self
118    where
119        F: Fn(E) -> Fut + Send + Sync + 'static,
120        Fut: std::future::Future<Output = bool> + Send + 'static,
121    {
122        let f = Arc::new(f);
123        self.publish = Some(Arc::new(move |e| {
124            let f = f.clone();
125            Box::pin(async move { f(e).await })
126        }));
127        self
128    }
129
130    pub fn offset_store<S: OutboxOffsetStore>(mut self, s: Arc<S>) -> Self {
131        self.offset_store = Some(s);
132        self
133    }
134
135    pub fn build(self) -> Result<OutboxTopology<E>, PatternError<()>> {
136        Ok(OutboxTopology {
137            name: self.name.unwrap_or_else(|| "outbox".into()),
138            read_journal: self.read_journal.ok_or(PatternError::NotConfigured("read_journal"))?,
139            poll_interval: self.poll_interval,
140            decode: self.decode.ok_or(PatternError::NotConfigured("decode"))?,
141            publish: self.publish.ok_or(PatternError::NotConfigured("publish"))?,
142            offset_store: self.offset_store.unwrap_or_else(|| Arc::new(InMemoryOffsetStore::new())),
143        })
144    }
145}
146
147pub struct OutboxTopology<E> {
148    #[allow(dead_code)]
149    name: String,
150    read_journal: Arc<dyn ReadJournal>,
151    poll_interval: Duration,
152    decode: Decoder<E>,
153    publish: Publisher<E>,
154    offset_store: Arc<dyn OutboxOffsetStore>,
155}
156
157pub struct OutboxHandles {
158    pub published: Arc<AtomicU64>,
159    stopper: oneshot::Sender<()>,
160}
161
162impl OutboxHandles {
163    /// Stop the publisher loop. Idempotent — second call is a no-op.
164    pub fn stop(self) {
165        let _ = self.stopper.send(());
166    }
167
168    pub fn published(&self) -> u64 {
169        self.published.load(Ordering::Acquire)
170    }
171}
172
173#[async_trait]
174impl<E: Clone + Send + 'static> Topology for OutboxTopology<E> {
175    type Handles = OutboxHandles;
176
177    async fn materialize(self, _system: &ActorSystem) -> Result<OutboxHandles, PatternError<()>> {
178        let OutboxTopology { name, read_journal, poll_interval, decode, publish, offset_store } = self;
179        let published = Arc::new(AtomicU64::new(0));
180        let published_clone = published.clone();
181        let (stop_tx, mut stop_rx) = oneshot::channel();
182        tokio::spawn(async move {
183            let mut pid_offsets = offset_store.load();
184            loop {
185                if stop_rx.try_recv().is_ok() {
186                    break;
187                }
188                let pids = match read_journal.all_persistence_ids().await {
189                    Ok(p) => p,
190                    Err(e) => {
191                        tracing::warn!(outbox = %name, error = ?e, "list pids failed");
192                        tokio::time::sleep(poll_interval).await;
193                        continue;
194                    }
195                };
196                for pid in pids {
197                    let from = pid_offsets.get(&pid).copied().unwrap_or(0).saturating_add(1);
198                    let events = match read_journal.events_by_persistence_id(&pid, from, u64::MAX).await {
199                        Ok(e) => e,
200                        Err(e) => {
201                            tracing::warn!(outbox = %name, pid = %pid, error = ?e, "read failed");
202                            continue;
203                        }
204                    };
205                    for env in events {
206                        match (decode)(&env.payload) {
207                            Ok(event) => {
208                                let ok = (publish)(event).await;
209                                if ok {
210                                    pid_offsets.insert(pid.clone(), env.sequence_nr);
211                                    published_clone.fetch_add(1, Ordering::AcqRel);
212                                } else {
213                                    // Stop advancing; retry next tick.
214                                    break;
215                                }
216                            }
217                            Err(s) => {
218                                tracing::warn!(outbox = %name, error = %s, "decode failed");
219                                pid_offsets.insert(pid.clone(), env.sequence_nr);
220                            }
221                        }
222                    }
223                }
224                offset_store.save(&pid_offsets);
225                tokio::time::sleep(poll_interval).await;
226            }
227        });
228        Ok(OutboxHandles { published, stopper: stop_tx })
229    }
230}