noxu_rep/consistency.rs
1//! Consistency policies for replica reads.
2//!
3//! Port of the JE `ReplicaConsistencyPolicy` hierarchy:
4//! `NoConsistencyRequiredPolicy`, `TimeConsistencyPolicy`, and
5//! `CommitPointConsistencyPolicy`
6//! (`com.sleepycat.je.rep.{NoConsistencyRequiredPolicy,TimeConsistencyPolicy,
7//! CommitPointConsistencyPolicy}`).
8//!
9//! ## What this does (REP-10)
10//!
11//! A read transaction that begins on a *replica* must not proceed until the
12//! replica's applied state satisfies the configured policy. JE implements
13//! this in `ReplicaConsistencyPolicy.ensureConsistency` →
14//! `Replica.ConsistencyTracker.awaitVLSN` / `lagAwait`, which BLOCKS the
15//! `beginTransaction` call until the replica has replayed far enough, or the
16//! policy timeout expires (→ `ReplicaConsistencyException`).
17//!
18//! [`ConsistencyTracker`] is the Rust equivalent: it reuses the REP-7
19//! `last_applied_vlsn` handle (`ReplicaReplay::last_applied_vlsn_handle`) as
20//! the wait predicate — NOT a parallel tracker — and blocks the caller until
21//! the predicate holds or the timeout elapses (a clean [`RepError`], never a
22//! hang).
23
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use noxu_sync::{Condvar, Mutex};
29
30use crate::error::{RepError, Result};
31
32/// A consistency policy that determines what state a replica must be in
33/// before a read operation can proceed.
34///
35/// Consistency policy hierarchy for replication.
36#[derive(Debug, Clone, PartialEq, Default)]
37pub enum ConsistencyPolicy {
38 /// No consistency requirement -- read from any state.
39 ///
40 ///
41 #[default]
42 NoConsistency,
43
44 /// Time-based consistency: the replica must be within `max_lag` of
45 /// the master's commit point.
46 ///
47 ///
48 TimeConsistency {
49 /// Maximum permissible lag behind the master.
50 max_lag: Duration,
51 /// How long to wait for the replica to catch up.
52 timeout: Duration,
53 },
54
55 /// Commit-point consistency: the replica must have applied up to
56 /// a specific VLSN before the read can proceed.
57 ///
58 ///
59 CommitPointConsistency {
60 /// The VLSN sequence that must be applied on the replica.
61 vlsn: i64,
62 /// How long to wait for the replica to reach the VLSN.
63 timeout: Duration,
64 },
65}
66
67impl ConsistencyPolicy {
68 /// Build a [`ConsistencyPolicy::CommitPointConsistency`] from a
69 /// [`CommitToken`] minted by the master.
70 ///
71 /// Port of `new CommitPointConsistencyPolicy(commitToken, timeout, unit)`:
72 /// a client that did a write on the master passes the returned token to a
73 /// replica read so the read waits until the replica has replayed past it.
74 pub fn commit_point(token: &crate::CommitToken, timeout: Duration) -> Self {
75 ConsistencyPolicy::CommitPointConsistency {
76 vlsn: token.vlsn() as i64,
77 timeout,
78 }
79 }
80
81 /// Checks whether the given replica state satisfies this consistency
82 /// policy.
83 ///
84 /// - `current_vlsn`: The replica's current VLSN sequence.
85 /// - `master_vlsn`: The master's current VLSN sequence.
86 ///
87 /// Returns `Ok(true)` if the consistency requirement is met, or an
88 /// error describing why it is not.
89 pub fn check_consistency(
90 &self,
91 current_vlsn: i64,
92 master_vlsn: i64,
93 ) -> Result<bool> {
94 match self {
95 ConsistencyPolicy::NoConsistency => Ok(true),
96
97 ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
98 // Approximate: each VLSN is roughly 1ms of lag.
99 // In a real implementation this would use timestamps from
100 // heartbeat messages. Here we use VLSN difference as a proxy.
101 let lag_vlsns = master_vlsn.saturating_sub(current_vlsn);
102 if lag_vlsns < 0 {
103 // Replica is ahead -- shouldn't happen, but treat as ok.
104 return Ok(true);
105 }
106 let lag_ms = lag_vlsns as u64;
107 let limit_ms = max_lag.as_millis() as u64;
108 if lag_ms <= limit_ms {
109 Ok(true)
110 } else {
111 Err(RepError::ReplicaLagExceeded { lag_ms, limit_ms })
112 }
113 }
114
115 ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
116 if current_vlsn >= *vlsn {
117 Ok(true)
118 } else {
119 Err(RepError::ConsistencyTimeout(
120 // Report the timeout configured for this policy.
121 self.timeout().unwrap_or(Duration::ZERO),
122 ))
123 }
124 }
125 }
126 }
127
128 /// Returns the timeout associated with this policy, if any.
129 pub fn timeout(&self) -> Option<Duration> {
130 match self {
131 ConsistencyPolicy::NoConsistency => None,
132 ConsistencyPolicy::TimeConsistency { timeout, .. } => {
133 Some(*timeout)
134 }
135 ConsistencyPolicy::CommitPointConsistency { timeout, .. } => {
136 Some(*timeout)
137 }
138 }
139 }
140}
141
142impl std::fmt::Display for ConsistencyPolicy {
143 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144 match self {
145 ConsistencyPolicy::NoConsistency => write!(f, "NoConsistency"),
146 ConsistencyPolicy::TimeConsistency { max_lag, timeout } => {
147 write!(
148 f,
149 "TimeConsistency(max_lag={:?}, timeout={:?})",
150 max_lag, timeout
151 )
152 }
153 ConsistencyPolicy::CommitPointConsistency { vlsn, timeout } => {
154 write!(
155 f,
156 "CommitPointConsistency(vlsn={}, timeout={:?})",
157 vlsn, timeout
158 )
159 }
160 }
161 }
162}
163
164// ---------------------------------------------------------------------------
165// ConsistencyTracker (REP-10 piece A): the blocking consistency-wait.
166// ---------------------------------------------------------------------------
167
168/// Tracks the replica's applied state and blocks a read until the configured
169/// [`ConsistencyPolicy`] is satisfied.
170///
171/// Port of `com.sleepycat.je.rep.impl.node.Replica.ConsistencyTracker`
172/// (`awaitVLSN` / `lagAwait` / `await`). In JE the tracker holds ordered
173/// `CountDownLatch`es that the replay thread *trips* as VLSNs are applied; a
174/// reader parks on the latch with the policy timeout and gets a
175/// `ReplicaConsistencyException` if it expires.
176///
177/// Here the predicate is the REP-7 `last_applied_vlsn` handle
178/// (`ReplicaReplay::last_applied_vlsn_handle`) — the SAME `Arc<AtomicU64>` the
179/// replay driver advances after each committed apply. We do NOT add a
180/// parallel tracker; we read the existing hook. `master_vlsn` is the
181/// master's latest known commit VLSN (the feeder stream / heartbeat
182/// high-water), used by the time policy — JE's `masterTxnEndVLSN`.
183#[derive(Clone)]
184pub struct ConsistencyTracker {
185 /// REP-7 hook: highest VLSN whose effects are visible in the replica's
186 /// live tree. Advanced by `ReplicaReplay`; read here as the predicate.
187 last_applied_vlsn: Arc<AtomicU64>,
188
189 /// Master's latest known commit VLSN (feeder stream / heartbeat
190 /// high-water). Port of `ConsistencyTracker.masterTxnEndVLSN`; used by
191 /// [`ConsistencyPolicy::TimeConsistency`] to estimate the lag.
192 master_vlsn: Arc<AtomicU64>,
193
194 /// Parking lot signalled when `last_applied_vlsn` advances, so a waiting
195 /// reader wakes promptly. Port of the latch trip in
196 /// `ConsistencyTracker.trackVLSN`.
197 signal: Arc<(Mutex<()>, Condvar)>,
198}
199
200impl ConsistencyTracker {
201 /// How often a waiter re-checks the predicate even without an explicit
202 /// wake. The replay thread advances `last_applied_vlsn` via a plain
203 /// atomic store; this tick bounds the wakeup latency if a `notify`
204 /// is ever missed, so the wait can never hang past the policy timeout.
205 //
206 // ponytail: 5ms re-check tick instead of wiring the latch-trip callback
207 // into the replay thread (JE trips the latch from `trackVLSN`). The tick
208 // bounds wakeup latency and guarantees no hang; wire an explicit notify
209 // into `ReplicaReplay::advance_vlsn` if sub-ms read latency ever matters.
210 const RECHECK_TICK: Duration = Duration::from_millis(5);
211
212 /// Build a tracker over the REP-7 `last_applied_vlsn` handle.
213 pub fn new(last_applied_vlsn: Arc<AtomicU64>) -> Self {
214 Self {
215 last_applied_vlsn,
216 master_vlsn: Arc::new(AtomicU64::new(0)),
217 signal: Arc::new((Mutex::new(()), Condvar::new())),
218 }
219 }
220
221 /// The replica's last-applied VLSN (the wait predicate).
222 pub fn last_applied_vlsn(&self) -> u64 {
223 self.last_applied_vlsn.load(Ordering::Acquire)
224 }
225
226 /// Record the master's latest known commit VLSN (feeder / heartbeat).
227 ///
228 /// Port of `ConsistencyTracker.trackHeartbeat` updating `masterTxnEndVLSN`.
229 /// Monotone.
230 pub fn set_master_vlsn(&self, vlsn: u64) {
231 self.master_vlsn.fetch_max(vlsn, Ordering::AcqRel);
232 }
233
234 /// The master's latest known commit VLSN.
235 pub fn master_vlsn(&self) -> u64 {
236 self.master_vlsn.load(Ordering::Acquire)
237 }
238
239 /// Wake any reader parked in [`Self::await_consistency`].
240 ///
241 /// Called when the replica applies a new entry (the replay thread can
242 /// invoke this after advancing `last_applied_vlsn`). Equivalent to the
243 /// latch trip in `ConsistencyTracker.trackVLSN`. Optional: a waiter also
244 /// re-checks every [`Self::RECHECK_TICK`], so a missed notify only delays
245 /// (never hangs) the read.
246 pub fn notify_applied(&self) {
247 let (_lock, cv) = &*self.signal;
248 cv.notify_all();
249 }
250
251 /// Block until the replica's applied state satisfies `policy`, or the
252 /// policy timeout expires.
253 ///
254 /// Port of `ReplicaConsistencyPolicy.ensureConsistency` →
255 /// `ConsistencyTracker.awaitVLSN` / `lagAwait`:
256 ///
257 /// - [`ConsistencyPolicy::NoConsistency`]: returns immediately (JE
258 /// `NoConsistencyRequiredPolicy.ensureConsistency` is a no-op).
259 /// - [`ConsistencyPolicy::CommitPointConsistency`]: waits until
260 /// `last_applied_vlsn >= token.vlsn` (JE `awaitVLSN` comparing against
261 /// `lastReplayedTxnVLSN`).
262 /// - [`ConsistencyPolicy::TimeConsistency`]: waits until the estimated
263 /// lag behind the master is within `max_lag` (JE `lagAwait`).
264 ///
265 /// On timeout returns a clean [`RepError`] —
266 /// [`RepError::ConsistencyTimeout`] for the commit-point policy and
267 /// [`RepError::ReplicaLagExceeded`] for the time policy — the equivalent
268 /// of JE's `ReplicaConsistencyException`. NEVER hangs.
269 pub fn await_consistency(&self, policy: &ConsistencyPolicy) -> Result<()> {
270 let target_vlsn = match policy {
271 // NoConsistencyRequiredPolicy.ensureConsistency: no-op.
272 ConsistencyPolicy::NoConsistency => return Ok(()),
273
274 // awaitVLSN(commitToken.getVLSN()).
275 ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
276 *vlsn as u64
277 }
278
279 // lagAwait: convert the permissible lag into the VLSN the replica
280 // must reach — master_vlsn back off by `max_lag` (1 VLSN ≈ 1ms, the
281 // same proxy `check_consistency` uses; a real impl would use the
282 // vlsn→time map).
283 ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
284 let master = self.master_vlsn();
285 let slack = max_lag.as_millis() as u64;
286 master.saturating_sub(slack)
287 }
288 };
289
290 // Fast path: already satisfied (JE awaitVLSN returns before parking
291 // when `vlsn <= compareVLSN`).
292 if self.last_applied_vlsn() >= target_vlsn {
293 return Ok(());
294 }
295
296 let timeout = policy.timeout().unwrap_or(Duration::ZERO);
297 let deadline = Instant::now() + timeout;
298 let (lock, cv) = &*self.signal;
299 let mut guard = lock.lock();
300 loop {
301 if self.last_applied_vlsn() >= target_vlsn {
302 return Ok(());
303 }
304 let now = Instant::now();
305 if now >= deadline {
306 // Timed out — clean error, never a hang. JE throws
307 // ReplicaConsistencyException here.
308 return Err(self.timeout_error(policy, target_vlsn));
309 }
310 // Park until the next notify or the recheck tick, whichever is
311 // sooner; bounded by the deadline so the timeout is honoured.
312 let remaining = deadline - now;
313 let wait = remaining.min(Self::RECHECK_TICK);
314 let _ = cv.wait_for(&mut guard, wait);
315 }
316 }
317
318 /// Build the timeout error for `policy`, matching the variant the
319 /// non-blocking [`ConsistencyPolicy::check_consistency`] reports.
320 fn timeout_error(
321 &self,
322 policy: &ConsistencyPolicy,
323 target_vlsn: u64,
324 ) -> RepError {
325 match policy {
326 ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
327 let lag_ms =
328 self.master_vlsn().saturating_sub(self.last_applied_vlsn());
329 RepError::ReplicaLagExceeded {
330 lag_ms,
331 limit_ms: max_lag.as_millis() as u64,
332 }
333 }
334 _ => {
335 let _ = target_vlsn;
336 RepError::ConsistencyTimeout(
337 policy.timeout().unwrap_or(Duration::ZERO),
338 )
339 }
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use super::*;
347
348 #[test]
349 fn test_no_consistency_always_passes() {
350 let policy = ConsistencyPolicy::NoConsistency;
351 assert!(policy.check_consistency(0, 1000).unwrap());
352 assert!(policy.check_consistency(1000, 1000).unwrap());
353 assert!(policy.check_consistency(1000, 0).unwrap());
354 }
355
356 #[test]
357 fn test_no_consistency_timeout_is_none() {
358 let policy = ConsistencyPolicy::NoConsistency;
359 assert!(policy.timeout().is_none());
360 }
361
362 #[test]
363 fn test_time_consistency_within_lag() {
364 let policy = ConsistencyPolicy::TimeConsistency {
365 max_lag: Duration::from_millis(100),
366 timeout: Duration::from_secs(5),
367 };
368 // Replica is 50 VLSNs behind, limit is 100ms.
369 assert!(policy.check_consistency(950, 1000).unwrap());
370 }
371
372 #[test]
373 fn test_time_consistency_at_limit() {
374 let policy = ConsistencyPolicy::TimeConsistency {
375 max_lag: Duration::from_millis(100),
376 timeout: Duration::from_secs(5),
377 };
378 // Exactly at limit.
379 assert!(policy.check_consistency(900, 1000).unwrap());
380 }
381
382 #[test]
383 fn test_time_consistency_exceeds_lag() {
384 let policy = ConsistencyPolicy::TimeConsistency {
385 max_lag: Duration::from_millis(100),
386 timeout: Duration::from_secs(5),
387 };
388 let result = policy.check_consistency(800, 1000);
389 assert!(result.is_err());
390 match result.unwrap_err() {
391 RepError::ReplicaLagExceeded { lag_ms, limit_ms } => {
392 assert_eq!(lag_ms, 200);
393 assert_eq!(limit_ms, 100);
394 }
395 other => panic!("unexpected error: {:?}", other),
396 }
397 }
398
399 #[test]
400 fn test_time_consistency_replica_ahead() {
401 let policy = ConsistencyPolicy::TimeConsistency {
402 max_lag: Duration::from_millis(100),
403 timeout: Duration::from_secs(5),
404 };
405 // Replica ahead of master -- should pass.
406 assert!(policy.check_consistency(1000, 500).unwrap());
407 }
408
409 #[test]
410 fn test_time_consistency_timeout() {
411 let policy = ConsistencyPolicy::TimeConsistency {
412 max_lag: Duration::from_millis(100),
413 timeout: Duration::from_secs(5),
414 };
415 assert_eq!(policy.timeout(), Some(Duration::from_secs(5)));
416 }
417
418 #[test]
419 fn test_commit_point_satisfied() {
420 let policy = ConsistencyPolicy::CommitPointConsistency {
421 vlsn: 500,
422 timeout: Duration::from_secs(10),
423 };
424 assert!(policy.check_consistency(500, 1000).unwrap());
425 assert!(policy.check_consistency(600, 1000).unwrap());
426 }
427
428 #[test]
429 fn test_commit_point_not_satisfied() {
430 let policy = ConsistencyPolicy::CommitPointConsistency {
431 vlsn: 500,
432 timeout: Duration::from_secs(10),
433 };
434 let result = policy.check_consistency(400, 1000);
435 assert!(result.is_err());
436 match result.unwrap_err() {
437 RepError::ConsistencyTimeout(d) => {
438 assert_eq!(d, Duration::from_secs(10));
439 }
440 other => panic!("unexpected error: {:?}", other),
441 }
442 }
443
444 #[test]
445 fn test_commit_point_timeout() {
446 let policy = ConsistencyPolicy::CommitPointConsistency {
447 vlsn: 100,
448 timeout: Duration::from_secs(10),
449 };
450 assert_eq!(policy.timeout(), Some(Duration::from_secs(10)));
451 }
452
453 #[test]
454 fn test_default_is_no_consistency() {
455 assert_eq!(
456 ConsistencyPolicy::default(),
457 ConsistencyPolicy::NoConsistency
458 );
459 }
460
461 #[test]
462 fn test_display_no_consistency() {
463 assert_eq!(
464 ConsistencyPolicy::NoConsistency.to_string(),
465 "NoConsistency"
466 );
467 }
468
469 #[test]
470 fn test_display_time_consistency() {
471 let policy = ConsistencyPolicy::TimeConsistency {
472 max_lag: Duration::from_millis(500),
473 timeout: Duration::from_secs(10),
474 };
475 let s = policy.to_string();
476 assert!(s.contains("TimeConsistency"));
477 assert!(s.contains("500ms"));
478 }
479
480 #[test]
481 fn test_display_commit_point() {
482 let policy = ConsistencyPolicy::CommitPointConsistency {
483 vlsn: 42,
484 timeout: Duration::from_secs(5),
485 };
486 let s = policy.to_string();
487 assert!(s.contains("CommitPointConsistency"));
488 assert!(s.contains("42"));
489 }
490
491 #[test]
492 fn test_clone_and_eq() {
493 let policy = ConsistencyPolicy::TimeConsistency {
494 max_lag: Duration::from_millis(100),
495 timeout: Duration::from_secs(5),
496 };
497 let cloned = policy.clone();
498 assert_eq!(policy, cloned);
499 }
500
501 // -- ConsistencyTracker (blocking wait) ------------------------------
502
503 #[test]
504 fn test_tracker_no_consistency_never_blocks() {
505 let applied = Arc::new(AtomicU64::new(0));
506 let tracker = ConsistencyTracker::new(applied);
507 // master far ahead; NoConsistency returns immediately.
508 tracker.set_master_vlsn(10_000);
509 tracker.await_consistency(&ConsistencyPolicy::NoConsistency).unwrap();
510 }
511
512 #[test]
513 fn test_tracker_commit_point_already_satisfied() {
514 let applied = Arc::new(AtomicU64::new(500));
515 let tracker = ConsistencyTracker::new(applied);
516 let policy = ConsistencyPolicy::CommitPointConsistency {
517 vlsn: 500,
518 timeout: Duration::from_secs(5),
519 };
520 // Fast path: no wait.
521 tracker.await_consistency(&policy).unwrap();
522 }
523
524 /// Headline behaviour: a commit-point read BLOCKS until the replica
525 /// applies the target VLSN, then returns Ok (blocks-then-sees-it).
526 #[test]
527 fn test_tracker_commit_point_blocks_then_satisfied() {
528 let applied = Arc::new(AtomicU64::new(0));
529 let tracker = ConsistencyTracker::new(Arc::clone(&applied));
530 let policy = ConsistencyPolicy::CommitPointConsistency {
531 vlsn: 7,
532 timeout: Duration::from_secs(5),
533 };
534
535 // Advance the replica from another thread after a short delay.
536 let tracker_bg = tracker.clone();
537 let applied_bg = Arc::clone(&applied);
538 let bg = std::thread::spawn(move || {
539 std::thread::sleep(Duration::from_millis(50));
540 applied_bg.store(7, Ordering::Release);
541 tracker_bg.notify_applied();
542 });
543
544 let start = Instant::now();
545 tracker.await_consistency(&policy).unwrap();
546 // It actually blocked (did not return on the fast path).
547 assert!(start.elapsed() >= Duration::from_millis(40));
548 assert!(applied.load(Ordering::Acquire) >= 7);
549 bg.join().unwrap();
550 }
551
552 /// Headline behaviour: a commit-point read that never catches up returns
553 /// a clean ConsistencyTimeout — NOT a hang.
554 #[test]
555 fn test_tracker_commit_point_times_out() {
556 let applied = Arc::new(AtomicU64::new(0));
557 let tracker = ConsistencyTracker::new(applied);
558 let policy = ConsistencyPolicy::CommitPointConsistency {
559 vlsn: 100,
560 timeout: Duration::from_millis(80),
561 };
562 let start = Instant::now();
563 let err = tracker.await_consistency(&policy).unwrap_err();
564 // Returned (no hang) and within a sane bound of the timeout.
565 assert!(start.elapsed() < Duration::from_secs(2));
566 assert!(matches!(err, RepError::ConsistencyTimeout(_)));
567 }
568
569 /// Headline behaviour: a lagging replica blocks a time-consistency read
570 /// until it catches up within the lag.
571 #[test]
572 fn test_tracker_time_blocks_then_catches_up() {
573 let applied = Arc::new(AtomicU64::new(0));
574 let tracker = ConsistencyTracker::new(Arc::clone(&applied));
575 // master at 1000, permissible lag 100ms (=100 VLSN proxy) -> must
576 // reach >= 900.
577 tracker.set_master_vlsn(1000);
578 let policy = ConsistencyPolicy::TimeConsistency {
579 max_lag: Duration::from_millis(100),
580 timeout: Duration::from_secs(5),
581 };
582
583 let tracker_bg = tracker.clone();
584 let applied_bg = Arc::clone(&applied);
585 let bg = std::thread::spawn(move || {
586 std::thread::sleep(Duration::from_millis(40));
587 applied_bg.store(920, Ordering::Release);
588 tracker_bg.notify_applied();
589 });
590
591 let start = Instant::now();
592 tracker.await_consistency(&policy).unwrap();
593 assert!(start.elapsed() >= Duration::from_millis(30));
594 bg.join().unwrap();
595 }
596
597 /// A time-consistency read that never catches up returns
598 /// ReplicaLagExceeded — not a hang.
599 #[test]
600 fn test_tracker_time_times_out() {
601 let applied = Arc::new(AtomicU64::new(0));
602 let tracker = ConsistencyTracker::new(applied);
603 tracker.set_master_vlsn(1000);
604 let policy = ConsistencyPolicy::TimeConsistency {
605 max_lag: Duration::from_millis(10),
606 timeout: Duration::from_millis(80),
607 };
608 let start = Instant::now();
609 let err = tracker.await_consistency(&policy).unwrap_err();
610 assert!(start.elapsed() < Duration::from_secs(2));
611 assert!(matches!(err, RepError::ReplicaLagExceeded { .. }));
612 }
613}