noxu_dbi/replica_ack.rs
1//! Replica-acknowledgment coordination trait used by `Transaction::commit`
2//! to honour `ReplicaAckPolicy` when an environment is replicated.
3//!
4//! This module exists in `noxu-dbi` (which both `noxu-db` and
5//! `noxu-rep` depend on) so that `noxu-db::Transaction` can call into a
6//! replication-aware ack coordinator without `noxu-db` taking a direct
7//! dependency on `noxu-rep`. `noxu-rep::ReplicatedEnvironment`
8//! implements this trait; users wire an instance into a `noxu-db::Environment`
9//! via `Environment::set_replica_coordinator()`.
10//!
11//! Closes finding F1 of `docs/src/internal/api-audit-2026-05-rep.md`.
12
13use std::sync::Arc;
14use std::time::Duration;
15
16/// Replica acknowledgment policy as visible to the durability path.
17///
18/// Mirrors `noxu_db::durability::ReplicaAckPolicy` and
19/// `noxu_rep::commit_durability::ReplicaAckPolicy` without taking
20/// either as a dependency. The enum is enum-stable; adding a variant
21/// is a breaking change.
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
23pub enum ReplicaAckPolicyKind {
24 /// All electable replicas must acknowledge before the commit
25 /// returns.
26 All,
27 /// A simple majority of electable nodes (including the master)
28 /// must acknowledge.
29 SimpleMajority,
30 /// No replica acknowledgment required; commit returns as soon as
31 /// the master has fsynced locally.
32 None,
33}
34
35impl ReplicaAckPolicyKind {
36 /// Number of acks required from peer replicas for the given
37 /// total electable count (including the master itself). The
38 /// master's own write counts as one ack, so `All` requires
39 /// `electable_count - 1` peer acks.
40 pub fn required_acks(self, electable_count: u32) -> u32 {
41 match self {
42 ReplicaAckPolicyKind::All => {
43 if electable_count == 0 {
44 0
45 } else {
46 electable_count - 1
47 }
48 }
49 ReplicaAckPolicyKind::SimpleMajority => {
50 if electable_count <= 1 {
51 0
52 } else {
53 let majority = electable_count / 2 + 1;
54 majority - 1
55 }
56 }
57 ReplicaAckPolicyKind::None => 0,
58 }
59 }
60}
61
62/// Reason an ack-wait did not satisfy the durability contract.
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum AckWaitErrorKind {
65 /// `ack_timeout` elapsed before enough replicas acknowledged the
66 /// commit. The commit is durably written locally but does not meet
67 /// the configured replication policy.
68 Timeout,
69 /// Commit was attempted on a replica node, which is not permitted.
70 NotMaster,
71 /// The replicated environment is shutting down and cannot wait for
72 /// acks.
73 Shutdown,
74}
75
76/// Error returned by [`ReplicaAckCoordinator::await_replica_acks`] when
77/// the configured number of replica acks could not be obtained within
78/// the supplied timeout.
79#[derive(Debug, Clone)]
80pub struct AckWaitError {
81 /// Kind of failure (timeout / not master / shutdown).
82 pub kind: AckWaitErrorKind,
83 /// Number of acks required by the policy.
84 pub needed: u32,
85 /// Number of acks actually received before the deadline.
86 pub received: u32,
87}
88
89impl std::fmt::Display for AckWaitError {
90 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
91 match self.kind {
92 AckWaitErrorKind::Timeout => write!(
93 f,
94 "replica ack timeout: needed {}, received {}",
95 self.needed, self.received,
96 ),
97 AckWaitErrorKind::NotMaster => {
98 write!(f, "commit attempted on non-master node")
99 }
100 AckWaitErrorKind::Shutdown => {
101 write!(f, "replicated environment is shutting down")
102 }
103 }
104 }
105}
106
107impl std::error::Error for AckWaitError {}
108
109/// Coordinates with a replication subsystem to satisfy a replica-ack
110/// policy on commit.
111///
112/// Implementations are typically `noxu_rep::ReplicatedEnvironment`. The
113/// `noxu-db::Environment` holds an `Option<Arc<dyn ReplicaAckCoordinator>>`;
114/// when present, `Transaction::commit_with_durability` calls
115/// [`Self::await_replica_acks`] after the local WAL fsync and propagates
116/// any error as `NoxuError::InsufficientReplicas`.
117pub trait ReplicaAckCoordinator: Send + Sync {
118 /// Block until at least `policy.required_acks(electable_count)`
119 /// replicas have acknowledged the most-recent local commit, or
120 /// until `timeout` elapses, whichever comes first.
121 ///
122 /// Returns `Ok(received_acks)` on success. Returns
123 /// [`AckWaitError`] if the deadline expires before the policy is
124 /// satisfied, or if this coordinator is not in a state where
125 /// commits may be acknowledged (replica node / shutting down).
126 ///
127 /// Implementations are responsible for assigning the commit VLSN
128 /// internally and for cleaning up internal tracking state on both
129 /// success and failure paths.
130 fn await_replica_acks(
131 &self,
132 policy: ReplicaAckPolicyKind,
133 timeout: Duration,
134 ) -> std::result::Result<u32, AckWaitError>;
135
136 /// Allocate the next commit VLSN and register `lsn` in the VLSN index.
137 ///
138 /// Called by `Environment::write_txn_commit_for_recovered` after
139 /// writing a `TxnCommit` WAL frame for a recovered prepared (XA)
140 /// transaction. In a replicated environment the commit must be visible
141 /// to feeders and replicas, so it needs a real VLSN assigned and
142 /// registered in the `VlsnIndex`.
143 ///
144 /// Returns the allocated VLSN (> 0) on success, or 0
145 /// (`NULL_VLSN`) if this node is not in a replicated or master
146 /// state where VLSN assignment makes sense.
147 ///
148 /// The default implementation returns 0 (non-replicated env). X-3 fix.
149 fn alloc_vlsn_for_recovered_commit(&self, _lsn: noxu_util::Lsn) -> u64 {
150 0
151 }
152
153 /// Pre-allocate the next VLSN for a recovered XA commit *without*
154 /// registering it in the VLSN index yet.
155 ///
156 /// R-3 fix: called BEFORE writing the `TxnCommit` WAL entry so the entry
157 /// can carry the allocated VLSN. The caller then writes the entry and
158 /// calls `register_recovered_commit_vlsn` with the resulting commit LSN.
159 ///
160 /// Returns 0 (NULL_VLSN) for non-replicated environments.
161 fn pre_alloc_vlsn_for_recovered_commit(&self) -> u64 {
162 0
163 }
164
165 /// Register a previously pre-allocated VLSN in the VLSN index, mapping
166 /// it to the actual WAL commit LSN.
167 ///
168 /// R-3 fix: called AFTER writing the `TxnCommit` WAL entry with the
169 /// pre-allocated VLSN. The `commit_lsn` is the LSN of the TxnCommit
170 /// entry just written to the log.
171 ///
172 /// No-op for non-replicated environments (default).
173 fn register_recovered_commit_vlsn(
174 &self,
175 _vlsn: u64,
176 _commit_lsn: noxu_util::Lsn,
177 ) {
178 }
179}
180
181/// Type alias used in `noxu-db::Environment` to hold the optional
182/// installed coordinator.
183pub type SharedReplicaAckCoordinator = Arc<dyn ReplicaAckCoordinator>;
184
185#[cfg(test)]
186mod tests {
187 use super::*;
188
189 #[test]
190 fn required_acks_all() {
191 assert_eq!(ReplicaAckPolicyKind::All.required_acks(0), 0);
192 assert_eq!(ReplicaAckPolicyKind::All.required_acks(1), 0);
193 assert_eq!(ReplicaAckPolicyKind::All.required_acks(3), 2);
194 assert_eq!(ReplicaAckPolicyKind::All.required_acks(5), 4);
195 }
196
197 #[test]
198 fn required_acks_simple_majority() {
199 assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(0), 0);
200 assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(1), 0);
201 assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(3), 1);
202 assert_eq!(ReplicaAckPolicyKind::SimpleMajority.required_acks(5), 2);
203 }
204
205 #[test]
206 fn required_acks_none() {
207 assert_eq!(ReplicaAckPolicyKind::None.required_acks(0), 0);
208 assert_eq!(ReplicaAckPolicyKind::None.required_acks(100), 0);
209 }
210}