use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use bytes::Bytes;
use noxu_recovery::{LnRecord, LogEntry, apply_redo_ln};
use noxu_util::Lsn;
use crate::environment_impl::EnvironmentImpl;
use crate::file_manager_scanner::FileManagerLogScanner;
pub struct ReplicaReplay {
env: Arc<EnvironmentImpl>,
active_txns: HashMap<u64, Vec<(LnRecord, Lsn)>>,
last_applied_vlsn: Arc<AtomicU64>,
}
impl ReplicaReplay {
pub fn new(env: Arc<EnvironmentImpl>) -> Self {
Self {
env,
active_txns: HashMap::new(),
last_applied_vlsn: Arc::new(AtomicU64::new(0)),
}
}
pub fn last_applied_vlsn_handle(&self) -> Arc<AtomicU64> {
Arc::clone(&self.last_applied_vlsn)
}
pub fn last_applied_vlsn(&self) -> u64 {
self.last_applied_vlsn.load(Ordering::Acquire)
}
pub fn apply_entry(
&mut self,
vlsn: u64,
entry_type: u8,
payload: &[u8],
lsn: Lsn,
) {
let decoded = FileManagerLogScanner::parse_payload(
entry_type,
Bytes::copy_from_slice(payload),
if vlsn > 0 { Some(vlsn) } else { None },
0,
);
match decoded {
Some(LogEntry::TxnCommit(rec)) => {
self.commit_txn(rec.txn_id);
self.advance_vlsn(vlsn);
}
Some(LogEntry::TxnAbort(rec)) => {
self.abort_txn(rec.txn_id);
self.advance_vlsn(vlsn);
}
Some(LogEntry::Ln(rec)) => {
match rec.txn_id {
Some(txn_id) => {
self.active_txns
.entry(txn_id)
.or_default()
.push((rec, lsn));
}
None => {
self.apply_ln(&rec, lsn);
self.advance_vlsn(vlsn);
}
}
}
_ => {
self.advance_vlsn(vlsn);
}
}
}
fn commit_txn(&mut self, txn_id: u64) {
if let Some(ops) = self.active_txns.remove(&txn_id) {
for (rec, lsn) in &ops {
self.apply_ln(rec, *lsn);
}
}
}
fn abort_txn(&mut self, txn_id: u64) {
self.active_txns.remove(&txn_id);
}
fn apply_ln(&mut self, rec: &LnRecord, lsn: Lsn) {
let Some(tree_arc) = self.env.replica_tree_for_db(rec.db_id) else {
log::trace!(
"replica-replay: db {} not open on replica; LN buffered in \
WAL only (vlsn={:?})",
rec.db_id,
rec.vlsn,
);
return;
};
match tree_arc.write() {
Ok(mut tree) => apply_redo_ln(&mut tree, rec, lsn),
Err(poisoned) => {
let mut tree = poisoned.into_inner();
apply_redo_ln(&mut tree, rec, lsn);
}
}
}
fn advance_vlsn(&self, vlsn: u64) {
if vlsn == 0 {
return;
}
self.last_applied_vlsn.fetch_max(vlsn, Ordering::AcqRel);
}
}
#[cfg(test)]
mod tests {
use super::*;
use noxu_log::entry::LnLogEntry;
use noxu_util::{NULL_LSN, NULL_VLSN};
fn ln_payload(
db_id: u64,
txn_id: Option<i64>,
key: &[u8],
data: Option<&[u8]>,
) -> Vec<u8> {
use bytes::BytesMut;
let entry = LnLogEntry::new(
db_id,
txn_id,
NULL_LSN,
false,
None,
None,
NULL_VLSN,
0,
false,
key.to_vec(),
data.map(|d| d.to_vec()),
0,
NULL_VLSN,
);
let mut buf = BytesMut::new();
entry.write_to_log(&mut buf);
buf.to_vec()
}
fn txn_commit_payload(txn_id: i64) -> Vec<u8> {
use bytes::BytesMut;
use noxu_log::entry::TxnEndEntry;
let e = TxnEndEntry::new_commit(txn_id, NULL_LSN, 0, 0, NULL_VLSN);
let mut buf = BytesMut::new();
e.write_to_log(&mut buf);
buf.to_vec()
}
fn txn_abort_payload(txn_id: i64) -> Vec<u8> {
use bytes::BytesMut;
use noxu_log::entry::TxnEndEntry;
let e = TxnEndEntry::new_abort(txn_id, NULL_LSN, 0, 0, NULL_VLSN);
let mut buf = BytesMut::new();
e.write_to_log(&mut buf);
buf.to_vec()
}
fn open_env_with_db()
-> (Arc<EnvironmentImpl>, u64, Arc<std::sync::RwLock<noxu_tree::Tree>>)
{
use crate::database_config::DatabaseConfig;
let dir = tempfile::TempDir::new().unwrap();
let env =
Arc::new(EnvironmentImpl::new(dir.path(), false, true).unwrap());
let mut cfg = DatabaseConfig::new();
cfg.set_allow_create(true).set_transactional(true);
let db = env.open_database("repl_db", &cfg).unwrap();
let db_id = db.read().get_id().id() as u64;
let tree = env.replica_tree_for_db(db_id).unwrap();
(env, db_id, tree)
}
fn insert_ln_txn() -> u8 {
noxu_log::LogEntryType::InsertLNTxn.type_num()
}
fn insert_ln() -> u8 {
noxu_log::LogEntryType::InsertLN.type_num()
}
fn txn_commit_type() -> u8 {
noxu_log::LogEntryType::TxnCommit.type_num()
}
fn txn_abort_type() -> u8 {
noxu_log::LogEntryType::TxnAbort.type_num()
}
#[test]
fn test_txn_ln_invisible_until_commit() {
let (env, db_id, tree) = open_env_with_db();
let mut replay = ReplicaReplay::new(Arc::clone(&env));
let p = ln_payload(db_id, Some(7), b"k1", Some(b"v1"));
replay.apply_entry(1, insert_ln_txn(), &p, Lsn::new(0, 100));
assert!(
tree.read().unwrap().search(b"k1").is_none(),
"uncommitted txn LN must NOT be visible before commit"
);
assert_eq!(replay.last_applied_vlsn(), 0, "no commit yet");
let c = txn_commit_payload(7);
replay.apply_entry(2, txn_commit_type(), &c, Lsn::new(0, 200));
assert!(
tree.read().unwrap().search(b"k1").is_some(),
"committed txn LN must be visible after commit"
);
assert_eq!(replay.last_applied_vlsn(), 2);
}
#[test]
fn test_txn_abort_discards_lns() {
let (env, db_id, tree) = open_env_with_db();
let mut replay = ReplicaReplay::new(Arc::clone(&env));
let p = ln_payload(db_id, Some(9), b"gone", Some(b"x"));
replay.apply_entry(1, insert_ln_txn(), &p, Lsn::new(0, 100));
let a = txn_abort_payload(9);
replay.apply_entry(2, txn_abort_type(), &a, Lsn::new(0, 200));
assert!(
tree.read().unwrap().search(b"gone").is_none(),
"aborted txn LN must never be visible"
);
}
#[test]
fn test_non_txn_ln_applied_immediately() {
let (env, db_id, tree) = open_env_with_db();
let mut replay = ReplicaReplay::new(Arc::clone(&env));
let p = ln_payload(db_id, None, b"now", Some(b"here"));
replay.apply_entry(1, insert_ln(), &p, Lsn::new(0, 100));
assert!(
tree.read().unwrap().search(b"now").is_some(),
"non-txn LN must be visible immediately"
);
assert_eq!(replay.last_applied_vlsn(), 1);
}
#[test]
fn test_multi_ln_txn_commit() {
let (env, db_id, tree) = open_env_with_db();
let mut replay = ReplicaReplay::new(Arc::clone(&env));
for (i, k) in [b"a", b"b", b"c"].iter().enumerate() {
let p = ln_payload(db_id, Some(3), *k, Some(b"v"));
replay.apply_entry(
(i + 1) as u64,
insert_ln_txn(),
&p,
Lsn::new(0, 100 + i as u32),
);
assert!(tree.read().unwrap().search(*k).is_none());
}
let c = txn_commit_payload(3);
replay.apply_entry(4, txn_commit_type(), &c, Lsn::new(0, 200));
for k in [b"a", b"b", b"c"].iter() {
assert!(
tree.read().unwrap().search(*k).is_some(),
"all committed LNs visible after commit"
);
}
}
}