1use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering};
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::replication::flow_control::{Admission, FlowController};
28use crate::replication::ReplicationRole;
29
30pub const FLOW_CONTROL_SOFT_TARGET_ENV: &str = "RED_REPLICATION_FLOW_CONTROL_SOFT_TARGET_LSN";
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum WriteKind {
38 Dml,
40 Ddl,
42 IndexBuild,
44 Maintenance,
46 Backup,
48 Serverless,
51}
52
53impl WriteKind {
54 fn label(self) -> &'static str {
55 match self {
56 WriteKind::Dml => "DML",
57 WriteKind::Ddl => "DDL",
58 WriteKind::IndexBuild => "index build",
59 WriteKind::Maintenance => "maintenance",
60 WriteKind::Backup => "backup trigger",
61 WriteKind::Serverless => "serverless lifecycle",
62 }
63 }
64}
65
66#[derive(Debug, Clone, Copy, PartialEq, Eq)]
74#[repr(u8)]
75pub enum LeaseGateState {
76 NotRequired = 0,
77 Held = 1,
78 NotHeld = 2,
79}
80
81impl LeaseGateState {
82 fn from_u8(raw: u8) -> Self {
83 match raw {
84 1 => Self::Held,
85 2 => Self::NotHeld,
86 _ => Self::NotRequired,
87 }
88 }
89
90 pub fn label(self) -> &'static str {
91 match self {
92 Self::NotRequired => "not_required",
93 Self::Held => "held",
94 Self::NotHeld => "not_held",
95 }
96 }
97}
98
99#[derive(Debug)]
108pub struct WriteGate {
109 read_only: AtomicBool,
114 role: ReplicationRole,
115 lease: AtomicU8,
116 auto_paused: AtomicBool,
121 last_archive_at_ms: AtomicU64,
127 pause_threshold_secs: AtomicU64,
131 flow: FlowController,
137}
138
139impl WriteGate {
140 pub fn from_options(options: &RedDBOptions) -> Self {
141 let soft_target = std::env::var(FLOW_CONTROL_SOFT_TARGET_ENV)
142 .ok()
143 .and_then(|raw| raw.trim().parse::<u64>().ok())
144 .unwrap_or(0);
145 Self {
146 read_only: AtomicBool::new(options.read_only),
147 role: options.replication.role.clone(),
148 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
149 auto_paused: AtomicBool::new(false),
150 last_archive_at_ms: AtomicU64::new(0),
151 pause_threshold_secs: AtomicU64::new(0),
152 flow: FlowController::new(soft_target, options.replication.quorum.clone()),
153 }
154 }
155
156 pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
169 self.check_consent(kind).map(|_| ())
170 }
171
172 pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
179 if matches!(self.role, ReplicationRole::Replica { .. }) {
180 return Err(RedDBError::ReadOnly(format!(
181 "instance is a replica — {} rejected on public surface",
182 kind.label()
183 )));
184 }
185 if matches!(self.lease_state(), LeaseGateState::NotHeld) {
186 return Err(RedDBError::ReadOnly(format!(
187 "writer lease not held — {} rejected (serverless fence)",
188 kind.label()
189 )));
190 }
191 if self.read_only.load(Ordering::Acquire) {
192 return Err(RedDBError::ReadOnly(format!(
193 "instance is configured read_only — {} rejected",
194 kind.label()
195 )));
196 }
197 if self.auto_paused.load(Ordering::Acquire) {
198 return Err(RedDBError::ReadOnly(format!(
199 "instance is paused — WAL archive lag exceeded threshold — {} rejected",
200 kind.label()
201 )));
202 }
203 if matches!(self.flow.try_admit(), Admission::Throttled) {
208 return Err(RedDBError::ReadOnly(format!(
209 "write admission throttled — in-quorum replica lag exceeded soft target ({} records) — {} rejected",
210 self.flow.soft_target_lsn(),
211 kind.label()
212 )));
213 }
214 Ok(crate::application::WriteConsent {
215 kind,
216 _seal: crate::application::WriteConsentSeal::new(),
217 })
218 }
219
220 pub fn is_read_only(&self) -> bool {
221 self.read_only.load(Ordering::Acquire)
222 || self.auto_paused.load(Ordering::Acquire)
223 || matches!(self.role, ReplicationRole::Replica { .. })
224 || matches!(self.lease_state(), LeaseGateState::NotHeld)
225 }
226
227 pub fn is_manual_read_only(&self) -> bool {
232 self.read_only.load(Ordering::Acquire)
233 }
234
235 pub fn is_auto_paused(&self) -> bool {
239 self.auto_paused.load(Ordering::Acquire)
240 }
241
242 pub fn role(&self) -> &ReplicationRole {
243 &self.role
244 }
245
246 pub fn flow_control(&self) -> &FlowController {
250 &self.flow
251 }
252
253 pub fn is_flow_throttled(&self) -> bool {
257 self.flow.is_throttled()
258 }
259
260 pub fn set_read_only(&self, enabled: bool) -> bool {
268 self.read_only.swap(enabled, Ordering::AcqRel)
269 }
270
271 pub fn lease_state(&self) -> LeaseGateState {
274 LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
275 }
276
277 pub fn configure_archive_lag_pause(&self, threshold_secs: u64, baseline_ms: u64) {
289 self.pause_threshold_secs
290 .store(threshold_secs, Ordering::Release);
291 self.last_archive_at_ms
292 .store(baseline_ms, Ordering::Release);
293 }
294
295 pub fn record_archive_success(&self, now_ms: u64) {
299 self.last_archive_at_ms.store(now_ms, Ordering::Release);
300 }
301
302 pub fn archive_pause_threshold_secs(&self) -> u64 {
305 self.pause_threshold_secs.load(Ordering::Acquire)
306 }
307
308 pub fn last_archive_at_ms(&self) -> u64 {
310 self.last_archive_at_ms.load(Ordering::Acquire)
311 }
312
313 pub fn evaluate_archive_lag(&self, now_ms: u64) -> bool {
327 let threshold = self.pause_threshold_secs.load(Ordering::Acquire);
328 if threshold == 0 {
329 return self.auto_paused.load(Ordering::Acquire);
330 }
331 if self.read_only.load(Ordering::Acquire) {
332 return self.auto_paused.load(Ordering::Acquire);
333 }
334 let last_ms = self.last_archive_at_ms.load(Ordering::Acquire);
335 let lag_secs = now_ms.saturating_sub(last_ms) / 1000;
336 let should_pause = lag_secs > threshold;
337 self.auto_paused.store(should_pause, Ordering::Release);
338 should_pause
339 }
340
341 pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
349 LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
358 WriteGate {
359 read_only: AtomicBool::new(read_only),
360 role,
361 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
362 auto_paused: AtomicBool::new(false),
363 last_archive_at_ms: AtomicU64::new(0),
364 pause_threshold_secs: AtomicU64::new(0),
365 flow: FlowController::disabled(),
366 }
367 }
368
369 #[test]
370 fn standalone_allows_writes() {
371 let g = gate(false, ReplicationRole::Standalone);
372 assert!(g.check(WriteKind::Dml).is_ok());
373 assert!(g.check(WriteKind::Ddl).is_ok());
374 assert!(!g.is_read_only());
375 }
376
377 #[test]
378 fn primary_allows_writes() {
379 let g = gate(false, ReplicationRole::Primary);
380 assert!(g.check(WriteKind::Dml).is_ok());
381 assert!(!g.is_read_only());
382 }
383
384 #[test]
385 fn replica_rejects_every_kind() {
386 let g = gate(
387 true,
388 ReplicationRole::Replica {
389 primary_addr: "http://primary:50051".into(),
390 },
391 );
392 for kind in [
393 WriteKind::Dml,
394 WriteKind::Ddl,
395 WriteKind::IndexBuild,
396 WriteKind::Maintenance,
397 WriteKind::Backup,
398 WriteKind::Serverless,
399 ] {
400 let err = g.check(kind).unwrap_err();
401 match err {
402 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
403 other => panic!("expected ReadOnly, got {other:?}"),
404 }
405 }
406 assert!(g.is_read_only());
407 }
408
409 #[test]
410 fn read_only_flag_rejects_writes_on_standalone() {
411 let g = gate(true, ReplicationRole::Standalone);
412 let err = g.check(WriteKind::Dml).unwrap_err();
413 match err {
414 RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
415 other => panic!("expected ReadOnly, got {other:?}"),
416 }
417 }
418
419 #[test]
420 fn lease_not_held_rejects_writes_on_primary() {
421 let g = gate(false, ReplicationRole::Primary);
422 g.set_lease_state(LeaseGateState::NotHeld);
423 let err = g.check(WriteKind::Dml).unwrap_err();
424 match err {
425 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
426 other => panic!("expected ReadOnly, got {other:?}"),
427 }
428 assert!(g.is_read_only());
429 }
430
431 #[test]
432 fn lease_held_allows_writes_on_primary() {
433 let g = gate(false, ReplicationRole::Primary);
434 g.set_lease_state(LeaseGateState::Held);
435 assert!(g.check(WriteKind::Dml).is_ok());
436 assert!(!g.is_read_only());
437 }
438
439 #[test]
440 fn lease_state_transitions_return_previous() {
441 let g = gate(false, ReplicationRole::Primary);
442 assert_eq!(
443 g.set_lease_state(LeaseGateState::Held),
444 LeaseGateState::NotRequired
445 );
446 assert_eq!(
447 g.set_lease_state(LeaseGateState::NotHeld),
448 LeaseGateState::Held
449 );
450 assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
451 }
452
453 #[test]
454 fn lease_loss_overrides_writable_read_only_flag() {
455 let g = gate(false, ReplicationRole::Primary);
457 g.set_lease_state(LeaseGateState::NotHeld);
458 let err = g.check(WriteKind::Ddl).unwrap_err();
459 match err {
460 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
461 other => panic!("expected ReadOnly, got {other:?}"),
462 }
463 }
464
465 #[test]
471 fn archive_lag_disabled_threshold_is_noop() {
472 let g = gate(false, ReplicationRole::Standalone);
473 g.configure_archive_lag_pause(0, 1_000);
474 assert!(!g.evaluate_archive_lag(10_000_000_000));
476 assert!(!g.is_auto_paused());
477 assert!(g.check(WriteKind::Dml).is_ok());
478 }
479
480 #[test]
481 fn archive_lag_triggers_auto_pause_past_threshold() {
482 let g = gate(false, ReplicationRole::Standalone);
483 g.configure_archive_lag_pause(60, 1_000_000);
485 assert!(!g.evaluate_archive_lag(1_000_000 + 30_000));
487 assert!(g.check(WriteKind::Dml).is_ok());
488
489 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
491 assert!(g.is_auto_paused());
492 let err = g.check(WriteKind::Dml).unwrap_err();
493 match err {
494 RedDBError::ReadOnly(msg) => assert!(msg.contains("WAL archive lag"), "{msg}"),
495 other => panic!("expected ReadOnly, got {other:?}"),
496 }
497 assert!(g.is_read_only());
498 }
499
500 #[test]
501 fn archive_lag_auto_resume_after_recovery() {
502 let g = gate(false, ReplicationRole::Standalone);
503 g.configure_archive_lag_pause(60, 1_000_000);
504 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
506 assert!(g.is_auto_paused());
507 g.record_archive_success(1_000_000 + 130_000);
509 assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
510 assert!(!g.is_auto_paused());
511 assert!(g.check(WriteKind::Dml).is_ok());
512 }
513
514 #[test]
515 fn manual_read_only_blocks_auto_pause_writes_and_is_sticky() {
516 let g = gate(true, ReplicationRole::Standalone);
520 g.configure_archive_lag_pause(60, 1_000_000);
521
522 assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
524 assert!(!g.is_auto_paused());
525 assert!(g.is_manual_read_only());
526 let err = g.check(WriteKind::Dml).unwrap_err();
528 match err {
529 RedDBError::ReadOnly(msg) => {
530 assert!(msg.contains("read_only"), "{msg}");
531 assert!(!msg.contains("WAL archive lag"), "{msg}");
532 }
533 other => panic!("expected ReadOnly, got {other:?}"),
534 }
535
536 g.record_archive_success(1_000_000 + 130_000);
539 assert!(!g.evaluate_archive_lag(1_000_000 + 130_000));
540 assert!(g.is_manual_read_only(), "manual must stay set");
541 assert!(!g.is_auto_paused());
542 assert!(g.check(WriteKind::Dml).is_err());
543 }
544
545 #[test]
546 fn manual_clearing_resumes_auto_evaluation() {
547 let g = gate(true, ReplicationRole::Standalone);
550 g.configure_archive_lag_pause(60, 1_000_000);
551 assert!(!g.evaluate_archive_lag(1_000_000 + 120_000));
553 g.set_read_only(false);
555 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
557 assert!(g.is_auto_paused());
558 }
559
560 #[test]
561 fn archive_lag_pause_state_independent_from_manual_flag() {
562 let g = gate(false, ReplicationRole::Standalone);
563 g.configure_archive_lag_pause(60, 1_000_000);
564 assert!(g.evaluate_archive_lag(1_000_000 + 120_000));
565 let prev = g.set_read_only(true);
567 assert!(!prev);
568 assert!(g.is_manual_read_only());
569 assert!(g.is_auto_paused());
570 g.set_read_only(false);
572 assert!(g.is_auto_paused());
573 assert!(g.check(WriteKind::Dml).is_err());
574 }
575
576 fn gate_with_flow(soft_target: u64, quorum: crate::replication::QuorumConfig) -> WriteGate {
581 WriteGate {
582 read_only: AtomicBool::new(false),
583 role: ReplicationRole::Primary,
584 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
585 auto_paused: AtomicBool::new(false),
586 last_archive_at_ms: AtomicU64::new(0),
587 pause_threshold_secs: AtomicU64::new(0),
588 flow: FlowController::new(soft_target, quorum),
589 }
590 }
591
592 fn flow_replica(
593 id: &str,
594 region: Option<&str>,
595 last_acked_lsn: u64,
596 ) -> crate::replication::primary::ReplicaState {
597 crate::replication::primary::ReplicaState {
598 id: id.to_string(),
599 last_acked_lsn,
600 last_sent_lsn: last_acked_lsn,
601 last_durable_lsn: last_acked_lsn,
602 apply_error_count: 0,
603 divergence_count: 0,
604 connected_at_unix_ms: 0,
605 last_seen_at_unix_ms: 0,
606 region: region.map(String::from),
607 rebootstrapping: false,
608 }
609 }
610
611 #[test]
612 fn flow_throttle_rejects_writes_when_in_quorum_replica_lags_and_releases() {
613 let g = gate_with_flow(100, crate::replication::QuorumConfig::sync(1));
614 assert!(g.check(WriteKind::Dml).is_ok());
616 assert!(!g.is_flow_throttled());
617
618 g.flow_control()
620 .observe(&[flow_replica("r1", Some("us"), 350)], 500);
621 assert!(g.is_flow_throttled());
622 let err = g.check(WriteKind::Dml).unwrap_err();
623 match err {
624 RedDBError::ReadOnly(msg) => assert!(msg.contains("throttled"), "{msg}"),
625 other => panic!("expected ReadOnly throttle, got {other:?}"),
626 }
627 assert!(!g.is_read_only());
629
630 g.flow_control()
632 .observe(&[flow_replica("r1", Some("us"), 450)], 500);
633 assert!(!g.is_flow_throttled());
634 assert!(g.check(WriteKind::Dml).is_ok());
635 }
636
637 #[test]
638 fn flow_throttle_excludes_async_read_replica() {
639 let g = gate_with_flow(100, crate::replication::QuorumConfig::regions(["us"]));
642 g.flow_control().observe(
643 &[
644 flow_replica("in-quorum-us", Some("us"), 500),
645 flow_replica("async-ap", Some("ap"), 0),
646 ],
647 500,
648 );
649 assert!(!g.is_flow_throttled());
650 assert!(g.check(WriteKind::Dml).is_ok());
651 }
652
653 #[test]
654 fn flow_throttle_disabled_by_default() {
655 let g = gate(false, ReplicationRole::Primary);
656 g.flow_control()
658 .observe(&[flow_replica("r1", Some("us"), 0)], 1_000_000);
659 assert!(!g.is_flow_throttled());
660 assert!(g.check(WriteKind::Dml).is_ok());
661 }
662
663 #[test]
664 fn replica_role_overrides_missing_read_only_flag() {
665 let g = gate(
666 false,
667 ReplicationRole::Replica {
668 primary_addr: "http://primary:50051".into(),
669 },
670 );
671 let err = g.check(WriteKind::Dml).unwrap_err();
672 match err {
673 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
674 other => panic!("expected ReadOnly, got {other:?}"),
675 }
676 }
677}