reddb_server/runtime/impl_replication_commit.rs
1use super::*;
2
3impl RedDBRuntime {
4 /// PLAN.md Phase 11.4 — owned snapshot of every registered
5 /// replica's state on this primary. Returns empty vec on
6 /// non-primary instances or when no replicas are registered yet.
7 pub fn primary_replica_snapshots(&self) -> Vec<crate::replication::primary::ReplicaState> {
8 self.inner
9 .db
10 .replication
11 .as_ref()
12 .map(|repl| repl.replica_snapshots())
13 .unwrap_or_default()
14 }
15
16 /// Issue #839 — the primary's current logical-WAL head LSN, used as
17 /// the reference point for per-replica lag. `0` on non-primary
18 /// instances or before the logical spool has any records.
19 pub fn primary_logical_head_lsn(&self) -> u64 {
20 self.inner
21 .db
22 .replication
23 .as_ref()
24 .map(|repl| repl.current_logical_lsn())
25 .unwrap_or(0)
26 }
27
28 /// Issue #839 — count of pulls that forced a full re-bootstrap since
29 /// process start. The primary operator alert signal; always `0` on a
30 /// non-primary instance.
31 pub fn replication_full_resync_count(&self) -> u64 {
32 self.inner
33 .db
34 .replication
35 .as_ref()
36 .map(|repl| repl.full_resync_count())
37 .unwrap_or(0)
38 }
39
40 /// Issue #839 — count of pulls served as a partial (incremental)
41 /// resync since process start. Always `0` on a non-primary instance.
42 pub fn replication_partial_resync_count(&self) -> u64 {
43 self.inner
44 .db
45 .replication
46 .as_ref()
47 .map(|repl| repl.partial_resync_count())
48 .unwrap_or(0)
49 }
50
51 pub fn enforce_primary_replica_retention_limits(
52 &self,
53 ) -> Vec<(String, reddb_file::ReplicationSlotInvalidationCause)> {
54 self.inner
55 .db
56 .replication
57 .as_ref()
58 .map(|repl| repl.enforce_retention_limits(crate::utils::now_unix_millis() as u128))
59 .unwrap_or_default()
60 }
61
62 /// Issue #839 — this node's stable identity, surfaced as the leader
63 /// identity in `/replication/status` when the node is the primary.
64 /// Reuses the same persisted id a replica advertises to the primary,
65 /// so a cluster has one stable name per node regardless of role.
66 pub fn node_id(&self) -> String {
67 self.resolve_replica_id()
68 }
69
70 /// Issue #826 — re-evaluate write-admission flow control from the
71 /// live primary replica registry and return the resulting throttle
72 /// state. Computes the max lag across in-quorum replicas (async
73 /// read-replicas excluded) against the primary's current LSN and
74 /// engages/releases the `WriteGate` throttle accordingly.
75 ///
76 /// No-op (returns `false`) on non-primary instances or when flow
77 /// control is disabled (soft target `0`). Cheap enough to call on
78 /// the replica-ack path and from `/metrics` scrapes so the throttle
79 /// tracks lag without a dedicated background loop.
80 pub fn refresh_replication_flow_control(&self) -> bool {
81 let flow = self.inner.write_gate.flow_control();
82 if !flow.is_enabled() {
83 return false;
84 }
85 let Some(repl) = self.inner.db.replication.as_ref() else {
86 return false;
87 };
88 let primary_lsn = repl.current_logical_lsn();
89 let replicas = repl.replica_snapshots();
90 flow.observe(&replicas, primary_lsn)
91 }
92
93 /// PLAN.md Phase 11.4 — active commit policy. Reads
94 /// `RED_PRIMARY_COMMIT_POLICY` once at runtime construction;
95 /// future env reloads will need a reload endpoint. Default is
96 /// `Local` — current behavior, no replica blocking.
97 pub fn commit_policy(&self) -> crate::replication::CommitPolicy {
98 crate::replication::CommitPolicy::from_env()
99 }
100
101 /// Issue #1001 — resolve the *effective* commit policy for one collection by
102 /// combining the cluster default ([`commit_policy`](Self::commit_policy)),
103 /// the collection's declared override, the collection data model, and the
104 /// deployment's HA intent (`RED_CLUSTER_HA_INTENT`).
105 ///
106 /// Both write admission and failover eligibility call this so they read the
107 /// same decision: a durable model (transactional/queue/audit/config/vault)
108 /// may not silently use local-only acknowledgement under declared HA intent
109 /// — that returns [`CommitPolicyViolation`] and the caller must fail closed.
110 /// Explicitly ephemeral/cache-like collections may opt into local commit
111 /// with the documented failover-eligibility data-loss window.
112 pub fn resolve_commit_policy(
113 &self,
114 model: crate::cluster::CollectionDataModel,
115 collection_override: Option<crate::replication::CommitPolicy>,
116 ) -> Result<crate::cluster::CommitPolicyResolution, crate::cluster::CommitPolicyViolation> {
117 crate::cluster::resolve_commit_policy(
118 self.commit_policy(),
119 collection_override,
120 model,
121 crate::cluster::HaIntent::from_env(),
122 )
123 }
124
125 pub fn primary_replica_durability(&self) -> reddb_file::ReplicationDurability {
126 Self::primary_replica_durability_for_policy(self.commit_policy())
127 }
128
129 pub(crate) fn primary_replica_durability_for_policy(
130 policy: crate::replication::CommitPolicy,
131 ) -> reddb_file::ReplicationDurability {
132 match policy {
133 crate::replication::CommitPolicy::AckN(n) if n > 0 => {
134 reddb_file::ReplicationDurability::RemoteFlush {
135 quorum: u16::try_from(n).unwrap_or(u16::MAX),
136 }
137 }
138 _ => reddb_file::ReplicationDurability::Async,
139 }
140 }
141
142 /// PLAN.md Phase 11.5 — accessor for replica-side apply error
143 /// counters (gap / divergence / apply / decode / apply_miss). Returned
144 /// snapshot is consistent across the counters; the labels match
145 /// `reddb_replica_apply_errors_total{kind}`. Issue #814 adds the
146 /// `apply_miss` kind for deletes against a missing target.
147 pub fn replica_apply_error_counts(
148 &self,
149 ) -> [(crate::replication::logical::ApplyErrorKind, u64); 6] {
150 self.inner.replica_apply_metrics.snapshot()
151 }
152
153 /// PLAN.md Phase 11.4 — observability snapshot of every
154 /// replica's durable LSN as known to the commit waiter. Empty
155 /// vec on non-primary instances or when no replica has acked.
156 pub fn commit_waiter_snapshot(&self) -> Vec<(String, u64)> {
157 self.inner
158 .db
159 .replication
160 .as_ref()
161 .map(|repl| repl.commit_waiter.snapshot())
162 .unwrap_or_default()
163 }
164
165 /// PLAN.md Phase 11.4 — `(reached, timed_out, not_required, last_micros)`
166 /// counters for /metrics. Always-zero on non-primary instances.
167 pub fn commit_waiter_metrics_snapshot(&self) -> (u64, u64, u64, u64) {
168 self.inner
169 .db
170 .replication
171 .as_ref()
172 .map(|repl| repl.commit_waiter.metrics_snapshot())
173 .unwrap_or((0, 0, 0, 0))
174 }
175
176 /// Named commit watermark: highest LSN durable on the active
177 /// synchronous commit quorum. Returns 0 when the active policy does
178 /// not require replica durability.
179 pub fn commit_watermark(&self) -> u64 {
180 match self.primary_replica_durability() {
181 reddb_file::ReplicationDurability::RemoteWrite { quorum }
182 | reddb_file::ReplicationDurability::RemoteFlush { quorum }
183 | reddb_file::ReplicationDurability::RemoteApply { quorum }
184 if quorum > 0 =>
185 {
186 self.inner
187 .db
188 .replication
189 .as_ref()
190 .map(|repl| repl.commit_waiter.commit_watermark(u32::from(quorum)))
191 .unwrap_or(0)
192 }
193 _ if matches!(
194 self.commit_policy(),
195 crate::replication::CommitPolicy::Quorum
196 ) =>
197 {
198 self.inner
199 .db
200 .quorum
201 .as_ref()
202 .map(|q| q.commit_watermark())
203 .unwrap_or(0)
204 }
205 _ => 0,
206 }
207 }
208
209 /// PLAN.md Phase 11.4 — block until at least `count` replicas
210 /// have durably applied through `target_lsn`, or `timeout`
211 /// elapses. Returns the `AwaitOutcome` so the caller can decide
212 /// whether to surface a timeout error to the client or continue
213 /// (the policy mapping lives in the commit dispatcher).
214 ///
215 /// Used by the `ack_n` commit policy once the operator flips
216 /// `RED_PRIMARY_COMMIT_POLICY` away from `local`.
217 pub fn await_replica_acks(
218 &self,
219 target_lsn: u64,
220 count: u32,
221 timeout: std::time::Duration,
222 ) -> crate::replication::AwaitOutcome {
223 match &self.inner.db.replication {
224 Some(repl) => repl.commit_waiter.await_acks(target_lsn, count, timeout),
225 None => {
226 // No replication configured: policy must be `Local`.
227 // Treat as immediate `NotRequired` so callers don't
228 // block on a degenerate setup.
229 crate::replication::AwaitOutcome::NotRequired
230 }
231 }
232 }
233
234 /// PLAN.md Phase 11.4 — enforce the configured commit policy
235 /// against `post_lsn` (the LSN of the just-completed write).
236 /// Returns `Ok(AwaitOutcome)` on every successful enforcement
237 /// (including `Reached` and `TimedOut` when fail-on-timeout is
238 /// off). Returns `Err(ReadOnly)` only when a synchronous policy
239 /// misses its threshold and `RED_COMMIT_FAIL_ON_TIMEOUT=true` is
240 /// set.
241 ///
242 /// The HTTP / gRPC / wire surfaces map the error to 504 / wire
243 /// backoff. Default behaviour (env unset) logs warn and returns
244 /// success — matches PLAN.md "default v1 stays local" semantics
245 /// while still letting the operator opt into hard-blocking.
246 pub fn enforce_commit_policy(
247 &self,
248 post_lsn: u64,
249 ) -> RedDBResult<crate::replication::AwaitOutcome> {
250 let policy = self.commit_policy();
251 if matches!(policy, crate::replication::CommitPolicy::Quorum) {
252 return match self.inner.db.wait_for_replication_quorum(post_lsn) {
253 Ok(()) => Ok(crate::replication::AwaitOutcome::Reached(0)),
254 Err(err) => {
255 tracing::warn!(
256 target: "reddb::commit",
257 post_lsn,
258 error = %err,
259 "quorum: timed out waiting for commit watermark"
260 );
261 let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
262 .ok()
263 .map(|v| {
264 let t = v.trim();
265 t.eq_ignore_ascii_case("true")
266 || t == "1"
267 || t.eq_ignore_ascii_case("yes")
268 })
269 .unwrap_or(false);
270 if fail {
271 return Err(RedDBError::ReadOnly(format!(
272 "commit policy timed out at lsn {post_lsn}: {err} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
273 )));
274 }
275 Ok(crate::replication::AwaitOutcome::TimedOut {
276 observed: 0,
277 required: 1,
278 })
279 }
280 };
281 }
282
283 let durability = Self::primary_replica_durability_for_policy(policy);
284 let n = match durability {
285 reddb_file::ReplicationDurability::RemoteWrite { quorum }
286 | reddb_file::ReplicationDurability::RemoteFlush { quorum }
287 | reddb_file::ReplicationDurability::RemoteApply { quorum }
288 if quorum > 0 =>
289 {
290 u32::from(quorum)
291 }
292 _ => return Ok(crate::replication::AwaitOutcome::NotRequired),
293 };
294 let timeout_ms = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
295 .ok()
296 .and_then(|v| v.parse::<u64>().ok())
297 .unwrap_or(5_000);
298 let outcome =
299 self.await_replica_acks(post_lsn, n, std::time::Duration::from_millis(timeout_ms));
300 {
301 use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
302 let (event_outcome, fields) = match &outcome {
303 crate::replication::AwaitOutcome::Reached(count) => (
304 Outcome::Allowed,
305 vec![
306 (
307 "post_lsn".to_string(),
308 Sensitivity::raw(post_lsn.to_string()),
309 ),
310 ("required".to_string(), Sensitivity::raw(n.to_string())),
311 ("observed".to_string(), Sensitivity::raw(count.to_string())),
312 (
313 "timeout_ms".to_string(),
314 Sensitivity::raw(timeout_ms.to_string()),
315 ),
316 ],
317 ),
318 crate::replication::AwaitOutcome::TimedOut { observed, required } => (
319 Outcome::Error,
320 vec![
321 (
322 "post_lsn".to_string(),
323 Sensitivity::raw(post_lsn.to_string()),
324 ),
325 (
326 "required".to_string(),
327 Sensitivity::raw(required.to_string()),
328 ),
329 (
330 "observed".to_string(),
331 Sensitivity::raw(observed.to_string()),
332 ),
333 (
334 "timeout_ms".to_string(),
335 Sensitivity::raw(timeout_ms.to_string()),
336 ),
337 ],
338 ),
339 crate::replication::AwaitOutcome::NotRequired => (Outcome::Allowed, Vec::new()),
340 };
341 if !fields.is_empty() {
342 self.emit_control_event(
343 EventKind::ReplicationSafety,
344 event_outcome,
345 "replication_commit_policy",
346 Some(format!("replication:lsn:{post_lsn}")),
347 None,
348 fields,
349 )?;
350 }
351 }
352 if let crate::replication::AwaitOutcome::TimedOut { observed, required } = &outcome {
353 tracing::warn!(
354 target: "reddb::commit",
355 post_lsn,
356 observed = *observed,
357 required = *required,
358 timeout_ms,
359 "ack_n: timed out waiting for replicas"
360 );
361 let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
362 .ok()
363 .map(|v| {
364 let t = v.trim();
365 t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
366 })
367 .unwrap_or(false);
368 if fail {
369 return Err(RedDBError::ReadOnly(format!(
370 "commit policy timed out at lsn {post_lsn}: observed={observed} required={required} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
371 )));
372 }
373 }
374 Ok(outcome)
375 }
376}