use nodedb_wal::record::RecordType;
use nodedb_wal::{CalvinAppliedPayload, WalRecord};
use tracing::warn;
use crate::wal::manager::WalManager;
pub const NOT_YET_APPLIED_EPOCH: u64 = u64::MAX;
pub fn read_last_applied_epoch(wal: &WalManager, vshard_id: u32) -> crate::Result<u64> {
let records = wal.replay()?;
let mut last_epoch = NOT_YET_APPLIED_EPOCH;
for record in &records {
if !is_calvin_applied_record(record) {
continue;
}
match CalvinAppliedPayload::from_bytes(&record.payload) {
Ok(p) if p.vshard_id == vshard_id => {
if last_epoch == NOT_YET_APPLIED_EPOCH || p.epoch > last_epoch {
last_epoch = p.epoch;
}
}
Ok(_) => {
}
Err(e) => {
warn!(
lsn = record.header.lsn,
error = %e,
"calvin recovery: failed to decode CalvinApplied payload; skipping"
);
}
}
}
Ok(last_epoch)
}
fn is_calvin_applied_record(record: &WalRecord) -> bool {
let raw_type = record.header.record_type & !nodedb_wal::record::ENCRYPTED_FLAG;
matches!(
RecordType::from_raw(raw_type),
Some(RecordType::CalvinApplied)
)
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use crate::wal::manager::WalManager;
fn open_wal(dir: &TempDir) -> WalManager {
WalManager::open(dir.path(), false).expect("open wal")
}
#[test]
fn read_last_applied_epoch_zero_for_greenfield() {
let dir = TempDir::new().unwrap();
let wal = open_wal(&dir);
let epoch = read_last_applied_epoch(&wal, 1).unwrap();
assert_eq!(
epoch, NOT_YET_APPLIED_EPOCH,
"greenfield WAL should return the not-yet-applied sentinel"
);
}
#[test]
fn read_last_applied_epoch_returns_max_calvin_applied_for_vshard() {
let dir = TempDir::new().unwrap();
let wal = open_wal(&dir);
use crate::types::VShardId;
wal.append_calvin_applied(VShardId::new(1), 2, 0).unwrap();
wal.append_calvin_applied(VShardId::new(1), 5, 0).unwrap();
wal.append_calvin_applied(VShardId::new(1), 3, 0).unwrap();
wal.append_calvin_applied(VShardId::new(2), 99, 0).unwrap();
wal.sync().unwrap();
let epoch = read_last_applied_epoch(&wal, 1).unwrap();
assert_eq!(epoch, 5, "should return the highest epoch for vshard 1");
let epoch_other = read_last_applied_epoch(&wal, 2).unwrap();
assert_eq!(epoch_other, 99, "vshard 2 should see its own epoch");
}
}