Skip to main content

reddb_server/runtime/
write_gate.rs

1//! Public-mutation gate.
2//!
3//! Centralises the check that decides whether a write coming through any
4//! public surface (SQL, HTTP, gRPC, PostgreSQL wire, native wire, admin
5//! mutating endpoints) is allowed for this instance.
6//!
7//! Two inputs:
8//! * `RedDBOptions::read_only` — set explicitly by operators.
9//! * `ReplicationConfig::role`  — `Replica { .. }` is always read-only on
10//!   public surfaces; internal logical-WAL apply (`LogicalChangeApplier`)
11//!   reaches into the store directly and never crosses this gate.
12//!
13//! All public mutation paths consult `WriteGate::check` before dispatching
14//! to storage. The replica internal apply path is the privileged surface
15//! and bypasses the gate by construction.
16//!
17//! Serverless writer-lease state (`PLAN.md` Phase 5 / W6) is wired
18//! through `LeaseGateState` — runtime flips it to `Held` after a
19//! successful acquire/refresh and back to `NotHeld` when the lease is
20//! lost, released, or has expired. Standalone / replica / lease-not-
21//! configured deployments stay on `NotRequired` so the check is a
22//! single atomic load of zero.
23
24use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::replication::ReplicationRole;
28
29/// Categorises the write so the rejection error can name a sensible
30/// surface in operator-facing logs without leaking internal call sites.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum WriteKind {
33    /// INSERT / UPDATE / DELETE on a user-visible collection.
34    Dml,
35    /// CREATE / DROP / ALTER TABLE, CREATE / DROP INDEX, etc.
36    Ddl,
37    /// Index build / rebuild outside a DDL statement (e.g. background reindex).
38    IndexBuild,
39    /// Reclaim / repair / retention sweeps that mutate state.
40    Maintenance,
41    /// Operator-triggered backup that mutates remote state.
42    Backup,
43    /// Serverless lifecycle endpoints that mutate state (attach / warmup
44    /// / reclaim).
45    Serverless,
46}
47
48impl WriteKind {
49    fn label(self) -> &'static str {
50        match self {
51            WriteKind::Dml => "DML",
52            WriteKind::Ddl => "DDL",
53            WriteKind::IndexBuild => "index build",
54            WriteKind::Maintenance => "maintenance",
55            WriteKind::Backup => "backup trigger",
56            WriteKind::Serverless => "serverless lifecycle",
57        }
58    }
59}
60
61/// Serverless writer-lease state wired through the gate.
62///
63/// `NotRequired` is the default — standalone, replica, and
64/// lease-disabled serverless deployments all share it. `Held` /
65/// `NotHeld` only matter for instances that opted into lease-fenced
66/// writes; the lease loop flips the value as it acquires / refreshes /
67/// loses the slot.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69#[repr(u8)]
70pub enum LeaseGateState {
71    NotRequired = 0,
72    Held = 1,
73    NotHeld = 2,
74}
75
76impl LeaseGateState {
77    fn from_u8(raw: u8) -> Self {
78        match raw {
79            1 => Self::Held,
80            2 => Self::NotHeld,
81            _ => Self::NotRequired,
82        }
83    }
84
85    pub fn label(self) -> &'static str {
86        match self {
87            Self::NotRequired => "not_required",
88            Self::Held => "held",
89            Self::NotHeld => "not_held",
90        }
91    }
92}
93
94/// Live policy for public-mutation surfaces.
95///
96/// `read_only` was originally a `bool` snapshot taken at runtime
97/// construction. PLAN.md Phase 4.3 promotes it to an `AtomicBool` so
98/// `POST /admin/readonly` can flip the policy without a restart. The
99/// `ReplicationRole` stays immutable — flipping a replica into a
100/// primary mid-process would need a full handshake (Phase 3 work in
101/// the data-safety plan), and shouldn't be a single-flag decision.
102#[derive(Debug)]
103pub struct WriteGate {
104    /// Operator-set read-only flag. Mutated by `POST /admin/readonly`
105    /// and by boot-time resolution (CLI/env/persisted state). Sticky:
106    /// the archive-lag auto-pause path (#519) never touches this — only
107    /// the operator can clear an operator-set pin.
108    read_only: AtomicBool,
109    role: ReplicationRole,
110    lease: AtomicU8,
111    /// Issue #519 — engine-managed graceful read-only triggered by
112    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS` when WAL archive lag exceeds
113    /// the threshold. Independent of `read_only` so the two precedences
114    /// (manual sticky, auto auto-resumes) cannot stomp each other.
115    auto_paused: AtomicBool,
116    /// Unix-ms timestamp of the last *successful* remote archive. `0`
117    /// means "never observed since boot"; the lag evaluator treats
118    /// that as "lag since boot" once it has been initialised by the
119    /// caller (typical pattern: stamp `now` at construction so the
120    /// instance gets a `threshold_secs` grace window).
121    last_archive_at_ms: AtomicU64,
122    /// Threshold from `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. `0` = feature
123    /// disabled — `evaluate_archive_lag` short-circuits without
124    /// touching `auto_paused`.
125    pause_threshold_secs: AtomicU64,
126}
127
128impl WriteGate {
129    pub fn from_options(options: &RedDBOptions) -> Self {
130        Self {
131            read_only: AtomicBool::new(options.read_only),
132            role: options.replication.role.clone(),
133            lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
134            auto_paused: AtomicBool::new(false),
135            last_archive_at_ms: AtomicU64::new(0),
136            pause_threshold_secs: AtomicU64::new(0),
137        }
138    }
139
140    /// Returns `Ok(())` if the public surface is allowed to perform `kind`.
141    /// Returns `RedDBError::ReadOnly` otherwise.
142    ///
143    /// Reasoning order is intentional:
144    /// 1. Replica role — a replica booted with `read_only = false`
145    ///    must still reject; this is a structural property.
146    /// 2. Lease lost — the strongest serverless correctness signal.
147    ///    A writer that lost its lease must stop *immediately*; running
148    ///    while another holder has been promoted causes split-brain.
149    /// 3. Operator read-only flag — explicit /admin/readonly toggle
150    ///    or boot-time pin; lower priority than lease loss because the
151    ///    operator can revoke it without external coordination.
152    pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
153        self.check_consent(kind).map(|_| ())
154    }
155
156    /// Same as `check` but on success returns a sealed
157    /// `WriteConsent` token. Mutating port methods that take
158    /// `&OperationContext` demand `ctx.write_consent.is_some()`;
159    /// the only way to mint such a token is to call this method,
160    /// so forgetting to consult the gate becomes a structural
161    /// property — not a discipline question.
162    pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
163        if matches!(self.role, ReplicationRole::Replica { .. }) {
164            return Err(RedDBError::ReadOnly(format!(
165                "instance is a replica — {} rejected on public surface",
166                kind.label()
167            )));
168        }
169        if matches!(self.lease_state(), LeaseGateState::NotHeld) {
170            return Err(RedDBError::ReadOnly(format!(
171                "writer lease not held — {} rejected (serverless fence)",
172                kind.label()
173            )));
174        }
175        if self.read_only.load(Ordering::Acquire) {
176            return Err(RedDBError::ReadOnly(format!(
177                "instance is configured read_only — {} rejected",
178                kind.label()
179            )));
180        }
181        if self.auto_paused.load(Ordering::Acquire) {
182            return Err(RedDBError::ReadOnly(format!(
183                "instance is paused — WAL archive lag exceeded threshold — {} rejected",
184                kind.label()
185            )));
186        }
187        Ok(crate::application::WriteConsent {
188            kind,
189            _seal: crate::application::WriteConsentSeal::new(),
190        })
191    }
192
193    pub fn is_read_only(&self) -> bool {
194        self.read_only.load(Ordering::Acquire)
195            || self.auto_paused.load(Ordering::Acquire)
196            || matches!(self.role, ReplicationRole::Replica { .. })
197            || matches!(self.lease_state(), LeaseGateState::NotHeld)
198    }
199
200    /// Whether the operator explicitly pinned this instance read-only
201    /// (via boot config or `POST /admin/readonly`). Distinct from
202    /// [`is_read_only`] which also returns `true` for structural
203    /// reasons (replica role, lease lost, archive-lag pause).
204    pub fn is_manual_read_only(&self) -> bool {
205        self.read_only.load(Ordering::Acquire)
206    }
207
208    /// Whether the engine-managed archive-lag pause (#519) is
209    /// currently active. Mutually independent of [`is_manual_read_only`]
210    /// so callers like `/backup/status` can report both.
211    pub fn is_auto_paused(&self) -> bool {
212        self.auto_paused.load(Ordering::Acquire)
213    }
214
215    pub fn role(&self) -> &ReplicationRole {
216        &self.role
217    }
218
219    /// PLAN.md Phase 4.3 — dynamic read-only toggle. Flipping a
220    /// replica back to writable here is a no-op for `check()` because
221    /// the role check fires first; the operator must change the
222    /// replication role through a separate, audited path.
223    ///
224    /// Returns the previous read_only value so callers can detect
225    /// idempotent calls (toggle to the same value = no work to do).
226    pub fn set_read_only(&self, enabled: bool) -> bool {
227        self.read_only.swap(enabled, Ordering::AcqRel)
228    }
229
230    /// Current writer-lease gate state. `NotRequired` for standalone,
231    /// replica, and lease-disabled serverless instances.
232    pub fn lease_state(&self) -> LeaseGateState {
233        LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
234    }
235
236    /// Issue #519 — install the archive-lag pause threshold and the
237    /// baseline "last archive observed at" stamp. Threshold `0`
238    /// disables auto-pause; subsequent `record_archive_success` /
239    /// `evaluate_archive_lag` calls then become no-ops.
240    ///
241    /// Idempotent: callers should invoke once during startup after
242    /// parsing `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. Stamping `last_archive_at_ms`
243    /// to "now" at construction grants a `threshold_secs` grace
244    /// window before the first auto-pause can fire — without it, a
245    /// freshly-booted instance with a never-archived WAL would flip
246    /// to read-only on the first poll.
247    pub fn configure_archive_lag_pause(&self, threshold_secs: u64, baseline_ms: u64) {
248        self.pause_threshold_secs
249            .store(threshold_secs, Ordering::Release);
250        self.last_archive_at_ms
251            .store(baseline_ms, Ordering::Release);
252    }
253
254    /// Stamp `last_archive_at_ms` after a successful remote archive.
255    /// Called by the WAL-archive task wrapper in `service_cli` after
256    /// `runtime.trigger_backup()` returns `Ok`.
257    pub fn record_archive_success(&self, now_ms: u64) {
258        self.last_archive_at_ms.store(now_ms, Ordering::Release);
259    }
260
261    /// Current archive-lag threshold in seconds. `0` means the
262    /// feature is disabled.
263    pub fn archive_pause_threshold_secs(&self) -> u64 {
264        self.pause_threshold_secs.load(Ordering::Acquire)
265    }
266
267    /// Last archive observation timestamp (unix ms).
268    pub fn last_archive_at_ms(&self) -> u64 {
269        self.last_archive_at_ms.load(Ordering::Acquire)
270    }
271
272    /// Re-evaluate the archive-lag state. Returns the resulting
273    /// `auto_paused` value.
274    ///
275    /// Semantics (issue #519):
276    /// * Threshold `0` → feature disabled, returns current state
277    ///   without writing.
278    /// * Manual read-only is **sticky** — when [`is_manual_read_only`]
279    ///   is true, this method never modifies `auto_paused`. The
280    ///   operator must clear the manual pin first; only then does the
281    ///   auto-path take over again on the next tick.
282    /// * Lag > threshold and manual=false → set `auto_paused = true`.
283    /// * Lag <= threshold and `auto_paused = true` → clear it
284    ///   (auto-resume). If `auto_paused` was already false, no-op.
285    pub fn evaluate_archive_lag(&self, now_ms: u64) -> bool {
286        let threshold = self.pause_threshold_secs.load(Ordering::Acquire);
287        if threshold == 0 {
288            return self.auto_paused.load(Ordering::Acquire);
289        }
290        if self.read_only.load(Ordering::Acquire) {
291            return self.auto_paused.load(Ordering::Acquire);
292        }
293        let last_ms = self.last_archive_at_ms.load(Ordering::Acquire);
294        let lag_secs = now_ms.saturating_sub(last_ms) / 1000;
295        let should_pause = lag_secs > threshold;
296        self.auto_paused.store(should_pause, Ordering::Release);
297        should_pause
298    }
299
300    /// Flip the lease gate state. Only `LeaseLifecycle` should call
301    /// this — other callers must go through the lifecycle so the
302    /// gate flip and the corresponding `lease/*` audit record
303    /// stay paired.
304    ///
305    /// Returns the previous state so the caller can detect idempotent
306    /// transitions and avoid spamming audit / metrics.
307    pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
308        LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315
316    fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
317        WriteGate {
318            read_only: AtomicBool::new(read_only),
319            role,
320            lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
321            auto_paused: AtomicBool::new(false),
322            last_archive_at_ms: AtomicU64::new(0),
323            pause_threshold_secs: AtomicU64::new(0),
324        }
325    }
326
327    #[test]
328    fn standalone_allows_writes() {
329        let g = gate(false, ReplicationRole::Standalone);
330        assert!(g.check(WriteKind::Dml).is_ok());
331        assert!(g.check(WriteKind::Ddl).is_ok());
332        assert!(!g.is_read_only());
333    }
334
335    #[test]
336    fn primary_allows_writes() {
337        let g = gate(false, ReplicationRole::Primary);
338        assert!(g.check(WriteKind::Dml).is_ok());
339        assert!(!g.is_read_only());
340    }
341
342    #[test]
343    fn replica_rejects_every_kind() {
344        let g = gate(
345            true,
346            ReplicationRole::Replica {
347                primary_addr: "http://primary:50051".into(),
348            },
349        );
350        for kind in [
351            WriteKind::Dml,
352            WriteKind::Ddl,
353            WriteKind::IndexBuild,
354            WriteKind::Maintenance,
355            WriteKind::Backup,
356            WriteKind::Serverless,
357        ] {
358            let err = g.check(kind).unwrap_err();
359            match err {
360                RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
361                other => panic!("expected ReadOnly, got {other:?}"),
362            }
363        }
364        assert!(g.is_read_only());
365    }
366
367    #[test]
368    fn read_only_flag_rejects_writes_on_standalone() {
369        let g = gate(true, ReplicationRole::Standalone);
370        let err = g.check(WriteKind::Dml).unwrap_err();
371        match err {
372            RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
373            other => panic!("expected ReadOnly, got {other:?}"),
374        }
375    }
376
377    #[test]
378    fn lease_not_held_rejects_writes_on_primary() {
379        let g = gate(false, ReplicationRole::Primary);
380        g.set_lease_state(LeaseGateState::NotHeld);
381        let err = g.check(WriteKind::Dml).unwrap_err();
382        match err {
383            RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
384            other => panic!("expected ReadOnly, got {other:?}"),
385        }
386        assert!(g.is_read_only());
387    }
388
389    #[test]
390    fn lease_held_allows_writes_on_primary() {
391        let g = gate(false, ReplicationRole::Primary);
392        g.set_lease_state(LeaseGateState::Held);
393        assert!(g.check(WriteKind::Dml).is_ok());
394        assert!(!g.is_read_only());
395    }
396
397    #[test]
398    fn lease_state_transitions_return_previous() {
399        let g = gate(false, ReplicationRole::Primary);
400        assert_eq!(
401            g.set_lease_state(LeaseGateState::Held),
402            LeaseGateState::NotRequired
403        );
404        assert_eq!(
405            g.set_lease_state(LeaseGateState::NotHeld),
406            LeaseGateState::Held
407        );
408        assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
409    }
410
411    #[test]
412    fn lease_loss_overrides_writable_read_only_flag() {
413        // Even with read_only=false, losing the lease must reject.
414        let g = gate(false, ReplicationRole::Primary);
415        g.set_lease_state(LeaseGateState::NotHeld);
416        let err = g.check(WriteKind::Ddl).unwrap_err();
417        match err {
418            RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
419            other => panic!("expected ReadOnly, got {other:?}"),
420        }
421    }
422
423    // ---------------------------------------------------------------
424    // Issue #519 — graceful read-only mode when WAL archive lag
425    // exceeds REDDB_BACKUP_PAUSE_ON_LAG_SECS.
426    // ---------------------------------------------------------------
427
428    #[test]
429    fn archive_lag_disabled_threshold_is_noop() {
430        let g = gate(false, ReplicationRole::Standalone);
431        g.configure_archive_lag_pause(0, 1_000);
432        // Even with an ancient timestamp, threshold=0 must not pause.
433        assert!(!g.evaluate_archive_lag(10_000_000_000));
434        assert!(!g.is_auto_paused());
435        assert!(g.check(WriteKind::Dml).is_ok());
436    }
437
438    #[test]
439    fn archive_lag_triggers_auto_pause_past_threshold() {
440        let g = gate(false, ReplicationRole::Standalone);
441        // Last archive at t=1_000_000ms; threshold = 60s.
442        g.configure_archive_lag_pause(60, 1_000_000);
443        // 30s later — still under threshold.
444        assert!(!g.evaluate_archive_lag(1_000_000 + 30_000));
445        assert!(g.check(WriteKind::Dml).is_ok());
446
447        // 120s later — over threshold; must auto-pause.
448        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
449        assert!(g.is_auto_paused());
450        let err = g.check(WriteKind::Dml).unwrap_err();
451        match err {
452            RedDBError::ReadOnly(msg) => assert!(msg.contains("WAL archive lag"), "{msg}"),
453            other => panic!("expected ReadOnly, got {other:?}"),
454        }
455        assert!(g.is_read_only());
456    }
457
458    #[test]
459    fn archive_lag_auto_resume_after_recovery() {
460        let g = gate(false, ReplicationRole::Standalone);
461        g.configure_archive_lag_pause(60, 1_000_000);
462        // Trip the auto-pause.
463        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
464        assert!(g.is_auto_paused());
465        // Archiver catches up — stamp success and re-evaluate.
466        g.record_archive_success(1_000_000 + 130_000);
467        assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
468        assert!(!g.is_auto_paused());
469        assert!(g.check(WriteKind::Dml).is_ok());
470    }
471
472    #[test]
473    fn manual_read_only_blocks_auto_pause_writes_and_is_sticky() {
474        // Operator pinned read-only *before* lag condition. The
475        // auto-pause path must be a no-op while manual is set, and
476        // archive recovery must NOT auto-clear the manual pin.
477        let g = gate(true, ReplicationRole::Standalone);
478        g.configure_archive_lag_pause(60, 1_000_000);
479
480        // Lag past threshold; but manual is set so auto stays false.
481        assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
482        assert!(!g.is_auto_paused());
483        assert!(g.is_manual_read_only());
484        // Writes still rejected — for the manual reason.
485        let err = g.check(WriteKind::Dml).unwrap_err();
486        match err {
487            RedDBError::ReadOnly(msg) => {
488                assert!(msg.contains("read_only"), "{msg}");
489                assert!(!msg.contains("WAL archive lag"), "{msg}");
490            }
491            other => panic!("expected ReadOnly, got {other:?}"),
492        }
493
494        // Archiver recovers; re-evaluate. Manual still set ⇒ auto stays false,
495        // manual stays true ⇒ instance stays read-only by operator intent.
496        g.record_archive_success(1_000_000 + 130_000);
497        assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
498        assert!(g.is_manual_read_only(), "manual must stay set");
499        assert!(!g.is_auto_paused());
500        assert!(g.check(WriteKind::Dml).is_err());
501    }
502
503    #[test]
504    fn manual_clearing_resumes_auto_evaluation() {
505        // Manual was set; operator clears it; lag is still bad.
506        // Next evaluation must auto-pause.
507        let g = gate(true, ReplicationRole::Standalone);
508        g.configure_archive_lag_pause(60, 1_000_000);
509        // No-op while manual.
510        assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
511        // Operator unsets manual.
512        g.set_read_only(false);
513        // Now the lag condition must fire.
514        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
515        assert!(g.is_auto_paused());
516    }
517
518    #[test]
519    fn archive_lag_pause_state_independent_from_manual_flag() {
520        let g = gate(false, ReplicationRole::Standalone);
521        g.configure_archive_lag_pause(60, 1_000_000);
522        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
523        // Operator separately pins manual on top; still both true.
524        let prev = g.set_read_only(true);
525        assert!(!prev);
526        assert!(g.is_manual_read_only());
527        assert!(g.is_auto_paused());
528        // Operator clears manual; auto pause survives.
529        g.set_read_only(false);
530        assert!(g.is_auto_paused());
531        assert!(g.check(WriteKind::Dml).is_err());
532    }
533
534    #[test]
535    fn replica_role_overrides_missing_read_only_flag() {
536        let g = gate(
537            false,
538            ReplicationRole::Replica {
539                primary_addr: "http://primary:50051".into(),
540            },
541        );
542        let err = g.check(WriteKind::Dml).unwrap_err();
543        match err {
544            RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
545            other => panic!("expected ReadOnly, got {other:?}"),
546        }
547    }
548}