Skip to main content

reddb_server/runtime/
write_gate.rs

1//! Public-mutation gate.
2//!
3//! Centralises the check that decides whether a write coming through any
4//! public surface (SQL, HTTP, gRPC, PostgreSQL wire, native wire, admin
5//! mutating endpoints) is allowed for this instance.
6//!
7//! Two inputs:
8//! * `RedDBOptions::read_only` — set explicitly by operators.
9//! * `ReplicationConfig::role`  — `Replica { .. }` is always read-only on
10//!   public surfaces; internal logical-WAL apply (`LogicalChangeApplier`)
11//!   reaches into the store directly and never crosses this gate.
12//!
13//! All public mutation paths consult `WriteGate::check` before dispatching
14//! to storage. The replica internal apply path is the privileged surface
15//! and bypasses the gate by construction.
16//!
17//! Serverless writer-lease state (`PLAN.md` Phase 5 / W6) is wired
18//! through `LeaseGateState` — runtime flips it to `Held` after a
19//! successful acquire/refresh and back to `NotHeld` when the lease is
20//! lost, released, or has expired. Standalone / replica / lease-not-
21//! configured deployments stay on `NotRequired` so the check is a
22//! single atomic load of zero.
23
24use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::replication::flow_control::{Admission, FlowController};
28use crate::replication::ReplicationRole;
29
30/// Env var that sets the flow-control soft target (in LSN records).
31/// `0` / unset disables write-admission throttling (the default).
32pub const FLOW_CONTROL_SOFT_TARGET_ENV: &str = "RED_REPLICATION_FLOW_CONTROL_SOFT_TARGET_LSN";
33
34/// Categorises the write so the rejection error can name a sensible
35/// surface in operator-facing logs without leaking internal call sites.
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum WriteKind {
38    /// INSERT / UPDATE / DELETE on a user-visible collection.
39    Dml,
40    /// CREATE / DROP / ALTER TABLE, CREATE / DROP INDEX, etc.
41    Ddl,
42    /// Index build / rebuild outside a DDL statement (e.g. background reindex).
43    IndexBuild,
44    /// Reclaim / repair / retention sweeps that mutate state.
45    Maintenance,
46    /// Operator-triggered backup that mutates remote state.
47    Backup,
48    /// Serverless lifecycle endpoints that mutate state (attach / warmup
49    /// / reclaim).
50    Serverless,
51}
52
53impl WriteKind {
54    fn label(self) -> &'static str {
55        match self {
56            WriteKind::Dml => "DML",
57            WriteKind::Ddl => "DDL",
58            WriteKind::IndexBuild => "index build",
59            WriteKind::Maintenance => "maintenance",
60            WriteKind::Backup => "backup trigger",
61            WriteKind::Serverless => "serverless lifecycle",
62        }
63    }
64}
65
66/// Serverless writer-lease state wired through the gate.
67///
68/// `NotRequired` is the default — standalone, replica, and
69/// lease-disabled serverless deployments all share it. `Held` /
70/// `NotHeld` only matter for instances that opted into lease-fenced
71/// writes; the lease loop flips the value as it acquires / refreshes /
72/// loses the slot.
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74#[repr(u8)]
75pub enum LeaseGateState {
76    NotRequired = 0,
77    Held = 1,
78    NotHeld = 2,
79}
80
81impl LeaseGateState {
82    fn from_u8(raw: u8) -> Self {
83        match raw {
84            1 => Self::Held,
85            2 => Self::NotHeld,
86            _ => Self::NotRequired,
87        }
88    }
89
90    pub fn label(self) -> &'static str {
91        match self {
92            Self::NotRequired => "not_required",
93            Self::Held => "held",
94            Self::NotHeld => "not_held",
95        }
96    }
97}
98
99/// Live policy for public-mutation surfaces.
100///
101/// `read_only` was originally a `bool` snapshot taken at runtime
102/// construction. PLAN.md Phase 4.3 promotes it to an `AtomicBool` so
103/// `POST /admin/readonly` can flip the policy without a restart. The
104/// `ReplicationRole` stays immutable — flipping a replica into a
105/// primary mid-process would need a full handshake (Phase 3 work in
106/// the data-safety plan), and shouldn't be a single-flag decision.
107#[derive(Debug)]
108pub struct WriteGate {
109    /// Operator-set read-only flag. Mutated by `POST /admin/readonly`
110    /// and by boot-time resolution (CLI/env/persisted state). Sticky:
111    /// the archive-lag auto-pause path (#519) never touches this — only
112    /// the operator can clear an operator-set pin.
113    read_only: AtomicBool,
114    role: ReplicationRole,
115    lease: AtomicU8,
116    /// Issue #519 — engine-managed graceful read-only triggered by
117    /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS` when WAL archive lag exceeds
118    /// the threshold. Independent of `read_only` so the two precedences
119    /// (manual sticky, auto auto-resumes) cannot stomp each other.
120    auto_paused: AtomicBool,
121    /// Unix-ms timestamp of the last *successful* remote archive. `0`
122    /// means "never observed since boot"; the lag evaluator treats
123    /// that as "lag since boot" once it has been initialised by the
124    /// caller (typical pattern: stamp `now` at construction so the
125    /// instance gets a `threshold_secs` grace window).
126    last_archive_at_ms: AtomicU64,
127    /// Threshold from `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. `0` = feature
128    /// disabled — `evaluate_archive_lag` short-circuits without
129    /// touching `auto_paused`.
130    pause_threshold_secs: AtomicU64,
131    /// Issue #826 — write-admission flow control keyed on in-quorum
132    /// replica lag. Independent of `read_only` / `auto_paused`: the
133    /// throttle engages and releases automatically as a quorum member
134    /// lags and recovers, and an operator read-only pin never clears it.
135    /// Disabled by default (soft target `0`).
136    flow: FlowController,
137}
138
139impl WriteGate {
140    pub fn from_options(options: &RedDBOptions) -> Self {
141        let soft_target = std::env::var(FLOW_CONTROL_SOFT_TARGET_ENV)
142            .ok()
143            .and_then(|raw| raw.trim().parse::<u64>().ok())
144            .unwrap_or(0);
145        Self {
146            read_only: AtomicBool::new(options.read_only),
147            role: options.replication.role.clone(),
148            lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
149            auto_paused: AtomicBool::new(false),
150            last_archive_at_ms: AtomicU64::new(0),
151            pause_threshold_secs: AtomicU64::new(0),
152            flow: FlowController::new(soft_target, options.replication.quorum.clone()),
153        }
154    }
155
156    /// Returns `Ok(())` if the public surface is allowed to perform `kind`.
157    /// Returns `RedDBError::ReadOnly` otherwise.
158    ///
159    /// Reasoning order is intentional:
160    /// 1. Replica role — a replica booted with `read_only = false`
161    ///    must still reject; this is a structural property.
162    /// 2. Lease lost — the strongest serverless correctness signal.
163    ///    A writer that lost its lease must stop *immediately*; running
164    ///    while another holder has been promoted causes split-brain.
165    /// 3. Operator read-only flag — explicit /admin/readonly toggle
166    ///    or boot-time pin; lower priority than lease loss because the
167    ///    operator can revoke it without external coordination.
168    pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
169        self.check_consent(kind).map(|_| ())
170    }
171
172    /// Same as `check` but on success returns a sealed
173    /// `WriteConsent` token. Mutating port methods that take
174    /// `&OperationContext` demand `ctx.write_consent.is_some()`;
175    /// the only way to mint such a token is to call this method,
176    /// so forgetting to consult the gate becomes a structural
177    /// property — not a discipline question.
178    pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
179        if matches!(self.role, ReplicationRole::Replica { .. }) {
180            return Err(RedDBError::ReadOnly(format!(
181                "instance is a replica — {} rejected on public surface",
182                kind.label()
183            )));
184        }
185        if matches!(self.lease_state(), LeaseGateState::NotHeld) {
186            return Err(RedDBError::ReadOnly(format!(
187                "writer lease not held — {} rejected (serverless fence)",
188                kind.label()
189            )));
190        }
191        if self.read_only.load(Ordering::Acquire) {
192            return Err(RedDBError::ReadOnly(format!(
193                "instance is configured read_only — {} rejected",
194                kind.label()
195            )));
196        }
197        if self.auto_paused.load(Ordering::Acquire) {
198            return Err(RedDBError::ReadOnly(format!(
199                "instance is paused — WAL archive lag exceeded threshold — {} rejected",
200                kind.label()
201            )));
202        }
203        // Issue #826 — write-admission flow control. Lowest-precedence
204        // gate: only consulted once the structural / operator / archive
205        // checks pass. Throttles when an in-quorum replica's lag exceeds
206        // the soft target; releases automatically as it catches up.
207        if matches!(self.flow.try_admit(), Admission::Throttled) {
208            return Err(RedDBError::ReadOnly(format!(
209                "write admission throttled — in-quorum replica lag exceeded soft target ({} records) — {} rejected",
210                self.flow.soft_target_lsn(),
211                kind.label()
212            )));
213        }
214        Ok(crate::application::WriteConsent {
215            kind,
216            _seal: crate::application::WriteConsentSeal::new(),
217        })
218    }
219
220    pub fn is_read_only(&self) -> bool {
221        self.read_only.load(Ordering::Acquire)
222            || self.auto_paused.load(Ordering::Acquire)
223            || matches!(self.role, ReplicationRole::Replica { .. })
224            || matches!(self.lease_state(), LeaseGateState::NotHeld)
225    }
226
227    /// Whether the operator explicitly pinned this instance read-only
228    /// (via boot config or `POST /admin/readonly`). Distinct from
229    /// [`is_read_only`] which also returns `true` for structural
230    /// reasons (replica role, lease lost, archive-lag pause).
231    pub fn is_manual_read_only(&self) -> bool {
232        self.read_only.load(Ordering::Acquire)
233    }
234
235    /// Whether the engine-managed archive-lag pause (#519) is
236    /// currently active. Mutually independent of [`is_manual_read_only`]
237    /// so callers like `/backup/status` can report both.
238    pub fn is_auto_paused(&self) -> bool {
239        self.auto_paused.load(Ordering::Acquire)
240    }
241
242    pub fn role(&self) -> &ReplicationRole {
243        &self.role
244    }
245
246    /// Issue #826 — borrow the write-admission flow controller. Callers
247    /// drive `observe()` from the primary's replica registry and read
248    /// throttle state for `/metrics`.
249    pub fn flow_control(&self) -> &FlowController {
250        &self.flow
251    }
252
253    /// Whether write admission is currently throttled by in-quorum
254    /// replica lag (issue #826). Distinct from [`is_read_only`] — a
255    /// throttle is transient back-pressure, not a read-only posture.
256    pub fn is_flow_throttled(&self) -> bool {
257        self.flow.is_throttled()
258    }
259
260    /// PLAN.md Phase 4.3 — dynamic read-only toggle. Flipping a
261    /// replica back to writable here is a no-op for `check()` because
262    /// the role check fires first; the operator must change the
263    /// replication role through a separate, audited path.
264    ///
265    /// Returns the previous read_only value so callers can detect
266    /// idempotent calls (toggle to the same value = no work to do).
267    pub fn set_read_only(&self, enabled: bool) -> bool {
268        self.read_only.swap(enabled, Ordering::AcqRel)
269    }
270
271    /// Current writer-lease gate state. `NotRequired` for standalone,
272    /// replica, and lease-disabled serverless instances.
273    pub fn lease_state(&self) -> LeaseGateState {
274        LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
275    }
276
277    /// Issue #519 — install the archive-lag pause threshold and the
278    /// baseline "last archive observed at" stamp. Threshold `0`
279    /// disables auto-pause; subsequent `record_archive_success` /
280    /// `evaluate_archive_lag` calls then become no-ops.
281    ///
282    /// Idempotent: callers should invoke once during startup after
283    /// parsing `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. Stamping `last_archive_at_ms`
284    /// to "now" at construction grants a `threshold_secs` grace
285    /// window before the first auto-pause can fire — without it, a
286    /// freshly-booted instance with a never-archived WAL would flip
287    /// to read-only on the first poll.
288    pub fn configure_archive_lag_pause(&self, threshold_secs: u64, baseline_ms: u64) {
289        self.pause_threshold_secs
290            .store(threshold_secs, Ordering::Release);
291        self.last_archive_at_ms
292            .store(baseline_ms, Ordering::Release);
293    }
294
295    /// Stamp `last_archive_at_ms` after a successful remote archive.
296    /// Called by the WAL-archive task wrapper in `service_cli` after
297    /// `runtime.trigger_backup()` returns `Ok`.
298    pub fn record_archive_success(&self, now_ms: u64) {
299        self.last_archive_at_ms.store(now_ms, Ordering::Release);
300    }
301
302    /// Current archive-lag threshold in seconds. `0` means the
303    /// feature is disabled.
304    pub fn archive_pause_threshold_secs(&self) -> u64 {
305        self.pause_threshold_secs.load(Ordering::Acquire)
306    }
307
308    /// Last archive observation timestamp (unix ms).
309    pub fn last_archive_at_ms(&self) -> u64 {
310        self.last_archive_at_ms.load(Ordering::Acquire)
311    }
312
313    /// Re-evaluate the archive-lag state. Returns the resulting
314    /// `auto_paused` value.
315    ///
316    /// Semantics (issue #519):
317    /// * Threshold `0` → feature disabled, returns current state
318    ///   without writing.
319    /// * Manual read-only is **sticky** — when [`is_manual_read_only`]
320    ///   is true, this method never modifies `auto_paused`. The
321    ///   operator must clear the manual pin first; only then does the
322    ///   auto-path take over again on the next tick.
323    /// * Lag > threshold and manual=false → set `auto_paused = true`.
324    /// * Lag <= threshold and `auto_paused = true` → clear it
325    ///   (auto-resume). If `auto_paused` was already false, no-op.
326    pub fn evaluate_archive_lag(&self, now_ms: u64) -> bool {
327        let threshold = self.pause_threshold_secs.load(Ordering::Acquire);
328        if threshold == 0 {
329            return self.auto_paused.load(Ordering::Acquire);
330        }
331        if self.read_only.load(Ordering::Acquire) {
332            return self.auto_paused.load(Ordering::Acquire);
333        }
334        let last_ms = self.last_archive_at_ms.load(Ordering::Acquire);
335        let lag_secs = now_ms.saturating_sub(last_ms) / 1000;
336        let should_pause = lag_secs > threshold;
337        self.auto_paused.store(should_pause, Ordering::Release);
338        should_pause
339    }
340
341    /// Flip the lease gate state. Only `LeaseLifecycle` should call
342    /// this — other callers must go through the lifecycle so the
343    /// gate flip and the corresponding `lease/*` audit record
344    /// stay paired.
345    ///
346    /// Returns the previous state so the caller can detect idempotent
347    /// transitions and avoid spamming audit / metrics.
348    pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
349        LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
358        WriteGate {
359            read_only: AtomicBool::new(read_only),
360            role,
361            lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
362            auto_paused: AtomicBool::new(false),
363            last_archive_at_ms: AtomicU64::new(0),
364            pause_threshold_secs: AtomicU64::new(0),
365            flow: FlowController::disabled(),
366        }
367    }
368
369    #[test]
370    fn standalone_allows_writes() {
371        let g = gate(false, ReplicationRole::Standalone);
372        assert!(g.check(WriteKind::Dml).is_ok());
373        assert!(g.check(WriteKind::Ddl).is_ok());
374        assert!(!g.is_read_only());
375    }
376
377    #[test]
378    fn primary_allows_writes() {
379        let g = gate(false, ReplicationRole::Primary);
380        assert!(g.check(WriteKind::Dml).is_ok());
381        assert!(!g.is_read_only());
382    }
383
384    #[test]
385    fn replica_rejects_every_kind() {
386        let g = gate(
387            true,
388            ReplicationRole::Replica {
389                primary_addr: "http://primary:50051".into(),
390            },
391        );
392        for kind in [
393            WriteKind::Dml,
394            WriteKind::Ddl,
395            WriteKind::IndexBuild,
396            WriteKind::Maintenance,
397            WriteKind::Backup,
398            WriteKind::Serverless,
399        ] {
400            let err = g.check(kind).unwrap_err();
401            match err {
402                RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
403                other => panic!("expected ReadOnly, got {other:?}"),
404            }
405        }
406        assert!(g.is_read_only());
407    }
408
409    #[test]
410    fn read_only_flag_rejects_writes_on_standalone() {
411        let g = gate(true, ReplicationRole::Standalone);
412        let err = g.check(WriteKind::Dml).unwrap_err();
413        match err {
414            RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
415            other => panic!("expected ReadOnly, got {other:?}"),
416        }
417    }
418
419    #[test]
420    fn lease_not_held_rejects_writes_on_primary() {
421        let g = gate(false, ReplicationRole::Primary);
422        g.set_lease_state(LeaseGateState::NotHeld);
423        let err = g.check(WriteKind::Dml).unwrap_err();
424        match err {
425            RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
426            other => panic!("expected ReadOnly, got {other:?}"),
427        }
428        assert!(g.is_read_only());
429    }
430
431    #[test]
432    fn lease_held_allows_writes_on_primary() {
433        let g = gate(false, ReplicationRole::Primary);
434        g.set_lease_state(LeaseGateState::Held);
435        assert!(g.check(WriteKind::Dml).is_ok());
436        assert!(!g.is_read_only());
437    }
438
439    #[test]
440    fn lease_state_transitions_return_previous() {
441        let g = gate(false, ReplicationRole::Primary);
442        assert_eq!(
443            g.set_lease_state(LeaseGateState::Held),
444            LeaseGateState::NotRequired
445        );
446        assert_eq!(
447            g.set_lease_state(LeaseGateState::NotHeld),
448            LeaseGateState::Held
449        );
450        assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
451    }
452
453    #[test]
454    fn lease_loss_overrides_writable_read_only_flag() {
455        // Even with read_only=false, losing the lease must reject.
456        let g = gate(false, ReplicationRole::Primary);
457        g.set_lease_state(LeaseGateState::NotHeld);
458        let err = g.check(WriteKind::Ddl).unwrap_err();
459        match err {
460            RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
461            other => panic!("expected ReadOnly, got {other:?}"),
462        }
463    }
464
465    // ---------------------------------------------------------------
466    // Issue #519 — graceful read-only mode when WAL archive lag
467    // exceeds REDDB_BACKUP_PAUSE_ON_LAG_SECS.
468    // ---------------------------------------------------------------
469
470    #[test]
471    fn archive_lag_disabled_threshold_is_noop() {
472        let g = gate(false, ReplicationRole::Standalone);
473        g.configure_archive_lag_pause(0, 1_000);
474        // Even with an ancient timestamp, threshold=0 must not pause.
475        assert!(!g.evaluate_archive_lag(10_000_000_000));
476        assert!(!g.is_auto_paused());
477        assert!(g.check(WriteKind::Dml).is_ok());
478    }
479
480    #[test]
481    fn archive_lag_triggers_auto_pause_past_threshold() {
482        let g = gate(false, ReplicationRole::Standalone);
483        // Last archive at t=1_000_000ms; threshold = 60s.
484        g.configure_archive_lag_pause(60, 1_000_000);
485        // 30s later — still under threshold.
486        assert!(!g.evaluate_archive_lag(1_000_000 + 30_000));
487        assert!(g.check(WriteKind::Dml).is_ok());
488
489        // 120s later — over threshold; must auto-pause.
490        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
491        assert!(g.is_auto_paused());
492        let err = g.check(WriteKind::Dml).unwrap_err();
493        match err {
494            RedDBError::ReadOnly(msg) => assert!(msg.contains("WAL archive lag"), "{msg}"),
495            other => panic!("expected ReadOnly, got {other:?}"),
496        }
497        assert!(g.is_read_only());
498    }
499
500    #[test]
501    fn archive_lag_auto_resume_after_recovery() {
502        let g = gate(false, ReplicationRole::Standalone);
503        g.configure_archive_lag_pause(60, 1_000_000);
504        // Trip the auto-pause.
505        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
506        assert!(g.is_auto_paused());
507        // Archiver catches up — stamp success and re-evaluate.
508        g.record_archive_success(1_000_000 + 130_000);
509        assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
510        assert!(!g.is_auto_paused());
511        assert!(g.check(WriteKind::Dml).is_ok());
512    }
513
514    #[test]
515    fn manual_read_only_blocks_auto_pause_writes_and_is_sticky() {
516        // Operator pinned read-only *before* lag condition. The
517        // auto-pause path must be a no-op while manual is set, and
518        // archive recovery must NOT auto-clear the manual pin.
519        let g = gate(true, ReplicationRole::Standalone);
520        g.configure_archive_lag_pause(60, 1_000_000);
521
522        // Lag past threshold; but manual is set so auto stays false.
523        assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
524        assert!(!g.is_auto_paused());
525        assert!(g.is_manual_read_only());
526        // Writes still rejected — for the manual reason.
527        let err = g.check(WriteKind::Dml).unwrap_err();
528        match err {
529            RedDBError::ReadOnly(msg) => {
530                assert!(msg.contains("read_only"), "{msg}");
531                assert!(!msg.contains("WAL archive lag"), "{msg}");
532            }
533            other => panic!("expected ReadOnly, got {other:?}"),
534        }
535
536        // Archiver recovers; re-evaluate. Manual still set ⇒ auto stays false,
537        // manual stays true ⇒ instance stays read-only by operator intent.
538        g.record_archive_success(1_000_000 + 130_000);
539        assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
540        assert!(g.is_manual_read_only(), "manual must stay set");
541        assert!(!g.is_auto_paused());
542        assert!(g.check(WriteKind::Dml).is_err());
543    }
544
545    #[test]
546    fn manual_clearing_resumes_auto_evaluation() {
547        // Manual was set; operator clears it; lag is still bad.
548        // Next evaluation must auto-pause.
549        let g = gate(true, ReplicationRole::Standalone);
550        g.configure_archive_lag_pause(60, 1_000_000);
551        // No-op while manual.
552        assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
553        // Operator unsets manual.
554        g.set_read_only(false);
555        // Now the lag condition must fire.
556        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
557        assert!(g.is_auto_paused());
558    }
559
560    #[test]
561    fn archive_lag_pause_state_independent_from_manual_flag() {
562        let g = gate(false, ReplicationRole::Standalone);
563        g.configure_archive_lag_pause(60, 1_000_000);
564        assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
565        // Operator separately pins manual on top; still both true.
566        let prev = g.set_read_only(true);
567        assert!(!prev);
568        assert!(g.is_manual_read_only());
569        assert!(g.is_auto_paused());
570        // Operator clears manual; auto pause survives.
571        g.set_read_only(false);
572        assert!(g.is_auto_paused());
573        assert!(g.check(WriteKind::Dml).is_err());
574    }
575
576    // ---------------------------------------------------------------
577    // Issue #826 — write-admission flow control on in-quorum replica lag.
578    // ---------------------------------------------------------------
579
580    fn gate_with_flow(soft_target: u64, quorum: crate::replication::QuorumConfig) -> WriteGate {
581        WriteGate {
582            read_only: AtomicBool::new(false),
583            role: ReplicationRole::Primary,
584            lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
585            auto_paused: AtomicBool::new(false),
586            last_archive_at_ms: AtomicU64::new(0),
587            pause_threshold_secs: AtomicU64::new(0),
588            flow: FlowController::new(soft_target, quorum),
589        }
590    }
591
592    fn flow_replica(
593        id: &str,
594        region: Option<&str>,
595        last_acked_lsn: u64,
596    ) -> crate::replication::primary::ReplicaState {
597        crate::replication::primary::ReplicaState {
598            id: id.to_string(),
599            last_acked_lsn,
600            last_sent_lsn: last_acked_lsn,
601            last_durable_lsn: last_acked_lsn,
602            apply_error_count: 0,
603            divergence_count: 0,
604            connected_at_unix_ms: 0,
605            last_seen_at_unix_ms: 0,
606            region: region.map(String::from),
607            rebootstrapping: false,
608        }
609    }
610
611    #[test]
612    fn flow_throttle_rejects_writes_when_in_quorum_replica_lags_and_releases() {
613        let g = gate_with_flow(100, crate::replication::QuorumConfig::sync(1));
614        // Healthy: no observation yet → writes admitted.
615        assert!(g.check(WriteKind::Dml).is_ok());
616        assert!(!g.is_flow_throttled());
617
618        // In-quorum replica lags 150 > soft target 100 → throttle engages.
619        g.flow_control()
620            .observe(&[flow_replica("r1", Some("us"), 350)], 500);
621        assert!(g.is_flow_throttled());
622        let err = g.check(WriteKind::Dml).unwrap_err();
623        match err {
624            RedDBError::ReadOnly(msg) => assert!(msg.contains("throttled"), "{msg}"),
625            other => panic!("expected ReadOnly throttle, got {other:?}"),
626        }
627        // Throttle is not the read-only posture.
628        assert!(!g.is_read_only());
629
630        // Replica catches up (lag 50 <= 100) → throttle releases.
631        g.flow_control()
632            .observe(&[flow_replica("r1", Some("us"), 450)], 500);
633        assert!(!g.is_flow_throttled());
634        assert!(g.check(WriteKind::Dml).is_ok());
635    }
636
637    #[test]
638    fn flow_throttle_excludes_async_read_replica() {
639        // Regions quorum requires "us"; an async read-replica in "ap"
640        // lags hugely but must never engage throttling.
641        let g = gate_with_flow(100, crate::replication::QuorumConfig::regions(["us"]));
642        g.flow_control().observe(
643            &[
644                flow_replica("in-quorum-us", Some("us"), 500),
645                flow_replica("async-ap", Some("ap"), 0),
646            ],
647            500,
648        );
649        assert!(!g.is_flow_throttled());
650        assert!(g.check(WriteKind::Dml).is_ok());
651    }
652
653    #[test]
654    fn flow_throttle_disabled_by_default() {
655        let g = gate(false, ReplicationRole::Primary);
656        // Even a huge lag observation cannot throttle a disabled controller.
657        g.flow_control()
658            .observe(&[flow_replica("r1", Some("us"), 0)], 1_000_000);
659        assert!(!g.is_flow_throttled());
660        assert!(g.check(WriteKind::Dml).is_ok());
661    }
662
663    #[test]
664    fn replica_role_overrides_missing_read_only_flag() {
665        let g = gate(
666            false,
667            ReplicationRole::Replica {
668                primary_addr: "http://primary:50051".into(),
669            },
670        );
671        let err = g.check(WriteKind::Dml).unwrap_err();
672        match err {
673            RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
674            other => panic!("expected ReadOnly, got {other:?}"),
675        }
676    }
677}