reddb_server/runtime/write_gate.rs
1//! Public-mutation gate.
2//!
3//! Centralises the check that decides whether a write coming through any
4//! public surface (SQL, HTTP, gRPC, PostgreSQL wire, native wire, admin
5//! mutating endpoints) is allowed for this instance.
6//!
7//! Two inputs:
8//! * `RedDBOptions::read_only` — set explicitly by operators.
9//! * `ReplicationConfig::role` — `Replica { .. }` is always read-only on
10//! public surfaces; internal logical-WAL apply (`LogicalChangeApplier`)
11//! reaches into the store directly and never crosses this gate.
12//!
13//! All public mutation paths consult `WriteGate::check` before dispatching
14//! to storage. The replica internal apply path is the privileged surface
15//! and bypasses the gate by construction.
16//!
17//! Serverless writer-lease state (`PLAN.md` Phase 5 / W6) is wired
18//! through `LeaseGateState` — runtime flips it to `Held` after a
19//! successful acquire/refresh and back to `NotHeld` when the lease is
20//! lost, released, or has expired. Standalone / replica / lease-not-
21//! configured deployments stay on `NotRequired` so the check is a
22//! single atomic load of zero.
23
24use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::replication::ReplicationRole;
28
29/// Categorises the write so the rejection error can name a sensible
30/// surface in operator-facing logs without leaking internal call sites.
31#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum WriteKind {
33 /// INSERT / UPDATE / DELETE on a user-visible collection.
34 Dml,
35 /// CREATE / DROP / ALTER TABLE, CREATE / DROP INDEX, etc.
36 Ddl,
37 /// Index build / rebuild outside a DDL statement (e.g. background reindex).
38 IndexBuild,
39 /// Reclaim / repair / retention sweeps that mutate state.
40 Maintenance,
41 /// Operator-triggered backup that mutates remote state.
42 Backup,
43 /// Serverless lifecycle endpoints that mutate state (attach / warmup
44 /// / reclaim).
45 Serverless,
46}
47
48impl WriteKind {
49 fn label(self) -> &'static str {
50 match self {
51 WriteKind::Dml => "DML",
52 WriteKind::Ddl => "DDL",
53 WriteKind::IndexBuild => "index build",
54 WriteKind::Maintenance => "maintenance",
55 WriteKind::Backup => "backup trigger",
56 WriteKind::Serverless => "serverless lifecycle",
57 }
58 }
59}
60
61/// Serverless writer-lease state wired through the gate.
62///
63/// `NotRequired` is the default — standalone, replica, and
64/// lease-disabled serverless deployments all share it. `Held` /
65/// `NotHeld` only matter for instances that opted into lease-fenced
66/// writes; the lease loop flips the value as it acquires / refreshes /
67/// loses the slot.
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69#[repr(u8)]
70pub enum LeaseGateState {
71 NotRequired = 0,
72 Held = 1,
73 NotHeld = 2,
74}
75
76impl LeaseGateState {
77 fn from_u8(raw: u8) -> Self {
78 match raw {
79 1 => Self::Held,
80 2 => Self::NotHeld,
81 _ => Self::NotRequired,
82 }
83 }
84
85 pub fn label(self) -> &'static str {
86 match self {
87 Self::NotRequired => "not_required",
88 Self::Held => "held",
89 Self::NotHeld => "not_held",
90 }
91 }
92}
93
94/// Live policy for public-mutation surfaces.
95///
96/// `read_only` was originally a `bool` snapshot taken at runtime
97/// construction. PLAN.md Phase 4.3 promotes it to an `AtomicBool` so
98/// `POST /admin/readonly` can flip the policy without a restart. The
99/// `ReplicationRole` stays immutable — flipping a replica into a
100/// primary mid-process would need a full handshake (Phase 3 work in
101/// the data-safety plan), and shouldn't be a single-flag decision.
102#[derive(Debug)]
103pub struct WriteGate {
104 /// Operator-set read-only flag. Mutated by `POST /admin/readonly`
105 /// and by boot-time resolution (CLI/env/persisted state). Sticky:
106 /// the archive-lag auto-pause path (#519) never touches this — only
107 /// the operator can clear an operator-set pin.
108 read_only: AtomicBool,
109 role: ReplicationRole,
110 lease: AtomicU8,
111 /// Issue #519 — engine-managed graceful read-only triggered by
112 /// `REDDB_BACKUP_PAUSE_ON_LAG_SECS` when WAL archive lag exceeds
113 /// the threshold. Independent of `read_only` so the two precedences
114 /// (manual sticky, auto auto-resumes) cannot stomp each other.
115 auto_paused: AtomicBool,
116 /// Unix-ms timestamp of the last *successful* remote archive. `0`
117 /// means "never observed since boot"; the lag evaluator treats
118 /// that as "lag since boot" once it has been initialised by the
119 /// caller (typical pattern: stamp `now` at construction so the
120 /// instance gets a `threshold_secs` grace window).
121 last_archive_at_ms: AtomicU64,
122 /// Threshold from `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. `0` = feature
123 /// disabled — `evaluate_archive_lag` short-circuits without
124 /// touching `auto_paused`.
125 pause_threshold_secs: AtomicU64,
126}
127
128impl WriteGate {
129 pub fn from_options(options: &RedDBOptions) -> Self {
130 Self {
131 read_only: AtomicBool::new(options.read_only),
132 role: options.replication.role.clone(),
133 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
134 auto_paused: AtomicBool::new(false),
135 last_archive_at_ms: AtomicU64::new(0),
136 pause_threshold_secs: AtomicU64::new(0),
137 }
138 }
139
140 /// Returns `Ok(())` if the public surface is allowed to perform `kind`.
141 /// Returns `RedDBError::ReadOnly` otherwise.
142 ///
143 /// Reasoning order is intentional:
144 /// 1. Replica role — a replica booted with `read_only = false`
145 /// must still reject; this is a structural property.
146 /// 2. Lease lost — the strongest serverless correctness signal.
147 /// A writer that lost its lease must stop *immediately*; running
148 /// while another holder has been promoted causes split-brain.
149 /// 3. Operator read-only flag — explicit /admin/readonly toggle
150 /// or boot-time pin; lower priority than lease loss because the
151 /// operator can revoke it without external coordination.
152 pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
153 self.check_consent(kind).map(|_| ())
154 }
155
156 /// Same as `check` but on success returns a sealed
157 /// `WriteConsent` token. Mutating port methods that take
158 /// `&OperationContext` demand `ctx.write_consent.is_some()`;
159 /// the only way to mint such a token is to call this method,
160 /// so forgetting to consult the gate becomes a structural
161 /// property — not a discipline question.
162 pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
163 if matches!(self.role, ReplicationRole::Replica { .. }) {
164 return Err(RedDBError::ReadOnly(format!(
165 "instance is a replica — {} rejected on public surface",
166 kind.label()
167 )));
168 }
169 if matches!(self.lease_state(), LeaseGateState::NotHeld) {
170 return Err(RedDBError::ReadOnly(format!(
171 "writer lease not held — {} rejected (serverless fence)",
172 kind.label()
173 )));
174 }
175 if self.read_only.load(Ordering::Acquire) {
176 return Err(RedDBError::ReadOnly(format!(
177 "instance is configured read_only — {} rejected",
178 kind.label()
179 )));
180 }
181 if self.auto_paused.load(Ordering::Acquire) {
182 return Err(RedDBError::ReadOnly(format!(
183 "instance is paused — WAL archive lag exceeded threshold — {} rejected",
184 kind.label()
185 )));
186 }
187 Ok(crate::application::WriteConsent {
188 kind,
189 _seal: crate::application::WriteConsentSeal::new(),
190 })
191 }
192
193 pub fn is_read_only(&self) -> bool {
194 self.read_only.load(Ordering::Acquire)
195 || self.auto_paused.load(Ordering::Acquire)
196 || matches!(self.role, ReplicationRole::Replica { .. })
197 || matches!(self.lease_state(), LeaseGateState::NotHeld)
198 }
199
200 /// Whether the operator explicitly pinned this instance read-only
201 /// (via boot config or `POST /admin/readonly`). Distinct from
202 /// [`is_read_only`] which also returns `true` for structural
203 /// reasons (replica role, lease lost, archive-lag pause).
204 pub fn is_manual_read_only(&self) -> bool {
205 self.read_only.load(Ordering::Acquire)
206 }
207
208 /// Whether the engine-managed archive-lag pause (#519) is
209 /// currently active. Mutually independent of [`is_manual_read_only`]
210 /// so callers like `/backup/status` can report both.
211 pub fn is_auto_paused(&self) -> bool {
212 self.auto_paused.load(Ordering::Acquire)
213 }
214
215 pub fn role(&self) -> &ReplicationRole {
216 &self.role
217 }
218
219 /// PLAN.md Phase 4.3 — dynamic read-only toggle. Flipping a
220 /// replica back to writable here is a no-op for `check()` because
221 /// the role check fires first; the operator must change the
222 /// replication role through a separate, audited path.
223 ///
224 /// Returns the previous read_only value so callers can detect
225 /// idempotent calls (toggle to the same value = no work to do).
226 pub fn set_read_only(&self, enabled: bool) -> bool {
227 self.read_only.swap(enabled, Ordering::AcqRel)
228 }
229
230 /// Current writer-lease gate state. `NotRequired` for standalone,
231 /// replica, and lease-disabled serverless instances.
232 pub fn lease_state(&self) -> LeaseGateState {
233 LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
234 }
235
236 /// Issue #519 — install the archive-lag pause threshold and the
237 /// baseline "last archive observed at" stamp. Threshold `0`
238 /// disables auto-pause; subsequent `record_archive_success` /
239 /// `evaluate_archive_lag` calls then become no-ops.
240 ///
241 /// Idempotent: callers should invoke once during startup after
242 /// parsing `REDDB_BACKUP_PAUSE_ON_LAG_SECS`. Stamping `last_archive_at_ms`
243 /// to "now" at construction grants a `threshold_secs` grace
244 /// window before the first auto-pause can fire — without it, a
245 /// freshly-booted instance with a never-archived WAL would flip
246 /// to read-only on the first poll.
247 pub fn configure_archive_lag_pause(&self, threshold_secs: u64, baseline_ms: u64) {
248 self.pause_threshold_secs
249 .store(threshold_secs, Ordering::Release);
250 self.last_archive_at_ms
251 .store(baseline_ms, Ordering::Release);
252 }
253
254 /// Stamp `last_archive_at_ms` after a successful remote archive.
255 /// Called by the WAL-archive task wrapper in `service_cli` after
256 /// `runtime.trigger_backup()` returns `Ok`.
257 pub fn record_archive_success(&self, now_ms: u64) {
258 self.last_archive_at_ms.store(now_ms, Ordering::Release);
259 }
260
261 /// Current archive-lag threshold in seconds. `0` means the
262 /// feature is disabled.
263 pub fn archive_pause_threshold_secs(&self) -> u64 {
264 self.pause_threshold_secs.load(Ordering::Acquire)
265 }
266
267 /// Last archive observation timestamp (unix ms).
268 pub fn last_archive_at_ms(&self) -> u64 {
269 self.last_archive_at_ms.load(Ordering::Acquire)
270 }
271
272 /// Re-evaluate the archive-lag state. Returns the resulting
273 /// `auto_paused` value.
274 ///
275 /// Semantics (issue #519):
276 /// * Threshold `0` → feature disabled, returns current state
277 /// without writing.
278 /// * Manual read-only is **sticky** — when [`is_manual_read_only`]
279 /// is true, this method never modifies `auto_paused`. The
280 /// operator must clear the manual pin first; only then does the
281 /// auto-path take over again on the next tick.
282 /// * Lag > threshold and manual=false → set `auto_paused = true`.
283 /// * Lag <= threshold and `auto_paused = true` → clear it
284 /// (auto-resume). If `auto_paused` was already false, no-op.
285 pub fn evaluate_archive_lag(&self, now_ms: u64) -> bool {
286 let threshold = self.pause_threshold_secs.load(Ordering::Acquire);
287 if threshold == 0 {
288 return self.auto_paused.load(Ordering::Acquire);
289 }
290 if self.read_only.load(Ordering::Acquire) {
291 return self.auto_paused.load(Ordering::Acquire);
292 }
293 let last_ms = self.last_archive_at_ms.load(Ordering::Acquire);
294 let lag_secs = now_ms.saturating_sub(last_ms) / 1000;
295 let should_pause = lag_secs > threshold;
296 self.auto_paused.store(should_pause, Ordering::Release);
297 should_pause
298 }
299
300 /// Flip the lease gate state. Only `LeaseLifecycle` should call
301 /// this — other callers must go through the lifecycle so the
302 /// gate flip and the corresponding `lease/*` audit record
303 /// stay paired.
304 ///
305 /// Returns the previous state so the caller can detect idempotent
306 /// transitions and avoid spamming audit / metrics.
307 pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
308 LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315
316 fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
317 WriteGate {
318 read_only: AtomicBool::new(read_only),
319 role,
320 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
321 auto_paused: AtomicBool::new(false),
322 last_archive_at_ms: AtomicU64::new(0),
323 pause_threshold_secs: AtomicU64::new(0),
324 }
325 }
326
327 #[test]
328 fn standalone_allows_writes() {
329 let g = gate(false, ReplicationRole::Standalone);
330 assert!(g.check(WriteKind::Dml).is_ok());
331 assert!(g.check(WriteKind::Ddl).is_ok());
332 assert!(!g.is_read_only());
333 }
334
335 #[test]
336 fn primary_allows_writes() {
337 let g = gate(false, ReplicationRole::Primary);
338 assert!(g.check(WriteKind::Dml).is_ok());
339 assert!(!g.is_read_only());
340 }
341
342 #[test]
343 fn replica_rejects_every_kind() {
344 let g = gate(
345 true,
346 ReplicationRole::Replica {
347 primary_addr: "http://primary:50051".into(),
348 },
349 );
350 for kind in [
351 WriteKind::Dml,
352 WriteKind::Ddl,
353 WriteKind::IndexBuild,
354 WriteKind::Maintenance,
355 WriteKind::Backup,
356 WriteKind::Serverless,
357 ] {
358 let err = g.check(kind).unwrap_err();
359 match err {
360 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
361 other => panic!("expected ReadOnly, got {other:?}"),
362 }
363 }
364 assert!(g.is_read_only());
365 }
366
367 #[test]
368 fn read_only_flag_rejects_writes_on_standalone() {
369 let g = gate(true, ReplicationRole::Standalone);
370 let err = g.check(WriteKind::Dml).unwrap_err();
371 match err {
372 RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
373 other => panic!("expected ReadOnly, got {other:?}"),
374 }
375 }
376
377 #[test]
378 fn lease_not_held_rejects_writes_on_primary() {
379 let g = gate(false, ReplicationRole::Primary);
380 g.set_lease_state(LeaseGateState::NotHeld);
381 let err = g.check(WriteKind::Dml).unwrap_err();
382 match err {
383 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
384 other => panic!("expected ReadOnly, got {other:?}"),
385 }
386 assert!(g.is_read_only());
387 }
388
389 #[test]
390 fn lease_held_allows_writes_on_primary() {
391 let g = gate(false, ReplicationRole::Primary);
392 g.set_lease_state(LeaseGateState::Held);
393 assert!(g.check(WriteKind::Dml).is_ok());
394 assert!(!g.is_read_only());
395 }
396
397 #[test]
398 fn lease_state_transitions_return_previous() {
399 let g = gate(false, ReplicationRole::Primary);
400 assert_eq!(
401 g.set_lease_state(LeaseGateState::Held),
402 LeaseGateState::NotRequired
403 );
404 assert_eq!(
405 g.set_lease_state(LeaseGateState::NotHeld),
406 LeaseGateState::Held
407 );
408 assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
409 }
410
411 #[test]
412 fn lease_loss_overrides_writable_read_only_flag() {
413 // Even with read_only=false, losing the lease must reject.
414 let g = gate(false, ReplicationRole::Primary);
415 g.set_lease_state(LeaseGateState::NotHeld);
416 let err = g.check(WriteKind::Ddl).unwrap_err();
417 match err {
418 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
419 other => panic!("expected ReadOnly, got {other:?}"),
420 }
421 }
422
423 // ---------------------------------------------------------------
424 // Issue #519 — graceful read-only mode when WAL archive lag
425 // exceeds REDDB_BACKUP_PAUSE_ON_LAG_SECS.
426 // ---------------------------------------------------------------
427
428 #[test]
429 fn archive_lag_disabled_threshold_is_noop() {
430 let g = gate(false, ReplicationRole::Standalone);
431 g.configure_archive_lag_pause(0, 1_000);
432 // Even with an ancient timestamp, threshold=0 must not pause.
433 assert!(!g.evaluate_archive_lag(10_000_000_000));
434 assert!(!g.is_auto_paused());
435 assert!(g.check(WriteKind::Dml).is_ok());
436 }
437
438 #[test]
439 fn archive_lag_triggers_auto_pause_past_threshold() {
440 let g = gate(false, ReplicationRole::Standalone);
441 // Last archive at t=1_000_000ms; threshold = 60s.
442 g.configure_archive_lag_pause(60, 1_000_000);
443 // 30s later — still under threshold.
444 assert!(!g.evaluate_archive_lag(1_000_000 + 30_000));
445 assert!(g.check(WriteKind::Dml).is_ok());
446
447 // 120s later — over threshold; must auto-pause.
448 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
449 assert!(g.is_auto_paused());
450 let err = g.check(WriteKind::Dml).unwrap_err();
451 match err {
452 RedDBError::ReadOnly(msg) => assert!(msg.contains("WAL archive lag"), "{msg}"),
453 other => panic!("expected ReadOnly, got {other:?}"),
454 }
455 assert!(g.is_read_only());
456 }
457
458 #[test]
459 fn archive_lag_auto_resume_after_recovery() {
460 let g = gate(false, ReplicationRole::Standalone);
461 g.configure_archive_lag_pause(60, 1_000_000);
462 // Trip the auto-pause.
463 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
464 assert!(g.is_auto_paused());
465 // Archiver catches up — stamp success and re-evaluate.
466 g.record_archive_success(1_000_000 + 130_000);
467 assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
468 assert!(!g.is_auto_paused());
469 assert!(g.check(WriteKind::Dml).is_ok());
470 }
471
472 #[test]
473 fn manual_read_only_blocks_auto_pause_writes_and_is_sticky() {
474 // Operator pinned read-only *before* lag condition. The
475 // auto-pause path must be a no-op while manual is set, and
476 // archive recovery must NOT auto-clear the manual pin.
477 let g = gate(true, ReplicationRole::Standalone);
478 g.configure_archive_lag_pause(60, 1_000_000);
479
480 // Lag past threshold; but manual is set so auto stays false.
481 assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
482 assert!(!g.is_auto_paused());
483 assert!(g.is_manual_read_only());
484 // Writes still rejected — for the manual reason.
485 let err = g.check(WriteKind::Dml).unwrap_err();
486 match err {
487 RedDBError::ReadOnly(msg) => {
488 assert!(msg.contains("read_only"), "{msg}");
489 assert!(!msg.contains("WAL archive lag"), "{msg}");
490 }
491 other => panic!("expected ReadOnly, got {other:?}"),
492 }
493
494 // Archiver recovers; re-evaluate. Manual still set ⇒ auto stays false,
495 // manual stays true ⇒ instance stays read-only by operator intent.
496 g.record_archive_success(1_000_000 + 130_000);
497 assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
498 assert!(g.is_manual_read_only(), "manual must stay set");
499 assert!(!g.is_auto_paused());
500 assert!(g.check(WriteKind::Dml).is_err());
501 }
502
503 #[test]
504 fn manual_clearing_resumes_auto_evaluation() {
505 // Manual was set; operator clears it; lag is still bad.
506 // Next evaluation must auto-pause.
507 let g = gate(true, ReplicationRole::Standalone);
508 g.configure_archive_lag_pause(60, 1_000_000);
509 // No-op while manual.
510 assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
511 // Operator unsets manual.
512 g.set_read_only(false);
513 // Now the lag condition must fire.
514 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
515 assert!(g.is_auto_paused());
516 }
517
518 #[test]
519 fn archive_lag_pause_state_independent_from_manual_flag() {
520 let g = gate(false, ReplicationRole::Standalone);
521 g.configure_archive_lag_pause(60, 1_000_000);
522 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
523 // Operator separately pins manual on top; still both true.
524 let prev = g.set_read_only(true);
525 assert!(!prev);
526 assert!(g.is_manual_read_only());
527 assert!(g.is_auto_paused());
528 // Operator clears manual; auto pause survives.
529 g.set_read_only(false);
530 assert!(g.is_auto_paused());
531 assert!(g.check(WriteKind::Dml).is_err());
532 }
533
534 #[test]
535 fn replica_role_overrides_missing_read_only_flag() {
536 let g = gate(
537 false,
538 ReplicationRole::Replica {
539 primary_addr: "http://primary:50051".into(),
540 },
541 );
542 let err = g.check(WriteKind::Dml).unwrap_err();
543 match err {
544 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
545 other => panic!("expected ReadOnly, got {other:?}"),
546 }
547 }
548}