Skip to main content

reddb_server/runtime/
impl_replication_commit.rs

1use super::*;
2
3impl RedDBRuntime {
4    /// PLAN.md Phase 11.4 — owned snapshot of every registered
5    /// replica's state on this primary. Returns empty vec on
6    /// non-primary instances or when no replicas are registered yet.
7    pub fn primary_replica_snapshots(&self) -> Vec<crate::replication::primary::ReplicaState> {
8        self.inner
9            .db
10            .replication
11            .as_ref()
12            .map(|repl| repl.replica_snapshots())
13            .unwrap_or_default()
14    }
15
16    /// Issue #839 — the primary's current logical-WAL head LSN, used as
17    /// the reference point for per-replica lag. `0` on non-primary
18    /// instances or before the logical spool has any records.
19    pub fn primary_logical_head_lsn(&self) -> u64 {
20        self.inner
21            .db
22            .replication
23            .as_ref()
24            .map(|repl| repl.current_logical_lsn())
25            .unwrap_or(0)
26    }
27
28    /// Issue #839 — count of pulls that forced a full re-bootstrap since
29    /// process start. The primary operator alert signal; always `0` on a
30    /// non-primary instance.
31    pub fn replication_full_resync_count(&self) -> u64 {
32        self.inner
33            .db
34            .replication
35            .as_ref()
36            .map(|repl| repl.full_resync_count())
37            .unwrap_or(0)
38    }
39
40    /// Issue #839 — count of pulls served as a partial (incremental)
41    /// resync since process start. Always `0` on a non-primary instance.
42    pub fn replication_partial_resync_count(&self) -> u64 {
43        self.inner
44            .db
45            .replication
46            .as_ref()
47            .map(|repl| repl.partial_resync_count())
48            .unwrap_or(0)
49    }
50
51    pub fn enforce_primary_replica_retention_limits(
52        &self,
53    ) -> Vec<(String, reddb_file::ReplicationSlotInvalidationCause)> {
54        self.inner
55            .db
56            .replication
57            .as_ref()
58            .map(|repl| repl.enforce_retention_limits(crate::utils::now_unix_millis() as u128))
59            .unwrap_or_default()
60    }
61
62    /// Issue #839 — this node's stable identity, surfaced as the leader
63    /// identity in `/replication/status` when the node is the primary.
64    /// Reuses the same persisted id a replica advertises to the primary,
65    /// so a cluster has one stable name per node regardless of role.
66    pub fn node_id(&self) -> String {
67        self.resolve_replica_id()
68    }
69
70    /// Issue #826 — re-evaluate write-admission flow control from the
71    /// live primary replica registry and return the resulting throttle
72    /// state. Computes the max lag across in-quorum replicas (async
73    /// read-replicas excluded) against the primary's current LSN and
74    /// engages/releases the `WriteGate` throttle accordingly.
75    ///
76    /// No-op (returns `false`) on non-primary instances or when flow
77    /// control is disabled (soft target `0`). Cheap enough to call on
78    /// the replica-ack path and from `/metrics` scrapes so the throttle
79    /// tracks lag without a dedicated background loop.
80    pub fn refresh_replication_flow_control(&self) -> bool {
81        let flow = self.inner.write_gate.flow_control();
82        if !flow.is_enabled() {
83            return false;
84        }
85        let Some(repl) = self.inner.db.replication.as_ref() else {
86            return false;
87        };
88        let primary_lsn = repl.current_logical_lsn();
89        let replicas = repl.replica_snapshots();
90        flow.observe(&replicas, primary_lsn)
91    }
92
93    /// PLAN.md Phase 11.4 — active commit policy. Reads
94    /// `RED_PRIMARY_COMMIT_POLICY` once at runtime construction;
95    /// future env reloads will need a reload endpoint. Default is
96    /// `Local` — current behavior, no replica blocking.
97    pub fn commit_policy(&self) -> crate::replication::CommitPolicy {
98        crate::replication::CommitPolicy::from_env()
99    }
100
101    pub fn primary_replica_durability(&self) -> reddb_file::ReplicationDurability {
102        Self::primary_replica_durability_for_policy(self.commit_policy())
103    }
104
105    pub(crate) fn primary_replica_durability_for_policy(
106        policy: crate::replication::CommitPolicy,
107    ) -> reddb_file::ReplicationDurability {
108        match policy {
109            crate::replication::CommitPolicy::AckN(n) if n > 0 => {
110                reddb_file::ReplicationDurability::RemoteFlush {
111                    quorum: u16::try_from(n).unwrap_or(u16::MAX),
112                }
113            }
114            _ => reddb_file::ReplicationDurability::Async,
115        }
116    }
117
118    /// PLAN.md Phase 11.5 — accessor for replica-side apply error
119    /// counters (gap / divergence / apply / decode / apply_miss). Returned
120    /// snapshot is consistent across the counters; the labels match
121    /// `reddb_replica_apply_errors_total{kind}`. Issue #814 adds the
122    /// `apply_miss` kind for deletes against a missing target.
123    pub fn replica_apply_error_counts(
124        &self,
125    ) -> [(crate::replication::logical::ApplyErrorKind, u64); 6] {
126        self.inner.replica_apply_metrics.snapshot()
127    }
128
129    /// PLAN.md Phase 11.4 — observability snapshot of every
130    /// replica's durable LSN as known to the commit waiter. Empty
131    /// vec on non-primary instances or when no replica has acked.
132    pub fn commit_waiter_snapshot(&self) -> Vec<(String, u64)> {
133        self.inner
134            .db
135            .replication
136            .as_ref()
137            .map(|repl| repl.commit_waiter.snapshot())
138            .unwrap_or_default()
139    }
140
141    /// PLAN.md Phase 11.4 — `(reached, timed_out, not_required, last_micros)`
142    /// counters for /metrics. Always-zero on non-primary instances.
143    pub fn commit_waiter_metrics_snapshot(&self) -> (u64, u64, u64, u64) {
144        self.inner
145            .db
146            .replication
147            .as_ref()
148            .map(|repl| repl.commit_waiter.metrics_snapshot())
149            .unwrap_or((0, 0, 0, 0))
150    }
151
152    /// Named commit watermark: highest LSN durable on the active
153    /// synchronous commit quorum. Returns 0 when the active policy does
154    /// not require replica durability.
155    pub fn commit_watermark(&self) -> u64 {
156        match self.primary_replica_durability() {
157            reddb_file::ReplicationDurability::RemoteWrite { quorum }
158            | reddb_file::ReplicationDurability::RemoteFlush { quorum }
159            | reddb_file::ReplicationDurability::RemoteApply { quorum }
160                if quorum > 0 =>
161            {
162                self.inner
163                    .db
164                    .replication
165                    .as_ref()
166                    .map(|repl| repl.commit_waiter.commit_watermark(u32::from(quorum)))
167                    .unwrap_or(0)
168            }
169            _ if matches!(
170                self.commit_policy(),
171                crate::replication::CommitPolicy::Quorum
172            ) =>
173            {
174                self.inner
175                    .db
176                    .quorum
177                    .as_ref()
178                    .map(|q| q.commit_watermark())
179                    .unwrap_or(0)
180            }
181            _ => 0,
182        }
183    }
184
185    /// PLAN.md Phase 11.4 — block until at least `count` replicas
186    /// have durably applied through `target_lsn`, or `timeout`
187    /// elapses. Returns the `AwaitOutcome` so the caller can decide
188    /// whether to surface a timeout error to the client or continue
189    /// (the policy mapping lives in the commit dispatcher).
190    ///
191    /// Used by the `ack_n` commit policy once the operator flips
192    /// `RED_PRIMARY_COMMIT_POLICY` away from `local`.
193    pub fn await_replica_acks(
194        &self,
195        target_lsn: u64,
196        count: u32,
197        timeout: std::time::Duration,
198    ) -> crate::replication::AwaitOutcome {
199        match &self.inner.db.replication {
200            Some(repl) => repl.commit_waiter.await_acks(target_lsn, count, timeout),
201            None => {
202                // No replication configured: policy must be `Local`.
203                // Treat as immediate `NotRequired` so callers don't
204                // block on a degenerate setup.
205                crate::replication::AwaitOutcome::NotRequired
206            }
207        }
208    }
209
210    /// PLAN.md Phase 11.4 — enforce the configured commit policy
211    /// against `post_lsn` (the LSN of the just-completed write).
212    /// Returns `Ok(AwaitOutcome)` on every successful enforcement
213    /// (including `Reached` and `TimedOut` when fail-on-timeout is
214    /// off). Returns `Err(ReadOnly)` only when a synchronous policy
215    /// misses its threshold and `RED_COMMIT_FAIL_ON_TIMEOUT=true` is
216    /// set.
217    ///
218    /// The HTTP / gRPC / wire surfaces map the error to 504 / wire
219    /// backoff. Default behaviour (env unset) logs warn and returns
220    /// success — matches PLAN.md "default v1 stays local" semantics
221    /// while still letting the operator opt into hard-blocking.
222    pub fn enforce_commit_policy(
223        &self,
224        post_lsn: u64,
225    ) -> RedDBResult<crate::replication::AwaitOutcome> {
226        let policy = self.commit_policy();
227        if matches!(policy, crate::replication::CommitPolicy::Quorum) {
228            return match self.inner.db.wait_for_replication_quorum(post_lsn) {
229                Ok(()) => Ok(crate::replication::AwaitOutcome::Reached(0)),
230                Err(err) => {
231                    tracing::warn!(
232                        target: "reddb::commit",
233                        post_lsn,
234                        error = %err,
235                        "quorum: timed out waiting for commit watermark"
236                    );
237                    let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
238                        .ok()
239                        .map(|v| {
240                            let t = v.trim();
241                            t.eq_ignore_ascii_case("true")
242                                || t == "1"
243                                || t.eq_ignore_ascii_case("yes")
244                        })
245                        .unwrap_or(false);
246                    if fail {
247                        return Err(RedDBError::ReadOnly(format!(
248                            "commit policy timed out at lsn {post_lsn}: {err} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
249                        )));
250                    }
251                    Ok(crate::replication::AwaitOutcome::TimedOut {
252                        observed: 0,
253                        required: 1,
254                    })
255                }
256            };
257        }
258
259        let durability = Self::primary_replica_durability_for_policy(policy);
260        let n = match durability {
261            reddb_file::ReplicationDurability::RemoteWrite { quorum }
262            | reddb_file::ReplicationDurability::RemoteFlush { quorum }
263            | reddb_file::ReplicationDurability::RemoteApply { quorum }
264                if quorum > 0 =>
265            {
266                u32::from(quorum)
267            }
268            _ => return Ok(crate::replication::AwaitOutcome::NotRequired),
269        };
270        let timeout_ms = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
271            .ok()
272            .and_then(|v| v.parse::<u64>().ok())
273            .unwrap_or(5_000);
274        let outcome =
275            self.await_replica_acks(post_lsn, n, std::time::Duration::from_millis(timeout_ms));
276        {
277            use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
278            let (event_outcome, fields) = match &outcome {
279                crate::replication::AwaitOutcome::Reached(count) => (
280                    Outcome::Allowed,
281                    vec![
282                        (
283                            "post_lsn".to_string(),
284                            Sensitivity::raw(post_lsn.to_string()),
285                        ),
286                        ("required".to_string(), Sensitivity::raw(n.to_string())),
287                        ("observed".to_string(), Sensitivity::raw(count.to_string())),
288                        (
289                            "timeout_ms".to_string(),
290                            Sensitivity::raw(timeout_ms.to_string()),
291                        ),
292                    ],
293                ),
294                crate::replication::AwaitOutcome::TimedOut { observed, required } => (
295                    Outcome::Error,
296                    vec![
297                        (
298                            "post_lsn".to_string(),
299                            Sensitivity::raw(post_lsn.to_string()),
300                        ),
301                        (
302                            "required".to_string(),
303                            Sensitivity::raw(required.to_string()),
304                        ),
305                        (
306                            "observed".to_string(),
307                            Sensitivity::raw(observed.to_string()),
308                        ),
309                        (
310                            "timeout_ms".to_string(),
311                            Sensitivity::raw(timeout_ms.to_string()),
312                        ),
313                    ],
314                ),
315                crate::replication::AwaitOutcome::NotRequired => (Outcome::Allowed, Vec::new()),
316            };
317            if !fields.is_empty() {
318                self.emit_control_event(
319                    EventKind::ReplicationSafety,
320                    event_outcome,
321                    "replication_commit_policy",
322                    Some(format!("replication:lsn:{post_lsn}")),
323                    None,
324                    fields,
325                )?;
326            }
327        }
328        if let crate::replication::AwaitOutcome::TimedOut { observed, required } = &outcome {
329            tracing::warn!(
330                target: "reddb::commit",
331                post_lsn,
332                observed = *observed,
333                required = *required,
334                timeout_ms,
335                "ack_n: timed out waiting for replicas"
336            );
337            let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
338                .ok()
339                .map(|v| {
340                    let t = v.trim();
341                    t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
342                })
343                .unwrap_or(false);
344            if fail {
345                return Err(RedDBError::ReadOnly(format!(
346                    "commit policy timed out at lsn {post_lsn}: observed={observed} required={required} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
347                )));
348            }
349        }
350        Ok(outcome)
351    }
352}