arcly_http/data/outbox.rs
1//! Transactional Outbox — reliable event publishing without dual writes.
2//!
3//! ## The dual-write problem
4//!
5//! "Commit the order, then publish `order.created`" is two systems with no
6//! shared transaction: crash between them and the database and the event
7//! stream disagree — silently. The outbox pattern closes the gap by writing
8//! the event into an **outbox table inside the same database transaction** as
9//! the business data, then letting a background relay publish it afterwards
10//! with at-least-once semantics (consumers dedupe on `idempotency_key`).
11//!
12//! ## Division of labour
13//!
14//! - The app implements [`OutboxTx`] (enqueue inside its DB transaction),
15//! [`OutboxStore`] (fetch/ack pending rows), and [`OutboxPublisher`]
16//! (Kafka / NATS / webhook).
17//! - The framework ships [`with_transaction`] (begin → work → commit/rollback)
18//! and [`OutboxRelay`] (background poll-publish-ack loop, spawned from
19//! `ArclyPlugin::on_start`).
20//!
21//! Nothing here touches the request hot path beyond the app's own
22//! transaction; the relay is a single background task.
23
24use std::sync::Arc;
25use std::time::Duration;
26
27use futures::future::BoxFuture;
28
29use crate::data::{DataError, DataSource};
30
31// ─── Entry ────────────────────────────────────────────────────────────────────
32
33/// One event enqueued in the same transaction as the business write.
34#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
35pub struct OutboxEntry {
36 /// Store-assigned sequence (0 before persistence).
37 pub id: u64,
38 pub topic: String,
39 pub payload: serde_json::Value,
40 /// Stable per-event key — consumers dedupe on this under at-least-once.
41 pub idempotency_key: String,
42 pub tenant: Option<String>,
43 /// W3C trace context of the producing request — consumers continue the
44 /// trace instead of starting an orphan root (`TraceContext::from_traceparent`).
45 #[serde(default)]
46 pub traceparent: Option<String>,
47}
48
49// ─── Transaction-side contracts ───────────────────────────────────────────────
50
51/// A live database transaction that can also enqueue outbox entries.
52pub trait OutboxTx: Send {
53 /// INSERT into the outbox table under THIS transaction — atomic with the
54 /// business write. Visible to the relay only after commit.
55 fn enqueue<'a>(&'a mut self, entry: OutboxEntry) -> BoxFuture<'a, Result<(), DataError>>;
56
57 fn commit(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
58 fn rollback(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
59}
60
61/// A datasource that can open outbox-capable transactions.
62pub trait TransactionalDataSource: DataSource {
63 fn begin(&self) -> BoxFuture<'_, Result<Box<dyn OutboxTx>, DataError>>;
64}
65
66/// Run `work` inside one transaction: commit on `Ok`, roll back on `Err`.
67///
68/// ```ignore
69/// let order = with_transaction(ds, |tx| Box::pin(async move {
70/// let order = repo.insert_order(tx, &dto).await?;
71/// tx.enqueue(OutboxEntry { topic: "order.created".into() /* ... */ }).await?;
72/// Ok(order)
73/// })).await?;
74/// ```
75pub async fn with_transaction<D, T, F>(ds: &D, work: F) -> Result<T, DataError>
76where
77 D: TransactionalDataSource + ?Sized,
78 F: for<'t> FnOnce(&'t mut Box<dyn OutboxTx>) -> BoxFuture<'t, Result<T, DataError>>,
79{
80 let mut tx = ds.begin().await?;
81 match work(&mut tx).await {
82 Ok(v) => {
83 tx.commit().await?;
84 Ok(v)
85 }
86 Err(e) => {
87 // Roll back best-effort; surface the original error.
88 let _ = tx.rollback().await;
89 Err(e)
90 }
91 }
92}
93
94// ─── Relay-side contracts ─────────────────────────────────────────────────────
95
96/// Committed-but-unpublished entries, fetched/acked by the relay.
97pub trait OutboxStore: Send + Sync + 'static {
98 /// Oldest committed-but-unpublished entries, up to `limit`, in order.
99 fn fetch_pending(&self, limit: usize) -> BoxFuture<'_, Result<Vec<OutboxEntry>, DataError>>;
100 /// Mark one entry delivered (idempotent — the relay may retry).
101 fn mark_published(&self, id: u64) -> BoxFuture<'_, Result<(), DataError>>;
102}
103
104/// Downstream event transport (Kafka, NATS, webhook fan-out, …).
105pub trait OutboxPublisher: Send + Sync + 'static {
106 fn publish<'a>(
107 &'a self,
108 entry: &'a OutboxEntry,
109 ) -> BoxFuture<'a, Result<(), crate::messaging::BoxError>>;
110}
111
112// ─── Relay ────────────────────────────────────────────────────────────────────
113
114/// Background poll → publish → ack loop. **At-least-once**: an entry is acked
115/// only after a successful publish, so a crash between the two replays it —
116/// consumers must dedupe on `idempotency_key`.
117#[non_exhaustive]
118pub struct OutboxRelay {
119 pub store: Arc<dyn OutboxStore>,
120 pub publisher: Arc<dyn OutboxPublisher>,
121 pub poll: Duration,
122 pub batch: usize,
123 /// Leader election: when set, only the replica holding this lock drains
124 /// the outbox each tick — others skip silently. Eliminates duplicate
125 /// publishing across a fleet (consumers still dedupe on idempotency_key
126 /// for the at-least-once edge cases).
127 pub leader: Option<(
128 crate::resilience::DistributedLock,
129 Arc<dyn crate::resilience::DLockBackend>,
130 )>,
131}
132
133impl OutboxRelay {
134 /// Defaults: 500ms poll, batch 32, no leader election.
135 pub fn new(store: Arc<dyn OutboxStore>, publisher: Arc<dyn OutboxPublisher>) -> Self {
136 Self {
137 store,
138 publisher,
139 poll: Duration::from_millis(500),
140 batch: 32,
141 leader: None,
142 }
143 }
144 pub fn poll(mut self, v: Duration) -> Self {
145 self.poll = v;
146 self
147 }
148 pub fn batch(mut self, v: usize) -> Self {
149 self.batch = v;
150 self
151 }
152 /// Enable leader election: only the lock holder drains each tick.
153 pub fn leader(
154 mut self,
155 lock: crate::resilience::DistributedLock,
156 backend: Arc<dyn crate::resilience::DLockBackend>,
157 ) -> Self {
158 self.leader = Some((lock, backend));
159 self
160 }
161
162 /// Spawn the relay. Call from `ArclyPlugin::on_start`; the task dies with
163 /// the runtime, so graceful shutdown needs no extra plumbing.
164 ///
165 /// **Drain-aware**: once a shutdown signal flips the process drain flag
166 /// the relay stops claiming new batches — pending rows are durable and
167 /// the next leader picks them up. **Backoff**: consecutive fetch
168 /// failures double the wait (up to 8× the poll interval) instead of
169 /// hammering a struggling store at full poll rate.
170 pub fn spawn(self) {
171 tokio::spawn(async move {
172 let mut tick = tokio::time::interval(self.poll);
173 tick.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
174 let mut consecutive_fetch_errors: u32 = 0;
175 loop {
176 tick.tick().await;
177
178 // Shutdown in progress: leave pending rows for the next
179 // leader instead of starting work mid-drain.
180 if crate::observability::health::is_draining() {
181 tracing::info!("outbox relay: drain flag set — stopping");
182 return;
183 }
184
185 // Error backoff: skip ticks after repeated fetch failures.
186 if consecutive_fetch_errors > 0 {
187 let skip = (1u32 << consecutive_fetch_errors.min(3)) - 1; // 1,3,7
188 for _ in 0..skip {
189 tick.tick().await;
190 }
191 }
192
193 // Leader election (optional): skip the tick unless we win.
194 // The guard is held for the whole batch and dropped after.
195 let _leader_guard = match &self.leader {
196 None => None,
197 Some((lock, backend)) => match lock.try_lock(backend).await {
198 Some(g) => Some(g),
199 None => continue, // another replica is draining
200 },
201 };
202
203 let pending = match self.store.fetch_pending(self.batch).await {
204 Ok(p) => {
205 consecutive_fetch_errors = 0;
206 p
207 }
208 Err(e) => {
209 consecutive_fetch_errors = consecutive_fetch_errors.saturating_add(1);
210 metrics::counter!("outbox_fetch_errors_total").increment(1);
211 tracing::warn!(error = %e, attempts = consecutive_fetch_errors,
212 "outbox fetch failed — backing off");
213 continue;
214 }
215 };
216 for entry in &pending {
217 match self.publisher.publish(entry).await {
218 Ok(()) => {
219 if let Err(e) = self.store.mark_published(entry.id).await {
220 tracing::warn!(id = entry.id, error = %e,
221 "outbox ack failed — entry will be republished");
222 } else {
223 metrics::counter!("outbox_published_total").increment(1);
224 }
225 }
226 Err(e) => {
227 metrics::counter!("outbox_publish_errors_total").increment(1);
228 tracing::warn!(id = entry.id, topic = %entry.topic, error = %e,
229 "outbox publish failed — will retry");
230 // Stop the batch: preserve per-topic ordering.
231 break;
232 }
233 }
234 }
235 }
236 });
237 }
238}