Skip to main content

noxu_dbi/
replica_ack.rs

1//! Replica-acknowledgment coordination trait used by `Transaction::commit`
2//! to honour `ReplicaAckPolicy` when an environment is replicated.
3//!
4//! This module exists in `noxu-dbi` (which both `noxu-db` and
5//! `noxu-rep` depend on) so that `noxu-db::Transaction` can call into a
6//! replication-aware ack coordinator without `noxu-db` taking a direct
7//! dependency on `noxu-rep`.  `noxu-rep::ReplicatedEnvironment`
8//! implements this trait; users wire an instance into a `noxu-db::Environment`
9//! via `Environment::set_replica_coordinator()`.
10//!
11//! Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
12
13use std::sync::Arc;
14use std::time::Duration;
15
16/// Replica acknowledgment policy as visible to the durability path.
17///
18/// Mirrors `noxu_db::durability::ReplicaAckPolicy` and
19/// `noxu_rep::commit_durability::ReplicaAckPolicy` without taking
20/// either as a dependency. The enum is enum-stable; adding a variant
21/// is a breaking change.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum ReplicaAckPolicyKind {
24    /// All electable replicas must acknowledge before the commit
25    /// returns.
26    All,
27    /// A simple majority of electable nodes (including the master)
28    /// must acknowledge.
29    SimpleMajority,
30    /// No replica acknowledgment required; commit returns as soon as
31    /// the master has fsynced locally.
32    None,
33}
34
35impl ReplicaAckPolicyKind {
36    /// Number of acks required from peer replicas for the given
37    /// total electable count (including the master itself). The
38    /// master's own write counts as one ack, so `All` requires
39    /// `electable_count - 1` peer acks.
40    pub fn required_acks(self, electable_count: u32) -> u32 {
41        match self {
42            ReplicaAckPolicyKind::All => {
43                if electable_count == 0 {
44                    0
45                } else {
46                    electable_count - 1
47                }
48            }
49            ReplicaAckPolicyKind::SimpleMajority => {
50                if electable_count <= 1 {
51                    0
52                } else {
53                    let majority = electable_count / 2 + 1;
54                    majority - 1
55                }
56            }
57            ReplicaAckPolicyKind::None => 0,
58        }
59    }
60}
61
62/// Reason an ack-wait did not satisfy the durability contract.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum AckWaitErrorKind {
65    /// `ack_timeout` elapsed before enough replicas acknowledged the
66    /// commit. The commit is durably written locally but does not meet
67    /// the configured replication policy.
68    Timeout,
69    /// Commit was attempted on a replica node, which is not permitted.
70    NotMaster,
71    /// The replicated environment is shutting down and cannot wait for
72    /// acks.
73    Shutdown,
74}
75
76/// Error returned by [`ReplicaAckCoordinator::await_replica_acks`] when
77/// the configured number of replica acks could not be obtained within
78/// the supplied timeout.
79#[derive(Debug, Clone)]
80pub struct AckWaitError {
81    /// Kind of failure (timeout / not master / shutdown).
82    pub kind: AckWaitErrorKind,
83    /// Number of acks required by the policy.
84    pub needed: u32,
85    /// Number of acks actually received before the deadline.
86    pub received: u32,
87}
88
89impl std::fmt::Display for AckWaitError {
90    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91        match self.kind {
92            AckWaitErrorKind::Timeout => write!(
93                f,
94                "replica ack timeout: needed {}, received {}",
95                self.needed, self.received,
96            ),
97            AckWaitErrorKind::NotMaster => {
98                write!(f, "commit attempted on non-master node")
99            }
100            AckWaitErrorKind::Shutdown => {
101                write!(f, "replicated environment is shutting down")
102            }
103        }
104    }
105}
106
107impl std::error::Error for AckWaitError {}
108
109/// Coordinates with a replication subsystem to satisfy a replica-ack
110/// policy on commit.
111///
112/// Implementations are typically `noxu_rep::ReplicatedEnvironment`. The
113/// `noxu-db::Environment` holds an `Option<Arc<dyn ReplicaAckCoordinator>>`;
114/// when present, `Transaction::commit_with_durability` calls
115/// [`Self::await_replica_acks`] after the local WAL fsync and propagates
116/// any error as `NoxuError::InsufficientReplicas`.
117pub trait ReplicaAckCoordinator: Send + Sync {
118    /// Block until at least `policy.required_acks(electable_count)`
119    /// replicas have acknowledged the most-recent local commit, or
120    /// until `timeout` elapses, whichever comes first.
121    ///
122    /// Returns `Ok(received_acks)` on success. Returns
123    /// [`AckWaitError`] if the deadline expires before the policy is
124    /// satisfied, or if this coordinator is not in a state where
125    /// commits may be acknowledged (replica node / shutting down).
126    ///
127    /// Implementations are responsible for assigning the commit VLSN
128    /// internally and for cleaning up internal tracking state on both
129    /// success and failure paths.
130    fn await_replica_acks(
131        &self,
132        policy: ReplicaAckPolicyKind,
133        timeout: Duration,
134    ) -> std::result::Result<u32, AckWaitError>;
135
136    /// Allocate the next commit VLSN and register `lsn` in the VLSN index.
137    ///
138    /// Called by `Environment::write_txn_commit_for_recovered` after
139    /// writing a `TxnCommit` WAL frame for a recovered prepared (XA)
140    /// transaction.  In a replicated environment the commit must be visible
141    /// to feeders and replicas, so it needs a real VLSN assigned and
142    /// registered in the `VlsnIndex`.
143    ///
144    /// Returns the allocated VLSN (> 0) on success, or 0
145    /// (`NULL_VLSN`) if this node is not in a replicated or master
146    /// state where VLSN assignment makes sense.
147    ///
148    /// The default implementation returns 0 (non-replicated env).  X-3 fix.
149    fn alloc_vlsn_for_recovered_commit(&self, _lsn: noxu_util::Lsn) -> u64 {
150        0
151    }
152
153    /// Pre-allocate the next VLSN for a recovered XA commit *without*
154    /// registering it in the VLSN index yet.
155    ///
156    /// R-3 fix: called BEFORE writing the `TxnCommit` WAL entry so the entry
157    /// can carry the allocated VLSN.  The caller then writes the entry and
158    /// calls `register_recovered_commit_vlsn` with the resulting commit LSN.
159    ///
160    /// Returns 0 (NULL_VLSN) for non-replicated environments.
161    fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
162        0
163    }
164
165    /// Register a previously pre-allocated VLSN in the VLSN index, mapping
166    /// it to the actual WAL commit LSN.
167    ///
168    /// R-3 fix: called AFTER writing the `TxnCommit` WAL entry with the
169    /// pre-allocated VLSN.  The `commit_lsn` is the LSN of the TxnCommit
170    /// entry just written to the log.
171    ///
172    /// No-op for non-replicated environments (default).
173    fn register_recovered_commit_vlsn(
174        &self,
175        _vlsn: u64,
176        _commit_lsn: noxu_util::Lsn,
177    ) {
178    }
179}
180
181/// Type alias used in `noxu-db::Environment` to hold the optional
182/// installed coordinator.
183pub type SharedReplicaAckCoordinator = Arc<dyn ReplicaAckCoordinator>;
184
185#[cfg(test)]
186mod tests {
187    use super::*;
188
189    #[test]
190    fn required_acks_all() {
191        assert_eq!(ReplicaAckPolicyKind::All.required_acks(0), 0);
192        assert_eq!(ReplicaAckPolicyKind::All.required_acks(1), 0);
193        assert_eq!(ReplicaAckPolicyKind::All.required_acks(3), 2);
194        assert_eq!(ReplicaAckPolicyKind::All.required_acks(5), 4);
195    }
196
197    #[test]
198    fn required_acks_simple_majority() {
199        assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(0), 0);
200        assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(1), 0);
201        assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(3), 1);
202        assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(5), 2);
203    }
204
205    #[test]
206    fn required_acks_none() {
207        assert_eq!(ReplicaAckPolicyKind::None.required_acks(0), 0);
208        assert_eq!(ReplicaAckPolicyKind::None.required_acks(100), 0);
209    }
210}