atomr_patterns/outbox/
mod.rs1mod journal_offset_store;
10
11pub use journal_offset_store::JournalOffsetStore;
12
13use std::collections::HashMap;
14use std::marker::PhantomData;
15use std::sync::atomic::{AtomicU64, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use async_trait::async_trait;
20use atomr_core::actor::ActorSystem;
21use atomr_persistence_query::ReadJournal;
22use parking_lot::Mutex;
23use tokio::sync::oneshot;
24
25use crate::topology::Topology;
26use crate::PatternError;
27
28pub struct OutboxPattern<E>(PhantomData<E>);
30
31impl<E: Clone + Send + 'static> OutboxPattern<E> {
32 pub fn builder() -> OutboxBuilder<E> {
33 OutboxBuilder {
34 name: None,
35 read_journal: None,
36 poll_interval: Duration::from_millis(50),
37 decode: None,
38 publish: None,
39 offset_store: None,
40 }
41 }
42}
43
44type Decoder<E> = Arc<dyn Fn(&[u8]) -> Result<E, String> + Send + Sync>;
45type Publisher<E> = Arc<dyn Fn(E) -> futures::future::BoxFuture<'static, bool> + Send + Sync>;
46
47pub trait OutboxOffsetStore: Send + Sync + 'static {
50 fn load(&self) -> HashMap<String, u64>;
51 fn save(&self, offsets: &HashMap<String, u64>);
52}
53
54#[derive(Default)]
58pub struct InMemoryOffsetStore {
59 inner: Arc<Mutex<HashMap<String, u64>>>,
60}
61
62impl InMemoryOffsetStore {
63 pub fn new() -> Self {
64 Self::default()
65 }
66
67 pub fn snapshot(&self) -> HashMap<String, u64> {
68 self.inner.lock().clone()
69 }
70}
71
72impl OutboxOffsetStore for InMemoryOffsetStore {
73 fn load(&self) -> HashMap<String, u64> {
74 self.inner.lock().clone()
75 }
76 fn save(&self, offsets: &HashMap<String, u64>) {
77 let mut guard = self.inner.lock();
78 for (k, v) in offsets {
79 guard.insert(k.clone(), *v);
80 }
81 }
82}
83
84pub struct OutboxBuilder<E> {
85 name: Option<String>,
86 read_journal: Option<Arc<dyn ReadJournal>>,
87 poll_interval: Duration,
88 decode: Option<Decoder<E>>,
89 publish: Option<Publisher<E>>,
90 offset_store: Option<Arc<dyn OutboxOffsetStore>>,
91}
92
93impl<E: Clone + Send + 'static> OutboxBuilder<E> {
94 pub fn name(mut self, n: impl Into<String>) -> Self {
95 self.name = Some(n.into());
96 self
97 }
98
99 pub fn read_journal<R: ReadJournal>(mut self, rj: Arc<R>) -> Self {
100 self.read_journal = Some(rj);
101 self
102 }
103
104 pub fn poll_interval(mut self, d: Duration) -> Self {
105 self.poll_interval = d;
106 self
107 }
108
109 pub fn decode<F>(mut self, f: F) -> Self
110 where
111 F: Fn(&[u8]) -> Result<E, String> + Send + Sync + 'static,
112 {
113 self.decode = Some(Arc::new(f));
114 self
115 }
116
117 pub fn publish<F, Fut>(mut self, f: F) -> Self
118 where
119 F: Fn(E) -> Fut + Send + Sync + 'static,
120 Fut: std::future::Future<Output = bool> + Send + 'static,
121 {
122 let f = Arc::new(f);
123 self.publish = Some(Arc::new(move |e| {
124 let f = f.clone();
125 Box::pin(async move { f(e).await })
126 }));
127 self
128 }
129
130 pub fn offset_store<S: OutboxOffsetStore>(mut self, s: Arc<S>) -> Self {
131 self.offset_store = Some(s);
132 self
133 }
134
135 pub fn build(self) -> Result<OutboxTopology<E>, PatternError<()>> {
136 Ok(OutboxTopology {
137 name: self.name.unwrap_or_else(|| "outbox".into()),
138 read_journal: self.read_journal.ok_or(PatternError::NotConfigured("read_journal"))?,
139 poll_interval: self.poll_interval,
140 decode: self.decode.ok_or(PatternError::NotConfigured("decode"))?,
141 publish: self.publish.ok_or(PatternError::NotConfigured("publish"))?,
142 offset_store: self.offset_store.unwrap_or_else(|| Arc::new(InMemoryOffsetStore::new())),
143 })
144 }
145}
146
147pub struct OutboxTopology<E> {
148 #[allow(dead_code)]
149 name: String,
150 read_journal: Arc<dyn ReadJournal>,
151 poll_interval: Duration,
152 decode: Decoder<E>,
153 publish: Publisher<E>,
154 offset_store: Arc<dyn OutboxOffsetStore>,
155}
156
157pub struct OutboxHandles {
158 pub published: Arc<AtomicU64>,
159 stopper: oneshot::Sender<()>,
160}
161
162impl OutboxHandles {
163 pub fn stop(self) {
165 let _ = self.stopper.send(());
166 }
167
168 pub fn published(&self) -> u64 {
169 self.published.load(Ordering::Acquire)
170 }
171}
172
173#[async_trait]
174impl<E: Clone + Send + 'static> Topology for OutboxTopology<E> {
175 type Handles = OutboxHandles;
176
177 async fn materialize(self, _system: &ActorSystem) -> Result<OutboxHandles, PatternError<()>> {
178 let OutboxTopology { name, read_journal, poll_interval, decode, publish, offset_store } = self;
179 let published = Arc::new(AtomicU64::new(0));
180 let published_clone = published.clone();
181 let (stop_tx, mut stop_rx) = oneshot::channel();
182 tokio::spawn(async move {
183 let mut pid_offsets = offset_store.load();
184 loop {
185 if stop_rx.try_recv().is_ok() {
186 break;
187 }
188 let pids = match read_journal.all_persistence_ids().await {
189 Ok(p) => p,
190 Err(e) => {
191 tracing::warn!(outbox = %name, error = ?e, "list pids failed");
192 tokio::time::sleep(poll_interval).await;
193 continue;
194 }
195 };
196 for pid in pids {
197 let from = pid_offsets.get(&pid).copied().unwrap_or(0).saturating_add(1);
198 let events = match read_journal.events_by_persistence_id(&pid, from, u64::MAX).await {
199 Ok(e) => e,
200 Err(e) => {
201 tracing::warn!(outbox = %name, pid = %pid, error = ?e, "read failed");
202 continue;
203 }
204 };
205 for env in events {
206 match (decode)(&env.payload) {
207 Ok(event) => {
208 let ok = (publish)(event).await;
209 if ok {
210 pid_offsets.insert(pid.clone(), env.sequence_nr);
211 published_clone.fetch_add(1, Ordering::AcqRel);
212 } else {
213 break;
215 }
216 }
217 Err(s) => {
218 tracing::warn!(outbox = %name, error = %s, "decode failed");
219 pid_offsets.insert(pid.clone(), env.sequence_nr);
220 }
221 }
222 }
223 }
224 offset_store.save(&pid_offsets);
225 tokio::time::sleep(poll_interval).await;
226 }
227 });
228 Ok(OutboxHandles { published, stopper: stop_tx })
229 }
230}