Skip to main content

noxu_rep/stream/
syncup_reader.rs

1//! REP-1 STEP 5 (A): the backward `ReplicaSyncupReader`.
2//!
3//! Port of `com.sleepycat.je.rep.stream.ReplicaSyncupReader` (and the feeder's
4//! `FeederSyncupReader`, which is the same backward log walk on the feeder
5//! side). Both scan the log BACKWARD from the last VLSN, yielding, per VLSN:
6//! the LSN, a record fingerprint (checksum, JE `OutputWireRecord.match`), and a
7//! sync-point flag (JE `LogEntryType.isSyncPoint`). The reader also counts the
8//! commits/aborts it steps over after the candidate matchpoint
9//! (`MatchpointSearchResults.getNumPassedCommits`), which
10//! [`crate::stream::syncup::verify_rollback`] needs for its HardRecovery
11//! decision.
12//!
13//! The VLSN index alone records only VLSN→LSN; it does NOT keep the per-VLSN
14//! sync-flag, the record checksum, or the commit count. JE therefore RE-READS
15//! the log rather than trusting the index (see the class comment in
16//! `ReplicaSyncupReader.java`: "The reader must track whether it has passed a
17//! checkpoint, and therefore can not use the vlsn index to skip over
18//! entries."). This reader re-reads too, reusing the same raw `FileManager`
19//! byte reads and VLSN-tagged header parsing the feeder's
20//! `EnvironmentLogScanner` already uses.
21
22use std::collections::BTreeMap;
23use std::path::Path;
24use std::sync::Arc;
25
26use noxu_log::MAX_ITEM_SIZE;
27use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
28use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
29use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
30use noxu_log::file_manager::FileManager;
31use noxu_util::{NULL_VLSN, Vlsn};
32
33use crate::stream::syncup::{SyncupView, VlsnEntry};
34use crate::vlsn::vlsn_index::VlsnIndex;
35
36/// A scanned snapshot of one node's replicated log, indexed by VLSN.
37///
38/// Built by walking the log and recording, per VLSN, its [`VlsnEntry`]
39/// (LSN, fingerprint, sync-flag). Implements [`SyncupView`] so the pure
40/// matchpoint search (`find_matchpoint`) and `verify_rollback` truth table can
41/// run against a real environment's log.
42///
43/// Port of the data the JE `ReplicaSyncupReader` exposes (`scanBackwards`,
44/// `findPrevSyncEntry`, plus the `MatchpointSearchResults` counters). JE walks
45/// strictly backward for efficiency; this snapshot collects the same per-VLSN
46/// facts in one pass (the log is the source of truth either way) and answers
47/// backward queries from the in-memory map. The O(n) one-pass scan is marked
48/// below; a streaming backward reader is the upgrade path if syncup ever runs
49/// on logs too large to snapshot.
50pub struct SyncupLogView {
51    /// VLSN → entry, in ascending VLSN order.
52    entries: BTreeMap<i64, VlsnEntry>,
53    /// VLSNs that are transaction ends (commit/abort). Used to count
54    /// `numPassedCommits` above a candidate matchpoint. Stored separately
55    /// because [`VlsnEntry`]'s public shape (fixed by the decision core)
56    /// carries only the sync-flag, not the narrower txn-end flag.
57    txn_end_vlsns: std::collections::BTreeSet<i64>,
58    /// Highest sync-point VLSN seen (JE `VLSNRange.getLastSync`).
59    last_sync: Vlsn,
60    /// Highest commit/abort VLSN seen (JE `VLSNRange.getLastTxnEnd`).
61    last_txn_end: Vlsn,
62    /// First (lowest) VLSN available (JE `VLSNRange.getFirst`).
63    first: Vlsn,
64}
65
66impl SyncupLogView {
67    /// Build a view by scanning the log under `env_home`.
68    ///
69    /// Reads every entry once (forward over files for simplicity), recording
70    /// the per-VLSN fingerprint/sync-flag the matchpoint search needs. Returns
71    /// `None` only if a `FileManager` cannot be opened for `env_home`.
72    pub fn scan(env_home: &Path) -> Option<Self> {
73        // Read-only FileManager over the env's log, same construction the
74        // feeder's EnvironmentLogScanner uses.
75        let fm = Arc::new(
76            FileManager::new(env_home, true, 256 * 1024 * 1024, 32).ok()?,
77        );
78        Some(Self::scan_with_manager(&fm))
79    }
80
81    /// Build a view from an already-open [`FileManager`] (used by the live
82    /// syncup driver, which already holds one, and by tests).
83    pub fn scan_with_manager(fm: &FileManager) -> Self {
84        let mut entries: BTreeMap<i64, VlsnEntry> = BTreeMap::new();
85        let mut txn_end_vlsns: std::collections::BTreeSet<i64> =
86            std::collections::BTreeSet::new();
87        let mut last_sync = NULL_VLSN;
88        let mut last_txn_end = NULL_VLSN;
89
90        // ponytail: one forward O(n) pass over the log collects every
91        // per-VLSN fact (lsn, fingerprint, sync-flag). JE scans backward and
92        // stops early at the matchpoint; a streaming backward reader is the
93        // upgrade path if syncup must run on logs too large to snapshot.
94        let file_nums = fm.list_file_numbers().unwrap_or_default();
95        for file_num in file_nums {
96            let header_size = fm
97                .file_header_size_for(file_num)
98                .unwrap_or_else(|_| file_header_on_disk_size(LOG_FILE_VERSION))
99                as u64;
100            let file_len = match fm.get_file_length(file_num) {
101                Ok(len) => len,
102                Err(_) => continue,
103            };
104            let mut offset = header_size;
105            while offset < file_len {
106                match read_raw_entry(fm, file_num, offset) {
107                    None => break, // end of written data in this file
108                    Some((entry_size, vlsn_opt, type_byte, payload)) => {
109                        offset += entry_size as u64;
110                        let Some(vlsn) = vlsn_opt else { continue };
111                        let lsn = noxu_util::Lsn::new(file_num, {
112                            // offset before this entry (we just advanced)
113                            (offset - entry_size as u64) as u32
114                        })
115                        .as_u64();
116                        let is_sync =
117                            noxu_log::LogEntryType::from_type_num(type_byte)
118                                .map(|t| t.is_sync_point())
119                                .unwrap_or(false);
120                        let is_txn_end =
121                            noxu_log::LogEntryType::from_type_num(type_byte)
122                                .map(|t| {
123                                    matches!(
124                                        t,
125                                        noxu_log::LogEntryType::TxnCommit
126                                            | noxu_log::LogEntryType::TxnAbort
127                                    )
128                                })
129                                .unwrap_or(false);
130                        // Fingerprint = checksum of the record payload, the
131                        // stand-in for JE OutputWireRecord.match (record
132                        // equality at the same VLSN).
133                        let fingerprint = crc32fast::hash(&payload) as u64
134                            ^ (type_byte as u64);
135                        entries.insert(
136                            vlsn as i64,
137                            VlsnEntry { lsn, fingerprint, is_sync },
138                        );
139                        let v = Vlsn::new(vlsn as i64);
140                        if is_sync && v > last_sync {
141                            last_sync = v;
142                        }
143                        if is_txn_end {
144                            txn_end_vlsns.insert(vlsn as i64);
145                            if v > last_txn_end {
146                                last_txn_end = v;
147                            }
148                        }
149                    }
150                }
151            }
152        }
153
154        let first =
155            entries.keys().next().map(|&v| Vlsn::new(v)).unwrap_or(NULL_VLSN);
156
157        Self { entries, txn_end_vlsns, last_sync, last_txn_end, first }
158    }
159
160    /// Count the commit/abort records strictly above `matchpoint` (JE
161    /// `MatchpointSearchResults.getNumPassedCommits`). `verify_rollback` uses
162    /// this to force HardRecovery when the backward scan stepped over a txn
163    /// end even if `lastTxnEnd <= matchpoint` numerically.
164    pub fn num_passed_commits(&self, matchpoint: Vlsn) -> u64 {
165        let floor = matchpoint.sequence();
166        self.txn_end_vlsns.range((floor + 1)..).count() as u64
167    }
168
169    /// All VLSN→[`VlsnEntry`] pairs, ascending. Used by the feeder side of the
170    /// syncup protocol to answer `EntryRequest`.
171    pub fn entries(&self) -> impl Iterator<Item = (Vlsn, &VlsnEntry)> {
172        self.entries.iter().map(|(&v, e)| (Vlsn::new(v), e))
173    }
174}
175
176impl SyncupView for SyncupLogView {
177    fn last_sync(&self) -> Vlsn {
178        self.last_sync
179    }
180    fn last_txn_end(&self) -> Vlsn {
181        self.last_txn_end
182    }
183    fn first_vlsn(&self) -> Vlsn {
184        self.first
185    }
186    fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
187        self.entries.get(&vlsn.sequence()).copied()
188    }
189}
190
191// ---------------------------------------------------------------------------
192// VlsnIndexView — a SyncupView over an in-memory VlsnIndex
193// ---------------------------------------------------------------------------
194
195/// A [`SyncupView`] backed by a live [`VlsnIndex`] (VLSN → LSN) plus the
196/// index's range (`getFirst`/`getLastSync`/`getLastTxnEnd`).
197///
198/// The per-VLSN *fingerprint* is the LSN itself: two nodes hold the "same
199/// record" at a VLSN iff they assigned it the same LSN. This is the in-memory
200/// equivalent of JE `OutputWireRecord.match` for the syncup driver that works
201/// from the VLSN index without re-reading raw log bytes (used by the live
202/// `become_replica` path and the multi-node test harness, which track
203/// replication at the VLSN-index granularity). The `SyncupLogView` above is
204/// the full re-read used when raw per-record checksums are required.
205///
206/// A VLSN is treated as a *sync point* iff it is `<= lastSync` and held in the
207/// index — matching JE, where every sync point is a txn end and `lastSync`
208/// bounds the highest matchpoint candidate.
209pub struct VlsnIndexView {
210    index: Arc<VlsnIndex>,
211    first: Vlsn,
212    last_sync: Vlsn,
213    last_txn_end: Vlsn,
214}
215
216impl VlsnIndexView {
217    /// Build a view over `index`.
218    pub fn from_index(index: &Arc<VlsnIndex>) -> Self {
219        let range = index.get_range();
220        let to_vlsn =
221            |v: u64| if v == 0 { NULL_VLSN } else { Vlsn::new(v as i64) };
222        Self {
223            index: Arc::clone(index),
224            first: to_vlsn(range.get_first()),
225            last_sync: to_vlsn(range.get_last_sync()),
226            last_txn_end: to_vlsn(range.get_last_txn_end()),
227        }
228    }
229
230    fn lsn_fingerprint(&self, vlsn: i64) -> Option<(u64, u64, bool)> {
231        if vlsn <= 0 {
232            return None;
233        }
234        // Proper per-VLSN lookup (NOT the sparse snapshot): get_lsn answers
235        // for every VLSN the index holds, matching JE VLSNIndex.getLsn.
236        let (file, offset) = self.index.get_lsn(vlsn as u64)?;
237        let lsn = noxu_util::Lsn::new(file, offset).as_u64();
238        // Fingerprint == LSN: same LSN at a VLSN means the same record.
239        let is_sync = Vlsn::new(vlsn) <= self.last_sync;
240        Some((lsn, lsn, is_sync))
241    }
242}
243
244impl SyncupView for VlsnIndexView {
245    fn last_sync(&self) -> Vlsn {
246        self.last_sync
247    }
248    fn last_txn_end(&self) -> Vlsn {
249        self.last_txn_end
250    }
251    fn first_vlsn(&self) -> Vlsn {
252        self.first
253    }
254    fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
255        let (lsn, fingerprint, is_sync) =
256            self.lsn_fingerprint(vlsn.sequence())?;
257        Some(VlsnEntry { lsn, fingerprint, is_sync })
258    }
259}
260
261/// Read the raw header+payload at `(file_num, offset)`.
262///
263/// Returns `(entry_size_bytes, vlsn_opt, entry_type_byte, payload)` or `None`
264/// at end-of-data / corruption. Same VLSN-tagged header parse as
265/// `EnvironmentLogScanner::read_raw_entry` (feeder.rs); kept as a free
266/// function here so the backward view does not depend on the feeder type.
267fn read_raw_entry(
268    fm: &FileManager,
269    file_num: u32,
270    offset: u64,
271) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
272    let mut hdr = [0u8; MIN_HEADER_SIZE];
273    let n = fm.read_from_file(file_num, offset, &mut hdr).ok()?;
274    if n < MIN_HEADER_SIZE {
275        return None;
276    }
277    if hdr[4] == 0 {
278        return None; // zero-fill past last entry
279    }
280    // Skip entries whose invisible bit (flags mask 0x10) is set: a rolled-back
281    // entry made invisible by Replay.rollback (STEP 4) is not a valid
282    // matchpoint candidate (JE ReplicaSyncupReader.isTargetEntry: "Skip
283    // invisible entries"). We still advance past it by returning its size.
284    let invisible = (hdr[5] & 0x10) != 0;
285    let entry_type_byte = hdr[4];
286    let flags = hdr[5];
287    let item_size =
288        u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
289    let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
290    let header_size =
291        if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
292    if item_size > MAX_ITEM_SIZE {
293        return None;
294    }
295    let entry_size = header_size + item_size;
296    let mut full = vec![0u8; entry_size];
297    let n = fm.read_from_file(file_num, offset, &mut full).ok()?;
298    if n < entry_size {
299        return None;
300    }
301    let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
302        let raw = i64::from_le_bytes(
303            full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
304        );
305        if raw > 0 { Some(raw as u64) } else { None }
306    } else {
307        None
308    };
309    let payload = full[header_size..].to_vec();
310    // Invisible entries advance the cursor but are not yielded as VLSN
311    // entries (their VLSN is suppressed so the matchpoint search ignores
312    // them). Returning a None VLSN keeps the scan moving past them.
313    let vlsn_opt = if invisible { None } else { vlsn_opt };
314    Some((entry_size, vlsn_opt, entry_type_byte, payload))
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320    use crate::stream::syncup::{Matchpoint, find_matchpoint};
321    use std::collections::HashMap;
322
323    /// A hand-built view used to prove the reader's data drives the decision
324    /// core (`find_matchpoint`) the same way the JE backward reader does.
325    struct FakeView {
326        entries: HashMap<i64, VlsnEntry>,
327        last_sync: Vlsn,
328        last_txn_end: Vlsn,
329        first: Vlsn,
330    }
331    impl SyncupView for FakeView {
332        fn last_sync(&self) -> Vlsn {
333            self.last_sync
334        }
335        fn last_txn_end(&self) -> Vlsn {
336            self.last_txn_end
337        }
338        fn first_vlsn(&self) -> Vlsn {
339            self.first
340        }
341        fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
342            self.entries.get(&vlsn.sequence()).copied()
343        }
344    }
345
346    #[test]
347    fn test_num_passed_commits_counts_above_matchpoint() {
348        let mut entries = BTreeMap::new();
349        for v in 1..=5i64 {
350            entries.insert(
351                v,
352                VlsnEntry {
353                    lsn: v as u64,
354                    fingerprint: v as u64,
355                    is_sync: true,
356                },
357            );
358        }
359        let mut txn_end_vlsns = std::collections::BTreeSet::new();
360        // Two txn ends above matchpoint 3 (at VLSN 4 and 5).
361        txn_end_vlsns.insert(4);
362        txn_end_vlsns.insert(5);
363        let view = SyncupLogView {
364            entries,
365            txn_end_vlsns,
366            last_sync: Vlsn::new(5),
367            last_txn_end: Vlsn::new(5),
368            first: Vlsn::new(1),
369        };
370        assert_eq!(view.num_passed_commits(Vlsn::new(3)), 2);
371        assert_eq!(view.num_passed_commits(Vlsn::new(5)), 0);
372    }
373
374    /// The reader's per-VLSN data feeds find_matchpoint: a replica view whose
375    /// fingerprints match the feeder at VLSN 4 but diverge at 5/6 yields
376    /// matchpoint 4.
377    #[test]
378    fn test_view_drives_find_matchpoint() {
379        let mk = |v: i64, fp: u64, sync: bool| {
380            (
381                v,
382                VlsnEntry {
383                    lsn: (v as u64) * 0x100,
384                    fingerprint: fp,
385                    is_sync: sync,
386                },
387            )
388        };
389        let replica = FakeView {
390            entries: [
391                mk(6, 0xDEAD, true),
392                mk(5, 0x55, false),
393                mk(4, 0x44, true),
394            ]
395            .into_iter()
396            .collect(),
397            last_sync: Vlsn::new(6),
398            last_txn_end: Vlsn::new(6),
399            first: Vlsn::new(1),
400        };
401        let feeder = FakeView {
402            entries: [mk(6, 0xBEEF, true), mk(4, 0x44, true)]
403                .into_iter()
404                .collect(),
405            last_sync: Vlsn::new(8),
406            last_txn_end: Vlsn::new(8),
407            first: Vlsn::new(1),
408        };
409        assert_eq!(
410            find_matchpoint(&replica, &feeder),
411            Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 }
412        );
413    }
414}