Skip to main content

reddb_server/cluster/
commit_resolution.rs

1//! Commit policy resolution for multi-writer clusters (issue #1001, PRD #987).
2//!
3//! A cluster has one global default [`CommitPolicy`], and a collection may
4//! declare a stricter or looser override when its model semantics justify it
5//! (see the [clustering glossary](../../../.red/context/clustering.md) entries
6//! *Commit policy* and *Ephemeral-local commit*). This module is the single
7//! deterministic place that combines those two inputs into the **effective**
8//! policy a write actually commits under, and enforces the one safety rule that
9//! the raw [`CommitPolicy`] type cannot express on its own:
10//!
11//! > Durable transactional, queue, audit, config, and vault collections must not
12//! > *silently* use local-only acknowledgement once HA intent is declared.
13//! > Only collections explicitly declared ephemeral/cache-like may opt into
14//! > `local` commit, and they do so with documented failover semantics.
15//!
16//! ## Why a resolver rather than a field on the collection
17//!
18//! The effective policy is a function of three independent inputs — the cluster
19//! default, the per-collection override, and whether the deployment has declared
20//! HA intent — and the guardrail couples all three. Resolving them ad hoc at each
21//! call site (write admission *and* failover eligibility both need the answer)
22//! would let the two paths drift, so a misconfigured durable collection could be
23//! admitted with `local` on the write path while failover still believed it was
24//! quorum-durable. A single pure resolver keeps both paths reading the same
25//! decision and makes the guardrail testable in isolation.
26//!
27//! ## Resolution
28//!
29//! 1. The effective policy is the collection override if present, otherwise the
30//!    cluster default ([`ResolutionSource`] records which won).
31//! 2. If the effective policy is local-only acknowledgement (`Local`, or the
32//!    degenerate `AckN(0)` which [the policy docs](super::super::replication::commit_policy)
33//!    define as equivalent to `Local`) **and** HA intent is declared:
34//!    - a **durable** model ([`CollectionDataModel::is_durable`]) is rejected with
35//!      [`CommitPolicyViolation::DurableLocalUnderHa`] — fail closed, the caller
36//!      must not admit writes under a silently-degraded policy.
37//!    - an **ephemeral/cache-like** model is allowed, tagged
38//!      [`GuardrailDisposition::EphemeralLocalAllowed`] so the decision is
39//!      explicit in the audit trail.
40//! 3. Otherwise the resolution succeeds; the guardrail is
41//!    [`GuardrailDisposition::Satisfied`] for a durable model under declared HA
42//!    intent (the effective policy is genuinely durable), or
43//!    [`GuardrailDisposition::NotApplicable`] when HA intent is not declared.
44//!
45//! The resolved policy also reports its **failover eligibility**
46//! ([`CommitPolicyResolution::failover_eligibility`]): a durable policy means a
47//! candidate may be promoted only if its log covers the range commit watermark,
48//! while a local-ack policy carries an explicit data-loss window — the documented
49//! failover semantics ephemeral/cache collections accept in exchange for `local`.
50
51use crate::replication::CommitPolicy;
52
53/// The durability model a collection declares for itself. The first five are
54/// **durable** models whose data must survive a single-node loss; the last two
55/// are explicitly **local-eligible** — losing their most recent unreplicated
56/// writes on failover is an accepted trade for lower write latency.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum CollectionDataModel {
59    /// Durable transactional records — the default model for user data.
60    Transactional,
61    /// Durable work-queue collection (at-least-once delivery semantics).
62    Queue,
63    /// Append-only audit log.
64    Audit,
65    /// Cluster/application configuration.
66    Config,
67    /// Secret/credential material.
68    Vault,
69    /// Explicitly ephemeral data with no durability expectation.
70    Ephemeral,
71    /// Cache-like data that can be rebuilt from a source of truth.
72    Cache,
73}
74
75impl CollectionDataModel {
76    /// `true` for models whose data must survive a single-node loss and so may
77    /// never silently acknowledge a write locally under declared HA intent.
78    pub fn is_durable(self) -> bool {
79        match self {
80            Self::Transactional | Self::Queue | Self::Audit | Self::Config | Self::Vault => true,
81            Self::Ephemeral | Self::Cache => false,
82        }
83    }
84
85    /// `true` for the explicitly local-eligible models (`Ephemeral`, `Cache`)
86    /// that may opt into local commit even under declared HA intent.
87    pub fn allows_ephemeral_local(self) -> bool {
88        !self.is_durable()
89    }
90
91    pub fn label(self) -> &'static str {
92        match self {
93            Self::Transactional => "transactional",
94            Self::Queue => "queue",
95            Self::Audit => "audit",
96            Self::Config => "config",
97            Self::Vault => "vault",
98            Self::Ephemeral => "ephemeral",
99            Self::Cache => "cache",
100        }
101    }
102}
103
104/// Whether the deployment has declared HA intent. The guardrail only restricts
105/// local-only acknowledgement once intent is [`Declared`](Self::Declared); a
106/// single-writer / non-HA deployment resolves policies without restriction.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
108pub enum HaIntent {
109    /// Multi-writer HA mode: durable models may not silently use `local`.
110    Declared,
111    /// No HA intent declared — the guardrail does not apply.
112    #[default]
113    None,
114}
115
116impl HaIntent {
117    pub fn is_declared(self) -> bool {
118        matches!(self, Self::Declared)
119    }
120
121    /// Parse from `RED_CLUSTER_HA_INTENT`. Truthy (`true`/`1`/`yes`/`declared`)
122    /// means [`Declared`](Self::Declared); anything else (including unset) means
123    /// [`None`](Self::None) so the guardrail stays off unless opted into.
124    pub fn from_env() -> Self {
125        match std::env::var("RED_CLUSTER_HA_INTENT") {
126            Ok(raw) => Self::parse(raw.trim()),
127            Err(_) => Self::None,
128        }
129    }
130
131    pub fn parse(raw: &str) -> Self {
132        let t = raw.trim();
133        if t.eq_ignore_ascii_case("true")
134            || t == "1"
135            || t.eq_ignore_ascii_case("yes")
136            || t.eq_ignore_ascii_case("declared")
137        {
138            Self::Declared
139        } else {
140            Self::None
141        }
142    }
143}
144
145/// Which input supplied the effective policy.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum ResolutionSource {
148    /// No collection override; the cluster global default applied.
149    ClusterDefault,
150    /// The collection's own override applied.
151    CollectionOverride,
152}
153
154impl ResolutionSource {
155    pub fn label(self) -> &'static str {
156        match self {
157            Self::ClusterDefault => "cluster_default",
158            Self::CollectionOverride => "collection_override",
159        }
160    }
161}
162
163/// How the ephemeral-local guardrail dispositioned a successful resolution.
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum GuardrailDisposition {
166    /// HA intent not declared — the guardrail did not run.
167    NotApplicable,
168    /// Durable model under declared HA intent with a genuinely durable effective
169    /// policy: the guardrail ran and was satisfied.
170    Satisfied,
171    /// Ephemeral/cache-like model explicitly permitted to use local commit under
172    /// declared HA intent (documented failover semantics apply).
173    EphemeralLocalAllowed,
174}
175
176/// Failover implication of a resolved commit policy. Consumed by failover
177/// eligibility: a durable policy gates promotion on watermark coverage, while a
178/// local-ack policy admits an explicit data-loss window on the promoted node.
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum FailoverEligibility {
181    /// The effective policy is durable: a candidate may be promoted only if its
182    /// applied log covers the range commit watermark.
183    RequiresWatermarkCoverage,
184    /// The effective policy is local-only: a promoted candidate may not have the
185    /// failed owner's most recent local-only writes — an accepted, documented
186    /// loss window for ephemeral/cache-like data.
187    LocalAckDataLossWindow,
188}
189
190/// The deterministic outcome of resolving a cluster default + collection
191/// override + HA intent against a collection's data model.
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub struct CommitPolicyResolution {
194    /// The policy the write actually commits under.
195    pub effective: CommitPolicy,
196    /// Which input supplied [`effective`](Self::effective).
197    pub source: ResolutionSource,
198    /// How the guardrail dispositioned this resolution.
199    pub guardrail: GuardrailDisposition,
200}
201
202impl CommitPolicyResolution {
203    /// `true` when the effective policy requires durability beyond the local WAL,
204    /// i.e. failover must gate promotion on range-commit-watermark coverage.
205    pub fn requires_durable_watermark(&self) -> bool {
206        !is_local_ack(self.effective)
207    }
208
209    /// Failover implication of the resolved policy. See [`FailoverEligibility`].
210    pub fn failover_eligibility(&self) -> FailoverEligibility {
211        if self.requires_durable_watermark() {
212            FailoverEligibility::RequiresWatermarkCoverage
213        } else {
214            FailoverEligibility::LocalAckDataLossWindow
215        }
216    }
217}
218
219/// Rejection raised when resolution would silently degrade a durable model to
220/// local-only acknowledgement under declared HA intent. The caller must fail
221/// closed rather than admit writes under the degraded policy.
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum CommitPolicyViolation {
224    /// A durable model resolved to local-only acknowledgement under declared HA
225    /// intent. `source` records whether the offending policy came from the
226    /// cluster default or the collection's own override.
227    DurableLocalUnderHa {
228        model: CollectionDataModel,
229        source: ResolutionSource,
230    },
231}
232
233impl CommitPolicyViolation {
234    pub fn message(&self) -> String {
235        match self {
236            Self::DurableLocalUnderHa { model, source } => format!(
237                "durable collection model '{}' may not use local-only commit acknowledgement \
238                 under declared HA intent (policy source: {})",
239                model.label(),
240                source.label()
241            ),
242        }
243    }
244}
245
246impl std::fmt::Display for CommitPolicyViolation {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        f.write_str(&self.message())
249    }
250}
251
252impl std::error::Error for CommitPolicyViolation {}
253
254/// `true` when `policy` acknowledges a commit on local WAL durability alone:
255/// `Local`, or the degenerate `AckN(0)` the policy docs define as equivalent.
256pub fn is_local_ack(policy: CommitPolicy) -> bool {
257    matches!(policy, CommitPolicy::Local | CommitPolicy::AckN(0))
258}
259
260/// Deterministically resolve the effective commit policy for one collection.
261///
262/// `cluster_default` is the global default; `collection_override` is the
263/// collection's declared override (if any); `model` is the collection's
264/// durability model; `ha_intent` is whether the deployment declared HA intent.
265///
266/// Returns the resolved policy, or [`CommitPolicyViolation`] when the guardrail
267/// rejects a durable model degraded to local-only acknowledgement under HA
268/// intent. The function is pure and side-effect free.
269pub fn resolve_commit_policy(
270    cluster_default: CommitPolicy,
271    collection_override: Option<CommitPolicy>,
272    model: CollectionDataModel,
273    ha_intent: HaIntent,
274) -> Result<CommitPolicyResolution, CommitPolicyViolation> {
275    let (effective, source) = match collection_override {
276        Some(p) => (p, ResolutionSource::CollectionOverride),
277        None => (cluster_default, ResolutionSource::ClusterDefault),
278    };
279
280    let guardrail = if !ha_intent.is_declared() {
281        // No HA intent: the guardrail does not constrain the resolution.
282        GuardrailDisposition::NotApplicable
283    } else if is_local_ack(effective) {
284        if model.is_durable() {
285            return Err(CommitPolicyViolation::DurableLocalUnderHa { model, source });
286        }
287        // Ephemeral/cache-like: explicitly permitted to opt into local commit.
288        GuardrailDisposition::EphemeralLocalAllowed
289    } else {
290        // Durable model under declared HA intent with a genuinely durable policy.
291        GuardrailDisposition::Satisfied
292    };
293
294    Ok(CommitPolicyResolution {
295        effective,
296        source,
297        guardrail,
298    })
299}
300
301#[cfg(test)]
302mod tests {
303    use super::*;
304
305    const DURABLE: [CollectionDataModel; 5] = [
306        CollectionDataModel::Transactional,
307        CollectionDataModel::Queue,
308        CollectionDataModel::Audit,
309        CollectionDataModel::Config,
310        CollectionDataModel::Vault,
311    ];
312    const LOCAL_ELIGIBLE: [CollectionDataModel; 2] =
313        [CollectionDataModel::Ephemeral, CollectionDataModel::Cache];
314
315    #[test]
316    fn data_model_durability_partition() {
317        for m in DURABLE {
318            assert!(m.is_durable(), "{} should be durable", m.label());
319            assert!(!m.allows_ephemeral_local());
320        }
321        for m in LOCAL_ELIGIBLE {
322            assert!(!m.is_durable(), "{} should not be durable", m.label());
323            assert!(m.allows_ephemeral_local());
324        }
325    }
326
327    #[test]
328    fn is_local_ack_treats_ack0_as_local() {
329        assert!(is_local_ack(CommitPolicy::Local));
330        assert!(is_local_ack(CommitPolicy::AckN(0)));
331        assert!(!is_local_ack(CommitPolicy::AckN(1)));
332        assert!(!is_local_ack(CommitPolicy::Quorum));
333        assert!(!is_local_ack(CommitPolicy::RemoteWal));
334    }
335
336    // AC: default quorum behavior — cluster default applies with no override.
337    #[test]
338    fn cluster_default_quorum_applies_without_override() {
339        let r = resolve_commit_policy(
340            CommitPolicy::Quorum,
341            None,
342            CollectionDataModel::Transactional,
343            HaIntent::Declared,
344        )
345        .expect("quorum default is durable under HA");
346        assert_eq!(r.effective, CommitPolicy::Quorum);
347        assert_eq!(r.source, ResolutionSource::ClusterDefault);
348        assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
349        assert_eq!(
350            r.failover_eligibility(),
351            FailoverEligibility::RequiresWatermarkCoverage
352        );
353    }
354
355    // AC: collection override — a stricter/looser override beats the default.
356    #[test]
357    fn collection_override_beats_cluster_default() {
358        let r = resolve_commit_policy(
359            CommitPolicy::AckN(1),
360            Some(CommitPolicy::Quorum),
361            CollectionDataModel::Audit,
362            HaIntent::Declared,
363        )
364        .expect("override quorum is durable");
365        assert_eq!(r.effective, CommitPolicy::Quorum);
366        assert_eq!(r.source, ResolutionSource::CollectionOverride);
367        assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
368    }
369
370    // AC: local commit allowed for ephemeral/cache-like data under HA intent.
371    #[test]
372    fn local_commit_allowed_for_ephemeral_cache_under_ha() {
373        for m in LOCAL_ELIGIBLE {
374            // via cluster default
375            let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
376                .unwrap_or_else(|e| panic!("{} local should be allowed: {e}", m.label()));
377            assert_eq!(r.effective, CommitPolicy::Local);
378            assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
379            assert_eq!(
380                r.failover_eligibility(),
381                FailoverEligibility::LocalAckDataLossWindow
382            );
383            assert!(!r.requires_durable_watermark());
384
385            // via explicit override, and the AckN(0) degenerate form
386            let r = resolve_commit_policy(
387                CommitPolicy::Quorum,
388                Some(CommitPolicy::AckN(0)),
389                m,
390                HaIntent::Declared,
391            )
392            .expect("ack_n=0 is local-eligible for ephemeral/cache");
393            assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
394        }
395    }
396
397    // AC: local commit rejected for durable models under HA intent.
398    #[test]
399    fn local_commit_rejected_for_durable_models_under_ha() {
400        for m in DURABLE {
401            // via cluster default
402            let err = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
403                .expect_err("durable local must be rejected under HA");
404            assert_eq!(
405                err,
406                CommitPolicyViolation::DurableLocalUnderHa {
407                    model: m,
408                    source: ResolutionSource::ClusterDefault,
409                }
410            );
411            assert!(err.message().contains(m.label()));
412
413            // via override, including the AckN(0) degenerate form
414            let err = resolve_commit_policy(
415                CommitPolicy::Quorum,
416                Some(CommitPolicy::AckN(0)),
417                m,
418                HaIntent::Declared,
419            )
420            .expect_err("durable ack_n=0 override must be rejected under HA");
421            assert_eq!(
422                err,
423                CommitPolicyViolation::DurableLocalUnderHa {
424                    model: m,
425                    source: ResolutionSource::CollectionOverride,
426                }
427            );
428        }
429    }
430
431    // Guardrail only bites under declared HA intent: a non-HA deployment may use
432    // local commit for any model.
433    #[test]
434    fn local_commit_allowed_for_durable_when_ha_not_declared() {
435        for m in DURABLE {
436            let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::None)
437                .expect("guardrail off without HA intent");
438            assert_eq!(r.effective, CommitPolicy::Local);
439            assert_eq!(r.guardrail, GuardrailDisposition::NotApplicable);
440        }
441    }
442
443    // AC: failover watermark implications follow the resolved policy.
444    #[test]
445    fn failover_watermark_implications_track_resolved_policy() {
446        // Durable resolved policy → promotion gated on watermark coverage.
447        let durable = resolve_commit_policy(
448            CommitPolicy::AckN(2),
449            None,
450            CollectionDataModel::Queue,
451            HaIntent::Declared,
452        )
453        .unwrap();
454        assert!(durable.requires_durable_watermark());
455        assert_eq!(
456            durable.failover_eligibility(),
457            FailoverEligibility::RequiresWatermarkCoverage
458        );
459
460        // Local resolved policy (ephemeral) → explicit data-loss window.
461        let local = resolve_commit_policy(
462            CommitPolicy::Local,
463            None,
464            CollectionDataModel::Cache,
465            HaIntent::Declared,
466        )
467        .unwrap();
468        assert!(!local.requires_durable_watermark());
469        assert_eq!(
470            local.failover_eligibility(),
471            FailoverEligibility::LocalAckDataLossWindow
472        );
473    }
474
475    #[test]
476    fn resolution_is_deterministic() {
477        let inputs = (
478            CommitPolicy::AckN(1),
479            Some(CommitPolicy::Quorum),
480            CollectionDataModel::Vault,
481            HaIntent::Declared,
482        );
483        let a = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
484        let b = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
485        assert_eq!(a, b);
486    }
487
488    #[test]
489    fn ha_intent_parse() {
490        assert_eq!(HaIntent::parse("true"), HaIntent::Declared);
491        assert_eq!(HaIntent::parse("1"), HaIntent::Declared);
492        assert_eq!(HaIntent::parse("YES"), HaIntent::Declared);
493        assert_eq!(HaIntent::parse("declared"), HaIntent::Declared);
494        assert_eq!(HaIntent::parse("false"), HaIntent::None);
495        assert_eq!(HaIntent::parse(""), HaIntent::None);
496        assert_eq!(HaIntent::parse("nonsense"), HaIntent::None);
497        assert_eq!(HaIntent::default(), HaIntent::None);
498    }
499
500    #[test]
501    fn source_and_disposition_labels() {
502        assert_eq!(ResolutionSource::ClusterDefault.label(), "cluster_default");
503        assert_eq!(
504            ResolutionSource::CollectionOverride.label(),
505            "collection_override"
506        );
507    }
508}