1use std::sync::atomic::{AtomicBool, AtomicU8, Ordering};
25
26use crate::api::{RedDBError, RedDBOptions, RedDBResult};
27use crate::replication::ReplicationRole;
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum WriteKind {
33 Dml,
35 Ddl,
37 IndexBuild,
39 Maintenance,
41 Backup,
43 Serverless,
46}
47
48impl WriteKind {
49 fn label(self) -> &'static str {
50 match self {
51 WriteKind::Dml => "DML",
52 WriteKind::Ddl => "DDL",
53 WriteKind::IndexBuild => "index build",
54 WriteKind::Maintenance => "maintenance",
55 WriteKind::Backup => "backup trigger",
56 WriteKind::Serverless => "serverless lifecycle",
57 }
58 }
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq)]
69#[repr(u8)]
70pub enum LeaseGateState {
71 NotRequired = 0,
72 Held = 1,
73 NotHeld = 2,
74}
75
76impl LeaseGateState {
77 fn from_u8(raw: u8) -> Self {
78 match raw {
79 1 => Self::Held,
80 2 => Self::NotHeld,
81 _ => Self::NotRequired,
82 }
83 }
84
85 pub fn label(self) -> &'static str {
86 match self {
87 Self::NotRequired => "not_required",
88 Self::Held => "held",
89 Self::NotHeld => "not_held",
90 }
91 }
92}
93
94#[derive(Debug)]
103pub struct WriteGate {
104 read_only: AtomicBool,
105 role: ReplicationRole,
106 lease: AtomicU8,
107}
108
109impl WriteGate {
110 pub fn from_options(options: &RedDBOptions) -> Self {
111 Self {
112 read_only: AtomicBool::new(options.read_only),
113 role: options.replication.role.clone(),
114 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
115 }
116 }
117
118 pub fn check(&self, kind: WriteKind) -> RedDBResult<()> {
131 self.check_consent(kind).map(|_| ())
132 }
133
134 pub fn check_consent(&self, kind: WriteKind) -> RedDBResult<crate::application::WriteConsent> {
141 if matches!(self.role, ReplicationRole::Replica { .. }) {
142 return Err(RedDBError::ReadOnly(format!(
143 "instance is a replica — {} rejected on public surface",
144 kind.label()
145 )));
146 }
147 if matches!(self.lease_state(), LeaseGateState::NotHeld) {
148 return Err(RedDBError::ReadOnly(format!(
149 "writer lease not held — {} rejected (serverless fence)",
150 kind.label()
151 )));
152 }
153 if self.read_only.load(Ordering::Acquire) {
154 return Err(RedDBError::ReadOnly(format!(
155 "instance is configured read_only — {} rejected",
156 kind.label()
157 )));
158 }
159 Ok(crate::application::WriteConsent {
160 kind,
161 _seal: crate::application::WriteConsentSeal::new(),
162 })
163 }
164
165 pub fn is_read_only(&self) -> bool {
166 self.read_only.load(Ordering::Acquire)
167 || matches!(self.role, ReplicationRole::Replica { .. })
168 || matches!(self.lease_state(), LeaseGateState::NotHeld)
169 }
170
171 pub fn role(&self) -> &ReplicationRole {
172 &self.role
173 }
174
175 pub fn set_read_only(&self, enabled: bool) -> bool {
183 self.read_only.swap(enabled, Ordering::AcqRel)
184 }
185
186 pub fn lease_state(&self) -> LeaseGateState {
189 LeaseGateState::from_u8(self.lease.load(Ordering::Acquire))
190 }
191
192 pub(crate) fn set_lease_state(&self, state: LeaseGateState) -> LeaseGateState {
200 LeaseGateState::from_u8(self.lease.swap(state as u8, Ordering::AcqRel))
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use super::*;
207
208 fn gate(read_only: bool, role: ReplicationRole) -> WriteGate {
209 WriteGate {
210 read_only: AtomicBool::new(read_only),
211 role,
212 lease: AtomicU8::new(LeaseGateState::NotRequired as u8),
213 }
214 }
215
216 #[test]
217 fn standalone_allows_writes() {
218 let g = gate(false, ReplicationRole::Standalone);
219 assert!(g.check(WriteKind::Dml).is_ok());
220 assert!(g.check(WriteKind::Ddl).is_ok());
221 assert!(!g.is_read_only());
222 }
223
224 #[test]
225 fn primary_allows_writes() {
226 let g = gate(false, ReplicationRole::Primary);
227 assert!(g.check(WriteKind::Dml).is_ok());
228 assert!(!g.is_read_only());
229 }
230
231 #[test]
232 fn replica_rejects_every_kind() {
233 let g = gate(
234 true,
235 ReplicationRole::Replica {
236 primary_addr: "http://primary:50051".into(),
237 },
238 );
239 for kind in [
240 WriteKind::Dml,
241 WriteKind::Ddl,
242 WriteKind::IndexBuild,
243 WriteKind::Maintenance,
244 WriteKind::Backup,
245 WriteKind::Serverless,
246 ] {
247 let err = g.check(kind).unwrap_err();
248 match err {
249 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
250 other => panic!("expected ReadOnly, got {other:?}"),
251 }
252 }
253 assert!(g.is_read_only());
254 }
255
256 #[test]
257 fn read_only_flag_rejects_writes_on_standalone() {
258 let g = gate(true, ReplicationRole::Standalone);
259 let err = g.check(WriteKind::Dml).unwrap_err();
260 match err {
261 RedDBError::ReadOnly(msg) => assert!(msg.contains("read_only")),
262 other => panic!("expected ReadOnly, got {other:?}"),
263 }
264 }
265
266 #[test]
267 fn lease_not_held_rejects_writes_on_primary() {
268 let g = gate(false, ReplicationRole::Primary);
269 g.set_lease_state(LeaseGateState::NotHeld);
270 let err = g.check(WriteKind::Dml).unwrap_err();
271 match err {
272 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
273 other => panic!("expected ReadOnly, got {other:?}"),
274 }
275 assert!(g.is_read_only());
276 }
277
278 #[test]
279 fn lease_held_allows_writes_on_primary() {
280 let g = gate(false, ReplicationRole::Primary);
281 g.set_lease_state(LeaseGateState::Held);
282 assert!(g.check(WriteKind::Dml).is_ok());
283 assert!(!g.is_read_only());
284 }
285
286 #[test]
287 fn lease_state_transitions_return_previous() {
288 let g = gate(false, ReplicationRole::Primary);
289 assert_eq!(
290 g.set_lease_state(LeaseGateState::Held),
291 LeaseGateState::NotRequired
292 );
293 assert_eq!(
294 g.set_lease_state(LeaseGateState::NotHeld),
295 LeaseGateState::Held
296 );
297 assert_eq!(g.lease_state(), LeaseGateState::NotHeld);
298 }
299
300 #[test]
301 fn lease_loss_overrides_writable_read_only_flag() {
302 let g = gate(false, ReplicationRole::Primary);
304 g.set_lease_state(LeaseGateState::NotHeld);
305 let err = g.check(WriteKind::Ddl).unwrap_err();
306 match err {
307 RedDBError::ReadOnly(msg) => assert!(msg.contains("lease")),
308 other => panic!("expected ReadOnly, got {other:?}"),
309 }
310 }
311
312 #[test]
313 fn replica_role_overrides_missing_read_only_flag() {
314 let g = gate(
315 false,
316 ReplicationRole::Replica {
317 primary_addr: "http://primary:50051".into(),
318 },
319 );
320 let err = g.check(WriteKind::Dml).unwrap_err();
321 match err {
322 RedDBError::ReadOnly(msg) => assert!(msg.contains("replica")),
323 other => panic!("expected ReadOnly, got {other:?}"),
324 }
325 }
326}