Skip to main content

arcly_http/data/
mod.rs

1//! Datasource contracts: tenant-scoped pools + read/write splitting.
2//!
3//! The framework defines the *shape* of data access; the app implements it
4//! with SQLx, SeaORM, or anything else — the same boundary rule that keeps
5//! OAuth providers and session stores app-side. `core/` is untouched: the
6//! registry is an ordinary DI singleton.
7//!
8//! ## Hot-path guarantees
9//!
10//! - [`DataSourceRegistry`] is **frozen at boot**: tenant→pool lookup is an
11//!   immutable `HashMap` read. No locks.
12//! - Replica selection inside a [`DataSource`] implementation should be a
13//!   single `AtomicUsize` round-robin — never a locked list.
14//! - [`ReadAfterWritePin`] is one `AtomicBool` per request, used to route
15//!   post-write reads to the primary (replica-lag protection).
16//!
17//! ## Read-after-write rule
18//!
19//! Within one request, after the first `Write` acquisition every subsequent
20//! `Read` must go to the primary — a replica may not have replayed the write
21//! yet. Handlers opt in by creating a [`ReadAfterWritePin`] and passing it to
22//! [`DataSourceRegistry::acquire`]:
23//!
24//! ```ignore
25//! let pin = ReadAfterWritePin::new();
26//! let ds  = registry.for_tenant(ctx.tenant());
27//! let w   = registry.acquire(ds, AccessIntent::Write, &pin).await?; // pins
28//! let r   = registry.acquire(ds, AccessIntent::Read,  &pin).await?; // → primary
29//! ```
30
31pub mod db;
32pub mod drivers;
33/// Migration runner uses the SQLx query facade; SeaORM/Diesel ledger support
34/// arrives with their facade methods.
35#[cfg(feature = "db-sqlx")]
36pub mod migrate;
37pub mod outbox;
38pub mod tx;
39
40use std::collections::HashMap;
41use std::sync::atomic::{AtomicBool, Ordering};
42
43use futures::future::BoxFuture;
44
45use crate::web::tenant::TenantConfig;
46
47// ─── Intent & errors ──────────────────────────────────────────────────────────
48
49/// Whether the caller intends to read or mutate.
50///
51/// `Write` always routes to the primary. `Read` may be served by a replica —
52/// unless a [`ReadAfterWritePin`] has been tripped for this request.
53#[derive(Clone, Copy, PartialEq, Eq, Debug)]
54pub enum AccessIntent {
55    Read,
56    Write,
57}
58
59/// Classified datasource failure — callers can finally distinguish "retry
60/// somewhere else" from "this will never work", which retry policies,
61/// circuit breakers, and replica failover need.
62#[derive(Debug)]
63#[non_exhaustive]
64pub struct DataError {
65    pub kind: DataErrorKind,
66    pub message: String,
67}
68
69/// Failure classes. `#[non_exhaustive]`: match with a `_` arm.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71#[non_exhaustive]
72pub enum DataErrorKind {
73    /// Misconfiguration (missing driver feature, bad URL): permanent until
74    /// a human intervenes.
75    Config,
76    /// Connection/pool failure: **retryable** — another replica or a later
77    /// attempt may succeed.
78    Connection,
79    /// The statement itself failed (syntax, constraint, type): permanent.
80    Query,
81    /// The operation timed out: **retryable**.
82    Timeout,
83    /// Optimistic/duplicate conflict (unique violation, version clash):
84    /// permanent for this payload, but semantically distinct from `Query`
85    /// so idempotent upsert paths can branch on it.
86    Conflict,
87    /// Unclassified (driver returned something we don't recognise).
88    Other,
89}
90
91impl DataError {
92    pub fn new(kind: DataErrorKind, message: impl Into<String>) -> Self {
93        Self {
94            kind,
95            message: message.into(),
96        }
97    }
98    pub fn config(m: impl Into<String>) -> Self {
99        Self::new(DataErrorKind::Config, m)
100    }
101    pub fn connection(m: impl Into<String>) -> Self {
102        Self::new(DataErrorKind::Connection, m)
103    }
104    pub fn query(m: impl Into<String>) -> Self {
105        Self::new(DataErrorKind::Query, m)
106    }
107    pub fn timeout(m: impl Into<String>) -> Self {
108        Self::new(DataErrorKind::Timeout, m)
109    }
110    pub fn conflict(m: impl Into<String>) -> Self {
111        Self::new(DataErrorKind::Conflict, m)
112    }
113    pub fn other(m: impl Into<String>) -> Self {
114        Self::new(DataErrorKind::Other, m)
115    }
116
117    /// `true` when a retry (or replica failover) can plausibly succeed.
118    pub fn is_retryable(&self) -> bool {
119        matches!(
120            self.kind,
121            DataErrorKind::Connection | DataErrorKind::Timeout
122        )
123    }
124}
125
126impl std::fmt::Display for DataError {
127    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
128        write!(f, "datasource error ({:?}): {}", self.kind, self.message)
129    }
130}
131impl std::error::Error for DataError {}
132
133// ─── DataSource trait ─────────────────────────────────────────────────────────
134
135/// One logical database: a primary plus optional read replicas.
136///
137/// Implementations own pooling (SQLx `PgPool`s etc.) and replica selection.
138/// `acquire(Write)` MUST return a primary connection; `acquire(Read)` MAY
139/// return a replica.
140pub trait DataSource: Send + Sync + 'static {
141    type Conn: Send;
142
143    fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;
144
145    /// Logical name (matches `TenantConfig::datasource`). For logs/metrics.
146    fn name(&self) -> &'static str;
147}
148
149// ─── Read-after-write pin ─────────────────────────────────────────────────────
150
151/// Per-request replica-lag protection: once any `Write` is acquired through
152/// this pin, all subsequent `Read`s are upgraded to the primary.
153///
154/// One `AtomicBool` — create it per request (cheap, no allocation beyond the
155/// stack), never share across requests.
156#[derive(Default)]
157pub struct ReadAfterWritePin {
158    wrote: AtomicBool,
159}
160
161impl ReadAfterWritePin {
162    pub fn new() -> Self {
163        Self::default()
164    }
165
166    /// Effective intent after applying the pin, recording writes as they pass.
167    pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
168        match intent {
169            AccessIntent::Write => {
170                self.wrote.store(true, Ordering::Relaxed);
171                AccessIntent::Write
172            }
173            AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
174            AccessIntent::Read => AccessIntent::Read,
175        }
176    }
177}
178
179// ─── Registry ─────────────────────────────────────────────────────────────────
180
181/// Frozen tenant→datasource map. Built once at boot, provided via DI,
182/// read lock-free on every request.
183pub struct DataSourceRegistry<D: DataSource> {
184    default: D,
185    by_name: HashMap<&'static str, D>,
186}
187
188impl<D: DataSource> DataSourceRegistry<D> {
189    pub fn new(default: D) -> Self {
190        Self {
191            default,
192            by_name: HashMap::new(),
193        }
194    }
195
196    /// Register a named datasource (boot-time only — consumes `self`).
197    pub fn with(mut self, name: &'static str, ds: D) -> Self {
198        self.by_name.insert(name, ds);
199        self
200    }
201
202    /// The datasource for this request's tenant (joins the tenant layer by
203    /// `TenantConfig::datasource` name). Unknown/absent tenant → default.
204    pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
205        tenant
206            .and_then(|t| self.by_name.get(t.datasource.as_str()))
207            .unwrap_or(&self.default)
208    }
209
210    /// Iterate all datasources (default first as "", then named in
211    /// deterministic order) — used by the migration runner.
212    pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
213        let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
214        named.sort_by_key(|(k, _)| *k);
215        std::iter::once(("", &self.default)).chain(named)
216    }
217
218    /// Acquire a connection with the read-after-write pin applied.
219    pub async fn acquire(
220        &self,
221        ds: &D,
222        intent: AccessIntent,
223        pin: &ReadAfterWritePin,
224    ) -> Result<D::Conn, DataError> {
225        ds.acquire(pin.apply(intent)).await
226    }
227}
228
229#[cfg(test)]
230mod tests {
231    use super::*;
232    use crate::web::tenant::{TenantConfig, TenantId};
233
234    #[test]
235    fn pin_upgrades_reads_after_first_write() {
236        let pin = ReadAfterWritePin::new();
237        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Read);
238        assert_eq!(pin.apply(AccessIntent::Write), AccessIntent::Write);
239        // Every read after a write goes to the primary.
240        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
241        assert_eq!(pin.apply(AccessIntent::Read), AccessIntent::Write);
242    }
243
244    struct FakeDs(&'static str);
245    impl DataSource for FakeDs {
246        type Conn = ();
247        fn acquire(&self, _: AccessIntent) -> BoxFuture<'_, Result<(), DataError>> {
248            Box::pin(async { Ok(()) })
249        }
250        fn name(&self) -> &'static str {
251            self.0
252        }
253    }
254
255    fn tenant(ds: &str) -> TenantConfig {
256        TenantConfig {
257            id: TenantId::new("t"),
258            display_name: "T".into(),
259            datasource: ds.into(),
260        }
261    }
262
263    #[test]
264    fn registry_routes_by_tenant_datasource_with_default_fallback() {
265        let reg = DataSourceRegistry::new(FakeDs("default"))
266            .with("acme", FakeDs("acme"))
267            .with("globex", FakeDs("globex"));
268
269        assert_eq!(reg.for_tenant(Some(&tenant("acme"))).name(), "acme");
270        assert_eq!(reg.for_tenant(Some(&tenant("unknown"))).name(), "default");
271        assert_eq!(reg.for_tenant(None).name(), "default");
272    }
273
274    #[test]
275    fn data_error_taxonomy_classifies_retryability() {
276        assert!(DataError::connection("pool down").is_retryable());
277        assert!(DataError::timeout("slow").is_retryable());
278        assert!(!DataError::query("syntax").is_retryable());
279        assert!(!DataError::config("bad url").is_retryable());
280        assert!(!DataError::conflict("dup key").is_retryable());
281        let e = DataError::connection("x");
282        assert_eq!(e.kind, DataErrorKind::Connection);
283        assert!(e.to_string().contains("Connection"));
284    }
285
286    #[test]
287    fn registry_iter_is_deterministic_default_first() {
288        let reg = DataSourceRegistry::new(FakeDs("default"))
289            .with("b", FakeDs("b"))
290            .with("a", FakeDs("a"));
291        let order: Vec<&str> = reg.iter().map(|(n, _)| n).collect();
292        assert_eq!(order, vec!["", "a", "b"]);
293    }
294}