Skip to main content

reddb_server/runtime/
impl_replication_commit.rs

1use super::*;
2
3impl RedDBRuntime {
4    /// PLAN.md Phase 11.4 — owned snapshot of every registered
5    /// replica's state on this primary. Returns empty vec on
6    /// non-primary instances or when no replicas are registered yet.
7    pub fn primary_replica_snapshots(&self) -> Vec<crate::replication::primary::ReplicaState> {
8        self.inner
9            .db
10            .replication
11            .as_ref()
12            .map(|repl| repl.replica_snapshots())
13            .unwrap_or_default()
14    }
15
16    /// Issue #839 — the primary's current logical-WAL head LSN, used as
17    /// the reference point for per-replica lag. `0` on non-primary
18    /// instances or before the logical spool has any records.
19    pub fn primary_logical_head_lsn(&self) -> u64 {
20        self.inner
21            .db
22            .replication
23            .as_ref()
24            .map(|repl| repl.current_logical_lsn())
25            .unwrap_or(0)
26    }
27
28    /// Issue #839 — count of pulls that forced a full re-bootstrap since
29    /// process start. The primary operator alert signal; always `0` on a
30    /// non-primary instance.
31    pub fn replication_full_resync_count(&self) -> u64 {
32        self.inner
33            .db
34            .replication
35            .as_ref()
36            .map(|repl| repl.full_resync_count())
37            .unwrap_or(0)
38    }
39
40    /// Issue #839 — count of pulls served as a partial (incremental)
41    /// resync since process start. Always `0` on a non-primary instance.
42    pub fn replication_partial_resync_count(&self) -> u64 {
43        self.inner
44            .db
45            .replication
46            .as_ref()
47            .map(|repl| repl.partial_resync_count())
48            .unwrap_or(0)
49    }
50
51    pub fn enforce_primary_replica_retention_limits(
52        &self,
53    ) -> Vec<(String, reddb_file::ReplicationSlotInvalidationCause)> {
54        self.inner
55            .db
56            .replication
57            .as_ref()
58            .map(|repl| repl.enforce_retention_limits(crate::utils::now_unix_millis() as u128))
59            .unwrap_or_default()
60    }
61
62    /// Issue #839 — this node's stable identity, surfaced as the leader
63    /// identity in `/replication/status` when the node is the primary.
64    /// Reuses the same persisted id a replica advertises to the primary,
65    /// so a cluster has one stable name per node regardless of role.
66    pub fn node_id(&self) -> String {
67        self.resolve_replica_id()
68    }
69
70    /// Issue #826 — re-evaluate write-admission flow control from the
71    /// live primary replica registry and return the resulting throttle
72    /// state. Computes the max lag across in-quorum replicas (async
73    /// read-replicas excluded) against the primary's current LSN and
74    /// engages/releases the `WriteGate` throttle accordingly.
75    ///
76    /// No-op (returns `false`) on non-primary instances or when flow
77    /// control is disabled (soft target `0`). Cheap enough to call on
78    /// the replica-ack path and from `/metrics` scrapes so the throttle
79    /// tracks lag without a dedicated background loop.
80    pub fn refresh_replication_flow_control(&self) -> bool {
81        let flow = self.inner.write_gate.flow_control();
82        if !flow.is_enabled() {
83            return false;
84        }
85        let Some(repl) = self.inner.db.replication.as_ref() else {
86            return false;
87        };
88        let primary_lsn = repl.current_logical_lsn();
89        let replicas = repl.replica_snapshots();
90        flow.observe(&replicas, primary_lsn)
91    }
92
93    /// PLAN.md Phase 11.4 — active commit policy. Reads
94    /// `RED_PRIMARY_COMMIT_POLICY` once at runtime construction;
95    /// future env reloads will need a reload endpoint. Default is
96    /// `Local` — current behavior, no replica blocking.
97    pub fn commit_policy(&self) -> crate::replication::CommitPolicy {
98        crate::replication::CommitPolicy::from_env()
99    }
100
101    /// Issue #1001 — resolve the *effective* commit policy for one collection by
102    /// combining the cluster default ([`commit_policy`](Self::commit_policy)),
103    /// the collection's declared override, the collection data model, and the
104    /// deployment's HA intent (`RED_CLUSTER_HA_INTENT`).
105    ///
106    /// Both write admission and failover eligibility call this so they read the
107    /// same decision: a durable model (transactional/queue/audit/config/vault)
108    /// may not silently use local-only acknowledgement under declared HA intent
109    /// — that returns [`CommitPolicyViolation`] and the caller must fail closed.
110    /// Explicitly ephemeral/cache-like collections may opt into local commit
111    /// with the documented failover-eligibility data-loss window.
112    pub fn resolve_commit_policy(
113        &self,
114        model: crate::cluster::CollectionDataModel,
115        collection_override: Option<crate::replication::CommitPolicy>,
116    ) -> Result<crate::cluster::CommitPolicyResolution, crate::cluster::CommitPolicyViolation> {
117        crate::cluster::resolve_commit_policy(
118            self.commit_policy(),
119            collection_override,
120            model,
121            crate::cluster::HaIntent::from_env(),
122        )
123    }
124
125    pub fn primary_replica_durability(&self) -> reddb_file::ReplicationDurability {
126        Self::primary_replica_durability_for_policy(self.commit_policy())
127    }
128
129    pub(crate) fn primary_replica_durability_for_policy(
130        policy: crate::replication::CommitPolicy,
131    ) -> reddb_file::ReplicationDurability {
132        match policy {
133            crate::replication::CommitPolicy::AckN(n) if n > 0 => {
134                reddb_file::ReplicationDurability::RemoteFlush {
135                    quorum: u16::try_from(n).unwrap_or(u16::MAX),
136                }
137            }
138            _ => reddb_file::ReplicationDurability::Async,
139        }
140    }
141
142    /// PLAN.md Phase 11.5 — accessor for replica-side apply error
143    /// counters (gap / divergence / apply / decode / apply_miss). Returned
144    /// snapshot is consistent across the counters; the labels match
145    /// `reddb_replica_apply_errors_total{kind}`. Issue #814 adds the
146    /// `apply_miss` kind for deletes against a missing target.
147    pub fn replica_apply_error_counts(
148        &self,
149    ) -> [(crate::replication::logical::ApplyErrorKind, u64); 6] {
150        self.inner.replica_apply_metrics.snapshot()
151    }
152
153    /// PLAN.md Phase 11.4 — observability snapshot of every
154    /// replica's durable LSN as known to the commit waiter. Empty
155    /// vec on non-primary instances or when no replica has acked.
156    pub fn commit_waiter_snapshot(&self) -> Vec<(String, u64)> {
157        self.inner
158            .db
159            .replication
160            .as_ref()
161            .map(|repl| repl.commit_waiter.snapshot())
162            .unwrap_or_default()
163    }
164
165    /// PLAN.md Phase 11.4 — `(reached, timed_out, not_required, last_micros)`
166    /// counters for /metrics. Always-zero on non-primary instances.
167    pub fn commit_waiter_metrics_snapshot(&self) -> (u64, u64, u64, u64) {
168        self.inner
169            .db
170            .replication
171            .as_ref()
172            .map(|repl| repl.commit_waiter.metrics_snapshot())
173            .unwrap_or((0, 0, 0, 0))
174    }
175
176    /// Named commit watermark: highest LSN durable on the active
177    /// synchronous commit quorum. Returns 0 when the active policy does
178    /// not require replica durability.
179    pub fn commit_watermark(&self) -> u64 {
180        match self.primary_replica_durability() {
181            reddb_file::ReplicationDurability::RemoteWrite { quorum }
182            | reddb_file::ReplicationDurability::RemoteFlush { quorum }
183            | reddb_file::ReplicationDurability::RemoteApply { quorum }
184                if quorum > 0 =>
185            {
186                self.inner
187                    .db
188                    .replication
189                    .as_ref()
190                    .map(|repl| repl.commit_waiter.commit_watermark(u32::from(quorum)))
191                    .unwrap_or(0)
192            }
193            _ if matches!(
194                self.commit_policy(),
195                crate::replication::CommitPolicy::Quorum
196            ) =>
197            {
198                self.inner
199                    .db
200                    .quorum
201                    .as_ref()
202                    .map(|q| q.commit_watermark())
203                    .unwrap_or(0)
204            }
205            _ => 0,
206        }
207    }
208
209    /// PLAN.md Phase 11.4 — block until at least `count` replicas
210    /// have durably applied through `target_lsn`, or `timeout`
211    /// elapses. Returns the `AwaitOutcome` so the caller can decide
212    /// whether to surface a timeout error to the client or continue
213    /// (the policy mapping lives in the commit dispatcher).
214    ///
215    /// Used by the `ack_n` commit policy once the operator flips
216    /// `RED_PRIMARY_COMMIT_POLICY` away from `local`.
217    pub fn await_replica_acks(
218        &self,
219        target_lsn: u64,
220        count: u32,
221        timeout: std::time::Duration,
222    ) -> crate::replication::AwaitOutcome {
223        match &self.inner.db.replication {
224            Some(repl) => repl.commit_waiter.await_acks(target_lsn, count, timeout),
225            None => {
226                // No replication configured: policy must be `Local`.
227                // Treat as immediate `NotRequired` so callers don't
228                // block on a degenerate setup.
229                crate::replication::AwaitOutcome::NotRequired
230            }
231        }
232    }
233
234    /// PLAN.md Phase 11.4 — enforce the configured commit policy
235    /// against `post_lsn` (the LSN of the just-completed write).
236    /// Returns `Ok(AwaitOutcome)` on every successful enforcement
237    /// (including `Reached` and `TimedOut` when fail-on-timeout is
238    /// off). Returns `Err(ReadOnly)` only when a synchronous policy
239    /// misses its threshold and `RED_COMMIT_FAIL_ON_TIMEOUT=true` is
240    /// set.
241    ///
242    /// The HTTP / gRPC / wire surfaces map the error to 504 / wire
243    /// backoff. Default behaviour (env unset) logs warn and returns
244    /// success — matches PLAN.md "default v1 stays local" semantics
245    /// while still letting the operator opt into hard-blocking.
246    pub fn enforce_commit_policy(
247        &self,
248        post_lsn: u64,
249    ) -> RedDBResult<crate::replication::AwaitOutcome> {
250        let policy = self.commit_policy();
251        if matches!(policy, crate::replication::CommitPolicy::Quorum) {
252            return match self.inner.db.wait_for_replication_quorum(post_lsn) {
253                Ok(()) => Ok(crate::replication::AwaitOutcome::Reached(0)),
254                Err(err) => {
255                    tracing::warn!(
256                        target: "reddb::commit",
257                        post_lsn,
258                        error = %err,
259                        "quorum: timed out waiting for commit watermark"
260                    );
261                    let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
262                        .ok()
263                        .map(|v| {
264                            let t = v.trim();
265                            t.eq_ignore_ascii_case("true")
266                                || t == "1"
267                                || t.eq_ignore_ascii_case("yes")
268                        })
269                        .unwrap_or(false);
270                    if fail {
271                        return Err(RedDBError::ReadOnly(format!(
272                            "commit policy timed out at lsn {post_lsn}: {err} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
273                        )));
274                    }
275                    Ok(crate::replication::AwaitOutcome::TimedOut {
276                        observed: 0,
277                        required: 1,
278                    })
279                }
280            };
281        }
282
283        let durability = Self::primary_replica_durability_for_policy(policy);
284        let n = match durability {
285            reddb_file::ReplicationDurability::RemoteWrite { quorum }
286            | reddb_file::ReplicationDurability::RemoteFlush { quorum }
287            | reddb_file::ReplicationDurability::RemoteApply { quorum }
288                if quorum > 0 =>
289            {
290                u32::from(quorum)
291            }
292            _ => return Ok(crate::replication::AwaitOutcome::NotRequired),
293        };
294        let timeout_ms = std::env::var("RED_REPLICATION_ACK_TIMEOUT_MS")
295            .ok()
296            .and_then(|v| v.parse::<u64>().ok())
297            .unwrap_or(5_000);
298        let outcome =
299            self.await_replica_acks(post_lsn, n, std::time::Duration::from_millis(timeout_ms));
300        {
301            use crate::runtime::control_events::{EventKind, Outcome, Sensitivity};
302            let (event_outcome, fields) = match &outcome {
303                crate::replication::AwaitOutcome::Reached(count) => (
304                    Outcome::Allowed,
305                    vec![
306                        (
307                            "post_lsn".to_string(),
308                            Sensitivity::raw(post_lsn.to_string()),
309                        ),
310                        ("required".to_string(), Sensitivity::raw(n.to_string())),
311                        ("observed".to_string(), Sensitivity::raw(count.to_string())),
312                        (
313                            "timeout_ms".to_string(),
314                            Sensitivity::raw(timeout_ms.to_string()),
315                        ),
316                    ],
317                ),
318                crate::replication::AwaitOutcome::TimedOut { observed, required } => (
319                    Outcome::Error,
320                    vec![
321                        (
322                            "post_lsn".to_string(),
323                            Sensitivity::raw(post_lsn.to_string()),
324                        ),
325                        (
326                            "required".to_string(),
327                            Sensitivity::raw(required.to_string()),
328                        ),
329                        (
330                            "observed".to_string(),
331                            Sensitivity::raw(observed.to_string()),
332                        ),
333                        (
334                            "timeout_ms".to_string(),
335                            Sensitivity::raw(timeout_ms.to_string()),
336                        ),
337                    ],
338                ),
339                crate::replication::AwaitOutcome::NotRequired => (Outcome::Allowed, Vec::new()),
340            };
341            if !fields.is_empty() {
342                self.emit_control_event(
343                    EventKind::ReplicationSafety,
344                    event_outcome,
345                    "replication_commit_policy",
346                    Some(format!("replication:lsn:{post_lsn}")),
347                    None,
348                    fields,
349                )?;
350            }
351        }
352        if let crate::replication::AwaitOutcome::TimedOut { observed, required } = &outcome {
353            tracing::warn!(
354                target: "reddb::commit",
355                post_lsn,
356                observed = *observed,
357                required = *required,
358                timeout_ms,
359                "ack_n: timed out waiting for replicas"
360            );
361            let fail = std::env::var("RED_COMMIT_FAIL_ON_TIMEOUT")
362                .ok()
363                .map(|v| {
364                    let t = v.trim();
365                    t.eq_ignore_ascii_case("true") || t == "1" || t.eq_ignore_ascii_case("yes")
366                })
367                .unwrap_or(false);
368            if fail {
369                return Err(RedDBError::ReadOnly(format!(
370                    "commit policy timed out at lsn {post_lsn}: observed={observed} required={required} (RED_COMMIT_FAIL_ON_TIMEOUT=true)"
371                )));
372            }
373        }
374        Ok(outcome)
375    }
376}