Skip to main content

noxu_rep/stream/
syncup.rs

1//! Replica-feeder syncup: diverged-tail matchpoint search and rollback
2//! decision.
3//!
4//! Port of the decision core of `com.sleepycat.je.rep.stream.ReplicaFeederSyncup`
5//! (`findMatchpoint` + `verifyRollback`) and the protocol exchange documented
6//! in `FeederReplicaSyncup.java`.
7//!
8//! When a replica reconnects to a (possibly new) master, the two must agree on
9//! the latest COMMON log entry — the *matchpoint* — defined as the highest VLSN
10//! that both nodes hold at the *same LSN with a matching log record*. If the
11//! replica has applied entries after that matchpoint (a *diverged tail*, e.g.
12//! after a failed election), those entries must be ROLLED BACK to the
13//! matchpoint before the stream resumes from `matchpoint + 1`.
14//!
15//! This module implements the *decision* core as pure, testable functions over
16//! the VLSN→LSN substrate (the matchpoint search and the `verifyRollback`
17//! truth table). The networked wire exchange (`EntryRequest` /
18//! `EntryNotFound` / `AlternateMatchpoint`), the backward `ReplicaSyncupReader`,
19//! and the live `replay.rollback` log/tree truncation are layered on top by the
20//! replica stream driver (see the module-level note in `peer_feeder.rs` and the
21//! REP-1 STEP 5 design note). The rollback EXECUTION reuses the durable
22//! recovery machinery built in REP-1 STEPS 1-4 (RollbackStart/End entries,
23//! RollbackTracker, TxnChain revert, invisible re-marking).
24
25use noxu_util::{NULL_VLSN, Vlsn};
26
27/// One node's per-VLSN log identity used for matchpoint comparison: the LSN
28/// the entry lives at and a record fingerprint (checksum) used to confirm the
29/// two nodes hold the *same* record at that VLSN, not merely the same VLSN.
30///
31/// JE compares full `OutputWireRecord.match(InputWireRecord)`; here the
32/// fingerprint stands in for that record equality.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct VlsnEntry {
35    /// LSN where this VLSN's log entry resides.
36    pub lsn: u64,
37    /// Record fingerprint (e.g. entry checksum) for record-equality.
38    pub fingerprint: u64,
39    /// Whether this VLSN is a sync-point (a valid matchpoint candidate). JE
40    /// only matches at sync points (`range.getLastSync()` and earlier sync
41    /// entries scanned by `scanMatchpointEntries`).
42    pub is_sync: bool,
43}
44
45/// A node's view of its replicated log for syncup: the VLSN range plus a way
46/// to look up each VLSN's [`VlsnEntry`]. Models the replica's `VLSNIndex` +
47/// log and the feeder's responses to `EntryRequest`.
48pub trait SyncupView {
49    /// Highest sync-point VLSN (`VLSNRange.getLastSync`) — the first
50    /// matchpoint candidate.
51    fn last_sync(&self) -> Vlsn;
52    /// Highest commit/abort VLSN (`VLSNRange.getLastTxnEnd`) — the rollback
53    /// safety boundary.
54    fn last_txn_end(&self) -> Vlsn;
55    /// First contiguous VLSN available (`VLSNRange.getFirst`). The search may
56    /// not go below this.
57    fn first_vlsn(&self) -> Vlsn;
58    /// Look up the entry at `vlsn`, or `None` if this node does not hold it.
59    fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry>;
60}
61
62/// Outcome of the matchpoint search (`ReplicaFeederSyncup.findMatchpoint`).
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum Matchpoint {
65    /// A common matchpoint was found at this VLSN and LSN.
66    Found { vlsn: Vlsn, lsn: u64 },
67    /// No common matchpoint exists within the replica's contiguous range; the
68    /// replica must fall back to a network restore.
69    None,
70}
71
72/// Search for the highest VLSN that BOTH the replica and the feeder hold with
73/// a matching record at the same LSN, scanning the replica's sync points
74/// backward from its `lastSync`.
75///
76/// Port of `ReplicaFeederSyncup.findMatchpoint`:
77/// - The first candidate is `replica.last_sync()`.
78/// - For each candidate, ask the feeder for the record at that VLSN; if the
79///   feeder holds it and the records match, that is the matchpoint.
80/// - Otherwise scan to the replica's previous sync point and retry.
81/// - If the scan goes below the replica's first contiguous VLSN, there is no
82///   matchpoint (network restore).
83///
84/// Records "match" when the feeder holds the VLSN and its fingerprint equals
85/// the replica's (JE `OutputWireRecord.match`).
86pub fn find_matchpoint(
87    replica: &dyn SyncupView,
88    feeder: &dyn SyncupView,
89) -> Matchpoint {
90    let candidate = replica.last_sync();
91
92    // If the replica has no sync-able entries, the matchpoint is NULL and the
93    // stream starts at VLSN 1 — provided the feeder holds VLSN 1.
94    if candidate.is_null() {
95        // JE getFeederRecord(range, FIRST_VLSN): if the feeder lacks VLSN 1,
96        // it is a network restore.
97        return Matchpoint::None;
98    }
99
100    let first = replica.first_vlsn();
101    let mut candidate = candidate;
102
103    loop {
104        // Look at the replica's record at the candidate VLSN.
105        let replica_entry = match replica.entry(candidate) {
106            Some(e) => e,
107            None => {
108                // Went past the replica's contiguous range → no matchpoint.
109                return Matchpoint::None;
110            }
111        };
112
113        // Ask the feeder for the same VLSN and compare records.
114        if let Some(feeder_entry) = feeder.entry(candidate)
115            && feeder_entry.fingerprint == replica_entry.fingerprint
116            && feeder_entry.lsn == replica_entry.lsn
117        {
118            return Matchpoint::Found {
119                vlsn: candidate,
120                lsn: replica_entry.lsn,
121            };
122        }
123
124        // No match at this candidate; scan to the previous sync point.
125        match prev_sync_candidate(replica, candidate, first) {
126            Some(prev) => candidate = prev,
127            None => return Matchpoint::None,
128        }
129    }
130}
131
132/// Find the replica's next sync-point VLSN strictly below `from`, not going
133/// below `first`. Models `ReplicaFeederSyncup.scanMatchpointEntries`, which
134/// scans the replica's log backward for the previous sync entry.
135fn prev_sync_candidate(
136    replica: &dyn SyncupView,
137    from: Vlsn,
138    first: Vlsn,
139) -> Option<Vlsn> {
140    let mut v = from.prev();
141    while !v.is_null() && v >= first {
142        if let Some(e) = replica.entry(v)
143            && e.is_sync
144        {
145            return Some(v);
146        }
147        v = v.prev();
148    }
149    None
150}
151
152/// The action `verifyRollback` selects once a matchpoint search has completed.
153///
154/// Port of the `ReplicaFeederSyncup.verifyRollback` truth table.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum RollbackDecision {
157    /// Roll the diverged tail back to the matchpoint, then resume streaming
158    /// from `matchpoint + 1`. A "normal" rollback that does not cross a
159    /// committed/aborted transaction end (`lastTxnEnd <= matchpoint`).
160    RollbackToMatchpoint { matchpoint_vlsn: Vlsn, start_vlsn: Vlsn },
161    /// The matchpoint would require rolling back past a transaction end that
162    /// the replica has acknowledged. JE does a *hard recovery* (log truncation
163    /// + restart) when truncation is permissible.
164    HardRecovery { matchpoint_vlsn: Vlsn, last_txn_end: Vlsn },
165    /// No safe matchpoint / truncation not permissible — the replica must do a
166    /// full network restore from the master.
167    NetworkRestore,
168}
169
170/// Decide what to do given the matchpoint search result and the replica's VLSN
171/// range, faithful to `ReplicaFeederSyncup.verifyRollback`'s truth table.
172///
173/// `num_passed_commits` is JE `searchResults.getNumPassedCommits()`: the count
174/// of commit/abort records the backward matchpoint scan stepped over. A
175/// non-zero count means the matchpoint lies before a txn end even when
176/// `lastTxnEnd <= matchpoint` numerically (the txn end was logged at a VLSN
177/// the scan passed), forcing hard recovery rather than a normal rollback.
178pub fn verify_rollback(
179    matchpoint: &Matchpoint,
180    last_txn_end: Vlsn,
181    last_sync: Vlsn,
182    num_passed_commits: u64,
183) -> RollbackDecision {
184    let matchpoint_vlsn = match matchpoint {
185        Matchpoint::Found { vlsn, .. } => *vlsn,
186        Matchpoint::None => NULL_VLSN,
187    };
188
189    // Row group 1: lastTxnEnd is NULL — no acknowledged txn end to protect, so
190    // a normal rollback is always safe (rollback everything / to M).
191    if last_txn_end.is_null() {
192        // (NULL txn end, NULL sync, found M) "can't occur" — but if it does,
193        // treat as no matchpoint → network restore is the conservative path.
194        if last_sync.is_null() && !matchpoint_vlsn.is_null() {
195            return RollbackDecision::NetworkRestore;
196        }
197        return rollback_to(matchpoint_vlsn);
198    }
199
200    // lastTxnEnd is non-null but no matchpoint was found: JE chooses network
201    // restore (could hard-recover, but copying files is assumed cheaper).
202    if matchpoint_vlsn.is_null() {
203        return RollbackDecision::NetworkRestore;
204    }
205
206    // The matchpoint is at or after the last txn end AND the backward scan
207    // passed no commits: a normal rollback (`lastTxnEnd <= matchpointVLSN`).
208    if last_txn_end <= matchpoint_vlsn && num_passed_commits == 0 {
209        return rollback_to(matchpoint_vlsn);
210    }
211
212    // Otherwise we would roll back past a commit/abort → hard recovery (JE
213    // truncates the log and runs hard recovery when truncation is permissible;
214    // the not-permissible / disabled cases degrade to network restore, which
215    // the live driver decides with checkpoint-deleted-file information not
216    // modelled here).
217    RollbackDecision::HardRecovery { matchpoint_vlsn, last_txn_end }
218}
219
220fn rollback_to(matchpoint_vlsn: Vlsn) -> RollbackDecision {
221    RollbackDecision::RollbackToMatchpoint {
222        matchpoint_vlsn,
223        start_vlsn: matchpoint_vlsn.next(),
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use std::collections::HashMap;
231
232    /// A simple map-backed [`SyncupView`] for tests.
233    struct MapView {
234        last_sync: Vlsn,
235        last_txn_end: Vlsn,
236        first: Vlsn,
237        entries: HashMap<i64, VlsnEntry>,
238    }
239
240    impl MapView {
241        fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
242            Self {
243                last_sync: Vlsn::new(last_sync),
244                last_txn_end: Vlsn::new(last_txn_end),
245                first: Vlsn::new(first),
246                entries: HashMap::new(),
247            }
248        }
249        fn put(
250            mut self,
251            vlsn: i64,
252            lsn: u64,
253            fingerprint: u64,
254            is_sync: bool,
255        ) -> Self {
256            self.entries.insert(vlsn, VlsnEntry { lsn, fingerprint, is_sync });
257            self
258        }
259    }
260
261    impl SyncupView for MapView {
262        fn last_sync(&self) -> Vlsn {
263            self.last_sync
264        }
265        fn last_txn_end(&self) -> Vlsn {
266            self.last_txn_end
267        }
268        fn first_vlsn(&self) -> Vlsn {
269            self.first
270        }
271        fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
272            self.entries.get(&vlsn.sequence()).copied()
273        }
274    }
275
276    /// The replica's last sync point matches the feeder directly: matchpoint =
277    /// lastSync, no rollback of divergent tail beyond it.
278    #[test]
279    fn test_matchpoint_at_last_sync() {
280        let replica = MapView::new(1, 5, 5)
281            .put(5, 0x500, 0xAA, true)
282            .put(4, 0x400, 0xBB, true);
283        let feeder = MapView::new(1, 7, 7)
284            .put(5, 0x500, 0xAA, true)
285            .put(6, 0x600, 0xCC, true);
286
287        let mp = find_matchpoint(&replica, &feeder);
288        assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 });
289    }
290
291    /// Diverged tail: the replica applied VLSN 6/7 that the feeder never had
292    /// (its 6/7 differ or are absent). The search walks back to the highest
293    /// common sync point (VLSN 4).
294    #[test]
295    fn test_diverged_tail_walks_back() {
296        // Replica's sync points: 6 (divergent), 4 (common), with 5 a non-sync.
297        let replica = MapView::new(1, 6, 6)
298            .put(6, 0x600, 0xDEAD, true) // divergent — feeder lacks/differs
299            .put(5, 0x500, 0x55, false) // not a sync point
300            .put(4, 0x400, 0x44, true); // common sync
301        let feeder = MapView::new(1, 8, 8)
302            .put(6, 0x600, 0xBEEF, true) // different record at VLSN 6
303            .put(4, 0x400, 0x44, true); // same record at VLSN 4
304
305        let mp = find_matchpoint(&replica, &feeder);
306        assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 });
307    }
308
309    /// No common matchpoint within the replica's contiguous range → network
310    /// restore.
311    #[test]
312    fn test_no_matchpoint_needs_restore() {
313        let replica = MapView::new(4, 6, 6)
314            .put(6, 0x600, 0x11, true)
315            .put(5, 0x500, 0x22, true)
316            .put(4, 0x400, 0x33, true);
317        // Feeder holds none of the replica's records (all differ).
318        let feeder = MapView::new(1, 8, 8)
319            .put(6, 0x600, 0x99, true)
320            .put(5, 0x500, 0x88, true)
321            .put(4, 0x400, 0x77, true);
322
323        assert_eq!(find_matchpoint(&replica, &feeder), Matchpoint::None);
324    }
325
326    /// verifyRollback: matchpoint at/after lastTxnEnd, no passed commits →
327    /// normal rollback to the matchpoint, resume at matchpoint+1.
328    #[test]
329    fn test_verify_normal_rollback() {
330        let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
331        let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 0);
332        assert_eq!(
333            d,
334            RollbackDecision::RollbackToMatchpoint {
335                matchpoint_vlsn: Vlsn::new(5),
336                start_vlsn: Vlsn::new(6),
337            }
338        );
339    }
340
341    /// verifyRollback: rolling back past a committed txn end → hard recovery.
342    #[test]
343    fn test_verify_hard_recovery_past_commit() {
344        let mp = Matchpoint::Found { vlsn: Vlsn::new(3), lsn: 0x300 };
345        // lastTxnEnd (5) > matchpoint (3) → would roll back past a commit.
346        let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(6), 0);
347        assert_eq!(
348            d,
349            RollbackDecision::HardRecovery {
350                matchpoint_vlsn: Vlsn::new(3),
351                last_txn_end: Vlsn::new(5),
352            }
353        );
354    }
355
356    /// verifyRollback: matchpoint == lastTxnEnd numerically but the scan
357    /// passed a commit → still hard recovery (numPassedCommits != 0).
358    #[test]
359    fn test_verify_passed_commit_forces_hard_recovery() {
360        let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
361        let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 1);
362        assert!(matches!(d, RollbackDecision::HardRecovery { .. }));
363    }
364
365    /// verifyRollback: no acknowledged txn end → normal rollback regardless.
366    #[test]
367    fn test_verify_null_txn_end_normal_rollback() {
368        let mp = Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 };
369        let d = verify_rollback(&mp, NULL_VLSN, Vlsn::new(4), 3);
370        assert_eq!(
371            d,
372            RollbackDecision::RollbackToMatchpoint {
373                matchpoint_vlsn: Vlsn::new(4),
374                start_vlsn: Vlsn::new(5),
375            }
376        );
377    }
378
379    /// verifyRollback: non-null txn end but no matchpoint → network restore.
380    #[test]
381    fn test_verify_no_matchpoint_with_txn_end_restore() {
382        let d =
383            verify_rollback(&Matchpoint::None, Vlsn::new(5), Vlsn::new(5), 0);
384        assert_eq!(d, RollbackDecision::NetworkRestore);
385    }
386}