arcly_http/data/mod.rs
1//! Datasource contracts: tenant-scoped pools + read/write splitting.
2//!
3//! The framework defines the *shape* of data access; the app implements it
4//! with SQLx, SeaORM, or anything else — the same boundary rule that keeps
5//! OAuth providers and session stores app-side. `core/` is untouched: the
6//! registry is an ordinary DI singleton.
7//!
8//! ## Hot-path guarantees
9//!
10//! - [`DataSourceRegistry`] is **frozen at boot**: tenant→pool lookup is an
11//! immutable `HashMap` read. No locks.
12//! - Replica selection inside a [`DataSource`] implementation should be a
13//! single `AtomicUsize` round-robin — never a locked list.
14//! - [`ReadAfterWritePin`] is one `AtomicBool` per request, used to route
15//! post-write reads to the primary (replica-lag protection).
16//!
17//! ## Read-after-write rule
18//!
19//! Within one request, after the first `Write` acquisition every subsequent
20//! `Read` must go to the primary — a replica may not have replayed the write
21//! yet. Handlers opt in by creating a [`ReadAfterWritePin`] and passing it to
22//! [`DataSourceRegistry::acquire`]:
23//!
24//! ```ignore
25//! let pin = ReadAfterWritePin::new();
26//! let ds = registry.for_tenant(ctx.tenant());
27//! let w = registry.acquire(ds, AccessIntent::Write, &pin).await?; // pins
28//! let r = registry.acquire(ds, AccessIntent::Read, &pin).await?; // → primary
29//! ```
30
31pub mod db;
32pub mod drivers;
33/// Migration runner uses the SQLx query facade; SeaORM/Diesel ledger support
34/// arrives with their facade methods.
35#[cfg(feature = "db-sqlx")]
36pub mod migrate;
37pub mod outbox;
38pub mod tx;
39
40use std::collections::HashMap;
41use std::sync::atomic::{AtomicBool, Ordering};
42
43use futures::future::BoxFuture;
44
45use crate::web::tenant::TenantConfig;
46
47// ─── Intent & errors ──────────────────────────────────────────────────────────
48
49/// Whether the caller intends to read or mutate.
50///
51/// `Write` always routes to the primary. `Read` may be served by a replica —
52/// unless a [`ReadAfterWritePin`] has been tripped for this request.
53#[derive(Clone, Copy, PartialEq, Eq, Debug)]
54pub enum AccessIntent {
55 Read,
56 Write,
57}
58
59#[derive(Debug)]
60pub struct DataError(pub String);
61
62impl std::fmt::Display for DataError {
63 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
64 write!(f, "datasource error: {}", self.0)
65 }
66}
67impl std::error::Error for DataError {}
68
69// ─── DataSource trait ─────────────────────────────────────────────────────────
70
71/// One logical database: a primary plus optional read replicas.
72///
73/// Implementations own pooling (SQLx `PgPool`s etc.) and replica selection.
74/// `acquire(Write)` MUST return a primary connection; `acquire(Read)` MAY
75/// return a replica.
76pub trait DataSource: Send + Sync + 'static {
77 type Conn: Send;
78
79 fn acquire(&self, intent: AccessIntent) -> BoxFuture<'_, Result<Self::Conn, DataError>>;
80
81 /// Logical name (matches `TenantConfig::datasource`). For logs/metrics.
82 fn name(&self) -> &'static str;
83}
84
85// ─── Read-after-write pin ─────────────────────────────────────────────────────
86
87/// Per-request replica-lag protection: once any `Write` is acquired through
88/// this pin, all subsequent `Read`s are upgraded to the primary.
89///
90/// One `AtomicBool` — create it per request (cheap, no allocation beyond the
91/// stack), never share across requests.
92#[derive(Default)]
93pub struct ReadAfterWritePin {
94 wrote: AtomicBool,
95}
96
97impl ReadAfterWritePin {
98 pub fn new() -> Self {
99 Self::default()
100 }
101
102 /// Effective intent after applying the pin, recording writes as they pass.
103 pub fn apply(&self, intent: AccessIntent) -> AccessIntent {
104 match intent {
105 AccessIntent::Write => {
106 self.wrote.store(true, Ordering::Relaxed);
107 AccessIntent::Write
108 }
109 AccessIntent::Read if self.wrote.load(Ordering::Relaxed) => AccessIntent::Write,
110 AccessIntent::Read => AccessIntent::Read,
111 }
112 }
113}
114
115// ─── Registry ─────────────────────────────────────────────────────────────────
116
117/// Frozen tenant→datasource map. Built once at boot, provided via DI,
118/// read lock-free on every request.
119pub struct DataSourceRegistry<D: DataSource> {
120 default: D,
121 by_name: HashMap<&'static str, D>,
122}
123
124impl<D: DataSource> DataSourceRegistry<D> {
125 pub fn new(default: D) -> Self {
126 Self {
127 default,
128 by_name: HashMap::new(),
129 }
130 }
131
132 /// Register a named datasource (boot-time only — consumes `self`).
133 pub fn with(mut self, name: &'static str, ds: D) -> Self {
134 self.by_name.insert(name, ds);
135 self
136 }
137
138 /// The datasource for this request's tenant (joins the tenant layer by
139 /// `TenantConfig::datasource` name). Unknown/absent tenant → default.
140 pub fn for_tenant(&self, tenant: Option<&TenantConfig>) -> &D {
141 tenant
142 .and_then(|t| self.by_name.get(t.datasource.as_str()))
143 .unwrap_or(&self.default)
144 }
145
146 /// Iterate all datasources (default first as "", then named in
147 /// deterministic order) — used by the migration runner.
148 pub fn iter(&self) -> impl Iterator<Item = (&'static str, &D)> {
149 let mut named: Vec<_> = self.by_name.iter().map(|(k, v)| (*k, v)).collect();
150 named.sort_by_key(|(k, _)| *k);
151 std::iter::once(("", &self.default)).chain(named)
152 }
153
154 /// Acquire a connection with the read-after-write pin applied.
155 pub async fn acquire(
156 &self,
157 ds: &D,
158 intent: AccessIntent,
159 pin: &ReadAfterWritePin,
160 ) -> Result<D::Conn, DataError> {
161 ds.acquire(pin.apply(intent)).await
162 }
163}