reddb_server/runtime/impl_replication_commit.rs
1use super::*;
2
3impl RedDBRuntime {
4 /// PLAN.md Phase 11.4 — owned snapshot of every registered
5 /// replica's state on this primary. Returns empty vec on
6 /// non-primary instances or when no replicas are registered yet.
7 pub fn primary_replica_snapshots(&self) -> Vec<crate::replication::primary::ReplicaState> {
8 self.inner
9 .db
10 .replication
11 .as_ref()
12 .map(|repl| repl.replica_snapshots())
13 .unwrap_or_default()
14 }
15
16 /// Issue #839 — the primary's current logical-WAL head LSN, used as
17 /// the reference point for per-replica lag. `0` on non-primary
18 /// instances or before the logical spool has any records.
19 pub fn primary_logical_head_lsn(&self) -> u64 {
20 self.inner
21 .db
22 .replication
23 .as_ref()
24 .map(|repl| repl.current_logical_lsn())
25 .unwrap_or(0)
26 }
27
28 /// Issue #839 — count of pulls that forced a full re-bootstrap since
29 /// process start. The primary operator alert signal; always `0` on a
30 /// non-primary instance.
31 pub fn replication_full_resync_count(&self) -> u64 {
32 self.inner
33 .db
34 .replication
35 .as_ref()
36 .map(|repl| repl.full_resync_count())
37 .unwrap_or(0)
38 }
39
40 /// Issue #839 — count of pulls served as a partial (incremental)
41 /// resync since process start. Always `0` on a non-primary instance.
42 pub fn replication_partial_resync_count(&self) -> u64 {
43 self.inner
44 .db
45 .replication
46 .as_ref()
47 .map(|repl| repl.partial_resync_count())
48 .unwrap_or(0)
49 }
50
51 pub fn enforce_primary_replica_retention_limits(
52 &self,
53 ) -> Vec<(String, reddb_file::ReplicationSlotInvalidationCause)> {
54 self.inner
55 .db
56 .replication
57 .as_ref()
58 .map(|repl| repl.enforce_retention_limits(crate::utils::now_unix_millis() as u128))
59 .unwrap_or_default()
60 }
61
62 /// Issue #839 — this node's stable identity, surfaced as the leader
63 /// identity in `/replication/status` when the node is the primary.
64 /// Reuses the same persisted id a replica advertises to the primary,
65 /// so a cluster has one stable name per node regardless of role.
66 pub fn node_id(&self) -> String {
67 self.resolve_replica_id()
68 }
69
70 /// Issue #826 — re-evaluate write-admission flow control from the
71 /// live primary replica registry and return the resulting throttle
72 /// state. Computes the max lag across in-quorum replicas (async
73 /// read-replicas excluded) against the primary's current LSN and
74 /// engages/releases the `WriteGate` throttle accordingly.
75 ///
76 /// No-op (returns `false`) on non-primary instances or when flow
77 /// control is disabled (soft target `0`). Cheap enough to call on
78 /// the replica-ack path and from `/metrics` scrapes so the throttle
79 /// tracks lag without a dedicated background loop.
80 pub fn refresh_replication_flow_control(&self) -> bool {
81 let flow = self.inner.write_gate.flow_control();
82 if !flow.is_enabled() {
83 return false;
84 }
85 let Some(repl) = self.inner.db.replication.as_ref() else {
86 return false;
87 };
88 let primary_lsn = repl.current_logical_lsn();
89 let replicas = repl.replica_snapshots();
90 flow.observe(&replicas, primary_lsn)
91 }
92
93 /// PLAN.md Phase 11.4 — active commit policy. Reads
94 /// `RED_PRIMARY_COMMIT_POLICY` once at runtime construction;
95 /// future env reloads will need a reload endpoint. Default is
96 /// `Local` — current behavior, no replica blocking.
97 pub fn commit_policy(&self) -> crate::replication::CommitPolicy {
98 crate::replication::CommitPolicy::from_env()
99 }
100
101 pub fn primary_replica_durability(&self) -> reddb_file::ReplicationDurability {
102 Self::primary_replica_durability_for_policy(self.commit_policy())
103 }
104
105 pub(crate) fn primary_replica_durability_for_policy(
106 policy: crate::replication::CommitPolicy,
107 ) -> reddb_file::ReplicationDurability {
108 match policy {
109 crate::replication::CommitPolicy::AckN(n) if n > 0 => {
110 reddb_file::ReplicationDurability::RemoteFlush {
111 quorum: u16::try_from(n).unwrap_or(u16::MAX),
112 }
113 }
114 _ => reddb_file::ReplicationDurability::Async,
115 }
116 }
117
118 /// PLAN.md Phase 11.5 — accessor for replica-side apply error
119 /// counters (gap / divergence / apply / decode / apply_miss). Returned
120 /// snapshot is consistent across the counters; the labels match
121 /// `reddb_replica_apply_errors_total{kind}`. Issue #814 adds the
122 /// `apply_miss` kind for deletes against a missing target.
123 pub fn replica_apply_error_counts(
124 &self,
125 ) -> [(crate::replication::logical::ApplyErrorKind, u64); 6] {
126 self.inner.replica_apply_metrics.snapshot()
127 }
128
129 /// PLAN.md Phase 11.4 — observability snapshot of every
130 /// replica's durable LSN as known to the commit waiter. Empty
131 /// vec on non-primary instances or when no replica has acked.
132 pub fn commit_waiter_snapshot(&self) -> Vec<(String, u64)> {
133 self.inner
134 .db
135 .replication
136 .as_ref()
137 .map(|repl| repl.commit_waiter.snapshot())
138 .unwrap_or_default()
139 }
140
141 /// PLAN.md Phase 11.4 — `(reached, timed_out, not_required, last_micros)`
142 /// counters for /metrics. Always-zero on non-primary instances.
143 pub fn commit_waiter_metrics_snapshot(&self) -> (u64, u64, u64, u64) {
144 self.inner
145 .db
146 .replication
147 .as_ref()
148 .map(|repl| repl.commit_waiter.metrics_snapshot())
149 .unwrap_or((0, 0, 0, 0))
150 }
151
152 /// Named commit watermark: highest LSN durable on the active
153 /// synchronous commit quorum. Returns 0 when the active policy does
154 /// not require replica durability.
155 pub fn commit_watermark(&self) -> u64 {
156 match self.primary_replica_durability() {
157 reddb_file::ReplicationDurability::RemoteWrite { quorum }
158 | reddb_file::ReplicationDurability::RemoteFlush { quorum }
159 | reddb_file::ReplicationDurability::RemoteApply { quorum }
160 if quorum > 0 =>
161 {
162 self.inner
163 .db
164 .replication
165 .as_ref()
166 .map(|repl| repl.commit_waiter.commit_watermark(u32::from(quorum)))
167 .unwrap_or(0)
168 }
169 _ if matches!(
170 self.commit_policy(),
171 crate::replication::CommitPolicy::Quorum
172 ) =>
173 {
174 self.inner
175 .db
176 .quorum
177 .as_ref()
178 .map(|q| q.commit_watermark())
179 .unwrap_or(0)
180 }
181 _ => 0,
182 }
183 }
184
185 /// PLAN.md Phase 11.4 — block until at least `count` replicas
186 /// have durably applied through `target_lsn`, or `timeout`
187 /// elapses. Returns the `AwaitOutcome` so the caller can decide
188 /// whether to surface a timeout error to the client or continue
189 /// (the policy mapping lives in the commit dispatcher).
190 ///
191 /// Used by the `ack_n` commit policy once the operator flips
192 /// `RED_PRIMARY_COMMIT_POLICY` away from `local`.
193 pub fn await_replica_acks(
194 &self,
195 target_lsn: u64,
196 count: u32,
197 timeout: std::time::Duration,
198 ) -> crate::replication::AwaitOutcome {
199 match &self.inner.db.replication {
200 Some(repl) => repl.commit_waiter.await_acks(target_lsn, count, timeout),
201 None => {
202 // No replication configured: policy must be `Local`.
203 // Treat as immediate `NotRequired` so callers don't
204 // block on a degenerate setup.
205 crate::replication::AwaitOutcome::NotRequired
206 }
207 }
208 }
209
210 /// PLAN.md Phase 11.4 — enforce the configured commit policy
211 /// against `post_lsn` (the LSN of the just-completed write).
212 /// Returns `Ok(AwaitOutcome)` on every successful enforcement
213 /// (including `Reached` and `TimedOut` when fail-on-timeout is
214 /// off). Returns `Err(ReadOnly)` only when a synchronous policy
215 /// misses its threshold and `RED_COMMIT_FAIL_ON_TIMEOUT=true` is
216 /// set.
217 ///
218 /// The HTTP / gRPC / wire surfaces map the error to 504 / wire
219 /// backoff. Default behaviour (env unset) logs warn and returns
220 /// success — matches PLAN.md "default v1 stays local" semantics
221 /// while still letting the operator opt into hard-blocking.
222 pub fn enforce_commit_policy(
223 &self,
224 post_lsn: u64,
225 ) -> RedDBResult<crate::replication::AwaitOutcome> {
226 let policy = self.commit_policy();
227 if matches!(policy, crate::replication::CommitPolicy::Quorum) {
228 return match self.inner.db.wait_for_replication_quorum(post_lsn) {
229 Ok(()) => Ok(crate::replication::AwaitOutcome::Reached(0)),
230 Err(err) => {
231 tracing::warn!(
232 target: "reddb::commit",
233 post_lsn,
234 error = %err,
235 "quorum: timed out waiting for commit watermark"
236 );
237 let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
238 .ok()
239 .map(|v| {
240 let t = v.trim();
241 t.eq_ignore_ascii_case("true")
242 || t == "1"
243 || t.eq_ignore_ascii_case("yes")
244 })
245 .unwrap_or(false);
246 if fail {
247 return Err(RedDBError::ReadOnly(format!(
248 "commit policy timed out at lsn {post_lsn}: {err} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
249 )));
250 }
251 Ok(crate::replication::AwaitOutcome::TimedOut {
252 observed: 0,
253 required: 1,
254 })
255 }
256 };
257 }
258
259 let durability = Self::primary_replica_durability_for_policy(policy);
260 let n = match durability {
261 reddb_file::ReplicationDurability::RemoteWrite { quorum }
262 | reddb_file::ReplicationDurability::RemoteFlush { quorum }
263 | reddb_file::ReplicationDurability::RemoteApply { quorum }
264 if quorum > 0 =>
265 {
266 u32::from(quorum)
267 }
268 _ => return Ok(crate::replication::AwaitOutcome::NotRequired),
269 };
270 let timeout_ms = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
271 .ok()
272 .and_then(|v| v.parse::<u64>().ok())
273 .unwrap_or(5_000);
274 let outcome =
275 self.await_replica_acks(post_lsn, n, std::time::Duration::from_millis(timeout_ms));
276 {
277 use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
278 let (event_outcome, fields) = match &outcome {
279 crate::replication::AwaitOutcome::Reached(count) => (
280 Outcome::Allowed,
281 vec![
282 (
283 "post_lsn".to_string(),
284 Sensitivity::raw(post_lsn.to_string()),
285 ),
286 ("required".to_string(), Sensitivity::raw(n.to_string())),
287 ("observed".to_string(), Sensitivity::raw(count.to_string())),
288 (
289 "timeout_ms".to_string(),
290 Sensitivity::raw(timeout_ms.to_string()),
291 ),
292 ],
293 ),
294 crate::replication::AwaitOutcome::TimedOut { observed, required } => (
295 Outcome::Error,
296 vec![
297 (
298 "post_lsn".to_string(),
299 Sensitivity::raw(post_lsn.to_string()),
300 ),
301 (
302 "required".to_string(),
303 Sensitivity::raw(required.to_string()),
304 ),
305 (
306 "observed".to_string(),
307 Sensitivity::raw(observed.to_string()),
308 ),
309 (
310 "timeout_ms".to_string(),
311 Sensitivity::raw(timeout_ms.to_string()),
312 ),
313 ],
314 ),
315 crate::replication::AwaitOutcome::NotRequired => (Outcome::Allowed, Vec::new()),
316 };
317 if !fields.is_empty() {
318 self.emit_control_event(
319 EventKind::ReplicationSafety,
320 event_outcome,
321 "replication_commit_policy",
322 Some(format!("replication:lsn:{post_lsn}")),
323 None,
324 fields,
325 )?;
326 }
327 }
328 if let crate::replication::AwaitOutcome::TimedOut { observed, required } = &outcome {
329 tracing::warn!(
330 target: "reddb::commit",
331 post_lsn,
332 observed = *observed,
333 required = *required,
334 timeout_ms,
335 "ack_n: timed out waiting for replicas"
336 );
337 let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
338 .ok()
339 .map(|v| {
340 let t = v.trim();
341 t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
342 })
343 .unwrap_or(false);
344 if fail {
345 return Err(RedDBError::ReadOnly(format!(
346 "commit policy timed out at lsn {post_lsn}: observed={observed} required={required} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
347 )));
348 }
349 }
350 Ok(outcome)
351 }
352}