Skip to main content

noxu_rep/
consistency.rs

1//! Consistency policies for replica reads.
2//!
3//! Port of the JE `ReplicaConsistencyPolicy` hierarchy:
4//! `NoConsistencyRequiredPolicy`, `TimeConsistencyPolicy`, and
5//! `CommitPointConsistencyPolicy`
6//! (`com.sleepycat.je.rep.{NoConsistencyRequiredPolicy,TimeConsistencyPolicy,
7//! CommitPointConsistencyPolicy}`).
8//!
9//! ## What this does (REP-10)
10//!
11//! A read transaction that begins on a *replica* must not proceed until the
12//! replica's applied state satisfies the configured policy.  JE implements
13//! this in `ReplicaConsistencyPolicy.ensureConsistency` →
14//! `Replica.ConsistencyTracker.awaitVLSN` / `lagAwait`, which BLOCKS the
15//! `beginTransaction` call until the replica has replayed far enough, or the
16//! policy timeout expires (→ `ReplicaConsistencyException`).
17//!
18//! [`ConsistencyTracker`] is the Rust equivalent: it reuses the REP-7
19//! `last_applied_vlsn` handle (`ReplicaReplay::last_applied_vlsn_handle`) as
20//! the wait predicate — NOT a parallel tracker — and blocks the caller until
21//! the predicate holds or the timeout elapses (a clean [`RepError`], never a
22//! hang).
23
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use noxu_sync::{Condvar, Mutex};
29
30use crate::error::{RepError, Result};
31
32/// A consistency policy that determines what state a replica must be in
33/// before a read operation can proceed.
34///
35/// Consistency policy hierarchy for replication.
36#[derive(Debug, Clone, PartialEq, Default)]
37pub enum ConsistencyPolicy {
38    /// No consistency requirement -- read from any state.
39    ///
40    ///
41    #[default]
42    NoConsistency,
43
44    /// Time-based consistency: the replica must be within `max_lag` of
45    /// the master's commit point.
46    ///
47    ///
48    TimeConsistency {
49        /// Maximum permissible lag behind the master.
50        max_lag: Duration,
51        /// How long to wait for the replica to catch up.
52        timeout: Duration,
53    },
54
55    /// Commit-point consistency: the replica must have applied up to
56    /// a specific VLSN before the read can proceed.
57    ///
58    ///
59    CommitPointConsistency {
60        /// The VLSN sequence that must be applied on the replica.
61        vlsn: i64,
62        /// How long to wait for the replica to reach the VLSN.
63        timeout: Duration,
64    },
65}
66
67impl ConsistencyPolicy {
68    /// Build a [`ConsistencyPolicy::CommitPointConsistency`] from a
69    /// [`CommitToken`] minted by the master.
70    ///
71    /// Port of `new CommitPointConsistencyPolicy(commitToken, timeout, unit)`:
72    /// a client that did a write on the master passes the returned token to a
73    /// replica read so the read waits until the replica has replayed past it.
74    pub fn commit_point(token: &crate::CommitToken, timeout: Duration) -> Self {
75        ConsistencyPolicy::CommitPointConsistency {
76            vlsn: token.vlsn() as i64,
77            timeout,
78        }
79    }
80
81    /// Checks whether the given replica state satisfies this consistency
82    /// policy.
83    ///
84    /// - `current_vlsn`: The replica's current VLSN sequence.
85    /// - `master_vlsn`: The master's current VLSN sequence.
86    ///
87    /// Returns `Ok(true)` if the consistency requirement is met, or an
88    /// error describing why it is not.
89    pub fn check_consistency(
90        &self,
91        current_vlsn: i64,
92        master_vlsn: i64,
93    ) -> Result<bool> {
94        match self {
95            ConsistencyPolicy::NoConsistency => Ok(true),
96
97            ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
98                // Approximate: each VLSN is roughly 1ms of lag.
99                // In a real implementation this would use timestamps from
100                // heartbeat messages. Here we use VLSN difference as a proxy.
101                let lag_vlsns = master_vlsn.saturating_sub(current_vlsn);
102                if lag_vlsns < 0 {
103                    // Replica is ahead -- shouldn't happen, but treat as ok.
104                    return Ok(true);
105                }
106                let lag_ms = lag_vlsns as u64;
107                let limit_ms = max_lag.as_millis() as u64;
108                if lag_ms <= limit_ms {
109                    Ok(true)
110                } else {
111                    Err(RepError::ReplicaLagExceeded { lag_ms, limit_ms })
112                }
113            }
114
115            ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
116                if current_vlsn >= *vlsn {
117                    Ok(true)
118                } else {
119                    Err(RepError::ConsistencyTimeout(
120                        // Report the timeout configured for this policy.
121                        self.timeout().unwrap_or(Duration::ZERO),
122                    ))
123                }
124            }
125        }
126    }
127
128    /// Returns the timeout associated with this policy, if any.
129    pub fn timeout(&self) -> Option<Duration> {
130        match self {
131            ConsistencyPolicy::NoConsistency => None,
132            ConsistencyPolicy::TimeConsistency { timeout, .. } => {
133                Some(*timeout)
134            }
135            ConsistencyPolicy::CommitPointConsistency { timeout, .. } => {
136                Some(*timeout)
137            }
138        }
139    }
140}
141
142impl std::fmt::Display for ConsistencyPolicy {
143    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
144        match self {
145            ConsistencyPolicy::NoConsistency => write!(f, "NoConsistency"),
146            ConsistencyPolicy::TimeConsistency { max_lag, timeout } => {
147                write!(
148                    f,
149                    "TimeConsistency(max_lag={:?}, timeout={:?})",
150                    max_lag, timeout
151                )
152            }
153            ConsistencyPolicy::CommitPointConsistency { vlsn, timeout } => {
154                write!(
155                    f,
156                    "CommitPointConsistency(vlsn={}, timeout={:?})",
157                    vlsn, timeout
158                )
159            }
160        }
161    }
162}
163
164// ---------------------------------------------------------------------------
165// ConsistencyTracker (REP-10 piece A): the blocking consistency-wait.
166// ---------------------------------------------------------------------------
167
168/// Tracks the replica's applied state and blocks a read until the configured
169/// [`ConsistencyPolicy`] is satisfied.
170///
171/// Port of `com.sleepycat.je.rep.impl.node.Replica.ConsistencyTracker`
172/// (`awaitVLSN` / `lagAwait` / `await`).  In JE the tracker holds ordered
173/// `CountDownLatch`es that the replay thread *trips* as VLSNs are applied; a
174/// reader parks on the latch with the policy timeout and gets a
175/// `ReplicaConsistencyException` if it expires.
176///
177/// Here the predicate is the REP-7 `last_applied_vlsn` handle
178/// (`ReplicaReplay::last_applied_vlsn_handle`) — the SAME `Arc<AtomicU64>` the
179/// replay driver advances after each committed apply.  We do NOT add a
180/// parallel tracker; we read the existing hook.  `master_vlsn` is the
181/// master's latest known commit VLSN (the feeder stream / heartbeat
182/// high-water), used by the time policy — JE's `masterTxnEndVLSN`.
183#[derive(Clone)]
184pub struct ConsistencyTracker {
185    /// REP-7 hook: highest VLSN whose effects are visible in the replica's
186    /// live tree.  Advanced by `ReplicaReplay`; read here as the predicate.
187    last_applied_vlsn: Arc<AtomicU64>,
188
189    /// Master's latest known commit VLSN (feeder stream / heartbeat
190    /// high-water).  Port of `ConsistencyTracker.masterTxnEndVLSN`; used by
191    /// [`ConsistencyPolicy::TimeConsistency`] to estimate the lag.
192    master_vlsn: Arc<AtomicU64>,
193
194    /// Parking lot signalled when `last_applied_vlsn` advances, so a waiting
195    /// reader wakes promptly.  Port of the latch trip in
196    /// `ConsistencyTracker.trackVLSN`.
197    signal: Arc<(Mutex<()>, Condvar)>,
198}
199
200impl ConsistencyTracker {
201    /// How often a waiter re-checks the predicate even without an explicit
202    /// wake.  The replay thread advances `last_applied_vlsn` via a plain
203    /// atomic store; this tick bounds the wakeup latency if a `notify`
204    /// is ever missed, so the wait can never hang past the policy timeout.
205    //
206    // ponytail: 5ms re-check tick instead of wiring the latch-trip callback
207    // into the replay thread (JE trips the latch from `trackVLSN`). The tick
208    // bounds wakeup latency and guarantees no hang; wire an explicit notify
209    // into `ReplicaReplay::advance_vlsn` if sub-ms read latency ever matters.
210    const RECHECK_TICK: Duration = Duration::from_millis(5);
211
212    /// Build a tracker over the REP-7 `last_applied_vlsn` handle.
213    pub fn new(last_applied_vlsn: Arc<AtomicU64>) -> Self {
214        Self {
215            last_applied_vlsn,
216            master_vlsn: Arc::new(AtomicU64::new(0)),
217            signal: Arc::new((Mutex::new(()), Condvar::new())),
218        }
219    }
220
221    /// The replica's last-applied VLSN (the wait predicate).
222    pub fn last_applied_vlsn(&self) -> u64 {
223        self.last_applied_vlsn.load(Ordering::Acquire)
224    }
225
226    /// Record the master's latest known commit VLSN (feeder / heartbeat).
227    ///
228    /// Port of `ConsistencyTracker.trackHeartbeat` updating `masterTxnEndVLSN`.
229    /// Monotone.
230    pub fn set_master_vlsn(&self, vlsn: u64) {
231        self.master_vlsn.fetch_max(vlsn, Ordering::AcqRel);
232    }
233
234    /// The master's latest known commit VLSN.
235    pub fn master_vlsn(&self) -> u64 {
236        self.master_vlsn.load(Ordering::Acquire)
237    }
238
239    /// Wake any reader parked in [`Self::await_consistency`].
240    ///
241    /// Called when the replica applies a new entry (the replay thread can
242    /// invoke this after advancing `last_applied_vlsn`).  Equivalent to the
243    /// latch trip in `ConsistencyTracker.trackVLSN`.  Optional: a waiter also
244    /// re-checks every [`Self::RECHECK_TICK`], so a missed notify only delays
245    /// (never hangs) the read.
246    pub fn notify_applied(&self) {
247        let (_lock, cv) = &*self.signal;
248        cv.notify_all();
249    }
250
251    /// Block until the replica's applied state satisfies `policy`, or the
252    /// policy timeout expires.
253    ///
254    /// Port of `ReplicaConsistencyPolicy.ensureConsistency` →
255    /// `ConsistencyTracker.awaitVLSN` / `lagAwait`:
256    ///
257    /// - [`ConsistencyPolicy::NoConsistency`]: returns immediately (JE
258    ///   `NoConsistencyRequiredPolicy.ensureConsistency` is a no-op).
259    /// - [`ConsistencyPolicy::CommitPointConsistency`]: waits until
260    ///   `last_applied_vlsn >= token.vlsn` (JE `awaitVLSN` comparing against
261    ///   `lastReplayedTxnVLSN`).
262    /// - [`ConsistencyPolicy::TimeConsistency`]: waits until the estimated
263    ///   lag behind the master is within `max_lag` (JE `lagAwait`).
264    ///
265    /// On timeout returns a clean [`RepError`] —
266    /// [`RepError::ConsistencyTimeout`] for the commit-point policy and
267    /// [`RepError::ReplicaLagExceeded`] for the time policy — the equivalent
268    /// of JE's `ReplicaConsistencyException`.  NEVER hangs.
269    pub fn await_consistency(&self, policy: &ConsistencyPolicy) -> Result<()> {
270        let target_vlsn = match policy {
271            // NoConsistencyRequiredPolicy.ensureConsistency: no-op.
272            ConsistencyPolicy::NoConsistency => return Ok(()),
273
274            // awaitVLSN(commitToken.getVLSN()).
275            ConsistencyPolicy::CommitPointConsistency { vlsn, .. } => {
276                *vlsn as u64
277            }
278
279            // lagAwait: convert the permissible lag into the VLSN the replica
280            // must reach — master_vlsn back off by `max_lag` (1 VLSN ≈ 1ms, the
281            // same proxy `check_consistency` uses; a real impl would use the
282            // vlsn→time map).
283            ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
284                let master = self.master_vlsn();
285                let slack = max_lag.as_millis() as u64;
286                master.saturating_sub(slack)
287            }
288        };
289
290        // Fast path: already satisfied (JE awaitVLSN returns before parking
291        // when `vlsn <= compareVLSN`).
292        if self.last_applied_vlsn() >= target_vlsn {
293            return Ok(());
294        }
295
296        let timeout = policy.timeout().unwrap_or(Duration::ZERO);
297        let deadline = Instant::now() + timeout;
298        let (lock, cv) = &*self.signal;
299        let mut guard = lock.lock();
300        loop {
301            if self.last_applied_vlsn() >= target_vlsn {
302                return Ok(());
303            }
304            let now = Instant::now();
305            if now >= deadline {
306                // Timed out — clean error, never a hang. JE throws
307                // ReplicaConsistencyException here.
308                return Err(self.timeout_error(policy, target_vlsn));
309            }
310            // Park until the next notify or the recheck tick, whichever is
311            // sooner; bounded by the deadline so the timeout is honoured.
312            let remaining = deadline - now;
313            let wait = remaining.min(Self::RECHECK_TICK);
314            let _ = cv.wait_for(&mut guard, wait);
315        }
316    }
317
318    /// Build the timeout error for `policy`, matching the variant the
319    /// non-blocking [`ConsistencyPolicy::check_consistency`] reports.
320    fn timeout_error(
321        &self,
322        policy: &ConsistencyPolicy,
323        target_vlsn: u64,
324    ) -> RepError {
325        match policy {
326            ConsistencyPolicy::TimeConsistency { max_lag, .. } => {
327                let lag_ms =
328                    self.master_vlsn().saturating_sub(self.last_applied_vlsn());
329                RepError::ReplicaLagExceeded {
330                    lag_ms,
331                    limit_ms: max_lag.as_millis() as u64,
332                }
333            }
334            _ => {
335                let _ = target_vlsn;
336                RepError::ConsistencyTimeout(
337                    policy.timeout().unwrap_or(Duration::ZERO),
338                )
339            }
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use super::*;
347
348    #[test]
349    fn test_no_consistency_always_passes() {
350        let policy = ConsistencyPolicy::NoConsistency;
351        assert!(policy.check_consistency(0, 1000).unwrap());
352        assert!(policy.check_consistency(1000, 1000).unwrap());
353        assert!(policy.check_consistency(1000, 0).unwrap());
354    }
355
356    #[test]
357    fn test_no_consistency_timeout_is_none() {
358        let policy = ConsistencyPolicy::NoConsistency;
359        assert!(policy.timeout().is_none());
360    }
361
362    #[test]
363    fn test_time_consistency_within_lag() {
364        let policy = ConsistencyPolicy::TimeConsistency {
365            max_lag: Duration::from_millis(100),
366            timeout: Duration::from_secs(5),
367        };
368        // Replica is 50 VLSNs behind, limit is 100ms.
369        assert!(policy.check_consistency(950, 1000).unwrap());
370    }
371
372    #[test]
373    fn test_time_consistency_at_limit() {
374        let policy = ConsistencyPolicy::TimeConsistency {
375            max_lag: Duration::from_millis(100),
376            timeout: Duration::from_secs(5),
377        };
378        // Exactly at limit.
379        assert!(policy.check_consistency(900, 1000).unwrap());
380    }
381
382    #[test]
383    fn test_time_consistency_exceeds_lag() {
384        let policy = ConsistencyPolicy::TimeConsistency {
385            max_lag: Duration::from_millis(100),
386            timeout: Duration::from_secs(5),
387        };
388        let result = policy.check_consistency(800, 1000);
389        assert!(result.is_err());
390        match result.unwrap_err() {
391            RepError::ReplicaLagExceeded { lag_ms, limit_ms } => {
392                assert_eq!(lag_ms, 200);
393                assert_eq!(limit_ms, 100);
394            }
395            other => panic!("unexpected error: {:?}", other),
396        }
397    }
398
399    #[test]
400    fn test_time_consistency_replica_ahead() {
401        let policy = ConsistencyPolicy::TimeConsistency {
402            max_lag: Duration::from_millis(100),
403            timeout: Duration::from_secs(5),
404        };
405        // Replica ahead of master -- should pass.
406        assert!(policy.check_consistency(1000, 500).unwrap());
407    }
408
409    #[test]
410    fn test_time_consistency_timeout() {
411        let policy = ConsistencyPolicy::TimeConsistency {
412            max_lag: Duration::from_millis(100),
413            timeout: Duration::from_secs(5),
414        };
415        assert_eq!(policy.timeout(), Some(Duration::from_secs(5)));
416    }
417
418    #[test]
419    fn test_commit_point_satisfied() {
420        let policy = ConsistencyPolicy::CommitPointConsistency {
421            vlsn: 500,
422            timeout: Duration::from_secs(10),
423        };
424        assert!(policy.check_consistency(500, 1000).unwrap());
425        assert!(policy.check_consistency(600, 1000).unwrap());
426    }
427
428    #[test]
429    fn test_commit_point_not_satisfied() {
430        let policy = ConsistencyPolicy::CommitPointConsistency {
431            vlsn: 500,
432            timeout: Duration::from_secs(10),
433        };
434        let result = policy.check_consistency(400, 1000);
435        assert!(result.is_err());
436        match result.unwrap_err() {
437            RepError::ConsistencyTimeout(d) => {
438                assert_eq!(d, Duration::from_secs(10));
439            }
440            other => panic!("unexpected error: {:?}", other),
441        }
442    }
443
444    #[test]
445    fn test_commit_point_timeout() {
446        let policy = ConsistencyPolicy::CommitPointConsistency {
447            vlsn: 100,
448            timeout: Duration::from_secs(10),
449        };
450        assert_eq!(policy.timeout(), Some(Duration::from_secs(10)));
451    }
452
453    #[test]
454    fn test_default_is_no_consistency() {
455        assert_eq!(
456            ConsistencyPolicy::default(),
457            ConsistencyPolicy::NoConsistency
458        );
459    }
460
461    #[test]
462    fn test_display_no_consistency() {
463        assert_eq!(
464            ConsistencyPolicy::NoConsistency.to_string(),
465            "NoConsistency"
466        );
467    }
468
469    #[test]
470    fn test_display_time_consistency() {
471        let policy = ConsistencyPolicy::TimeConsistency {
472            max_lag: Duration::from_millis(500),
473            timeout: Duration::from_secs(10),
474        };
475        let s = policy.to_string();
476        assert!(s.contains("TimeConsistency"));
477        assert!(s.contains("500ms"));
478    }
479
480    #[test]
481    fn test_display_commit_point() {
482        let policy = ConsistencyPolicy::CommitPointConsistency {
483            vlsn: 42,
484            timeout: Duration::from_secs(5),
485        };
486        let s = policy.to_string();
487        assert!(s.contains("CommitPointConsistency"));
488        assert!(s.contains("42"));
489    }
490
491    #[test]
492    fn test_clone_and_eq() {
493        let policy = ConsistencyPolicy::TimeConsistency {
494            max_lag: Duration::from_millis(100),
495            timeout: Duration::from_secs(5),
496        };
497        let cloned = policy.clone();
498        assert_eq!(policy, cloned);
499    }
500
501    // -- ConsistencyTracker (blocking wait) ------------------------------
502
503    #[test]
504    fn test_tracker_no_consistency_never_blocks() {
505        let applied = Arc::new(AtomicU64::new(0));
506        let tracker = ConsistencyTracker::new(applied);
507        // master far ahead; NoConsistency returns immediately.
508        tracker.set_master_vlsn(10_000);
509        tracker.await_consistency(&ConsistencyPolicy::NoConsistency).unwrap();
510    }
511
512    #[test]
513    fn test_tracker_commit_point_already_satisfied() {
514        let applied = Arc::new(AtomicU64::new(500));
515        let tracker = ConsistencyTracker::new(applied);
516        let policy = ConsistencyPolicy::CommitPointConsistency {
517            vlsn: 500,
518            timeout: Duration::from_secs(5),
519        };
520        // Fast path: no wait.
521        tracker.await_consistency(&policy).unwrap();
522    }
523
524    /// Headline behaviour: a commit-point read BLOCKS until the replica
525    /// applies the target VLSN, then returns Ok (blocks-then-sees-it).
526    #[test]
527    fn test_tracker_commit_point_blocks_then_satisfied() {
528        let applied = Arc::new(AtomicU64::new(0));
529        let tracker = ConsistencyTracker::new(Arc::clone(&applied));
530        let policy = ConsistencyPolicy::CommitPointConsistency {
531            vlsn: 7,
532            timeout: Duration::from_secs(5),
533        };
534
535        // Advance the replica from another thread after a short delay.
536        let tracker_bg = tracker.clone();
537        let applied_bg = Arc::clone(&applied);
538        let bg = std::thread::spawn(move || {
539            std::thread::sleep(Duration::from_millis(50));
540            applied_bg.store(7, Ordering::Release);
541            tracker_bg.notify_applied();
542        });
543
544        let start = Instant::now();
545        tracker.await_consistency(&policy).unwrap();
546        // It actually blocked (did not return on the fast path).
547        assert!(start.elapsed() >= Duration::from_millis(40));
548        assert!(applied.load(Ordering::Acquire) >= 7);
549        bg.join().unwrap();
550    }
551
552    /// Headline behaviour: a commit-point read that never catches up returns
553    /// a clean ConsistencyTimeout — NOT a hang.
554    #[test]
555    fn test_tracker_commit_point_times_out() {
556        let applied = Arc::new(AtomicU64::new(0));
557        let tracker = ConsistencyTracker::new(applied);
558        let policy = ConsistencyPolicy::CommitPointConsistency {
559            vlsn: 100,
560            timeout: Duration::from_millis(80),
561        };
562        let start = Instant::now();
563        let err = tracker.await_consistency(&policy).unwrap_err();
564        // Returned (no hang) and within a sane bound of the timeout.
565        assert!(start.elapsed() < Duration::from_secs(2));
566        assert!(matches!(err, RepError::ConsistencyTimeout(_)));
567    }
568
569    /// Headline behaviour: a lagging replica blocks a time-consistency read
570    /// until it catches up within the lag.
571    #[test]
572    fn test_tracker_time_blocks_then_catches_up() {
573        let applied = Arc::new(AtomicU64::new(0));
574        let tracker = ConsistencyTracker::new(Arc::clone(&applied));
575        // master at 1000, permissible lag 100ms (=100 VLSN proxy) -> must
576        // reach >= 900.
577        tracker.set_master_vlsn(1000);
578        let policy = ConsistencyPolicy::TimeConsistency {
579            max_lag: Duration::from_millis(100),
580            timeout: Duration::from_secs(5),
581        };
582
583        let tracker_bg = tracker.clone();
584        let applied_bg = Arc::clone(&applied);
585        let bg = std::thread::spawn(move || {
586            std::thread::sleep(Duration::from_millis(40));
587            applied_bg.store(920, Ordering::Release);
588            tracker_bg.notify_applied();
589        });
590
591        let start = Instant::now();
592        tracker.await_consistency(&policy).unwrap();
593        assert!(start.elapsed() >= Duration::from_millis(30));
594        bg.join().unwrap();
595    }
596
597    /// A time-consistency read that never catches up returns
598    /// ReplicaLagExceeded — not a hang.
599    #[test]
600    fn test_tracker_time_times_out() {
601        let applied = Arc::new(AtomicU64::new(0));
602        let tracker = ConsistencyTracker::new(applied);
603        tracker.set_master_vlsn(1000);
604        let policy = ConsistencyPolicy::TimeConsistency {
605            max_lag: Duration::from_millis(10),
606            timeout: Duration::from_millis(80),
607        };
608        let start = Instant::now();
609        let err = tracker.await_consistency(&policy).unwrap_err();
610        assert!(start.elapsed() < Duration::from_secs(2));
611        assert!(matches!(err, RepError::ReplicaLagExceeded { .. }));
612    }
613}