use crate::replication::CommitPolicy;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CollectionDataModel {
Transactional,
Queue,
Audit,
Config,
Vault,
Ephemeral,
Cache,
}
impl CollectionDataModel {
pub fn is_durable(self) -> bool {
match self {
Self::Transactional | Self::Queue | Self::Audit | Self::Config | Self::Vault => true,
Self::Ephemeral | Self::Cache => false,
}
}
pub fn allows_ephemeral_local(self) -> bool {
!self.is_durable()
}
pub fn label(self) -> &'static str {
match self {
Self::Transactional => "transactional",
Self::Queue => "queue",
Self::Audit => "audit",
Self::Config => "config",
Self::Vault => "vault",
Self::Ephemeral => "ephemeral",
Self::Cache => "cache",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum HaIntent {
Declared,
#[default]
None,
}
impl HaIntent {
pub fn is_declared(self) -> bool {
matches!(self, Self::Declared)
}
pub fn from_env() -> Self {
match std::env::var("RED_CLUSTER_HA_INTENT") {
Ok(raw) => Self::parse(raw.trim()),
Err(_) => Self::None,
}
}
pub fn parse(raw: &str) -> Self {
let t = raw.trim();
if t.eq_ignore_ascii_case("true")
|| t == "1"
|| t.eq_ignore_ascii_case("yes")
|| t.eq_ignore_ascii_case("declared")
{
Self::Declared
} else {
Self::None
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ResolutionSource {
ClusterDefault,
CollectionOverride,
}
impl ResolutionSource {
pub fn label(self) -> &'static str {
match self {
Self::ClusterDefault => "cluster_default",
Self::CollectionOverride => "collection_override",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GuardrailDisposition {
NotApplicable,
Satisfied,
EphemeralLocalAllowed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FailoverEligibility {
RequiresWatermarkCoverage,
LocalAckDataLossWindow,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct CommitPolicyResolution {
pub effective: CommitPolicy,
pub source: ResolutionSource,
pub guardrail: GuardrailDisposition,
}
impl CommitPolicyResolution {
pub fn requires_durable_watermark(&self) -> bool {
!is_local_ack(self.effective)
}
pub fn failover_eligibility(&self) -> FailoverEligibility {
if self.requires_durable_watermark() {
FailoverEligibility::RequiresWatermarkCoverage
} else {
FailoverEligibility::LocalAckDataLossWindow
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CommitPolicyViolation {
DurableLocalUnderHa {
model: CollectionDataModel,
source: ResolutionSource,
},
}
impl CommitPolicyViolation {
pub fn message(&self) -> String {
match self {
Self::DurableLocalUnderHa { model, source } => format!(
"durable collection model '{}' may not use local-only commit acknowledgement \
under declared HA intent (policy source: {})",
model.label(),
source.label()
),
}
}
}
impl std::fmt::Display for CommitPolicyViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.message())
}
}
impl std::error::Error for CommitPolicyViolation {}
pub fn is_local_ack(policy: CommitPolicy) -> bool {
matches!(policy, CommitPolicy::Local | CommitPolicy::AckN(0))
}
pub fn resolve_commit_policy(
cluster_default: CommitPolicy,
collection_override: Option<CommitPolicy>,
model: CollectionDataModel,
ha_intent: HaIntent,
) -> Result<CommitPolicyResolution, CommitPolicyViolation> {
let (effective, source) = match collection_override {
Some(p) => (p, ResolutionSource::CollectionOverride),
None => (cluster_default, ResolutionSource::ClusterDefault),
};
let guardrail = if !ha_intent.is_declared() {
GuardrailDisposition::NotApplicable
} else if is_local_ack(effective) {
if model.is_durable() {
return Err(CommitPolicyViolation::DurableLocalUnderHa { model, source });
}
GuardrailDisposition::EphemeralLocalAllowed
} else {
GuardrailDisposition::Satisfied
};
Ok(CommitPolicyResolution {
effective,
source,
guardrail,
})
}
#[cfg(test)]
mod tests {
use super::*;
const DURABLE: [CollectionDataModel; 5] = [
CollectionDataModel::Transactional,
CollectionDataModel::Queue,
CollectionDataModel::Audit,
CollectionDataModel::Config,
CollectionDataModel::Vault,
];
const LOCAL_ELIGIBLE: [CollectionDataModel; 2] =
[CollectionDataModel::Ephemeral, CollectionDataModel::Cache];
#[test]
fn data_model_durability_partition() {
for m in DURABLE {
assert!(m.is_durable(), "{} should be durable", m.label());
assert!(!m.allows_ephemeral_local());
}
for m in LOCAL_ELIGIBLE {
assert!(!m.is_durable(), "{} should not be durable", m.label());
assert!(m.allows_ephemeral_local());
}
}
#[test]
fn is_local_ack_treats_ack0_as_local() {
assert!(is_local_ack(CommitPolicy::Local));
assert!(is_local_ack(CommitPolicy::AckN(0)));
assert!(!is_local_ack(CommitPolicy::AckN(1)));
assert!(!is_local_ack(CommitPolicy::Quorum));
assert!(!is_local_ack(CommitPolicy::RemoteWal));
}
#[test]
fn cluster_default_quorum_applies_without_override() {
let r = resolve_commit_policy(
CommitPolicy::Quorum,
None,
CollectionDataModel::Transactional,
HaIntent::Declared,
)
.expect("quorum default is durable under HA");
assert_eq!(r.effective, CommitPolicy::Quorum);
assert_eq!(r.source, ResolutionSource::ClusterDefault);
assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
assert_eq!(
r.failover_eligibility(),
FailoverEligibility::RequiresWatermarkCoverage
);
}
#[test]
fn collection_override_beats_cluster_default() {
let r = resolve_commit_policy(
CommitPolicy::AckN(1),
Some(CommitPolicy::Quorum),
CollectionDataModel::Audit,
HaIntent::Declared,
)
.expect("override quorum is durable");
assert_eq!(r.effective, CommitPolicy::Quorum);
assert_eq!(r.source, ResolutionSource::CollectionOverride);
assert_eq!(r.guardrail, GuardrailDisposition::Satisfied);
}
#[test]
fn local_commit_allowed_for_ephemeral_cache_under_ha() {
for m in LOCAL_ELIGIBLE {
let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
.unwrap_or_else(|e| panic!("{} local should be allowed: {e}", m.label()));
assert_eq!(r.effective, CommitPolicy::Local);
assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
assert_eq!(
r.failover_eligibility(),
FailoverEligibility::LocalAckDataLossWindow
);
assert!(!r.requires_durable_watermark());
let r = resolve_commit_policy(
CommitPolicy::Quorum,
Some(CommitPolicy::AckN(0)),
m,
HaIntent::Declared,
)
.expect("ack_n=0 is local-eligible for ephemeral/cache");
assert_eq!(r.guardrail, GuardrailDisposition::EphemeralLocalAllowed);
}
}
#[test]
fn local_commit_rejected_for_durable_models_under_ha() {
for m in DURABLE {
let err = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::Declared)
.expect_err("durable local must be rejected under HA");
assert_eq!(
err,
CommitPolicyViolation::DurableLocalUnderHa {
model: m,
source: ResolutionSource::ClusterDefault,
}
);
assert!(err.message().contains(m.label()));
let err = resolve_commit_policy(
CommitPolicy::Quorum,
Some(CommitPolicy::AckN(0)),
m,
HaIntent::Declared,
)
.expect_err("durable ack_n=0 override must be rejected under HA");
assert_eq!(
err,
CommitPolicyViolation::DurableLocalUnderHa {
model: m,
source: ResolutionSource::CollectionOverride,
}
);
}
}
#[test]
fn local_commit_allowed_for_durable_when_ha_not_declared() {
for m in DURABLE {
let r = resolve_commit_policy(CommitPolicy::Local, None, m, HaIntent::None)
.expect("guardrail off without HA intent");
assert_eq!(r.effective, CommitPolicy::Local);
assert_eq!(r.guardrail, GuardrailDisposition::NotApplicable);
}
}
#[test]
fn failover_watermark_implications_track_resolved_policy() {
let durable = resolve_commit_policy(
CommitPolicy::AckN(2),
None,
CollectionDataModel::Queue,
HaIntent::Declared,
)
.unwrap();
assert!(durable.requires_durable_watermark());
assert_eq!(
durable.failover_eligibility(),
FailoverEligibility::RequiresWatermarkCoverage
);
let local = resolve_commit_policy(
CommitPolicy::Local,
None,
CollectionDataModel::Cache,
HaIntent::Declared,
)
.unwrap();
assert!(!local.requires_durable_watermark());
assert_eq!(
local.failover_eligibility(),
FailoverEligibility::LocalAckDataLossWindow
);
}
#[test]
fn resolution_is_deterministic() {
let inputs = (
CommitPolicy::AckN(1),
Some(CommitPolicy::Quorum),
CollectionDataModel::Vault,
HaIntent::Declared,
);
let a = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
let b = resolve_commit_policy(inputs.0, inputs.1, inputs.2, inputs.3);
assert_eq!(a, b);
}
#[test]
fn ha_intent_parse() {
assert_eq!(HaIntent::parse("true"), HaIntent::Declared);
assert_eq!(HaIntent::parse("1"), HaIntent::Declared);
assert_eq!(HaIntent::parse("YES"), HaIntent::Declared);
assert_eq!(HaIntent::parse("declared"), HaIntent::Declared);
assert_eq!(HaIntent::parse("false"), HaIntent::None);
assert_eq!(HaIntent::parse(""), HaIntent::None);
assert_eq!(HaIntent::parse("nonsense"), HaIntent::None);
assert_eq!(HaIntent::default(), HaIntent::None);
}
#[test]
fn source_and_disposition_labels() {
assert_eq!(ResolutionSource::ClusterDefault.label(), "cluster_default");
assert_eq!(
ResolutionSource::CollectionOverride.label(),
"collection_override"
);
}
}