Skip to main content

sqlmodel_pool/
replica.rs

1//! Read replica routing for connection pools.
2//!
3//! Provides `ReplicaPool` which routes read queries to replica databases
4//! and write queries to the primary database.
5
6use std::future::Future;
7use std::sync::atomic::{AtomicUsize, Ordering};
8
9use asupersync::{Cx, Outcome};
10use sqlmodel_core::{Connection, Error};
11
12use crate::{Pool, PooledConnection};
13
14/// Strategy for selecting which replica to use for reads.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16pub enum ReplicaStrategy {
17    /// Rotate through replicas in order.
18    RoundRobin,
19    /// Pick a random replica each time.
20    Random,
21}
22
23/// A pool that routes reads to replicas and writes to a primary.
24///
25/// # Example
26///
27/// ```ignore
28/// let primary = Pool::new(primary_config);
29/// let replica1 = Pool::new(replica1_config);
30/// let replica2 = Pool::new(replica2_config);
31///
32/// let pool = ReplicaPool::new(primary, vec![replica1, replica2]);
33///
34/// // Reads go to replicas (factory provided at acquire time)
35/// let conn = pool.acquire_read(&cx, || connect_replica()).await?;
36///
37/// // Writes go to primary
38/// let conn = pool.acquire_write(&cx, || connect_primary()).await?;
39/// ```
40pub struct ReplicaPool<C: Connection> {
41    /// Primary pool (for writes and explicit primary reads).
42    primary: Pool<C>,
43    /// Replica pools (for reads).
44    replicas: Vec<Pool<C>>,
45    /// Selection strategy for replicas.
46    strategy: ReplicaStrategy,
47    /// Counter for round-robin selection.
48    round_robin_counter: AtomicUsize,
49}
50
51impl<C: Connection> ReplicaPool<C> {
52    /// Create a new replica pool with round-robin strategy.
53    pub fn new(primary: Pool<C>, replicas: Vec<Pool<C>>) -> Self {
54        Self {
55            primary,
56            replicas,
57            strategy: ReplicaStrategy::RoundRobin,
58            round_robin_counter: AtomicUsize::new(0),
59        }
60    }
61
62    /// Create a new replica pool with a specific strategy.
63    pub fn with_strategy(
64        primary: Pool<C>,
65        replicas: Vec<Pool<C>>,
66        strategy: ReplicaStrategy,
67    ) -> Self {
68        Self {
69            primary,
70            replicas,
71            strategy,
72            round_robin_counter: AtomicUsize::new(0),
73        }
74    }
75
76    /// Acquire a connection for read operations.
77    ///
78    /// If replicas are available, selects one based on the configured strategy.
79    /// Falls back to the primary if no replicas are configured.
80    pub async fn acquire_read<F, Fut>(
81        &self,
82        cx: &Cx,
83        factory: F,
84    ) -> Outcome<PooledConnection<C>, Error>
85    where
86        F: Fn() -> Fut,
87        Fut: Future<Output = Outcome<C, Error>>,
88    {
89        if self.replicas.is_empty() {
90            return self.primary.acquire(cx, factory).await;
91        }
92
93        let idx = self.select_replica();
94        self.replicas[idx].acquire(cx, factory).await
95    }
96
97    /// Acquire a connection for write operations (always uses primary).
98    pub async fn acquire_write<F, Fut>(
99        &self,
100        cx: &Cx,
101        factory: F,
102    ) -> Outcome<PooledConnection<C>, Error>
103    where
104        F: Fn() -> Fut,
105        Fut: Future<Output = Outcome<C, Error>>,
106    {
107        self.primary.acquire(cx, factory).await
108    }
109
110    /// Acquire a connection from the primary (for read-after-write consistency).
111    pub async fn acquire_primary<F, Fut>(
112        &self,
113        cx: &Cx,
114        factory: F,
115    ) -> Outcome<PooledConnection<C>, Error>
116    where
117        F: Fn() -> Fut,
118        Fut: Future<Output = Outcome<C, Error>>,
119    {
120        self.primary.acquire(cx, factory).await
121    }
122
123    /// Get a reference to the primary pool.
124    pub fn primary(&self) -> &Pool<C> {
125        &self.primary
126    }
127
128    /// Get the replica pools.
129    pub fn replicas(&self) -> &[Pool<C>] {
130        &self.replicas
131    }
132
133    /// Get the number of replicas.
134    pub fn replica_count(&self) -> usize {
135        self.replicas.len()
136    }
137
138    /// Get the current strategy.
139    pub fn strategy(&self) -> ReplicaStrategy {
140        self.strategy
141    }
142
143    fn select_replica(&self) -> usize {
144        match self.strategy {
145            ReplicaStrategy::RoundRobin => {
146                let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed);
147                idx % self.replicas.len()
148            }
149            ReplicaStrategy::Random => {
150                // Mix counter bits to approximate uniform distribution without
151                // pulling in a random number generator dependency.
152                let seq = self.round_robin_counter.fetch_add(1, Ordering::Relaxed);
153                // Multiplicative hash using golden ratio constant.
154                // Use 32-bit mixing and cast to usize for portability across architectures.
155                #[allow(clippy::cast_possible_truncation)]
156                let seq32 = seq as u32;
157                let mixed = seq32.wrapping_mul(2_654_435_761_u32);
158                (mixed as usize) % self.replicas.len()
159            }
160        }
161    }
162}
163
164impl<C: Connection> std::fmt::Debug for ReplicaPool<C> {
165    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
166        f.debug_struct("ReplicaPool")
167            .field("primary", &"Pool { .. }")
168            .field("replicas", &self.replicas.len())
169            .field("strategy", &self.strategy)
170            .field(
171                "round_robin_counter",
172                &self.round_robin_counter.load(Ordering::Relaxed),
173            )
174            .finish()
175    }
176}