use crate::LnInfo;
use crate::cleaner_stat::CleanerStats;
use bytes::BytesMut;
use noxu_log::LogManager;
use noxu_txn::{LockManager, LockType, TxnError};
use noxu_util::Lsn;
use std::collections::{BTreeMap, HashMap};
use std::sync::atomic::{AtomicBool, AtomicI64, Ordering};
use std::sync::{Arc, RwLock};
fn release_cleaner_lock(
lock_manager: &LockManager,
lock_lsn: u64,
locker_id: i64,
site: &'static str,
) {
if let Err(e) = lock_manager.release(lock_lsn, locker_id) {
log::error!(
"noxu-cleaner: lock_manager.release(lsn={lock_lsn}, \
locker={locker_id}) failed at {site}: {e}; cleaner will \
continue but a small lock leak may occur",
);
}
}
fn write_migration_ln(
lm: &LogManager,
db_id: u64,
key: &[u8],
data: &[u8],
old_lsn: Lsn,
) -> Option<Lsn> {
use noxu_log::{LogEntryType, Provisional, entry::LnLogEntry};
use noxu_util::vlsn::NULL_VLSN;
let entry = LnLogEntry::new(
db_id,
None, old_lsn, false, None, None, NULL_VLSN, 0, true, key.to_vec(),
Some(data.to_vec()),
0, NULL_VLSN, );
let buf_size = entry.log_size();
let mut bm = BytesMut::with_capacity(buf_size);
entry.write_to_log(&mut bm);
lm.log(
LogEntryType::UpdateLN,
&bm,
Provisional::No,
false, false, )
.ok()
}
const PROCESS_PENDING_EVERY_N_LNS: usize = 100;
pub const PROCESS_PENDING_EVERY_N_LNS_PUB: usize = PROCESS_PENDING_EVERY_N_LNS;
#[derive(Debug)]
pub enum BinLookupResult {
NotFound,
KnownDeleted,
Found {
tree_lsn: Lsn,
},
}
#[derive(Debug, PartialEq, Eq)]
pub enum MigrationOutcome {
Migrated,
Locked,
Obsolete,
}
static CLEANER_LOCKER_NEXT: AtomicI64 = AtomicI64::new(-1);
fn next_cleaner_locker_id() -> i64 {
CLEANER_LOCKER_NEXT.fetch_sub(1, Ordering::Relaxed)
}
pub struct RealTreeLookup {
tree: Arc<RwLock<noxu_tree::Tree>>,
lock_manager: Arc<LockManager>,
log_manager: Option<Arc<LogManager>>,
}
impl RealTreeLookup {
pub fn new(
tree: Arc<RwLock<noxu_tree::Tree>>,
lock_manager: Arc<LockManager>,
) -> Self {
Self { tree, lock_manager, log_manager: None }
}
pub fn with_log_manager(mut self, lm: Arc<LogManager>) -> Self {
self.log_manager = Some(lm);
self
}
}
impl TreeLookup for RealTreeLookup {
fn lookup_parent_bin(
&self,
_db_id: i64,
key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
let tree = match self.tree.read() {
Ok(g) => g,
Err(_) => return BinLookupResult::NotFound,
};
match tree.search(key) {
None => BinLookupResult::NotFound,
Some(result) if !result.exact_parent_found => {
BinLookupResult::NotFound
}
Some(_) => {
let slot_lsn =
Self::get_slot_lsn_from_root(tree.get_root(), key);
match slot_lsn {
Some(lsn) => BinLookupResult::Found { tree_lsn: lsn },
None => BinLookupResult::NotFound,
}
}
}
}
fn migrate_ln_slot(
&self,
_db_id: i64,
key: &[u8],
log_lsn: Lsn,
tree_lsn: Lsn,
) -> MigrationOutcome {
let locker_id = next_cleaner_locker_id();
let lock_lsn = tree_lsn.as_u64();
match self.lock_manager.lock(
lock_lsn,
locker_id,
LockType::Read,
true, false, ) {
Err(TxnError::LockNotAvailable { .. }) => {
return MigrationOutcome::Locked;
}
Err(_) => {
return MigrationOutcome::Locked;
}
Ok(_) => {} }
let current_lsn = {
let tree = match self.tree.read() {
Ok(g) => g,
Err(_) => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"RealTreeLookup::migrate_ln_slot:tree_poisoned_pre_check",
);
return MigrationOutcome::Obsolete;
}
};
Self::get_slot_lsn_from_root(tree.get_root(), key)
};
let slot_matches = match current_lsn {
Some(lsn) => lsn == tree_lsn,
None => false,
};
if !slot_matches {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"RealTreeLookup::migrate_ln_slot:slot_mismatch",
);
return MigrationOutcome::Obsolete;
}
let data = {
let tree = match self.tree.read() {
Ok(g) => g,
Err(_) => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"RealTreeLookup::migrate_ln_slot:tree_poisoned_data",
);
return MigrationOutcome::Obsolete;
}
};
Self::get_slot_data_from_root(tree.get_root(), key)
.unwrap_or_default()
};
let db_id_u64 = _db_id.unsigned_abs();
let new_lsn = if let Some(lm) = &self.log_manager {
write_migration_ln(lm, db_id_u64, key, &data, log_lsn)
.unwrap_or(log_lsn)
} else {
log_lsn
};
let outcome = {
let tree = match self.tree.read() {
Ok(g) => g,
Err(_) => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"RealTreeLookup::migrate_ln_slot:tree_poisoned_insert",
);
return MigrationOutcome::Obsolete;
}
};
match tree.insert(key.to_vec(), data, new_lsn) {
Ok(_) => MigrationOutcome::Migrated,
Err(_) => MigrationOutcome::Obsolete,
}
};
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"RealTreeLookup::migrate_ln_slot:done",
);
outcome
}
fn lookup_in(
&self,
_db_id: i64,
node_id: i64,
log_lsn: Lsn,
) -> InLookupResult {
use noxu_tree::TreeNode;
let node_id_u64 = node_id as u64;
let tree_guard = match self.tree.read() {
Ok(g) => g,
Err(_) => return InLookupResult::Obsolete,
};
if let Some((parent_arc, slot_idx)) =
tree_guard.get_parent_in_for_child_in(node_id_u64)
{
let parent_guard = parent_arc.read();
let slot_lsn = match &*parent_guard {
TreeNode::Internal(n) => {
n.entries.get(slot_idx).map(|e| e.lsn)
}
_ => None,
};
drop(parent_guard);
let child_arc = {
let parent_guard = parent_arc.read();
match &*parent_guard {
TreeNode::Internal(n) => n.get_child(slot_idx),
_ => None,
}
};
let child_arc = match child_arc {
Some(a) => a,
None => return InLookupResult::Obsolete,
};
let node_lsn = {
let child_guard = child_arc.read();
match &*child_guard {
TreeNode::Bottom(b) => b.last_full_lsn,
TreeNode::Internal(_) => {
match slot_lsn {
Some(lsn) => lsn,
None => return InLookupResult::Obsolete,
}
}
}
};
if node_lsn == noxu_util::NULL_LSN {
return InLookupResult::Obsolete;
}
if node_lsn != log_lsn {
return InLookupResult::Obsolete;
}
drop(tree_guard);
{
let mut child_write = child_arc.write();
child_write.set_dirty(true);
}
return InLookupResult::Found;
}
let root_arc_opt = tree_guard.get_root();
drop(tree_guard);
if let Some(root) = root_arc_opt {
let root_node_id = {
let g = root.read();
match &*g {
TreeNode::Bottom(b) => b.node_id,
TreeNode::Internal(n) => n.node_id,
}
};
if root_node_id == node_id_u64 {
let root_lsn = {
let g = root.read();
match &*g {
TreeNode::Bottom(b) => b.last_full_lsn,
TreeNode::Internal(_) => {
return InLookupResult::Obsolete;
}
}
};
if root_lsn == noxu_util::NULL_LSN || root_lsn != log_lsn {
return InLookupResult::Obsolete;
}
{
let mut w = root.write();
w.set_dirty(true);
}
return InLookupResult::Found;
}
}
InLookupResult::Obsolete
}
}
impl RealTreeLookup {
pub(crate) fn get_slot_lsn_from_root(
root: Option<
std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::TreeNode>>,
>,
key: &[u8],
) -> Option<Lsn> {
let arc = root?;
Self::find_bin_entry_lsn(&arc, key)
}
pub(crate) fn get_slot_data_from_root(
root: Option<
std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::TreeNode>>,
>,
key: &[u8],
) -> Option<Vec<u8>> {
let arc = root?;
Self::find_bin_entry_data(&arc, key)
}
fn find_bin_entry_lsn(
node_arc: &std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::TreeNode>>,
key: &[u8],
) -> Option<Lsn> {
use noxu_tree::TreeNode;
let guard = node_arc.read();
match &*guard {
TreeNode::Bottom(bin) => {
let idx = bin
.entries
.binary_search_by(|e| e.key.as_slice().cmp(key))
.ok()?;
Some(bin.entries[idx].lsn)
}
TreeNode::Internal(n) => {
let mut idx = 0usize;
for (i, entry) in n.entries.iter().enumerate() {
if i == 0 {
idx = 0;
} else if entry.key.as_slice() <= key {
idx = i;
} else {
break;
}
}
let child = n.get_child(idx)?;
drop(guard);
Self::find_bin_entry_lsn(&child, key)
}
}
}
fn find_bin_entry_data(
node_arc: &std::sync::Arc<noxu_tree::NodeRwLock<noxu_tree::TreeNode>>,
key: &[u8],
) -> Option<Vec<u8>> {
use noxu_tree::TreeNode;
let guard = node_arc.read();
match &*guard {
TreeNode::Bottom(bin) => {
let idx = bin
.entries
.binary_search_by(|e| e.key.as_slice().cmp(key))
.ok()?;
bin.entries[idx].data.clone()
}
TreeNode::Internal(n) => {
let mut idx = 0usize;
for (i, entry) in n.entries.iter().enumerate() {
if i == 0 {
idx = 0;
} else if entry.key.as_slice() <= key {
idx = i;
} else {
break;
}
}
let child = n.get_child(idx)?;
drop(guard);
Self::find_bin_entry_data(&child, key)
}
}
}
}
pub struct SharedTreeLookup {
tree: Arc<RwLock<noxu_tree::Tree>>,
log_manager: Arc<LogManager>,
lock_manager: Arc<LockManager>,
extra_trees: HashMap<i64, Arc<RwLock<noxu_tree::Tree>>>,
}
impl SharedTreeLookup {
pub fn new(
tree: Arc<RwLock<noxu_tree::Tree>>,
log_manager: Arc<LogManager>,
) -> Self {
let lock_manager = Arc::new(LockManager::new());
Self { tree, log_manager, lock_manager, extra_trees: HashMap::new() }
}
pub fn with_lock_manager(
tree: Arc<RwLock<noxu_tree::Tree>>,
log_manager: Arc<LogManager>,
lock_manager: Arc<LockManager>,
) -> Self {
Self { tree, log_manager, lock_manager, extra_trees: HashMap::new() }
}
pub fn with_extra_trees(
mut self,
extra_trees: HashMap<i64, Arc<RwLock<noxu_tree::Tree>>>,
) -> Self {
self.extra_trees = extra_trees;
self
}
fn resolve_tree(&self, db_id: i64) -> &Arc<RwLock<noxu_tree::Tree>> {
self.extra_trees.get(&db_id).unwrap_or(&self.tree)
}
}
impl TreeLookup for SharedTreeLookup {
fn lookup_parent_bin(
&self,
db_id: i64,
key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
let tree_arc = self.resolve_tree(db_id);
let tree = match tree_arc.read() {
Ok(g) => g,
Err(_) => return BinLookupResult::NotFound,
};
match tree.search(key) {
None => BinLookupResult::NotFound,
Some(result) if !result.exact_parent_found => {
BinLookupResult::NotFound
}
Some(_) => {
let slot_lsn = RealTreeLookup::get_slot_lsn_from_root(
tree.get_root(),
key,
);
match slot_lsn {
Some(lsn) => BinLookupResult::Found { tree_lsn: lsn },
None => BinLookupResult::NotFound,
}
}
}
}
fn migrate_ln_slot(
&self,
db_id: i64,
key: &[u8],
log_lsn: Lsn,
tree_lsn: Lsn,
) -> MigrationOutcome {
let tree_arc = Arc::clone(self.resolve_tree(db_id));
let locker_id = next_cleaner_locker_id();
let lock_lsn = tree_lsn.as_u64();
match self.lock_manager.lock(
lock_lsn,
locker_id,
LockType::Read,
true, false, ) {
Err(TxnError::LockNotAvailable { .. }) => {
return MigrationOutcome::Locked;
}
Err(_) => return MigrationOutcome::Locked,
Ok(_) => {}
}
let current_lsn = {
let tree = match tree_arc.read() {
Ok(g) => g,
Err(_) => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"SharedTreeLookup::migrate_ln_slot:tree_poisoned_pre_check",
);
return MigrationOutcome::Obsolete;
}
};
RealTreeLookup::get_slot_lsn_from_root(tree.get_root(), key)
};
let slot_matches = match current_lsn {
Some(lsn) => lsn == tree_lsn,
None => false,
};
if !slot_matches {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"SharedTreeLookup::migrate_ln_slot:slot_mismatch",
);
return MigrationOutcome::Obsolete;
}
let data = {
let tree = match tree_arc.read() {
Ok(g) => g,
Err(_) => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"SharedTreeLookup::migrate_ln_slot:tree_poisoned_data",
);
return MigrationOutcome::Obsolete;
}
};
RealTreeLookup::get_slot_data_from_root(tree.get_root(), key)
.unwrap_or_default()
};
let db_id_u64 = db_id.unsigned_abs();
let new_lsn = match write_migration_ln(
&self.log_manager,
db_id_u64,
key,
&data,
log_lsn,
) {
Some(lsn) => lsn,
None => {
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"SharedTreeLookup::migrate_ln_slot:wal_write_failed",
);
return MigrationOutcome::Locked;
}
};
let result =
tree_arc.read().map(|t| t.insert(key.to_vec(), data, new_lsn));
release_cleaner_lock(
&self.lock_manager,
lock_lsn,
locker_id,
"SharedTreeLookup::migrate_ln_slot:done",
);
match result {
Ok(Ok(_)) => MigrationOutcome::Migrated,
_ => MigrationOutcome::Obsolete,
}
}
fn lookup_in(
&self,
db_id: i64,
node_id: i64,
log_lsn: Lsn,
) -> InLookupResult {
let delegate = RealTreeLookup::new(
Arc::clone(&self.tree),
Arc::clone(&self.lock_manager),
);
delegate.lookup_in(db_id, node_id, log_lsn)
}
}
pub trait TreeLookup {
fn lookup_parent_bin(
&self,
db_id: i64,
key: &[u8],
log_lsn: Lsn,
) -> BinLookupResult;
fn migrate_ln_slot(
&self,
db_id: i64,
key: &[u8],
log_lsn: Lsn,
tree_lsn: Lsn,
) -> MigrationOutcome;
fn lookup_in(
&self,
db_id: i64,
node_id: i64,
log_lsn: Lsn,
) -> InLookupResult;
}
#[derive(Debug, PartialEq, Eq)]
pub enum InLookupResult {
Found,
Obsolete,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum LogEntryType {
Ln {
db_id: i64,
key: Vec<u8>,
deleted: bool,
expiration_time: u64,
entry_size: i32,
},
In {
db_id: i64,
node_id: i64,
},
BinDelta {
db_id: i64,
node_id: i64,
},
Other,
}
#[derive(Debug, Clone)]
pub struct LogEntry {
pub lsn: Lsn,
pub entry_type: LogEntryType,
}
pub struct LookAheadCache {
map: BTreeMap<u32, LnInfo>,
used_mem: usize,
max_mem: usize,
}
impl LookAheadCache {
pub fn new(max_mem: usize) -> Self {
const TREEMAP_OVERHEAD: usize = 64;
Self { map: BTreeMap::new(), used_mem: TREEMAP_OVERHEAD, max_mem }
}
pub fn is_empty(&self) -> bool {
self.map.is_empty()
}
pub fn is_full(&self) -> bool {
self.used_mem >= self.max_mem
}
pub fn add(&mut self, lsn_offset: u32, info: LnInfo) {
const TREEMAP_ENTRY_OVERHEAD: usize = 48;
self.used_mem += info.memory_size() + TREEMAP_ENTRY_OVERHEAD;
self.map.insert(lsn_offset, info);
}
pub fn next_offset(&self) -> Option<u32> {
self.map.keys().next().copied()
}
pub fn remove(&mut self, offset: u32) -> Option<LnInfo> {
if let Some(info) = self.map.remove(&offset) {
const TREEMAP_ENTRY_OVERHEAD: usize = 48;
self.used_mem = self
.used_mem
.saturating_sub(info.memory_size() + TREEMAP_ENTRY_OVERHEAD);
Some(info)
} else {
None
}
}
pub fn len(&self) -> usize {
self.map.len()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MigrateLnResult {
Dead,
Migrated,
Locked,
}
pub struct FileProcessor {
stats: Arc<CleanerStats>,
shutdown: Arc<AtomicBool>,
process_pending_interval: usize,
process_pending_fn: Option<Arc<dyn Fn() + Send + Sync>>,
}
#[derive(Debug, Default, Clone, PartialEq)]
pub struct FileProcessResult {
pub entries_read: u64,
pub lns_cleaned: u64,
pub lns_dead: u64,
pub lns_migrated: u64,
pub lns_obsolete: u64,
pub lns_locked: u64,
pub locked_lns: Vec<(noxu_util::Lsn, crate::LnInfo)>,
pub ins_cleaned: u64,
pub ins_dead: u64,
pub ins_migrated: u64,
pub ins_obsolete: u64,
pub bin_deltas_cleaned: u64,
pub bin_deltas_dead: u64,
pub bin_deltas_migrated: u64,
pub bin_deltas_obsolete: u64,
pub completed: bool,
}
impl FileProcessor {
pub fn new(stats: Arc<CleanerStats>, shutdown: Arc<AtomicBool>) -> Self {
Self {
stats,
shutdown,
process_pending_interval: PROCESS_PENDING_EVERY_N_LNS,
process_pending_fn: None,
}
}
pub fn set_process_pending_interval(&mut self, interval: usize) {
self.process_pending_interval = interval;
}
pub fn with_process_pending_fn(
mut self,
f: Arc<dyn Fn() + Send + Sync>,
) -> Self {
self.process_pending_fn = Some(f);
self
}
pub fn process_file<T: TreeLookup>(
&self,
file_number: u32,
_file_summary: &crate::FileSummary,
entries: &[LogEntry],
tree: &T,
) -> Result<FileProcessResult, String> {
if self.shutdown.load(Ordering::Relaxed) {
return Ok(FileProcessResult {
completed: false,
..Default::default()
});
}
let mut result = FileProcessResult::new();
let mut look_ahead_cache = LookAheadCache::new(4 * 1024 * 1024);
let mut n_processed_lns: usize = 0;
for entry in entries {
result.entries_read += 1;
if self.shutdown.load(Ordering::Relaxed) {
result.completed = false;
return Ok(result);
}
let lsn = entry.lsn;
let file_offset = lsn.file_offset();
match &entry.entry_type {
LogEntryType::Ln {
db_id,
key,
deleted,
expiration_time,
entry_size,
} => {
if *deleted {
result.lns_obsolete += 1;
self.stats.lns_obsolete.fetch_add(1, Ordering::Relaxed);
continue;
}
let info = crate::LnInfo::new(
lsn,
*db_id,
key.clone(),
*entry_size,
*deleted,
*expiration_time,
);
look_ahead_cache.add(file_offset, info);
if look_ahead_cache.is_full() {
self.process_ln(
file_number,
&mut look_ahead_cache,
tree,
&mut result,
);
}
n_processed_lns += 1;
if n_processed_lns
.is_multiple_of(self.process_pending_interval)
{
self.process_pending_fn.as_deref().inspect(|cb| cb());
}
}
LogEntryType::In { db_id, node_id } => {
self.process_in(*db_id, *node_id, lsn, tree, &mut result);
}
LogEntryType::BinDelta { db_id, node_id } => {
self.process_bin_delta(
*db_id,
*node_id,
lsn,
tree,
&mut result,
);
}
LogEntryType::Other => {
}
}
}
while !look_ahead_cache.is_empty() {
if self.shutdown.load(Ordering::Relaxed) {
result.completed = false;
return Ok(result);
}
self.process_ln(
file_number,
&mut look_ahead_cache,
tree,
&mut result,
);
}
result.completed = true;
Ok(result)
}
pub fn process_file_no_entries(
&self,
file_number: u32,
file_summary: &crate::FileSummary,
) -> Result<FileProcessResult, String> {
struct NoopTree;
impl TreeLookup for NoopTree {
fn lookup_parent_bin(
&self,
_: i64,
_: &[u8],
_: Lsn,
) -> BinLookupResult {
BinLookupResult::NotFound
}
fn migrate_ln_slot(
&self,
_: i64,
_: &[u8],
_: Lsn,
_: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(&self, _: i64, _: i64, _: Lsn) -> InLookupResult {
InLookupResult::Obsolete
}
}
self.process_file(file_number, file_summary, &[], &NoopTree)
}
pub fn process_ln<T: TreeLookup>(
&self,
file_number: u32,
cache: &mut LookAheadCache,
tree: &T,
result: &mut FileProcessResult,
) {
let offset = match cache.next_offset() {
Some(o) => o,
None => return,
};
let info = match cache.remove(offset) {
Some(i) => i,
None => return,
};
let log_lsn = Lsn::new(file_number, offset);
result.lns_cleaned += 1;
let bin_result =
tree.lookup_parent_bin(info.db_id, info.key(), log_lsn);
match bin_result {
BinLookupResult::NotFound => {
result.lns_dead += 1;
self.stats.lns_dead.fetch_add(1, Ordering::Relaxed);
}
BinLookupResult::KnownDeleted => {
result.lns_dead += 1;
self.stats.lns_dead.fetch_add(1, Ordering::Relaxed);
}
BinLookupResult::Found { tree_lsn } => {
let outcome =
self.process_found_ln(&info, log_lsn, tree_lsn, tree);
match outcome {
MigrateLnResult::Dead => {
result.lns_dead += 1;
self.stats.lns_dead.fetch_add(1, Ordering::Relaxed);
}
MigrateLnResult::Migrated => {
result.lns_migrated += 1;
self.stats.lns_migrated.fetch_add(1, Ordering::Relaxed);
}
MigrateLnResult::Locked => {
result.lns_locked += 1;
self.stats.lns_locked.fetch_add(1, Ordering::Relaxed);
result.locked_lns.push((log_lsn, info));
}
}
}
}
}
pub fn process_found_ln<T: TreeLookup>(
&self,
info: &LnInfo,
log_lsn: Lsn,
tree_lsn: Lsn,
tree: &T,
) -> MigrateLnResult {
if tree_lsn == noxu_util::NULL_LSN {
return MigrateLnResult::Dead;
}
let outcome =
tree.migrate_ln_slot(info.db_id, info.key(), log_lsn, tree_lsn);
match outcome {
MigrationOutcome::Migrated => MigrateLnResult::Migrated,
MigrationOutcome::Locked => MigrateLnResult::Locked,
MigrationOutcome::Obsolete => MigrateLnResult::Dead,
}
}
pub fn process_in<T: TreeLookup>(
&self,
db_id: i64,
node_id: i64,
log_lsn: Lsn,
tree: &T,
result: &mut FileProcessResult,
) {
result.ins_cleaned += 1;
self.stats.ins_cleaned.fetch_add(1, Ordering::Relaxed);
match tree.lookup_in(db_id, node_id, log_lsn) {
InLookupResult::Found => {
result.ins_migrated += 1;
self.stats.ins_migrated.fetch_add(1, Ordering::Relaxed);
}
InLookupResult::Obsolete => {
result.ins_dead += 1;
self.stats.ins_dead.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn process_bin_delta<T: TreeLookup>(
&self,
db_id: i64,
node_id: i64,
log_lsn: Lsn,
tree: &T,
result: &mut FileProcessResult,
) {
result.bin_deltas_cleaned += 1;
self.stats.bin_deltas_cleaned.fetch_add(1, Ordering::Relaxed);
self.process_in(db_id, node_id, log_lsn, tree, result);
if result.ins_migrated > 0 {
result.ins_migrated -= 1;
result.bin_deltas_migrated += 1;
} else if result.ins_dead > 0 {
result.ins_dead -= 1;
result.bin_deltas_dead += 1;
}
}
pub fn is_shutdown(&self) -> bool {
self.shutdown.load(Ordering::Relaxed)
}
}
impl FileProcessResult {
pub fn new() -> Self {
Self::default()
}
pub fn merge(&mut self, other: &FileProcessResult) {
self.entries_read += other.entries_read;
self.lns_cleaned += other.lns_cleaned;
self.lns_dead += other.lns_dead;
self.lns_migrated += other.lns_migrated;
self.lns_obsolete += other.lns_obsolete;
self.lns_locked += other.lns_locked;
self.locked_lns.extend_from_slice(&other.locked_lns);
self.ins_cleaned += other.ins_cleaned;
self.ins_dead += other.ins_dead;
self.ins_migrated += other.ins_migrated;
self.ins_obsolete += other.ins_obsolete;
self.bin_deltas_cleaned += other.bin_deltas_cleaned;
self.bin_deltas_dead += other.bin_deltas_dead;
self.bin_deltas_migrated += other.bin_deltas_migrated;
self.bin_deltas_obsolete += other.bin_deltas_obsolete;
self.completed = self.completed && other.completed;
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_processor() -> FileProcessor {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
FileProcessor::new(stats, shutdown)
}
fn make_ln_info(file_num: u32, offset: u32, db_id: i64) -> LnInfo {
let lsn = Lsn::new(file_num, offset);
LnInfo::new(lsn, db_id, vec![0x01, 0x02, 0x03], 128, false, 0)
}
struct DeletedTree;
impl TreeLookup for DeletedTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::NotFound
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Obsolete
}
}
struct KnownDeletedTree;
impl TreeLookup for KnownDeletedTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::KnownDeleted
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Obsolete
}
}
struct MigratingTree;
impl TreeLookup for MigratingTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::Found { tree_lsn: log_lsn }
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Migrated
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Found
}
}
struct ObsoleteTree {
pub current_lsn: Lsn,
}
impl TreeLookup for ObsoleteTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::Found { tree_lsn: self.current_lsn }
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Obsolete
}
}
struct LockedTree;
impl TreeLookup for LockedTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::Found { tree_lsn: log_lsn }
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Locked
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Obsolete
}
}
struct NullLsnTree;
impl TreeLookup for NullLsnTree {
fn lookup_parent_bin(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
) -> BinLookupResult {
BinLookupResult::Found { tree_lsn: noxu_util::NULL_LSN }
}
fn migrate_ln_slot(
&self,
_db_id: i64,
_key: &[u8],
_log_lsn: Lsn,
_tree_lsn: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(
&self,
_db_id: i64,
_node_id: i64,
_log_lsn: Lsn,
) -> InLookupResult {
InLookupResult::Obsolete
}
}
struct ObsoleteInTree;
impl TreeLookup for ObsoleteInTree {
fn lookup_parent_bin(
&self,
_: i64,
_: &[u8],
_: Lsn,
) -> BinLookupResult {
BinLookupResult::NotFound
}
fn migrate_ln_slot(
&self,
_: i64,
_: &[u8],
_: Lsn,
_: Lsn,
) -> MigrationOutcome {
MigrationOutcome::Obsolete
}
fn lookup_in(&self, _: i64, _: i64, _: Lsn) -> InLookupResult {
InLookupResult::Obsolete
}
}
#[test]
fn test_new_processor() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let processor = FileProcessor::new(stats, shutdown);
assert!(!processor.is_shutdown());
assert_eq!(
processor.process_pending_interval,
PROCESS_PENDING_EVERY_N_LNS
);
}
#[test]
fn test_set_pending_interval() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let mut processor = FileProcessor::new(stats, shutdown);
processor.set_process_pending_interval(200);
assert_eq!(processor.process_pending_interval, 200);
}
#[test]
fn test_process_file_empty_entries() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let processor = FileProcessor::new(stats, shutdown);
let summary = crate::FileSummary::new();
let result = processor.process_file_no_entries(1, &summary).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 0);
assert_eq!(result.lns_cleaned, 0);
}
#[test]
fn test_process_file_with_shutdown() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(true));
let processor = FileProcessor::new(stats, shutdown);
let summary = crate::FileSummary::new();
let result = processor.process_file_no_entries(1, &summary).unwrap();
assert!(!result.completed);
}
#[test]
fn test_shutdown_check() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let processor = FileProcessor::new(stats, shutdown.clone());
assert!(!processor.is_shutdown());
shutdown.store(true, Ordering::Relaxed);
assert!(processor.is_shutdown());
}
#[test]
fn test_result_default() {
let result = FileProcessResult::default();
assert_eq!(result.entries_read, 0);
assert_eq!(result.lns_cleaned, 0);
assert!(!result.completed);
}
#[test]
fn test_result_new() {
let result = FileProcessResult::new();
assert_eq!(result.entries_read, 0);
assert!(!result.completed);
}
#[test]
fn test_result_merge() {
let mut result1 = FileProcessResult {
entries_read: 100,
lns_cleaned: 50,
lns_migrated: 30,
ins_cleaned: 10,
completed: true,
..Default::default()
};
let result2 = FileProcessResult {
entries_read: 200,
lns_cleaned: 75,
lns_migrated: 40,
ins_cleaned: 15,
completed: true,
..Default::default()
};
result1.merge(&result2);
assert_eq!(result1.entries_read, 300);
assert_eq!(result1.lns_cleaned, 125);
assert_eq!(result1.lns_migrated, 70);
assert_eq!(result1.ins_cleaned, 25);
assert!(result1.completed);
}
#[test]
fn test_result_merge_with_incomplete() {
let mut result1 = FileProcessResult {
entries_read: 100,
completed: true,
..Default::default()
};
let result2 = FileProcessResult {
entries_read: 50,
completed: false,
..Default::default()
};
result1.merge(&result2);
assert_eq!(result1.entries_read, 150);
assert!(!result1.completed); }
#[test]
fn test_result_all_counters() {
let result = FileProcessResult {
entries_read: 1,
lns_cleaned: 2,
lns_dead: 3,
lns_migrated: 4,
lns_obsolete: 5,
lns_locked: 6,
ins_cleaned: 7,
ins_dead: 8,
ins_migrated: 9,
ins_obsolete: 10,
bin_deltas_cleaned: 11,
bin_deltas_dead: 12,
bin_deltas_migrated: 13,
bin_deltas_obsolete: 14,
completed: true,
locked_lns: vec![],
};
assert_eq!(result.lns_cleaned, 2);
assert_eq!(result.lns_dead, 3);
assert_eq!(result.lns_migrated, 4);
assert_eq!(result.lns_obsolete, 5);
assert_eq!(result.lns_locked, 6);
assert_eq!(result.ins_cleaned, 7);
assert_eq!(result.ins_dead, 8);
assert_eq!(result.ins_migrated, 9);
assert_eq!(result.ins_obsolete, 10);
assert_eq!(result.bin_deltas_cleaned, 11);
assert_eq!(result.bin_deltas_dead, 12);
assert_eq!(result.bin_deltas_migrated, 13);
assert_eq!(result.bin_deltas_obsolete, 14);
assert!(result.completed);
}
#[test]
fn test_result_clone() {
let result = FileProcessResult {
entries_read: 100,
lns_cleaned: 50,
completed: true,
..Default::default()
};
let cloned = result.clone();
assert_eq!(cloned.entries_read, result.entries_read);
assert_eq!(cloned.lns_cleaned, result.lns_cleaned);
assert_eq!(cloned.completed, result.completed);
}
#[test]
fn test_result_equality() {
let result1 = FileProcessResult {
entries_read: 100,
lns_cleaned: 50,
completed: true,
..Default::default()
};
let result2 = FileProcessResult {
entries_read: 100,
lns_cleaned: 50,
completed: true,
..Default::default()
};
let result3 = FileProcessResult {
entries_read: 100,
lns_cleaned: 51, completed: true,
..Default::default()
};
assert_eq!(result1, result2);
assert_ne!(result1, result3);
}
#[test]
fn test_look_ahead_cache_new() {
let cache = LookAheadCache::new(4096);
assert!(cache.is_empty());
assert_eq!(cache.len(), 0);
}
#[test]
fn test_look_ahead_cache_add_and_remove() {
let mut cache = LookAheadCache::new(4096);
let info = make_ln_info(1, 1000, 42);
cache.add(1000, info);
assert!(!cache.is_empty());
assert_eq!(cache.len(), 1);
let removed = cache.remove(1000);
assert!(removed.is_some());
assert!(cache.is_empty());
}
#[test]
fn test_look_ahead_cache_next_offset_is_smallest() {
let mut cache = LookAheadCache::new(65536);
cache.add(3000, make_ln_info(1, 3000, 1));
cache.add(1000, make_ln_info(1, 1000, 1));
cache.add(2000, make_ln_info(1, 2000, 1));
assert_eq!(cache.next_offset(), Some(1000));
}
#[test]
fn test_look_ahead_cache_is_full() {
let mut cache = LookAheadCache::new(65);
assert!(!cache.is_full());
cache.add(1000, make_ln_info(1, 1000, 42));
assert!(cache.is_full()); }
#[test]
fn test_look_ahead_cache_remove_absent_key() {
let mut cache = LookAheadCache::new(4096);
let result = cache.remove(9999);
assert!(result.is_none());
}
#[test]
fn test_look_ahead_cache_next_offset_empty() {
let cache = LookAheadCache::new(4096);
assert_eq!(cache.next_offset(), None);
}
#[test]
fn test_look_ahead_cache_memory_accounting() {
let mut cache = LookAheadCache::new(65536);
let info = make_ln_info(1, 100, 1);
let mem_before = cache.used_mem;
cache.add(100, info);
let mem_after_add = cache.used_mem;
assert!(mem_after_add > mem_before);
cache.remove(100);
assert_eq!(cache.used_mem, mem_before);
}
#[test]
fn test_process_found_ln_migrates_when_lsns_match() {
let proc = make_processor();
let file_num = 1u32;
let offset = 1000u32;
let log_lsn = Lsn::new(file_num, offset);
let info = make_ln_info(file_num, offset, 42);
let result =
proc.process_found_ln(&info, log_lsn, log_lsn, &MigratingTree);
assert_eq!(result, MigrateLnResult::Migrated);
}
#[test]
fn test_process_found_ln_dead_when_lsns_differ() {
let proc = make_processor();
let file_num = 1u32;
let log_lsn = Lsn::new(file_num, 1000);
let tree_lsn = Lsn::new(file_num, 2000); let info = make_ln_info(file_num, 1000, 42);
let obsolete_tree = ObsoleteTree { current_lsn: tree_lsn };
let result =
proc.process_found_ln(&info, log_lsn, tree_lsn, &obsolete_tree);
assert_eq!(result, MigrateLnResult::Dead);
}
#[test]
fn test_process_found_ln_dead_when_tree_lsn_is_null() {
let proc = make_processor();
let file_num = 1u32;
let log_lsn = Lsn::new(file_num, 1000);
let info = make_ln_info(file_num, 1000, 42);
let result = proc.process_found_ln(
&info,
log_lsn,
noxu_util::NULL_LSN,
&NullLsnTree,
);
assert_eq!(result, MigrateLnResult::Dead);
}
#[test]
fn test_process_found_ln_locked() {
let proc = make_processor();
let file_num = 1u32;
let log_lsn = Lsn::new(file_num, 1000);
let info = make_ln_info(file_num, 1000, 42);
let result =
proc.process_found_ln(&info, log_lsn, log_lsn, &LockedTree);
assert_eq!(result, MigrateLnResult::Locked);
}
#[test]
fn test_process_ln_empty_cache() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
proc.process_ln(1, &mut cache, &DeletedTree, &mut result);
assert_eq!(result.lns_cleaned, 0);
assert_eq!(result.lns_dead, 0);
assert_eq!(result.lns_migrated, 0);
}
#[test]
fn test_process_ln_not_found_in_tree() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(1000, make_ln_info(1, 1000, 42));
proc.process_ln(1, &mut cache, &DeletedTree, &mut result);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_dead, 1);
assert_eq!(result.lns_migrated, 0);
assert!(cache.is_empty());
}
#[test]
fn test_process_ln_known_deleted() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(500, make_ln_info(1, 500, 7));
proc.process_ln(1, &mut cache, &KnownDeletedTree, &mut result);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_dead, 1);
assert!(cache.is_empty());
}
#[test]
fn test_process_ln_migrated() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(2000, make_ln_info(2, 2000, 1));
proc.process_ln(2, &mut cache, &MigratingTree, &mut result);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_migrated, 1);
assert_eq!(result.lns_dead, 0);
assert!(cache.is_empty());
}
#[test]
fn test_process_ln_locked() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(3000, make_ln_info(1, 3000, 5));
proc.process_ln(1, &mut cache, &LockedTree, &mut result);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_locked, 1);
assert_eq!(result.lns_migrated, 0);
}
#[test]
fn test_process_ln_dequeues_lowest_offset_first() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(300, make_ln_info(1, 300, 1));
cache.add(100, make_ln_info(1, 100, 1));
cache.add(200, make_ln_info(1, 200, 1));
proc.process_ln(1, &mut cache, &MigratingTree, &mut result);
assert_eq!(cache.len(), 2);
assert!(cache.next_offset() == Some(200));
}
#[test]
fn test_process_ln_drain_cache() {
let proc = make_processor();
let file_num = 4u32;
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
for i in 0..5u32 {
cache.add(i * 1000, make_ln_info(file_num, i * 1000, 1));
}
while !cache.is_empty() {
proc.process_ln(file_num, &mut cache, &MigratingTree, &mut result);
}
assert_eq!(result.lns_cleaned, 5);
assert_eq!(result.lns_migrated, 5);
assert_eq!(result.lns_dead, 0);
}
#[test]
fn test_process_ln_updates_stats_migrated() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats.clone(), shutdown);
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(1000, make_ln_info(1, 1000, 1));
proc.process_ln(1, &mut cache, &MigratingTree, &mut result);
assert_eq!(stats.lns_migrated.load(Ordering::Relaxed), 1);
}
#[test]
fn test_process_ln_updates_stats_dead() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats.clone(), shutdown);
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(1000, make_ln_info(1, 1000, 1));
proc.process_ln(1, &mut cache, &DeletedTree, &mut result);
assert_eq!(stats.lns_dead.load(Ordering::Relaxed), 1);
}
#[test]
fn test_process_ln_updates_stats_locked() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats.clone(), shutdown);
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(1000, make_ln_info(1, 1000, 1));
proc.process_ln(1, &mut cache, &LockedTree, &mut result);
assert_eq!(stats.lns_locked.load(Ordering::Relaxed), 1);
}
#[test]
fn test_bin_lookup_result_not_found() {
let proc = make_processor();
let mut cache = LookAheadCache::new(65536);
let mut result = FileProcessResult::new();
cache.add(42, make_ln_info(1, 42, 1));
proc.process_ln(1, &mut cache, &DeletedTree, &mut result);
assert_eq!(result.lns_dead, 1);
}
#[test]
fn test_null_lsn_in_tree_is_dead() {
let proc = make_processor();
let file_num = 1u32;
let log_lsn = Lsn::new(file_num, 100);
let info = make_ln_info(file_num, 100, 99);
let result = proc.process_found_ln(
&info,
log_lsn,
noxu_util::NULL_LSN,
&NullLsnTree,
);
assert_eq!(
result,
MigrateLnResult::Dead,
"NULL_LSN in tree slot must yield Dead (case 4 in the equivalent processFoundLN)"
);
}
#[test]
fn test_migrate_ln_result_variants() {
assert_ne!(MigrateLnResult::Dead, MigrateLnResult::Migrated);
assert_ne!(MigrateLnResult::Dead, MigrateLnResult::Locked);
assert_ne!(MigrateLnResult::Migrated, MigrateLnResult::Locked);
}
#[test]
fn test_migration_outcome_variants() {
assert_ne!(MigrationOutcome::Migrated, MigrationOutcome::Locked);
assert_ne!(MigrationOutcome::Migrated, MigrationOutcome::Obsolete);
assert_ne!(MigrationOutcome::Locked, MigrationOutcome::Obsolete);
}
#[test]
fn test_process_in_found_marks_migrated() {
let proc = make_processor();
let mut result = FileProcessResult::new();
let log_lsn = Lsn::new(1, 100);
proc.process_in(42, 99, log_lsn, &MigratingTree, &mut result);
assert_eq!(result.ins_cleaned, 1);
assert_eq!(result.ins_migrated, 1);
assert_eq!(result.ins_dead, 0);
}
#[test]
fn test_process_in_obsolete_marks_dead() {
let proc = make_processor();
let mut result = FileProcessResult::new();
let log_lsn = Lsn::new(1, 100);
proc.process_in(42, 99, log_lsn, &ObsoleteInTree, &mut result);
assert_eq!(result.ins_cleaned, 1);
assert_eq!(result.ins_dead, 1);
assert_eq!(result.ins_migrated, 0);
}
#[test]
fn test_process_in_updates_stats() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats.clone(), shutdown);
let mut result = FileProcessResult::new();
proc.process_in(1, 1, Lsn::new(1, 0), &MigratingTree, &mut result);
assert_eq!(stats.ins_cleaned.load(Ordering::Relaxed), 1);
assert_eq!(stats.ins_migrated.load(Ordering::Relaxed), 1);
proc.process_in(1, 2, Lsn::new(1, 100), &ObsoleteInTree, &mut result);
assert_eq!(stats.ins_cleaned.load(Ordering::Relaxed), 2);
assert_eq!(stats.ins_dead.load(Ordering::Relaxed), 1);
}
fn make_ln_entry(
file_num: u32,
offset: u32,
db_id: i64,
key: &[u8],
) -> LogEntry {
LogEntry {
lsn: Lsn::new(file_num, offset),
entry_type: LogEntryType::Ln {
db_id,
key: key.to_vec(),
deleted: false,
expiration_time: 0,
entry_size: 64,
},
}
}
fn make_deleted_ln_entry(
file_num: u32,
offset: u32,
db_id: i64,
) -> LogEntry {
LogEntry {
lsn: Lsn::new(file_num, offset),
entry_type: LogEntryType::Ln {
db_id,
key: vec![1],
deleted: true,
expiration_time: 0,
entry_size: 32,
},
}
}
fn make_in_entry(
file_num: u32,
offset: u32,
db_id: i64,
node_id: i64,
) -> LogEntry {
LogEntry {
lsn: Lsn::new(file_num, offset),
entry_type: LogEntryType::In { db_id, node_id },
}
}
fn make_other_entry(file_num: u32, offset: u32) -> LogEntry {
LogEntry {
lsn: Lsn::new(file_num, offset),
entry_type: LogEntryType::Other,
}
}
#[test]
fn test_process_file_empty() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let result =
proc.process_file(1, &summary, &[], &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 0);
assert_eq!(result.lns_cleaned, 0);
assert_eq!(result.ins_cleaned, 0);
}
#[test]
fn test_process_file_single_ln_migrated() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_ln_entry(1, 100, 42, &[1, 2, 3])];
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 1);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_migrated, 1);
assert_eq!(result.lns_dead, 0);
}
#[test]
fn test_process_file_deleted_ln_is_obsolete() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_deleted_ln_entry(1, 100, 42)];
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 1);
assert_eq!(result.lns_obsolete, 1);
assert_eq!(result.lns_cleaned, 0);
}
#[test]
fn test_process_file_in_entry_migrated() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_in_entry(1, 200, 1, 77)];
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 1);
assert_eq!(result.ins_cleaned, 1);
assert_eq!(result.ins_migrated, 1);
}
#[test]
fn test_process_file_in_entry_dead() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_in_entry(1, 200, 1, 77)];
let result =
proc.process_file(1, &summary, &entries, &ObsoleteInTree).unwrap();
assert!(result.completed);
assert_eq!(result.ins_cleaned, 1);
assert_eq!(result.ins_dead, 1);
}
#[test]
fn test_process_file_other_entry_skipped() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_other_entry(1, 300)];
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 1);
assert_eq!(result.lns_cleaned, 0);
assert_eq!(result.ins_cleaned, 0);
}
#[test]
fn test_process_file_mixed_entries() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![
make_ln_entry(2, 100, 1, &[1]), make_ln_entry(2, 200, 1, &[2]), make_deleted_ln_entry(2, 300, 1), make_in_entry(2, 400, 1, 10), make_other_entry(2, 500), ];
let result =
proc.process_file(2, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 5);
assert_eq!(result.lns_cleaned, 2);
assert_eq!(result.lns_migrated, 2);
assert_eq!(result.lns_obsolete, 1);
assert_eq!(result.ins_cleaned, 1);
assert_eq!(result.ins_migrated, 1);
}
#[test]
fn test_process_file_ln_not_found_in_tree() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_ln_entry(1, 100, 1, &[0xAB])];
let result =
proc.process_file(1, &summary, &entries, &DeletedTree).unwrap();
assert!(result.completed);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_dead, 1);
assert_eq!(result.lns_migrated, 0);
}
#[test]
fn test_process_file_ln_locked() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_ln_entry(1, 100, 1, &[0x01])];
let result =
proc.process_file(1, &summary, &entries, &LockedTree).unwrap();
assert!(result.completed);
assert_eq!(result.lns_locked, 1);
}
#[test]
fn test_process_file_shutdown_mid_file() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats, shutdown.clone());
let summary = crate::FileSummary::new();
shutdown.store(true, Ordering::Relaxed);
let entries = vec![
make_ln_entry(1, 100, 1, &[1]),
make_ln_entry(1, 200, 1, &[2]),
];
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(!result.completed);
}
#[test]
fn test_process_file_many_lns_all_migrated() {
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries: Vec<LogEntry> = (0u32..500)
.map(|i| make_ln_entry(3, i * 100, 1, &[i as u8]))
.collect();
let result =
proc.process_file(3, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 500);
assert_eq!(result.lns_cleaned, 500);
assert_eq!(result.lns_migrated, 500);
}
#[test]
fn test_in_lookup_result_variants() {
assert_ne!(InLookupResult::Found, InLookupResult::Obsolete);
}
#[test]
fn test_log_entry_type_other() {
let entry = make_other_entry(1, 0);
assert_eq!(entry.entry_type, LogEntryType::Other);
}
#[test]
fn test_log_entry_type_ln() {
let entry = make_ln_entry(1, 0, 1, &[1, 2]);
assert!(matches!(entry.entry_type, LogEntryType::Ln { .. }));
}
#[test]
fn test_log_entry_type_in() {
let entry = make_in_entry(1, 0, 1, 42);
assert!(matches!(entry.entry_type, LogEntryType::In { .. }));
}
#[test]
fn test_process_file_shutdown_during_drain() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let proc = FileProcessor::new(stats, shutdown.clone());
let summary = crate::FileSummary::new();
let entries = vec![make_ln_entry(1, 100, 1, &[0x01])];
shutdown.store(true, Ordering::Relaxed);
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(!result.completed);
}
#[test]
fn test_process_file_shutdown_in_drain_loop() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let mut proc = FileProcessor::new(stats, shutdown.clone());
proc.set_process_pending_interval(1);
let summary = crate::FileSummary::new();
let entries = vec![
make_ln_entry(5, 100, 1, &[0xAA]),
make_ln_entry(5, 200, 1, &[0xBB]),
];
shutdown.store(true, Ordering::Relaxed);
let result =
proc.process_file(5, &summary, &entries, &MigratingTree).unwrap();
assert!(!result.completed);
}
#[test]
fn test_result_merge_all_fields() {
let mut r1 = FileProcessResult {
entries_read: 10,
lns_cleaned: 1,
lns_dead: 2,
lns_migrated: 3,
lns_obsolete: 4,
lns_locked: 5,
ins_cleaned: 6,
ins_dead: 7,
ins_migrated: 8,
ins_obsolete: 9,
bin_deltas_cleaned: 10,
bin_deltas_dead: 11,
bin_deltas_migrated: 12,
bin_deltas_obsolete: 13,
completed: true,
locked_lns: vec![],
};
let r2 = FileProcessResult {
entries_read: 1,
lns_cleaned: 1,
lns_dead: 1,
lns_migrated: 1,
lns_obsolete: 1,
lns_locked: 1,
ins_cleaned: 1,
ins_dead: 1,
ins_migrated: 1,
ins_obsolete: 1,
bin_deltas_cleaned: 1,
bin_deltas_dead: 1,
bin_deltas_migrated: 1,
bin_deltas_obsolete: 1,
completed: true,
locked_lns: vec![],
};
r1.merge(&r2);
assert_eq!(r1.entries_read, 11);
assert_eq!(r1.lns_cleaned, 2);
assert_eq!(r1.lns_dead, 3);
assert_eq!(r1.lns_migrated, 4);
assert_eq!(r1.lns_obsolete, 5);
assert_eq!(r1.lns_locked, 6);
assert_eq!(r1.ins_cleaned, 7);
assert_eq!(r1.ins_dead, 8);
assert_eq!(r1.ins_migrated, 9);
assert_eq!(r1.ins_obsolete, 10);
assert_eq!(r1.bin_deltas_cleaned, 11);
assert_eq!(r1.bin_deltas_dead, 12);
assert_eq!(r1.bin_deltas_migrated, 13);
assert_eq!(r1.bin_deltas_obsolete, 14);
assert!(r1.completed);
}
#[test]
fn test_result_merge_both_incomplete() {
let mut r1 =
FileProcessResult { completed: false, ..Default::default() };
let r2 = FileProcessResult { completed: false, ..Default::default() };
r1.merge(&r2);
assert!(!r1.completed);
}
#[test]
fn test_process_file_periodic_drain() {
let stats = Arc::new(CleanerStats::new());
let shutdown = Arc::new(AtomicBool::new(false));
let mut proc = FileProcessor::new(stats, shutdown);
proc.set_process_pending_interval(2);
let summary = crate::FileSummary::new();
let entries: Vec<LogEntry> = (0u32..10)
.map(|i| make_ln_entry(1, i * 100, 1, &[i as u8]))
.collect();
let result =
proc.process_file(1, &summary, &entries, &MigratingTree).unwrap();
assert!(result.completed);
assert_eq!(result.entries_read, 10);
assert_eq!(result.lns_migrated, 10);
}
#[test]
fn test_bin_lookup_result_debug() {
let r = BinLookupResult::NotFound;
let s = format!("{:?}", r);
assert!(s.contains("NotFound"));
let r2 = BinLookupResult::KnownDeleted;
let s2 = format!("{:?}", r2);
assert!(s2.contains("KnownDeleted"));
let lsn = Lsn::new(1, 100);
let r3 = BinLookupResult::Found { tree_lsn: lsn };
let s3 = format!("{:?}", r3);
assert!(s3.contains("Found"));
}
#[test]
fn test_log_entry_type_clone_and_eq() {
let e1 = LogEntryType::Other;
let e2 = e1.clone();
assert_eq!(e1, e2);
let ln = LogEntryType::Ln {
db_id: 1,
key: vec![1],
deleted: false,
expiration_time: 0,
entry_size: 32,
};
let ln2 = ln.clone();
assert_eq!(ln, ln2);
}
#[test]
fn test_look_ahead_cache_zero_budget_is_full() {
let cache = LookAheadCache::new(0);
assert!(cache.is_full());
}
#[test]
fn test_in_lookup_result_debug() {
let s = format!("{:?}", InLookupResult::Found);
assert!(s.contains("Found"));
let s2 = format!("{:?}", InLookupResult::Obsolete);
assert!(s2.contains("Obsolete"));
}
#[test]
fn test_migrate_ln_result_debug() {
let s = format!("{:?}", MigrateLnResult::Migrated);
assert!(s.contains("Migrated"));
let s2 = format!("{:?}", MigrateLnResult::Dead);
assert!(s2.contains("Dead"));
let s3 = format!("{:?}", MigrateLnResult::Locked);
assert!(s3.contains("Locked"));
}
fn make_tree_with_key(key: &[u8], lsn: Lsn) -> noxu_tree::Tree {
let tree = noxu_tree::Tree::new(1, 128);
tree.insert(key.to_vec(), b"value".to_vec(), lsn)
.expect("insert should succeed");
tree
}
#[test]
fn test_real_tree_lookup_new_and_shared() {
let lsn = Lsn::new(1, 100);
let tree = make_tree_with_key(b"hello", lsn);
let arc_tree = Arc::new(std::sync::RwLock::new(tree));
let lookup = RealTreeLookup::new(
Arc::clone(&arc_tree),
Arc::new(LockManager::new()),
);
match lookup.lookup_parent_bin(1, b"hello", lsn) {
BinLookupResult::Found { .. } => {}
other => panic!("expected Found, got {:?}", other),
}
}
#[test]
fn test_real_tree_lookup_tree_ref() {
let lsn = Lsn::new(1, 200);
let tree = make_tree_with_key(b"key", lsn);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
match lookup.lookup_parent_bin(1, b"key", lsn) {
BinLookupResult::Found { tree_lsn } => assert_eq!(tree_lsn, lsn),
other => panic!("expected Found, got {:?}", other),
}
}
#[test]
fn test_real_tree_lookup_found() {
let lsn = Lsn::new(2, 500);
let key = b"alpha";
let tree = make_tree_with_key(key, lsn);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
match lookup.lookup_parent_bin(1, key, lsn) {
BinLookupResult::Found { tree_lsn } => {
assert_eq!(
tree_lsn, lsn,
"slot LSN should match what was inserted"
);
}
other => panic!("expected Found, got {:?}", other),
}
}
#[test]
fn test_real_tree_lookup_not_found() {
let lsn = Lsn::new(1, 100);
let tree = make_tree_with_key(b"present", lsn);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let result = lookup.lookup_parent_bin(1, b"absent", lsn);
assert!(matches!(result, BinLookupResult::NotFound));
}
#[test]
fn test_real_tree_lookup_empty_tree() {
let tree = noxu_tree::Tree::new(1, 128);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let lsn = Lsn::new(1, 50);
let result = lookup.lookup_parent_bin(1, b"anything", lsn);
assert!(matches!(result, BinLookupResult::NotFound));
}
#[test]
fn test_real_tree_migrate_ln_slot_migrated() {
let lsn = Lsn::new(3, 300);
let key = b"migrate_me";
let tree = make_tree_with_key(key, lsn);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let new_lsn = Lsn::new(3, 400);
let outcome = lookup.migrate_ln_slot(1, key, new_lsn, lsn);
assert_eq!(
outcome,
MigrationOutcome::Migrated,
"slot LSN matches tree_lsn so migration should succeed"
);
}
#[test]
fn test_real_tree_migrate_ln_slot_obsolete_lsn_mismatch() {
let original_lsn = Lsn::new(1, 100);
let newer_lsn = Lsn::new(1, 200);
let key = b"raced";
let tree = make_tree_with_key(key, newer_lsn);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let outcome =
lookup.migrate_ln_slot(1, key, original_lsn, original_lsn);
assert_eq!(
outcome,
MigrationOutcome::Obsolete,
"slot has moved on — should be obsolete"
);
}
#[test]
fn test_real_tree_migrate_ln_slot_key_absent() {
let tree = make_tree_with_key(b"present", Lsn::new(1, 10));
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let outcome = lookup.migrate_ln_slot(
1,
b"absent",
Lsn::new(1, 20),
Lsn::new(1, 20),
);
assert_eq!(
outcome,
MigrationOutcome::Obsolete,
"key not in tree — should be obsolete"
);
}
#[test]
fn test_real_tree_lookup_in_always_obsolete() {
let tree = noxu_tree::Tree::new(1, 128);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let result = lookup.lookup_in(1, 42, Lsn::new(1, 0));
assert_eq!(result, InLookupResult::Obsolete);
}
#[test]
fn test_process_file_with_real_tree_migrates_active_ln() {
let key: &[u8] = &[0x10, 0x20, 0x30];
let lsn = Lsn::new(5, 100);
let tree = noxu_tree::Tree::new(1, 128);
tree.insert(key.to_vec(), b"data".to_vec(), lsn).unwrap();
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![LogEntry {
lsn,
entry_type: LogEntryType::Ln {
db_id: 1,
key: key.to_vec(),
deleted: false,
expiration_time: 0,
entry_size: 64,
},
}];
let result = proc.process_file(5, &summary, &entries, &lookup).unwrap();
assert!(result.completed);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_migrated, 1);
assert_eq!(result.lns_dead, 0);
}
#[test]
fn test_process_file_with_real_tree_absent_key_is_dead() {
let tree = noxu_tree::Tree::new(1, 128);
let lookup = RealTreeLookup::new(
Arc::new(std::sync::RwLock::new(tree)),
Arc::new(LockManager::new()),
);
let proc = make_processor();
let summary = crate::FileSummary::new();
let entries = vec![make_ln_entry(6, 50, 1, &[0xFF])];
let result = proc.process_file(6, &summary, &entries, &lookup).unwrap();
assert!(result.completed);
assert_eq!(result.lns_cleaned, 1);
assert_eq!(result.lns_dead, 1);
assert_eq!(result.lns_migrated, 0);
}
#[test]
fn test_r7_migration_abort_on_wal_write_failure() {
use noxu_log::{FileManager, LogManager};
use std::sync::RwLock;
use tempfile::TempDir;
let log_lsn = Lsn::new(1, 100);
let tree = noxu_tree::Tree::new(1, 128);
let tree_arc = Arc::new(RwLock::new(tree));
{
let t = tree_arc.write().unwrap();
let _ = t.insert(b"key1".to_vec(), b"val1".to_vec(), log_lsn);
}
let dir = TempDir::new().unwrap();
let fm = Arc::new(
FileManager::new(dir.path(), false, 10_000_000, 100).unwrap(),
);
let lm = Arc::new(LogManager::new(fm, 3, 1024 * 1024, 4096));
lm.io_invalid.store(true, std::sync::atomic::Ordering::Release);
let lookup = SharedTreeLookup::new(Arc::clone(&tree_arc), lm);
let outcome = lookup.migrate_ln_slot(1, b"key1", log_lsn, log_lsn);
assert_eq!(
outcome,
MigrationOutcome::Locked,
"R-7: WAL write failure must abort migration (Locked), got {:?}",
outcome
);
let slot_lsn = {
let t = tree_arc.read().unwrap();
RealTreeLookup::get_slot_lsn_from_root(t.get_root(), b"key1")
};
assert_eq!(
slot_lsn,
Some(log_lsn),
"R-7: tree slot must retain original log_lsn after aborted migration"
);
}
}