noxu_rep/stream/syncup_reader.rs
1//! REP-1 STEP 5 (A): the backward `ReplicaSyncupReader`.
2//!
3//! Port of `com.sleepycat.je.rep.stream.ReplicaSyncupReader` (and the feeder's
4//! `FeederSyncupReader`, which is the same backward log walk on the feeder
5//! side). Both scan the log BACKWARD from the last VLSN, yielding, per VLSN:
6//! the LSN, a record fingerprint (checksum, JE `OutputWireRecord.match`), and a
7//! sync-point flag (JE `LogEntryType.isSyncPoint`). The reader also counts the
8//! commits/aborts it steps over after the candidate matchpoint
9//! (`MatchpointSearchResults.getNumPassedCommits`), which
10//! [`crate::stream::syncup::verify_rollback`] needs for its HardRecovery
11//! decision.
12//!
13//! The VLSN index alone records only VLSN→LSN; it does NOT keep the per-VLSN
14//! sync-flag, the record checksum, or the commit count. JE therefore RE-READS
15//! the log rather than trusting the index (see the class comment in
16//! `ReplicaSyncupReader.java`: "The reader must track whether it has passed a
17//! checkpoint, and therefore can not use the vlsn index to skip over
18//! entries."). This reader re-reads too, reusing the same raw `FileManager`
19//! byte reads and VLSN-tagged header parsing the feeder's
20//! `EnvironmentLogScanner` already uses.
21
22use std::collections::BTreeMap;
23use std::path::Path;
24use std::sync::Arc;
25
26use noxu_log::MAX_ITEM_SIZE;
27use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
28use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
29use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
30use noxu_log::file_manager::FileManager;
31use noxu_util::{NULL_VLSN, Vlsn};
32
33use crate::stream::syncup::{SyncupView, VlsnEntry};
34use crate::vlsn::vlsn_index::VlsnIndex;
35
36/// A scanned snapshot of one node's replicated log, indexed by VLSN.
37///
38/// Built by walking the log and recording, per VLSN, its [`VlsnEntry`]
39/// (LSN, fingerprint, sync-flag). Implements [`SyncupView`] so the pure
40/// matchpoint search (`find_matchpoint`) and `verify_rollback` truth table can
41/// run against a real environment's log.
42///
43/// Port of the data the JE `ReplicaSyncupReader` exposes (`scanBackwards`,
44/// `findPrevSyncEntry`, plus the `MatchpointSearchResults` counters). JE walks
45/// strictly backward for efficiency; this snapshot collects the same per-VLSN
46/// facts in one pass (the log is the source of truth either way) and answers
47/// backward queries from the in-memory map. The O(n) one-pass scan is marked
48/// below; a streaming backward reader is the upgrade path if syncup ever runs
49/// on logs too large to snapshot.
50pub struct SyncupLogView {
51 /// VLSN → entry, in ascending VLSN order.
52 entries: BTreeMap<i64, VlsnEntry>,
53 /// VLSNs that are transaction ends (commit/abort). Used to count
54 /// `numPassedCommits` above a candidate matchpoint. Stored separately
55 /// because [`VlsnEntry`]'s public shape (fixed by the decision core)
56 /// carries only the sync-flag, not the narrower txn-end flag.
57 txn_end_vlsns: std::collections::BTreeSet<i64>,
58 /// Highest sync-point VLSN seen (JE `VLSNRange.getLastSync`).
59 last_sync: Vlsn,
60 /// Highest commit/abort VLSN seen (JE `VLSNRange.getLastTxnEnd`).
61 last_txn_end: Vlsn,
62 /// First (lowest) VLSN available (JE `VLSNRange.getFirst`).
63 first: Vlsn,
64}
65
66impl SyncupLogView {
67 /// Build a view by scanning the log under `env_home`.
68 ///
69 /// Reads every entry once (forward over files for simplicity), recording
70 /// the per-VLSN fingerprint/sync-flag the matchpoint search needs. Returns
71 /// `None` only if a `FileManager` cannot be opened for `env_home`.
72 pub fn scan(env_home: &Path) -> Option<Self> {
73 // Read-only FileManager over the env's log, same construction the
74 // feeder's EnvironmentLogScanner uses.
75 let fm = Arc::new(
76 FileManager::new(env_home, true, 256 * 1024 * 1024, 32).ok()?,
77 );
78 Some(Self::scan_with_manager(&fm))
79 }
80
81 /// Build a view from an already-open [`FileManager`] (used by the live
82 /// syncup driver, which already holds one, and by tests).
83 pub fn scan_with_manager(fm: &FileManager) -> Self {
84 let mut entries: BTreeMap<i64, VlsnEntry> = BTreeMap::new();
85 let mut txn_end_vlsns: std::collections::BTreeSet<i64> =
86 std::collections::BTreeSet::new();
87 let mut last_sync = NULL_VLSN;
88 let mut last_txn_end = NULL_VLSN;
89
90 // ponytail: one forward O(n) pass over the log collects every
91 // per-VLSN fact (lsn, fingerprint, sync-flag). JE scans backward and
92 // stops early at the matchpoint; a streaming backward reader is the
93 // upgrade path if syncup must run on logs too large to snapshot.
94 let file_nums = fm.list_file_numbers().unwrap_or_default();
95 for file_num in file_nums {
96 let header_size = fm
97 .file_header_size_for(file_num)
98 .unwrap_or_else(|_| file_header_on_disk_size(LOG_FILE_VERSION))
99 as u64;
100 let file_len = match fm.get_file_length(file_num) {
101 Ok(len) => len,
102 Err(_) => continue,
103 };
104 let mut offset = header_size;
105 while offset < file_len {
106 match read_raw_entry(fm, file_num, offset) {
107 None => break, // end of written data in this file
108 Some((entry_size, vlsn_opt, type_byte, payload)) => {
109 offset += entry_size as u64;
110 let Some(vlsn) = vlsn_opt else { continue };
111 let lsn = noxu_util::Lsn::new(file_num, {
112 // offset before this entry (we just advanced)
113 (offset - entry_size as u64) as u32
114 })
115 .as_u64();
116 let is_sync =
117 noxu_log::LogEntryType::from_type_num(type_byte)
118 .map(|t| t.is_sync_point())
119 .unwrap_or(false);
120 let is_txn_end =
121 noxu_log::LogEntryType::from_type_num(type_byte)
122 .map(|t| {
123 matches!(
124 t,
125 noxu_log::LogEntryType::TxnCommit
126 | noxu_log::LogEntryType::TxnAbort
127 )
128 })
129 .unwrap_or(false);
130 // Fingerprint = checksum of the record payload, the
131 // stand-in for JE OutputWireRecord.match (record
132 // equality at the same VLSN).
133 let fingerprint = crc32fast::hash(&payload) as u64
134 ^ (type_byte as u64);
135 entries.insert(
136 vlsn as i64,
137 VlsnEntry { lsn, fingerprint, is_sync },
138 );
139 let v = Vlsn::new(vlsn as i64);
140 if is_sync && v > last_sync {
141 last_sync = v;
142 }
143 if is_txn_end {
144 txn_end_vlsns.insert(vlsn as i64);
145 if v > last_txn_end {
146 last_txn_end = v;
147 }
148 }
149 }
150 }
151 }
152 }
153
154 let first =
155 entries.keys().next().map(|&v| Vlsn::new(v)).unwrap_or(NULL_VLSN);
156
157 Self { entries, txn_end_vlsns, last_sync, last_txn_end, first }
158 }
159
160 /// Count the commit/abort records strictly above `matchpoint` (JE
161 /// `MatchpointSearchResults.getNumPassedCommits`). `verify_rollback` uses
162 /// this to force HardRecovery when the backward scan stepped over a txn
163 /// end even if `lastTxnEnd <= matchpoint` numerically.
164 pub fn num_passed_commits(&self, matchpoint: Vlsn) -> u64 {
165 let floor = matchpoint.sequence();
166 self.txn_end_vlsns.range((floor + 1)..).count() as u64
167 }
168
169 /// All VLSN→[`VlsnEntry`] pairs, ascending. Used by the feeder side of the
170 /// syncup protocol to answer `EntryRequest`.
171 pub fn entries(&self) -> impl Iterator<Item = (Vlsn, &VlsnEntry)> {
172 self.entries.iter().map(|(&v, e)| (Vlsn::new(v), e))
173 }
174}
175
176impl SyncupView for SyncupLogView {
177 fn last_sync(&self) -> Vlsn {
178 self.last_sync
179 }
180 fn last_txn_end(&self) -> Vlsn {
181 self.last_txn_end
182 }
183 fn first_vlsn(&self) -> Vlsn {
184 self.first
185 }
186 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
187 self.entries.get(&vlsn.sequence()).copied()
188 }
189}
190
191// ---------------------------------------------------------------------------
192// VlsnIndexView — a SyncupView over an in-memory VlsnIndex
193// ---------------------------------------------------------------------------
194
195/// A [`SyncupView`] backed by a live [`VlsnIndex`] (VLSN → LSN) plus the
196/// index's range (`getFirst`/`getLastSync`/`getLastTxnEnd`).
197///
198/// The per-VLSN *fingerprint* is the LSN itself: two nodes hold the "same
199/// record" at a VLSN iff they assigned it the same LSN. This is the in-memory
200/// equivalent of JE `OutputWireRecord.match` for the syncup driver that works
201/// from the VLSN index without re-reading raw log bytes (used by the live
202/// `become_replica` path and the multi-node test harness, which track
203/// replication at the VLSN-index granularity). The `SyncupLogView` above is
204/// the full re-read used when raw per-record checksums are required.
205///
206/// A VLSN is treated as a *sync point* iff it is `<= lastSync` and held in the
207/// index — matching JE, where every sync point is a txn end and `lastSync`
208/// bounds the highest matchpoint candidate.
209pub struct VlsnIndexView {
210 index: Arc<VlsnIndex>,
211 first: Vlsn,
212 last_sync: Vlsn,
213 last_txn_end: Vlsn,
214}
215
216impl VlsnIndexView {
217 /// Build a view over `index`.
218 pub fn from_index(index: &Arc<VlsnIndex>) -> Self {
219 let range = index.get_range();
220 let to_vlsn =
221 |v: u64| if v == 0 { NULL_VLSN } else { Vlsn::new(v as i64) };
222 Self {
223 index: Arc::clone(index),
224 first: to_vlsn(range.get_first()),
225 last_sync: to_vlsn(range.get_last_sync()),
226 last_txn_end: to_vlsn(range.get_last_txn_end()),
227 }
228 }
229
230 fn lsn_fingerprint(&self, vlsn: i64) -> Option<(u64, u64, bool)> {
231 if vlsn <= 0 {
232 return None;
233 }
234 // Proper per-VLSN lookup (NOT the sparse snapshot): get_lsn answers
235 // for every VLSN the index holds, matching JE VLSNIndex.getLsn.
236 let (file, offset) = self.index.get_lsn(vlsn as u64)?;
237 let lsn = noxu_util::Lsn::new(file, offset).as_u64();
238 // Fingerprint == LSN: same LSN at a VLSN means the same record.
239 let is_sync = Vlsn::new(vlsn) <= self.last_sync;
240 Some((lsn, lsn, is_sync))
241 }
242}
243
244impl SyncupView for VlsnIndexView {
245 fn last_sync(&self) -> Vlsn {
246 self.last_sync
247 }
248 fn last_txn_end(&self) -> Vlsn {
249 self.last_txn_end
250 }
251 fn first_vlsn(&self) -> Vlsn {
252 self.first
253 }
254 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
255 let (lsn, fingerprint, is_sync) =
256 self.lsn_fingerprint(vlsn.sequence())?;
257 Some(VlsnEntry { lsn, fingerprint, is_sync })
258 }
259}
260
261/// Read the raw header+payload at `(file_num, offset)`.
262///
263/// Returns `(entry_size_bytes, vlsn_opt, entry_type_byte, payload)` or `None`
264/// at end-of-data / corruption. Same VLSN-tagged header parse as
265/// `EnvironmentLogScanner::read_raw_entry` (feeder.rs); kept as a free
266/// function here so the backward view does not depend on the feeder type.
267fn read_raw_entry(
268 fm: &FileManager,
269 file_num: u32,
270 offset: u64,
271) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
272 let mut hdr = [0u8; MIN_HEADER_SIZE];
273 let n = fm.read_from_file(file_num, offset, &mut hdr).ok()?;
274 if n < MIN_HEADER_SIZE {
275 return None;
276 }
277 if hdr[4] == 0 {
278 return None; // zero-fill past last entry
279 }
280 // Skip entries whose invisible bit (flags mask 0x10) is set: a rolled-back
281 // entry made invisible by Replay.rollback (STEP 4) is not a valid
282 // matchpoint candidate (JE ReplicaSyncupReader.isTargetEntry: "Skip
283 // invisible entries"). We still advance past it by returning its size.
284 let invisible = (hdr[5] & 0x10) != 0;
285 let entry_type_byte = hdr[4];
286 let flags = hdr[5];
287 let item_size =
288 u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
289 let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
290 let header_size =
291 if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
292 if item_size > MAX_ITEM_SIZE {
293 return None;
294 }
295 let entry_size = header_size + item_size;
296 let mut full = vec![0u8; entry_size];
297 let n = fm.read_from_file(file_num, offset, &mut full).ok()?;
298 if n < entry_size {
299 return None;
300 }
301 let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
302 let raw = i64::from_le_bytes(
303 full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
304 );
305 if raw > 0 { Some(raw as u64) } else { None }
306 } else {
307 None
308 };
309 let payload = full[header_size..].to_vec();
310 // Invisible entries advance the cursor but are not yielded as VLSN
311 // entries (their VLSN is suppressed so the matchpoint search ignores
312 // them). Returning a None VLSN keeps the scan moving past them.
313 let vlsn_opt = if invisible { None } else { vlsn_opt };
314 Some((entry_size, vlsn_opt, entry_type_byte, payload))
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320 use crate::stream::syncup::{Matchpoint, find_matchpoint};
321 use std::collections::HashMap;
322
323 /// A hand-built view used to prove the reader's data drives the decision
324 /// core (`find_matchpoint`) the same way the JE backward reader does.
325 struct FakeView {
326 entries: HashMap<i64, VlsnEntry>,
327 last_sync: Vlsn,
328 last_txn_end: Vlsn,
329 first: Vlsn,
330 }
331 impl SyncupView for FakeView {
332 fn last_sync(&self) -> Vlsn {
333 self.last_sync
334 }
335 fn last_txn_end(&self) -> Vlsn {
336 self.last_txn_end
337 }
338 fn first_vlsn(&self) -> Vlsn {
339 self.first
340 }
341 fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
342 self.entries.get(&vlsn.sequence()).copied()
343 }
344 }
345
346 #[test]
347 fn test_num_passed_commits_counts_above_matchpoint() {
348 let mut entries = BTreeMap::new();
349 for v in 1..=5i64 {
350 entries.insert(
351 v,
352 VlsnEntry {
353 lsn: v as u64,
354 fingerprint: v as u64,
355 is_sync: true,
356 },
357 );
358 }
359 let mut txn_end_vlsns = std::collections::BTreeSet::new();
360 // Two txn ends above matchpoint 3 (at VLSN 4 and 5).
361 txn_end_vlsns.insert(4);
362 txn_end_vlsns.insert(5);
363 let view = SyncupLogView {
364 entries,
365 txn_end_vlsns,
366 last_sync: Vlsn::new(5),
367 last_txn_end: Vlsn::new(5),
368 first: Vlsn::new(1),
369 };
370 assert_eq!(view.num_passed_commits(Vlsn::new(3)), 2);
371 assert_eq!(view.num_passed_commits(Vlsn::new(5)), 0);
372 }
373
374 /// The reader's per-VLSN data feeds find_matchpoint: a replica view whose
375 /// fingerprints match the feeder at VLSN 4 but diverge at 5/6 yields
376 /// matchpoint 4.
377 #[test]
378 fn test_view_drives_find_matchpoint() {
379 let mk = |v: i64, fp: u64, sync: bool| {
380 (
381 v,
382 VlsnEntry {
383 lsn: (v as u64) * 0x100,
384 fingerprint: fp,
385 is_sync: sync,
386 },
387 )
388 };
389 let replica = FakeView {
390 entries: [
391 mk(6, 0xDEAD, true),
392 mk(5, 0x55, false),
393 mk(4, 0x44, true),
394 ]
395 .into_iter()
396 .collect(),
397 last_sync: Vlsn::new(6),
398 last_txn_end: Vlsn::new(6),
399 first: Vlsn::new(1),
400 };
401 let feeder = FakeView {
402 entries: [mk(6, 0xBEEF, true), mk(4, 0x44, true)]
403 .into_iter()
404 .collect(),
405 last_sync: Vlsn::new(8),
406 last_txn_end: Vlsn::new(8),
407 first: Vlsn::new(1),
408 };
409 assert_eq!(
410 find_matchpoint(&replica, &feeder),
411 Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 }
412 );
413 }
414}