noxu_rep/stream/syncup.rs
1//! Replica-feeder syncup: diverged-tail matchpoint search and rollback
2//! decision.
3//!
4//! Port of the decision core of `com.sleepycat.je.rep.stream.ReplicaFeederSyncup`
5//! (`findMatchpoint` + `verifyRollback`) and the protocol exchange documented
6//! in `FeederReplicaSyncup.java`.
7//!
8//! When a replica reconnects to a (possibly new) master, the two must agree on
9//! the latest COMMON log entry — the *matchpoint* — defined as the highest VLSN
10//! that both nodes hold at the *same LSN with a matching log record*. If the
11//! replica has applied entries after that matchpoint (a *diverged tail*, e.g.
12//! after a failed election), those entries must be ROLLED BACK to the
13//! matchpoint before the stream resumes from `matchpoint + 1`.
14//!
15//! This module implements the *decision* core as pure, testable functions over
16//! the VLSN→LSN substrate (the matchpoint search and the `verifyRollback`
17//! truth table). The networked wire exchange (`EntryRequest` /
18//! `EntryNotFound` / `AlternateMatchpoint`), the backward `ReplicaSyncupReader`,
19//! and the live `replay.rollback` log/tree truncation are layered on top by the
20//! replica stream driver (see the module-level note in `peer_feeder.rs` and the
21//! REP-1 STEP 5 design note). The rollback EXECUTION reuses the durable
22//! recovery machinery built in REP-1 STEPS 1-4 (RollbackStart/End entries,
23//! RollbackTracker, TxnChain revert, invisible re-marking).
24
25use noxu_util::{NULL_VLSN, Vlsn};
26
27/// One node's per-VLSN log identity used for matchpoint comparison: the LSN
28/// the entry lives at and a record fingerprint (checksum) used to confirm the
29/// two nodes hold the *same* record at that VLSN, not merely the same VLSN.
30///
31/// JE compares full `OutputWireRecord.match(InputWireRecord)`; here the
32/// fingerprint stands in for that record equality.
33#[derive(Debug, Clone, Copy, PartialEq, Eq)]
34pub struct VlsnEntry {
35 /// LSN where this VLSN's log entry resides.
36 pub lsn: u64,
37 /// Record fingerprint (e.g. entry checksum) for record-equality.
38 pub fingerprint: u64,
39 /// Whether this VLSN is a sync-point (a valid matchpoint candidate). JE
40 /// only matches at sync points (`range.getLastSync()` and earlier sync
41 /// entries scanned by `scanMatchpointEntries`).
42 pub is_sync: bool,
43}
44
45/// A node's view of its replicated log for syncup: the VLSN range plus a way
46/// to look up each VLSN's [`VlsnEntry`]. Models the replica's `VLSNIndex` +
47/// log and the feeder's responses to `EntryRequest`.
48pub trait SyncupView {
49 /// Highest sync-point VLSN (`VLSNRange.getLastSync`) — the first
50 /// matchpoint candidate.
51 fn last_sync(&self) -> Vlsn;
52 /// Highest commit/abort VLSN (`VLSNRange.getLastTxnEnd`) — the rollback
53 /// safety boundary.
54 fn last_txn_end(&self) -> Vlsn;
55 /// First contiguous VLSN available (`VLSNRange.getFirst`). The search may
56 /// not go below this.
57 fn first_vlsn(&self) -> Vlsn;
58 /// Look up the entry at `vlsn`, or `None` if this node does not hold it.
59 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry>;
60}
61
62/// Outcome of the matchpoint search (`ReplicaFeederSyncup.findMatchpoint`).
63#[derive(Debug, Clone, PartialEq, Eq)]
64pub enum Matchpoint {
65 /// A common matchpoint was found at this VLSN and LSN.
66 Found { vlsn: Vlsn, lsn: u64 },
67 /// No common matchpoint exists within the replica's contiguous range; the
68 /// replica must fall back to a network restore.
69 None,
70}
71
72/// Search for the highest VLSN that BOTH the replica and the feeder hold with
73/// a matching record at the same LSN, scanning the replica's sync points
74/// backward from its `lastSync`.
75///
76/// Port of `ReplicaFeederSyncup.findMatchpoint`:
77/// - The first candidate is `replica.last_sync()`.
78/// - For each candidate, ask the feeder for the record at that VLSN; if the
79/// feeder holds it and the records match, that is the matchpoint.
80/// - Otherwise scan to the replica's previous sync point and retry.
81/// - If the scan goes below the replica's first contiguous VLSN, there is no
82/// matchpoint (network restore).
83///
84/// Records "match" when the feeder holds the VLSN and its fingerprint equals
85/// the replica's (JE `OutputWireRecord.match`).
86pub fn find_matchpoint(
87 replica: &dyn SyncupView,
88 feeder: &dyn SyncupView,
89) -> Matchpoint {
90 let candidate = replica.last_sync();
91
92 // If the replica has no sync-able entries, the matchpoint is NULL and the
93 // stream starts at VLSN 1 — provided the feeder holds VLSN 1.
94 if candidate.is_null() {
95 // JE getFeederRecord(range, FIRST_VLSN): if the feeder lacks VLSN 1,
96 // it is a network restore.
97 return Matchpoint::None;
98 }
99
100 let first = replica.first_vlsn();
101 let mut candidate = candidate;
102
103 loop {
104 // Look at the replica's record at the candidate VLSN.
105 let replica_entry = match replica.entry(candidate) {
106 Some(e) => e,
107 None => {
108 // Went past the replica's contiguous range → no matchpoint.
109 return Matchpoint::None;
110 }
111 };
112
113 // Ask the feeder for the same VLSN and compare records.
114 if let Some(feeder_entry) = feeder.entry(candidate)
115 && feeder_entry.fingerprint == replica_entry.fingerprint
116 && feeder_entry.lsn == replica_entry.lsn
117 {
118 return Matchpoint::Found {
119 vlsn: candidate,
120 lsn: replica_entry.lsn,
121 };
122 }
123
124 // No match at this candidate; scan to the previous sync point.
125 match prev_sync_candidate(replica, candidate, first) {
126 Some(prev) => candidate = prev,
127 None => return Matchpoint::None,
128 }
129 }
130}
131
132/// Find the replica's next sync-point VLSN strictly below `from`, not going
133/// below `first`. Models `ReplicaFeederSyncup.scanMatchpointEntries`, which
134/// scans the replica's log backward for the previous sync entry.
135fn prev_sync_candidate(
136 replica: &dyn SyncupView,
137 from: Vlsn,
138 first: Vlsn,
139) -> Option<Vlsn> {
140 let mut v = from.prev();
141 while !v.is_null() && v >= first {
142 if let Some(e) = replica.entry(v)
143 && e.is_sync
144 {
145 return Some(v);
146 }
147 v = v.prev();
148 }
149 None
150}
151
152/// The action `verifyRollback` selects once a matchpoint search has completed.
153///
154/// Port of the `ReplicaFeederSyncup.verifyRollback` truth table.
155#[derive(Debug, Clone, PartialEq, Eq)]
156pub enum RollbackDecision {
157 /// Roll the diverged tail back to the matchpoint, then resume streaming
158 /// from `matchpoint + 1`. A "normal" rollback that does not cross a
159 /// committed/aborted transaction end (`lastTxnEnd <= matchpoint`).
160 RollbackToMatchpoint { matchpoint_vlsn: Vlsn, start_vlsn: Vlsn },
161 /// The matchpoint would require rolling back past a transaction end that
162 /// the replica has acknowledged. JE does a *hard recovery* (log truncation
163 /// + restart) when truncation is permissible.
164 HardRecovery { matchpoint_vlsn: Vlsn, last_txn_end: Vlsn },
165 /// No safe matchpoint / truncation not permissible — the replica must do a
166 /// full network restore from the master.
167 NetworkRestore,
168}
169
170/// Decide what to do given the matchpoint search result and the replica's VLSN
171/// range, faithful to `ReplicaFeederSyncup.verifyRollback`'s truth table.
172///
173/// `num_passed_commits` is JE `searchResults.getNumPassedCommits()`: the count
174/// of commit/abort records the backward matchpoint scan stepped over. A
175/// non-zero count means the matchpoint lies before a txn end even when
176/// `lastTxnEnd <= matchpoint` numerically (the txn end was logged at a VLSN
177/// the scan passed), forcing hard recovery rather than a normal rollback.
178pub fn verify_rollback(
179 matchpoint: &Matchpoint,
180 last_txn_end: Vlsn,
181 last_sync: Vlsn,
182 num_passed_commits: u64,
183) -> RollbackDecision {
184 let matchpoint_vlsn = match matchpoint {
185 Matchpoint::Found { vlsn, .. } => *vlsn,
186 Matchpoint::None => NULL_VLSN,
187 };
188
189 // Row group 1: lastTxnEnd is NULL — no acknowledged txn end to protect, so
190 // a normal rollback is always safe (rollback everything / to M).
191 if last_txn_end.is_null() {
192 // (NULL txn end, NULL sync, found M) "can't occur" — but if it does,
193 // treat as no matchpoint → network restore is the conservative path.
194 if last_sync.is_null() && !matchpoint_vlsn.is_null() {
195 return RollbackDecision::NetworkRestore;
196 }
197 return rollback_to(matchpoint_vlsn);
198 }
199
200 // lastTxnEnd is non-null but no matchpoint was found: JE chooses network
201 // restore (could hard-recover, but copying files is assumed cheaper).
202 if matchpoint_vlsn.is_null() {
203 return RollbackDecision::NetworkRestore;
204 }
205
206 // The matchpoint is at or after the last txn end AND the backward scan
207 // passed no commits: a normal rollback (`lastTxnEnd <= matchpointVLSN`).
208 if last_txn_end <= matchpoint_vlsn && num_passed_commits == 0 {
209 return rollback_to(matchpoint_vlsn);
210 }
211
212 // Otherwise we would roll back past a commit/abort → hard recovery (JE
213 // truncates the log and runs hard recovery when truncation is permissible;
214 // the not-permissible / disabled cases degrade to network restore, which
215 // the live driver decides with checkpoint-deleted-file information not
216 // modelled here).
217 RollbackDecision::HardRecovery { matchpoint_vlsn, last_txn_end }
218}
219
220fn rollback_to(matchpoint_vlsn: Vlsn) -> RollbackDecision {
221 RollbackDecision::RollbackToMatchpoint {
222 matchpoint_vlsn,
223 start_vlsn: matchpoint_vlsn.next(),
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use std::collections::HashMap;
231
232 /// A simple map-backed [`SyncupView`] for tests.
233 struct MapView {
234 last_sync: Vlsn,
235 last_txn_end: Vlsn,
236 first: Vlsn,
237 entries: HashMap<i64, VlsnEntry>,
238 }
239
240 impl MapView {
241 fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
242 Self {
243 last_sync: Vlsn::new(last_sync),
244 last_txn_end: Vlsn::new(last_txn_end),
245 first: Vlsn::new(first),
246 entries: HashMap::new(),
247 }
248 }
249 fn put(
250 mut self,
251 vlsn: i64,
252 lsn: u64,
253 fingerprint: u64,
254 is_sync: bool,
255 ) -> Self {
256 self.entries.insert(vlsn, VlsnEntry { lsn, fingerprint, is_sync });
257 self
258 }
259 }
260
261 impl SyncupView for MapView {
262 fn last_sync(&self) -> Vlsn {
263 self.last_sync
264 }
265 fn last_txn_end(&self) -> Vlsn {
266 self.last_txn_end
267 }
268 fn first_vlsn(&self) -> Vlsn {
269 self.first
270 }
271 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
272 self.entries.get(&vlsn.sequence()).copied()
273 }
274 }
275
276 /// The replica's last sync point matches the feeder directly: matchpoint =
277 /// lastSync, no rollback of divergent tail beyond it.
278 #[test]
279 fn test_matchpoint_at_last_sync() {
280 let replica = MapView::new(1, 5, 5)
281 .put(5, 0x500, 0xAA, true)
282 .put(4, 0x400, 0xBB, true);
283 let feeder = MapView::new(1, 7, 7)
284 .put(5, 0x500, 0xAA, true)
285 .put(6, 0x600, 0xCC, true);
286
287 let mp = find_matchpoint(&replica, &feeder);
288 assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 });
289 }
290
291 /// Diverged tail: the replica applied VLSN 6/7 that the feeder never had
292 /// (its 6/7 differ or are absent). The search walks back to the highest
293 /// common sync point (VLSN 4).
294 #[test]
295 fn test_diverged_tail_walks_back() {
296 // Replica's sync points: 6 (divergent), 4 (common), with 5 a non-sync.
297 let replica = MapView::new(1, 6, 6)
298 .put(6, 0x600, 0xDEAD, true) // divergent — feeder lacks/differs
299 .put(5, 0x500, 0x55, false) // not a sync point
300 .put(4, 0x400, 0x44, true); // common sync
301 let feeder = MapView::new(1, 8, 8)
302 .put(6, 0x600, 0xBEEF, true) // different record at VLSN 6
303 .put(4, 0x400, 0x44, true); // same record at VLSN 4
304
305 let mp = find_matchpoint(&replica, &feeder);
306 assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 });
307 }
308
309 /// No common matchpoint within the replica's contiguous range → network
310 /// restore.
311 #[test]
312 fn test_no_matchpoint_needs_restore() {
313 let replica = MapView::new(4, 6, 6)
314 .put(6, 0x600, 0x11, true)
315 .put(5, 0x500, 0x22, true)
316 .put(4, 0x400, 0x33, true);
317 // Feeder holds none of the replica's records (all differ).
318 let feeder = MapView::new(1, 8, 8)
319 .put(6, 0x600, 0x99, true)
320 .put(5, 0x500, 0x88, true)
321 .put(4, 0x400, 0x77, true);
322
323 assert_eq!(find_matchpoint(&replica, &feeder), Matchpoint::None);
324 }
325
326 /// verifyRollback: matchpoint at/after lastTxnEnd, no passed commits →
327 /// normal rollback to the matchpoint, resume at matchpoint+1.
328 #[test]
329 fn test_verify_normal_rollback() {
330 let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
331 let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 0);
332 assert_eq!(
333 d,
334 RollbackDecision::RollbackToMatchpoint {
335 matchpoint_vlsn: Vlsn::new(5),
336 start_vlsn: Vlsn::new(6),
337 }
338 );
339 }
340
341 /// verifyRollback: rolling back past a committed txn end → hard recovery.
342 #[test]
343 fn test_verify_hard_recovery_past_commit() {
344 let mp = Matchpoint::Found { vlsn: Vlsn::new(3), lsn: 0x300 };
345 // lastTxnEnd (5) > matchpoint (3) → would roll back past a commit.
346 let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(6), 0);
347 assert_eq!(
348 d,
349 RollbackDecision::HardRecovery {
350 matchpoint_vlsn: Vlsn::new(3),
351 last_txn_end: Vlsn::new(5),
352 }
353 );
354 }
355
356 /// verifyRollback: matchpoint == lastTxnEnd numerically but the scan
357 /// passed a commit → still hard recovery (numPassedCommits != 0).
358 #[test]
359 fn test_verify_passed_commit_forces_hard_recovery() {
360 let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
361 let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 1);
362 assert!(matches!(d, RollbackDecision::HardRecovery { .. }));
363 }
364
365 /// verifyRollback: no acknowledged txn end → normal rollback regardless.
366 #[test]
367 fn test_verify_null_txn_end_normal_rollback() {
368 let mp = Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 };
369 let d = verify_rollback(&mp, NULL_VLSN, Vlsn::new(4), 3);
370 assert_eq!(
371 d,
372 RollbackDecision::RollbackToMatchpoint {
373 matchpoint_vlsn: Vlsn::new(4),
374 start_vlsn: Vlsn::new(5),
375 }
376 );
377 }
378
379 /// verifyRollback: non-null txn end but no matchpoint → network restore.
380 #[test]
381 fn test_verify_no_matchpoint_with_txn_end_restore() {
382 let d =
383 verify_rollback(&Matchpoint::None, Vlsn::new(5), Vlsn::new(5), 0);
384 assert_eq!(d, RollbackDecision::NetworkRestore);
385 }
386}