Skip to main content

arcly_http/data/
outbox.rs

1//! Transactional Outbox — reliable event publishing without dual writes.
2//!
3//! ## The dual-write problem
4//!
5//! "Commit the order, then publish `order.created`" is two systems with no
6//! shared transaction: crash between them and the database and the event
7//! stream disagree — silently. The outbox pattern closes the gap by writing
8//! the event into an **outbox table inside the same database transaction** as
9//! the business data, then letting a background relay publish it afterwards
10//! with at-least-once semantics (consumers dedupe on `idempotency_key`).
11//!
12//! ## Division of labour
13//!
14//! - The app implements [`OutboxTx`] (enqueue inside its DB transaction),
15//!   [`OutboxStore`] (fetch/ack pending rows), and [`OutboxPublisher`]
16//!   (Kafka / NATS / webhook).
17//! - The framework ships [`with_transaction`] (begin → work → commit/rollback)
18//!   and [`OutboxRelay`] (background poll-publish-ack loop, spawned from
19//!   `ArclyPlugin::on_start`).
20//!
21//! Nothing here touches the request hot path beyond the app's own
22//! transaction; the relay is a single background task.
23
24use std::sync::Arc;
25use std::time::Duration;
26
27use futures::future::BoxFuture;
28
29use crate::data::{DataError, DataSource};
30
31// ─── Entry ────────────────────────────────────────────────────────────────────
32
33/// One event enqueued in the same transaction as the business write.
34#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
35pub struct OutboxEntry {
36    /// Store-assigned sequence (0 before persistence).
37    pub id: u64,
38    pub topic: String,
39    pub payload: serde_json::Value,
40    /// Stable per-event key — consumers dedupe on this under at-least-once.
41    pub idempotency_key: String,
42    pub tenant: Option<String>,
43    /// W3C trace context of the producing request — consumers continue the
44    /// trace instead of starting an orphan root (`TraceContext::from_traceparent`).
45    #[serde(default)]
46    pub traceparent: Option<String>,
47}
48
49// ─── Transaction-side contracts ───────────────────────────────────────────────
50
51/// A live database transaction that can also enqueue outbox entries.
52pub trait OutboxTx: Send {
53    /// INSERT into the outbox table under THIS transaction — atomic with the
54    /// business write. Visible to the relay only after commit.
55    fn enqueue<'a>(&'a mut self, entry: OutboxEntry) -> BoxFuture<'a, Result<(), DataError>>;
56
57    fn commit(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
58    fn rollback(self: Box<Self>) -> BoxFuture<'static, Result<(), DataError>>;
59}
60
61/// A datasource that can open outbox-capable transactions.
62pub trait TransactionalDataSource: DataSource {
63    fn begin(&self) -> BoxFuture<'_, Result<Box<dyn OutboxTx>, DataError>>;
64}
65
66/// Run `work` inside one transaction: commit on `Ok`, roll back on `Err`.
67///
68/// ```ignore
69/// let order = with_transaction(ds, |tx| Box::pin(async move {
70///     let order = repo.insert_order(tx, &dto).await?;
71///     tx.enqueue(OutboxEntry { topic: "order.created".into() /* ... */ }).await?;
72///     Ok(order)
73/// })).await?;
74/// ```
75pub async fn with_transaction<D, T, F>(ds: &D, work: F) -> Result<T, DataError>
76where
77    D: TransactionalDataSource + ?Sized,
78    F: for<'t> FnOnce(&'t mut Box<dyn OutboxTx>) -> BoxFuture<'t, Result<T, DataError>>,
79{
80    let mut tx = ds.begin().await?;
81    match work(&mut tx).await {
82        Ok(v) => {
83            tx.commit().await?;
84            Ok(v)
85        }
86        Err(e) => {
87            // Roll back best-effort; surface the original error.
88            let _ = tx.rollback().await;
89            Err(e)
90        }
91    }
92}
93
94// ─── Relay-side contracts ─────────────────────────────────────────────────────
95
96/// Committed-but-unpublished entries, fetched/acked by the relay.
97pub trait OutboxStore: Send + Sync + 'static {
98    /// Oldest committed-but-unpublished entries, up to `limit`, in order.
99    fn fetch_pending(&self, limit: usize) -> BoxFuture<'_, Result<Vec<OutboxEntry>, DataError>>;
100    /// Mark one entry delivered (idempotent — the relay may retry).
101    fn mark_published(&self, id: u64) -> BoxFuture<'_, Result<(), DataError>>;
102}
103
104/// Downstream event transport (Kafka, NATS, webhook fan-out, …).
105pub trait OutboxPublisher: Send + Sync + 'static {
106    fn publish<'a>(&'a self, entry: &'a OutboxEntry) -> BoxFuture<'a, Result<(), String>>;
107}
108
109// ─── Relay ────────────────────────────────────────────────────────────────────
110
111/// Background poll → publish → ack loop. **At-least-once**: an entry is acked
112/// only after a successful publish, so a crash between the two replays it —
113/// consumers must dedupe on `idempotency_key`.
114pub struct OutboxRelay {
115    pub store: Arc<dyn OutboxStore>,
116    pub publisher: Arc<dyn OutboxPublisher>,
117    pub poll: Duration,
118    pub batch: usize,
119    /// Leader election: when set, only the replica holding this lock drains
120    /// the outbox each tick — others skip silently. Eliminates duplicate
121    /// publishing across a fleet (consumers still dedupe on idempotency_key
122    /// for the at-least-once edge cases).
123    pub leader: Option<(
124        crate::resilience::DistributedLock,
125        Arc<dyn crate::resilience::DLockBackend>,
126    )>,
127}
128
129impl OutboxRelay {
130    /// Spawn the relay. Call from `ArclyPlugin::on_start`; the task dies with
131    /// the runtime, so graceful shutdown needs no extra plumbing.
132    pub fn spawn(self) {
133        tokio::spawn(async move {
134            let mut tick = tokio::time::interval(self.poll);
135            loop {
136                tick.tick().await;
137
138                // Leader election (optional): skip the tick unless we win.
139                // The guard is held for the whole batch and dropped after.
140                let _leader_guard = match &self.leader {
141                    None => None,
142                    Some((lock, backend)) => match lock.try_lock(backend).await {
143                        Some(g) => Some(g),
144                        None => continue, // another replica is draining
145                    },
146                };
147
148                let pending = match self.store.fetch_pending(self.batch).await {
149                    Ok(p) => p,
150                    Err(e) => {
151                        tracing::warn!(error = %e, "outbox fetch failed — retrying next tick");
152                        continue;
153                    }
154                };
155                for entry in &pending {
156                    match self.publisher.publish(entry).await {
157                        Ok(()) => {
158                            if let Err(e) = self.store.mark_published(entry.id).await {
159                                tracing::warn!(id = entry.id, error = %e,
160                                    "outbox ack failed — entry will be republished");
161                            } else {
162                                metrics::counter!("outbox_published_total").increment(1);
163                            }
164                        }
165                        Err(e) => {
166                            metrics::counter!("outbox_publish_errors_total").increment(1);
167                            tracing::warn!(id = entry.id, topic = %entry.topic, error = %e,
168                                "outbox publish failed — will retry");
169                            // Stop the batch: preserve per-topic ordering.
170                            break;
171                        }
172                    }
173                }
174            }
175        });
176    }
177}