reddb_server/cluster/commit_resolution.rs
1//! Commit policy resolution for multi-writer clusters (issue #1001, PRD #987).
2//!
3//! A cluster has one global default [`CommitPolicy`], and a collection may
4//! declare a stricter or looser override when its model semantics justify it
5//! (see the [clustering glossary](../../../.red/context/clustering.md) entries
6//! *Commit policy* and *Ephemeral-local commit*). This module is the single
7//! deterministic place that combines those two inputs into the **effective**
8//! policy a write actually commits under, and enforces the one safety rule that
9//! the raw [`CommitPolicy`] type cannot express on its own:
10//!
11//! > Durable transactional, queue, audit, config, and vault collections must not
12//! > *silently* use local-only acknowledgement once HA intent is declared.
13//! > Only collections explicitly declared ephemeral/cache-like may opt into
14//! > `local` commit, and they do so with documented failover semantics.
15//!
16//! ## Why a resolver rather than a field on the collection
17//!
18//! The effective policy is a function of three independent inputs — the cluster
19//! default, the per-collection override, and whether the deployment has declared
20//! HA intent — and the guardrail couples all three. Resolving them ad hoc at each
21//! call site (write admission *and* failover eligibility both need the answer)
22//! would let the two paths drift, so a misconfigured durable collection could be
23//! admitted with `local` on the write path while failover still believed it was
24//! quorum-durable. A single pure resolver keeps both paths reading the same
25//! decision and makes the guardrail testable in isolation.
26//!
27//! ## Resolution
28//!
29//! 1. The effective policy is the collection override if present, otherwise the
30//! cluster default ([`ResolutionSource`] records which won).
31//! 2. If the effective policy is local-only acknowledgement (`Local`, or the
32//! degenerate `AckN(0)` which [the policy docs](super::super::replication::commit_policy)
33//! define as equivalent to `Local`) **and** HA intent is declared:
34//! - a **durable** model ([`CollectionDataModel::is_durable`]) is rejected with
35//! [`CommitPolicyViolation::DurableLocalUnderHa`] — fail closed, the caller
36//! must not admit writes under a silently-degraded policy.
37//! - an **ephemeral/cache-like** model is allowed, tagged
38//! [`GuardrailDisposition::EphemeralLocalAllowed`] so the decision is
39//! explicit in the audit trail.
40//! 3. Otherwise the resolution succeeds; the guardrail is
41//! [`GuardrailDisposition::Satisfied`] for a durable model under declared HA
42//! intent (the effective policy is genuinely durable), or
43//! [`GuardrailDisposition::NotApplicable`] when HA intent is not declared.
44//!
45//! The resolved policy also reports its **failover eligibility**
46//! ([`CommitPolicyResolution::failover_eligibility`]): a durable policy means a
47//! candidate may be promoted only if its log covers the range commit watermark,
48//! while a local-ack policy carries an explicit data-loss window — the documented
49//! failover semantics ephemeral/cache collections accept in exchange for `local`.
50
51use crate::replication::CommitPolicy;
52
53/// The durability model a collection declares for itself. The first five are
54/// **durable** models whose data must survive a single-node loss; the last two
55/// are explicitly **local-eligible** — losing their most recent unreplicated
56/// writes on failover is an accepted trade for lower write latency.
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
58pub enum CollectionDataModel {
59 /// Durable transactional records — the default model for user data.
60 Transactional,
61 /// Durable work-queue collection (at-least-once delivery semantics).
62 Queue,
63 /// Append-only audit log.
64 Audit,
65 /// Cluster/application configuration.
66 Config,
67 /// Secret/credential material.
68 Vault,
69 /// Explicitly ephemeral data with no durability expectation.
70 Ephemeral,
71 /// Cache-like data that can be rebuilt from a source of truth.
72 Cache,
73}
74
75impl CollectionDataModel {
76 /// `true` for models whose data must survive a single-node loss and so may
77 /// never silently acknowledge a write locally under declared HA intent.
78 pub fn is_durable(self) -> bool {
79 match self {
80 Self::Transactional | Self::Queue | Self::Audit | Self::Config | Self::Vault => true,
81 Self::Ephemeral | Self::Cache => false,
82 }
83 }
84
85 /// `true` for the explicitly local-eligible models (`Ephemeral`, `Cache`)
86 /// that may opt into local commit even under declared HA intent.
87 pub fn allows_ephemeral_local(self) -> bool {
88 !self.is_durable()
89 }
90
91 pub fn label(self) -> &'static str {
92 match self {
93 Self::Transactional => "transactional",
94 Self::Queue => "queue",
95 Self::Audit => "audit",
96 Self::Config => "config",
97 Self::Vault => "vault",
98 Self::Ephemeral => "ephemeral",
99 Self::Cache => "cache",
100 }
101 }
102}
103
104/// Whether the deployment has declared HA intent. The guardrail only restricts
105/// local-only acknowledgement once intent is [`Declared`](Self::Declared); a
106/// single-writer / non-HA deployment resolves policies without restriction.
107#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
108pub enum HaIntent {
109 /// Multi-writer HA mode: durable models may not silently use `local`.
110 Declared,
111 /// No HA intent declared — the guardrail does not apply.
112 #[default]
113 None,
114}
115
116impl HaIntent {
117 pub fn is_declared(self) -> bool {
118 matches!(self, Self::Declared)
119 }
120
121 /// Parse from `RED_CLUSTER_HA_INTENT`. Truthy (`true`/`1`/`yes`/`declared`)
122 /// means [`Declared`](Self::Declared); anything else (including unset) means
123 /// [`None`](Self::None) so the guardrail stays off unless opted into.
124 pub fn from_env() -> Self {
125 match std::env::var("RED_CLUSTER_HA_INTENT") {
126 Ok(raw) => Self::parse(raw.trim()),
127 Err(_) => Self::None,
128 }
129 }
130
131 pub fn parse(raw: &str) -> Self {
132 let t = raw.trim();
133 if t.eq_ignore_ascii_case("true")
134 || t == "1"
135 || t.eq_ignore_ascii_case("yes")
136 || t.eq_ignore_ascii_case("declared")
137 {
138 Self::Declared
139 } else {
140 Self::None
141 }
142 }
143}
144
145/// Which input supplied the effective policy.
146#[derive(Debug, Clone, Copy, PartialEq, Eq)]
147pub enum ResolutionSource {
148 /// No collection override; the cluster global default applied.
149 ClusterDefault,
150 /// The collection's own override applied.
151 CollectionOverride,
152}
153
154impl ResolutionSource {
155 pub fn label(self) -> &'static str {
156 match self {
157 Self::ClusterDefault => "cluster_default",
158 Self::CollectionOverride => "collection_override",
159 }
160 }
161}
162
163/// How the ephemeral-local guardrail dispositioned a successful resolution.
164#[derive(Debug, Clone, Copy, PartialEq, Eq)]
165pub enum GuardrailDisposition {
166 /// HA intent not declared — the guardrail did not run.
167 NotApplicable,
168 /// Durable model under declared HA intent with a genuinely durable effective
169 /// policy: the guardrail ran and was satisfied.
170 Satisfied,
171 /// Ephemeral/cache-like model explicitly permitted to use local commit under
172 /// declared HA intent (documented failover semantics apply).
173 EphemeralLocalAllowed,
174}
175
176/// Failover implication of a resolved commit policy. Consumed by failover
177/// eligibility: a durable policy gates promotion on watermark coverage, while a
178/// local-ack policy admits an explicit data-loss window on the promoted node.
179#[derive(Debug, Clone, Copy, PartialEq, Eq)]
180pub enum FailoverEligibility {
181 /// The effective policy is durable: a candidate may be promoted only if its
182 /// applied log covers the range commit watermark.
183 RequiresWatermarkCoverage,
184 /// The effective policy is local-only: a promoted candidate may not have the
185 /// failed owner's most recent local-only writes — an accepted, documented
186 /// loss window for ephemeral/cache-like data.
187 LocalAckDataLossWindow,
188}
189
190/// The deterministic outcome of resolving a cluster default + collection
191/// override + HA intent against a collection's data model.
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub struct CommitPolicyResolution {
194 /// The policy the write actually commits under.
195 pub effective: CommitPolicy,
196 /// Which input supplied [`effective`](Self::effective).
197 pub source: ResolutionSource,
198 /// How the guardrail dispositioned this resolution.
199 pub guardrail: GuardrailDisposition,
200}
201
202impl CommitPolicyResolution {
203 /// `true` when the effective policy requires durability beyond the local WAL,
204 /// i.e. failover must gate promotion on range-commit-watermark coverage.
205 pub fn requires_durable_watermark(&self) -> bool {
206 !is_local_ack(self.effective)
207 }
208
209 /// Failover implication of the resolved policy. See [`FailoverEligibility`].
210 pub fn failover_eligibility(&self) -> FailoverEligibility {
211 if self.requires_durable_watermark() {
212 FailoverEligibility::RequiresWatermarkCoverage
213 } else {
214 FailoverEligibility::LocalAckDataLossWindow
215 }
216 }
217}
218
219/// Rejection raised when resolution would silently degrade a durable model to
220/// local-only acknowledgement under declared HA intent. The caller must fail
221/// closed rather than admit writes under the degraded policy.
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub enum CommitPolicyViolation {
224 /// A durable model resolved to local-only acknowledgement under declared HA
225 /// intent. `source` records whether the offending policy came from the
226 /// cluster default or the collection's own override.
227 DurableLocalUnderHa {
228 model: CollectionDataModel,
229 source: ResolutionSource,
230 },
231}
232
233impl CommitPolicyViolation {
234 pub fn message(&self) -> String {
235 match self {
236 Self::DurableLocalUnderHa { model, source } => format!(
237 "durable collection model '{}' may not use local-only commit acknowledgement \
238 under declared HA intent (policy source: {})",
239 model.label(),
240 source.label()
241 ),
242 }
243 }
244}
245
246impl std::fmt::Display for CommitPolicyViolation {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 f.write_str(&self.message())
249 }
250}
251
252impl std::error::Error for CommitPolicyViolation {}
253
254/// `true` when `policy` acknowledges a commit on local WAL durability alone:
255/// `Local`, or the degenerate `AckN(0)` the policy docs define as equivalent.
256pub fn is_local_ack(policy: CommitPolicy) -> bool {
257 matches!(policy, CommitPolicy::Local | CommitPolicy::AckN(0))
258}
259
260/// Deterministically resolve the effective commit policy for one collection.
261///
262/// `cluster_default` is the global default; `collection_override` is the
263/// collection's declared override (if any); `model` is the collection's
264/// durability model; `ha_intent` is whether the deployment declared HA intent.
265///
266/// Returns the resolved policy, or [`CommitPolicyViolation`] when the guardrail
267/// rejects a durable model degraded to local-only acknowledgement under HA
268/// intent. The function is pure and side-effect free.
269pub fn resolve_commit_policy(
270 cluster_default: CommitPolicy,
271 collection_override: Option<CommitPolicy>,
272 model: CollectionDataModel,
273 ha_intent: HaIntent,
274) -> Result<CommitPolicyResolution, CommitPolicyViolation> {
275 let (effective, source) = match collection_override {
276 Some(p) => (p, ResolutionSource::CollectionOverride),
277 None => (cluster_default, ResolutionSource::ClusterDefault),
278 };
279
280 let guardrail = if !ha_intent.is_declared() {
281 // No HA intent: the guardrail does not constrain the resolution.
282 GuardrailDisposition::NotApplicable
283 } else if is_local_ack(effective) {
284 if model.is_durable() {
285 return Err(CommitPolicyViolation::DurableLocalUnderHa { model, source });
286 }
287 // Ephemeral/cache-like: explicitly permitted to opt into local commit.
288 GuardrailDisposition::EphemeralLocalAllowed
289 } else {
290 // Durable model under declared HA intent with a genuinely durable policy.
291 GuardrailDisposition::Satisfied
292 };
293
294 Ok(CommitPolicyResolution {
295 effective,
296 source,
297 guardrail,
298 })
299}
300
301#[cfg(test)]
302mod tests {
303 use super::*;
304
305 const DURABLE: [CollectionDataModel; 5] = [
306 CollectionDataModel::Transactional,
307 CollectionDataModel::Queue,
308 CollectionDataModel::Audit,
309 CollectionDataModel::Config,
310 CollectionDataModel::Vault,
311 ];
312 const LOCAL_ELIGIBLE: [CollectionDataModel; 2] =
313 [CollectionDataModel::Ephemeral, CollectionDataModel::Cache];
314
315 #[test]
316 fn data_model_durability_partition() {
317 for m in DURABLE {
318 assert!(m.is_durable(), "{} should be durable", m.label());
319 assert!(!m.allows_ephemeral_local());
320 }
321 for m in LOCAL_ELIGIBLE {
322 assert!(!m.is_durable(), "{} should not be durable", m.label());
323 assert!(m.allows_ephemeral_local());
324 }
325 }
326
327 #[test]
328 fn is_local_ack_treats_ack0_as_local() {
329 assert!(is_local_ack(CommitPolicy::Local));
330 assert!(is_local_ack(CommitPolicy::AckN(0)));
331 assert!(!is_local_ack(CommitPolicy::AckN(1)));
332 assert!(!is_local_ack(CommitPolicy::Quorum));
333 assert!(!is_local_ack(CommitPolicy::RemoteWal));
334 }
335
336 // AC: default quorum behavior — cluster default applies with no override.
337 #[test]
338 fn cluster_default_quorum_applies_without_override() {
339 let r = resolve_commit_policy(
340 CommitPolicy::Quorum,
341 None,
342 CollectionDataModel::Transactional,
343 HaIntent::Declared,
344 )
345 .expect("quorum default is durable under HA");
346 assert_eq!(r.effective, CommitPolicy::Quorum);
347 assert_eq!(r.source, ResolutionSource::ClusterDefault);
348 assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
349 assert_eq!(
350 r.failover_eligibility(),
351 FailoverEligibility::RequiresWatermarkCoverage
352 );
353 }
354
355 // AC: collection override — a stricter/looser override beats the default.
356 #[test]
357 fn collection_override_beats_cluster_default() {
358 let r = resolve_commit_policy(
359 CommitPolicy::AckN(1),
360 Some(CommitPolicy::Quorum),
361 CollectionDataModel::Audit,
362 HaIntent::Declared,
363 )
364 .expect("override quorum is durable");
365 assert_eq!(r.effective, CommitPolicy::Quorum);
366 assert_eq!(r.source, ResolutionSource::CollectionOverride);
367 assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
368 }
369
370 // AC: local commit allowed for ephemeral/cache-like data under HA intent.
371 #[test]
372 fn local_commit_allowed_for_ephemeral_cache_under_ha() {
373 for m in LOCAL_ELIGIBLE {
374 // via cluster default
375 let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
376 .unwrap_or_else(|e| panic!("{} local should be allowed: {e}", m.label()));
377 assert_eq!(r.effective, CommitPolicy::Local);
378 assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
379 assert_eq!(
380 r.failover_eligibility(),
381 FailoverEligibility::LocalAckDataLossWindow
382 );
383 assert!(!r.requires_durable_watermark());
384
385 // via explicit override, and the AckN(0) degenerate form
386 let r = resolve_commit_policy(
387 CommitPolicy::Quorum,
388 Some(CommitPolicy::AckN(0)),
389 m,
390 HaIntent::Declared,
391 )
392 .expect("ack_n=0 is local-eligible for ephemeral/cache");
393 assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
394 }
395 }
396
397 // AC: local commit rejected for durable models under HA intent.
398 #[test]
399 fn local_commit_rejected_for_durable_models_under_ha() {
400 for m in DURABLE {
401 // via cluster default
402 let err = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
403 .expect_err("durable local must be rejected under HA");
404 assert_eq!(
405 err,
406 CommitPolicyViolation::DurableLocalUnderHa {
407 model: m,
408 source: ResolutionSource::ClusterDefault,
409 }
410 );
411 assert!(err.message().contains(m.label()));
412
413 // via override, including the AckN(0) degenerate form
414 let err = resolve_commit_policy(
415 CommitPolicy::Quorum,
416 Some(CommitPolicy::AckN(0)),
417 m,
418 HaIntent::Declared,
419 )
420 .expect_err("durable ack_n=0 override must be rejected under HA");
421 assert_eq!(
422 err,
423 CommitPolicyViolation::DurableLocalUnderHa {
424 model: m,
425 source: ResolutionSource::CollectionOverride,
426 }
427 );
428 }
429 }
430
431 // Guardrail only bites under declared HA intent: a non-HA deployment may use
432 // local commit for any model.
433 #[test]
434 fn local_commit_allowed_for_durable_when_ha_not_declared() {
435 for m in DURABLE {
436 let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::None)
437 .expect("guardrail off without HA intent");
438 assert_eq!(r.effective, CommitPolicy::Local);
439 assert_eq!(r.guardrail, GuardrailDisposition::NotApplicable);
440 }
441 }
442
443 // AC: failover watermark implications follow the resolved policy.
444 #[test]
445 fn failover_watermark_implications_track_resolved_policy() {
446 // Durable resolved policy → promotion gated on watermark coverage.
447 let durable = resolve_commit_policy(
448 CommitPolicy::AckN(2),
449 None,
450 CollectionDataModel::Queue,
451 HaIntent::Declared,
452 )
453 .unwrap();
454 assert!(durable.requires_durable_watermark());
455 assert_eq!(
456 durable.failover_eligibility(),
457 FailoverEligibility::RequiresWatermarkCoverage
458 );
459
460 // Local resolved policy (ephemeral) → explicit data-loss window.
461 let local = resolve_commit_policy(
462 CommitPolicy::Local,
463 None,
464 CollectionDataModel::Cache,
465 HaIntent::Declared,
466 )
467 .unwrap();
468 assert!(!local.requires_durable_watermark());
469 assert_eq!(
470 local.failover_eligibility(),
471 FailoverEligibility::LocalAckDataLossWindow
472 );
473 }
474
475 #[test]
476 fn resolution_is_deterministic() {
477 let inputs = (
478 CommitPolicy::AckN(1),
479 Some(CommitPolicy::Quorum),
480 CollectionDataModel::Vault,
481 HaIntent::Declared,
482 );
483 let a = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
484 let b = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
485 assert_eq!(a, b);
486 }
487
488 #[test]
489 fn ha_intent_parse() {
490 assert_eq!(HaIntent::parse("true"), HaIntent::Declared);
491 assert_eq!(HaIntent::parse("1"), HaIntent::Declared);
492 assert_eq!(HaIntent::parse("YES"), HaIntent::Declared);
493 assert_eq!(HaIntent::parse("declared"), HaIntent::Declared);
494 assert_eq!(HaIntent::parse("false"), HaIntent::None);
495 assert_eq!(HaIntent::parse(""), HaIntent::None);
496 assert_eq!(HaIntent::parse("nonsense"), HaIntent::None);
497 assert_eq!(HaIntent::default(), HaIntent::None);
498 }
499
500 #[test]
501 fn source_and_disposition_labels() {
502 assert_eq!(ResolutionSource::ClusterDefault.label(), "cluster_default");
503 assert_eq!(
504 ResolutionSource::CollectionOverride.label(),
505 "collection_override"
506 );
507 }
508}