use noxu_util::{NULL_VLSN, Vlsn};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct VlsnEntry {
pub lsn: u64,
pub fingerprint: u64,
pub is_sync: bool,
}
pub trait SyncupView {
fn last_sync(&self) -> Vlsn;
fn last_txn_end(&self) -> Vlsn;
fn first_vlsn(&self) -> Vlsn;
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Matchpoint {
Found { vlsn: Vlsn, lsn: u64 },
None,
}
pub fn find_matchpoint(
replica: &dyn SyncupView,
feeder: &dyn SyncupView,
) -> Matchpoint {
let candidate = replica.last_sync();
if candidate.is_null() {
return Matchpoint::None;
}
let first = replica.first_vlsn();
let mut candidate = candidate;
loop {
let replica_entry = match replica.entry(candidate) {
Some(e) => e,
None => {
return Matchpoint::None;
}
};
if let Some(feeder_entry) = feeder.entry(candidate)
&& feeder_entry.fingerprint == replica_entry.fingerprint
&& feeder_entry.lsn == replica_entry.lsn
{
return Matchpoint::Found {
vlsn: candidate,
lsn: replica_entry.lsn,
};
}
match prev_sync_candidate(replica, candidate, first) {
Some(prev) => candidate = prev,
None => return Matchpoint::None,
}
}
}
fn prev_sync_candidate(
replica: &dyn SyncupView,
from: Vlsn,
first: Vlsn,
) -> Option<Vlsn> {
let mut v = from.prev();
while !v.is_null() && v >= first {
if let Some(e) = replica.entry(v)
&& e.is_sync
{
return Some(v);
}
v = v.prev();
}
None
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RollbackDecision {
RollbackToMatchpoint { matchpoint_vlsn: Vlsn, start_vlsn: Vlsn },
HardRecovery { matchpoint_vlsn: Vlsn, last_txn_end: Vlsn },
NetworkRestore,
}
pub fn verify_rollback(
matchpoint: &Matchpoint,
last_txn_end: Vlsn,
last_sync: Vlsn,
num_passed_commits: u64,
) -> RollbackDecision {
let matchpoint_vlsn = match matchpoint {
Matchpoint::Found { vlsn, .. } => *vlsn,
Matchpoint::None => NULL_VLSN,
};
if last_txn_end.is_null() {
if last_sync.is_null() && !matchpoint_vlsn.is_null() {
return RollbackDecision::NetworkRestore;
}
return rollback_to(matchpoint_vlsn);
}
if matchpoint_vlsn.is_null() {
return RollbackDecision::NetworkRestore;
}
if last_txn_end <= matchpoint_vlsn && num_passed_commits == 0 {
return rollback_to(matchpoint_vlsn);
}
RollbackDecision::HardRecovery { matchpoint_vlsn, last_txn_end }
}
fn rollback_to(matchpoint_vlsn: Vlsn) -> RollbackDecision {
RollbackDecision::RollbackToMatchpoint {
matchpoint_vlsn,
start_vlsn: matchpoint_vlsn.next(),
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
struct MapView {
last_sync: Vlsn,
last_txn_end: Vlsn,
first: Vlsn,
entries: HashMap<i64, VlsnEntry>,
}
impl MapView {
fn new(first: i64, last_sync: i64, last_txn_end: i64) -> Self {
Self {
last_sync: Vlsn::new(last_sync),
last_txn_end: Vlsn::new(last_txn_end),
first: Vlsn::new(first),
entries: HashMap::new(),
}
}
fn put(
mut self,
vlsn: i64,
lsn: u64,
fingerprint: u64,
is_sync: bool,
) -> Self {
self.entries.insert(vlsn, VlsnEntry { lsn, fingerprint, is_sync });
self
}
}
impl SyncupView for MapView {
fn last_sync(&self) -> Vlsn {
self.last_sync
}
fn last_txn_end(&self) -> Vlsn {
self.last_txn_end
}
fn first_vlsn(&self) -> Vlsn {
self.first
}
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
self.entries.get(&vlsn.sequence()).copied()
}
}
#[test]
fn test_matchpoint_at_last_sync() {
let replica = MapView::new(1, 5, 5)
.put(5, 0x500, 0xAA, true)
.put(4, 0x400, 0xBB, true);
let feeder = MapView::new(1, 7, 7)
.put(5, 0x500, 0xAA, true)
.put(6, 0x600, 0xCC, true);
let mp = find_matchpoint(&replica, &feeder);
assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 });
}
#[test]
fn test_diverged_tail_walks_back() {
let replica = MapView::new(1, 6, 6)
.put(6, 0x600, 0xDEAD, true) .put(5, 0x500, 0x55, false) .put(4, 0x400, 0x44, true); let feeder = MapView::new(1, 8, 8)
.put(6, 0x600, 0xBEEF, true) .put(4, 0x400, 0x44, true);
let mp = find_matchpoint(&replica, &feeder);
assert_eq!(mp, Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 });
}
#[test]
fn test_no_matchpoint_needs_restore() {
let replica = MapView::new(4, 6, 6)
.put(6, 0x600, 0x11, true)
.put(5, 0x500, 0x22, true)
.put(4, 0x400, 0x33, true);
let feeder = MapView::new(1, 8, 8)
.put(6, 0x600, 0x99, true)
.put(5, 0x500, 0x88, true)
.put(4, 0x400, 0x77, true);
assert_eq!(find_matchpoint(&replica, &feeder), Matchpoint::None);
}
#[test]
fn test_verify_normal_rollback() {
let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 0);
assert_eq!(
d,
RollbackDecision::RollbackToMatchpoint {
matchpoint_vlsn: Vlsn::new(5),
start_vlsn: Vlsn::new(6),
}
);
}
#[test]
fn test_verify_hard_recovery_past_commit() {
let mp = Matchpoint::Found { vlsn: Vlsn::new(3), lsn: 0x300 };
let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(6), 0);
assert_eq!(
d,
RollbackDecision::HardRecovery {
matchpoint_vlsn: Vlsn::new(3),
last_txn_end: Vlsn::new(5),
}
);
}
#[test]
fn test_verify_passed_commit_forces_hard_recovery() {
let mp = Matchpoint::Found { vlsn: Vlsn::new(5), lsn: 0x500 };
let d = verify_rollback(&mp, Vlsn::new(5), Vlsn::new(5), 1);
assert!(matches!(d, RollbackDecision::HardRecovery { .. }));
}
#[test]
fn test_verify_null_txn_end_normal_rollback() {
let mp = Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 };
let d = verify_rollback(&mp, NULL_VLSN, Vlsn::new(4), 3);
assert_eq!(
d,
RollbackDecision::RollbackToMatchpoint {
matchpoint_vlsn: Vlsn::new(4),
start_vlsn: Vlsn::new(5),
}
);
}
#[test]
fn test_verify_no_matchpoint_with_txn_end_restore() {
let d =
verify_rollback(&Matchpoint::None, Vlsn::new(5), Vlsn::new(5), 0);
assert_eq!(d, RollbackDecision::NetworkRestore);
}
}