use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use crate::api::{RedDBError, RedDBOptions, RedDBResult};
use crate::replication::ReplicationRole;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum WriteKind {
Dml,
Ddl,
IndexBuild,
Maintenance,
Backup,
Serverless,
}
impl WriteKind {
fn label(self) -> &'static str {
match self {
WriteKind::Dml => "DML",
WriteKind::Ddl => "DDL",
WriteKind::IndexBuild => "index build",
WriteKind::Maintenance => "maintenance",
WriteKind::Backup => "backup trigger",
WriteKind::Serverless => "serverless lifecycle",
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[repr(u8)]
pub enum LeaseGateState {
NotRequired = 0,
Held = 1,
NotHeld = 2,
}
impl LeaseGateState {
fn from_u8(raw: u8) -> Self {
match raw {
1 => Self::Held,
2 => Self::NotHeld,
_ => Self::NotRequired,
}
}
pub fn label(self) -> &'static str {
match self {
Self::NotRequired => "not_required",
Self::Held => "held",
Self::NotHeld => "not_held",
}
}
}
#[derive(Debug)]
pub struct WriteGate {
read_only: AtomicBool,
role: ReplicationRole,
lease: AtomicU8,
}
impl WriteGate {
pub fn from_options(options: &RedDBOptions) -> Self {
Self {
read_only: AtomicBool::new(options.read_only),
role: options.replication.role.clone(),
lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
}
}
pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
self.check_consent(kind).map(|_| ())
}
pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
if matches!(self.role, ReplicationRole::Replica { .. }) {
return Err(RedDBError::ReadOnly(format!(
"instance is a replica — {} rejected on public surface",
kind.label()
)));
}
if matches!(self.lease_state(), LeaseGateState::NotHeld) {
return Err(RedDBError::ReadOnly(format!(
"writer lease not held — {} rejected (serverless fence)",
kind.label()
)));
}
if self.read_only.load(Ordering::Acquire) {
return Err(RedDBError::ReadOnly(format!(
"instance is configured read_only — {} rejected",
kind.label()
)));
}
Ok(crate::application::WriteConsent {
kind,
_seal: crate::application::WriteConsentSeal::new(),
})
}
pub fn is_read_only(&self) -> bool {
self.read_only.load(Ordering::Acquire)
|| matches!(self.role, ReplicationRole::Replica { .. })
|| matches!(self.lease_state(), LeaseGateState::NotHeld)
}
pub fn role(&self) -> &ReplicationRole {
&self.role
}
pub fn set_read_only(&self, enabled: bool) -> bool {
self.read_only.swap(enabled, Ordering::AcqRel)
}
pub fn lease_state(&self) -> LeaseGateState {
LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
}
pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
}
}
#[cfg(test)]
mod tests {
use super::*;
fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
WriteGate {
read_only: AtomicBool::new(read_only),
role,
lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
}
}
#[test]
fn standalone_allows_writes() {
let g = gate(false, ReplicationRole::Standalone);
assert!(g.check(WriteKind::Dml).is_ok());
assert!(g.check(WriteKind::Ddl).is_ok());
assert!(!g.is_read_only());
}
#[test]
fn primary_allows_writes() {
let g = gate(false, ReplicationRole::Primary);
assert!(g.check(WriteKind::Dml).is_ok());
assert!(!g.is_read_only());
}
#[test]
fn replica_rejects_every_kind() {
let g = gate(
true,
ReplicationRole::Replica {
primary_addr: "http://primary:50051".into(),
},
);
for kind in [
WriteKind::Dml,
WriteKind::Ddl,
WriteKind::IndexBuild,
WriteKind::Maintenance,
WriteKind::Backup,
WriteKind::Serverless,
] {
let err = g.check(kind).unwrap_err();
match err {
RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
other => panic!("expected ReadOnly, got {other:?}"),
}
}
assert!(g.is_read_only());
}
#[test]
fn read_only_flag_rejects_writes_on_standalone() {
let g = gate(true, ReplicationRole::Standalone);
let err = g.check(WriteKind::Dml).unwrap_err();
match err {
RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
other => panic!("expected ReadOnly, got {other:?}"),
}
}
#[test]
fn lease_not_held_rejects_writes_on_primary() {
let g = gate(false, ReplicationRole::Primary);
g.set_lease_state(LeaseGateState::NotHeld);
let err = g.check(WriteKind::Dml).unwrap_err();
match err {
RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
other => panic!("expected ReadOnly, got {other:?}"),
}
assert!(g.is_read_only());
}
#[test]
fn lease_held_allows_writes_on_primary() {
let g = gate(false, ReplicationRole::Primary);
g.set_lease_state(LeaseGateState::Held);
assert!(g.check(WriteKind::Dml).is_ok());
assert!(!g.is_read_only());
}
#[test]
fn lease_state_transitions_return_previous() {
let g = gate(false, ReplicationRole::Primary);
assert_eq!(
g.set_lease_state(LeaseGateState::Held),
LeaseGateState::NotRequired
);
assert_eq!(
g.set_lease_state(LeaseGateState::NotHeld),
LeaseGateState::Held
);
assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
}
#[test]
fn lease_loss_overrides_writable_read_only_flag() {
let g = gate(false, ReplicationRole::Primary);
g.set_lease_state(LeaseGateState::NotHeld);
let err = g.check(WriteKind::Ddl).unwrap_err();
match err {
RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
other => panic!("expected ReadOnly, got {other:?}"),
}
}
#[test]
fn replica_role_overrides_missing_read_only_flag() {
let g = gate(
false,
ReplicationRole::Replica {
primary_addr: "http://primary:50051".into(),
},
);
let err = g.check(WriteKind::Dml).unwrap_err();
match err {
RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
other => panic!("expected ReadOnly, got {other:?}"),
}
}
}