use std::collections::BTreeMap;
use std::path::Path;
use std::sync::Arc;
use noxu_log::MAX_ITEM_SIZE;
use noxu_log::entry_header::{MAX_HEADER_SIZE, MIN_HEADER_SIZE};
use noxu_log::file_header::LOG_VERSION as LOG_FILE_VERSION;
use noxu_log::file_header::on_disk_size as file_header_on_disk_size;
use noxu_log::file_manager::FileManager;
use noxu_util::{NULL_VLSN, Vlsn};
use crate::stream::syncup::{SyncupView, VlsnEntry};
use crate::vlsn::vlsn_index::VlsnIndex;
pub struct SyncupLogView {
entries: BTreeMap<i64, VlsnEntry>,
txn_end_vlsns: std::collections::BTreeSet<i64>,
last_sync: Vlsn,
last_txn_end: Vlsn,
first: Vlsn,
}
impl SyncupLogView {
pub fn scan(env_home: &Path) -> Option<Self> {
let fm = Arc::new(
FileManager::new(env_home, true, 256 * 1024 * 1024, 32).ok()?,
);
Some(Self::scan_with_manager(&fm))
}
pub fn scan_with_manager(fm: &FileManager) -> Self {
let mut entries: BTreeMap<i64, VlsnEntry> = BTreeMap::new();
let mut txn_end_vlsns: std::collections::BTreeSet<i64> =
std::collections::BTreeSet::new();
let mut last_sync = NULL_VLSN;
let mut last_txn_end = NULL_VLSN;
let file_nums = fm.list_file_numbers().unwrap_or_default();
for file_num in file_nums {
let header_size = fm
.file_header_size_for(file_num)
.unwrap_or_else(|_| file_header_on_disk_size(LOG_FILE_VERSION))
as u64;
let file_len = match fm.get_file_length(file_num) {
Ok(len) => len,
Err(_) => continue,
};
let mut offset = header_size;
while offset < file_len {
match read_raw_entry(fm, file_num, offset) {
None => break, Some((entry_size, vlsn_opt, type_byte, payload)) => {
offset += entry_size as u64;
let Some(vlsn) = vlsn_opt else { continue };
let lsn = noxu_util::Lsn::new(file_num, {
(offset - entry_size as u64) as u32
})
.as_u64();
let is_sync =
noxu_log::LogEntryType::from_type_num(type_byte)
.map(|t| t.is_sync_point())
.unwrap_or(false);
let is_txn_end =
noxu_log::LogEntryType::from_type_num(type_byte)
.map(|t| {
matches!(
t,
noxu_log::LogEntryType::TxnCommit
| noxu_log::LogEntryType::TxnAbort
)
})
.unwrap_or(false);
let fingerprint = crc32fast::hash(&payload) as u64
^ (type_byte as u64);
entries.insert(
vlsn as i64,
VlsnEntry { lsn, fingerprint, is_sync },
);
let v = Vlsn::new(vlsn as i64);
if is_sync && v > last_sync {
last_sync = v;
}
if is_txn_end {
txn_end_vlsns.insert(vlsn as i64);
if v > last_txn_end {
last_txn_end = v;
}
}
}
}
}
}
let first =
entries.keys().next().map(|&v| Vlsn::new(v)).unwrap_or(NULL_VLSN);
Self { entries, txn_end_vlsns, last_sync, last_txn_end, first }
}
pub fn num_passed_commits(&self, matchpoint: Vlsn) -> u64 {
let floor = matchpoint.sequence();
self.txn_end_vlsns.range((floor + 1)..).count() as u64
}
pub fn entries(&self) -> impl Iterator<Item = (Vlsn, &VlsnEntry)> {
self.entries.iter().map(|(&v, e)| (Vlsn::new(v), e))
}
}
impl SyncupView for SyncupLogView {
fn last_sync(&self) -> Vlsn {
self.last_sync
}
fn last_txn_end(&self) -> Vlsn {
self.last_txn_end
}
fn first_vlsn(&self) -> Vlsn {
self.first
}
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
self.entries.get(&vlsn.sequence()).copied()
}
}
pub struct VlsnIndexView {
index: Arc<VlsnIndex>,
first: Vlsn,
last_sync: Vlsn,
last_txn_end: Vlsn,
}
impl VlsnIndexView {
pub fn from_index(index: &Arc<VlsnIndex>) -> Self {
let range = index.get_range();
let to_vlsn =
|v: u64| if v == 0 { NULL_VLSN } else { Vlsn::new(v as i64) };
Self {
index: Arc::clone(index),
first: to_vlsn(range.get_first()),
last_sync: to_vlsn(range.get_last_sync()),
last_txn_end: to_vlsn(range.get_last_txn_end()),
}
}
fn lsn_fingerprint(&self, vlsn: i64) -> Option<(u64, u64, bool)> {
if vlsn <= 0 {
return None;
}
let (file, offset) = self.index.get_lsn(vlsn as u64)?;
let lsn = noxu_util::Lsn::new(file, offset).as_u64();
let is_sync = Vlsn::new(vlsn) <= self.last_sync;
Some((lsn, lsn, is_sync))
}
}
impl SyncupView for VlsnIndexView {
fn last_sync(&self) -> Vlsn {
self.last_sync
}
fn last_txn_end(&self) -> Vlsn {
self.last_txn_end
}
fn first_vlsn(&self) -> Vlsn {
self.first
}
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
let (lsn, fingerprint, is_sync) =
self.lsn_fingerprint(vlsn.sequence())?;
Some(VlsnEntry { lsn, fingerprint, is_sync })
}
}
fn read_raw_entry(
fm: &FileManager,
file_num: u32,
offset: u64,
) -> Option<(usize, Option<u64>, u8, Vec<u8>)> {
let mut hdr = [0u8; MIN_HEADER_SIZE];
let n = fm.read_from_file(file_num, offset, &mut hdr).ok()?;
if n < MIN_HEADER_SIZE {
return None;
}
if hdr[4] == 0 {
return None; }
let invisible = (hdr[5] & 0x10) != 0;
let entry_type_byte = hdr[4];
let flags = hdr[5];
let item_size =
u32::from_le_bytes([hdr[10], hdr[11], hdr[12], hdr[13]]) as usize;
let vlsn_present = (flags & 0x08) != 0 || (flags & 0x20) != 0;
let header_size =
if vlsn_present { MAX_HEADER_SIZE } else { MIN_HEADER_SIZE };
if item_size > MAX_ITEM_SIZE {
return None;
}
let entry_size = header_size + item_size;
let mut full = vec![0u8; entry_size];
let n = fm.read_from_file(file_num, offset, &mut full).ok()?;
if n < entry_size {
return None;
}
let vlsn_opt = if vlsn_present && full.len() >= MAX_HEADER_SIZE {
let raw = i64::from_le_bytes(
full[MIN_HEADER_SIZE..MAX_HEADER_SIZE].try_into().ok()?,
);
if raw > 0 { Some(raw as u64) } else { None }
} else {
None
};
let payload = full[header_size..].to_vec();
let vlsn_opt = if invisible { None } else { vlsn_opt };
Some((entry_size, vlsn_opt, entry_type_byte, payload))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::stream::syncup::{Matchpoint, find_matchpoint};
use std::collections::HashMap;
struct FakeView {
entries: HashMap<i64, VlsnEntry>,
last_sync: Vlsn,
last_txn_end: Vlsn,
first: Vlsn,
}
impl SyncupView for FakeView {
fn last_sync(&self) -> Vlsn {
self.last_sync
}
fn last_txn_end(&self) -> Vlsn {
self.last_txn_end
}
fn first_vlsn(&self) -> Vlsn {
self.first
}
fn entry(&self, vlsn: Vlsn) -> Option<VlsnEntry> {
self.entries.get(&vlsn.sequence()).copied()
}
}
#[test]
fn test_num_passed_commits_counts_above_matchpoint() {
let mut entries = BTreeMap::new();
for v in 1..=5i64 {
entries.insert(
v,
VlsnEntry {
lsn: v as u64,
fingerprint: v as u64,
is_sync: true,
},
);
}
let mut txn_end_vlsns = std::collections::BTreeSet::new();
txn_end_vlsns.insert(4);
txn_end_vlsns.insert(5);
let view = SyncupLogView {
entries,
txn_end_vlsns,
last_sync: Vlsn::new(5),
last_txn_end: Vlsn::new(5),
first: Vlsn::new(1),
};
assert_eq!(view.num_passed_commits(Vlsn::new(3)), 2);
assert_eq!(view.num_passed_commits(Vlsn::new(5)), 0);
}
#[test]
fn test_view_drives_find_matchpoint() {
let mk = |v: i64, fp: u64, sync: bool| {
(
v,
VlsnEntry {
lsn: (v as u64) * 0x100,
fingerprint: fp,
is_sync: sync,
},
)
};
let replica = FakeView {
entries: [
mk(6, 0xDEAD, true),
mk(5, 0x55, false),
mk(4, 0x44, true),
]
.into_iter()
.collect(),
last_sync: Vlsn::new(6),
last_txn_end: Vlsn::new(6),
first: Vlsn::new(1),
};
let feeder = FakeView {
entries: [mk(6, 0xBEEF, true), mk(4, 0x44, true)]
.into_iter()
.collect(),
last_sync: Vlsn::new(8),
last_txn_end: Vlsn::new(8),
first: Vlsn::new(1),
};
assert_eq!(
find_matchpoint(&replica, &feeder),
Matchpoint::Found { vlsn: Vlsn::new(4), lsn: 0x400 }
);
}
}