use noxu_dbi::DatabaseImpl;
use noxu_tree::NodeRwLock as RwLock;
use noxu_tree::Tree;
use noxu_tree::tree::{BinStub, InNodeStub, TreeNode};
use noxu_util::{Lsn, NULL_LSN};
use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VerifyResult {
pub errors: Vec<VerifyError>,
pub warnings: Vec<String>,
pub databases_verified: u32,
pub records_verified: u64,
pub passed: bool,
}
impl VerifyResult {
pub fn new() -> Self {
Self {
errors: Vec::new(),
warnings: Vec::new(),
databases_verified: 0,
records_verified: 0,
passed: true,
}
}
pub fn with_errors(errors: Vec<VerifyError>) -> Self {
Self {
passed: errors.is_empty(),
errors,
warnings: Vec::new(),
databases_verified: 0,
records_verified: 0,
}
}
pub fn add_error(&mut self, error: VerifyError) {
self.errors.push(error);
self.passed = false;
}
pub fn add_warning(&mut self, warning: String) {
self.warnings.push(warning);
}
pub fn is_passed(&self) -> bool {
self.passed
}
pub fn error_count(&self) -> usize {
self.errors.len()
}
pub fn warning_count(&self) -> usize {
self.warnings.len()
}
}
impl Default for VerifyResult {
fn default() -> Self {
Self::new()
}
}
impl fmt::Display for VerifyResult {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
writeln!(f, "Verification Result")?;
writeln!(f, "===================")?;
writeln!(
f,
"Status: {}",
if self.passed { "PASSED" } else { "FAILED" }
)?;
writeln!(f, "Databases verified: {}", self.databases_verified)?;
writeln!(f, "Records verified: {}", self.records_verified)?;
writeln!(f)?;
if !self.errors.is_empty() {
writeln!(f, "Errors ({}):", self.errors.len())?;
for error in &self.errors {
writeln!(f, " - {}", error)?;
}
writeln!(f)?;
}
if !self.warnings.is_empty() {
writeln!(f, "Warnings ({}):", self.warnings.len())?;
for warning in &self.warnings {
writeln!(f, " - {}", warning)?;
}
writeln!(f)?;
}
if self.errors.is_empty() && self.warnings.is_empty() {
writeln!(f, "No errors or warnings found.")?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum VerifyError {
BtreeError { db_name: String, description: String },
LogError { file_number: u32, description: String },
DataInconsistency { description: String },
ChecksumError { location: String, description: String },
InvalidNodeReference { node_id: u64, description: String },
MetadataError { db_name: String, description: String },
}
impl fmt::Display for VerifyError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
VerifyError::BtreeError { db_name, description } => {
write!(f, "B-tree error in '{}': {}", db_name, description)
}
VerifyError::LogError { file_number, description } => {
write!(
f,
"Log file {:08x}.ndb error: {}",
file_number, description
)
}
VerifyError::DataInconsistency { description } => {
write!(f, "Data inconsistency: {}", description)
}
VerifyError::ChecksumError { location, description } => {
write!(f, "Checksum error at {}: {}", location, description)
}
VerifyError::InvalidNodeReference { node_id, description } => {
write!(
f,
"Invalid node reference (ID {}): {}",
node_id, description
)
}
VerifyError::MetadataError { db_name, description } => {
write!(f, "Metadata error in '{}': {}", db_name, description)
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct VerifyConfig {
pub verify_btree: bool,
pub verify_log: bool,
pub verify_data_checksums: bool,
pub repair: bool,
pub max_errors: u32,
pub verbose: bool,
pub database_name: Option<String>,
}
impl VerifyConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_btree_verification(mut self, enabled: bool) -> Self {
self.verify_btree = enabled;
self
}
pub fn with_log_verification(mut self, enabled: bool) -> Self {
self.verify_log = enabled;
self
}
pub fn with_checksum_verification(mut self, enabled: bool) -> Self {
self.verify_data_checksums = enabled;
self
}
pub fn with_repair(mut self, enabled: bool) -> Self {
self.repair = enabled;
self
}
pub fn with_max_errors(mut self, max: u32) -> Self {
self.max_errors = max;
self
}
pub fn with_verbose(mut self, enabled: bool) -> Self {
self.verbose = enabled;
self
}
pub fn for_database(mut self, name: String) -> Self {
self.database_name = Some(name);
self
}
}
impl Default for VerifyConfig {
fn default() -> Self {
VerifyConfig {
verify_btree: true,
verify_log: true,
verify_data_checksums: true,
repair: false,
max_errors: 100,
verbose: false,
database_name: None,
}
}
}
pub fn verify_tree(
tree: &Tree,
db_name: &str,
config: &VerifyConfig,
) -> VerifyResult {
let mut result = VerifyResult::new();
if !config.verify_btree {
return result;
}
let root = match tree.get_root() {
Some(r) => r,
None => {
result.databases_verified = 1;
return result;
}
};
let mut records: u64 = 0;
verify_node(&root, None, db_name, config, &mut result, &mut records);
result.records_verified = records;
result.databases_verified = 1;
result
}
fn verify_node(
node_arc: &Arc<RwLock<TreeNode>>,
parent_key: Option<&[u8]>,
db_name: &str,
config: &VerifyConfig,
result: &mut VerifyResult,
records: &mut u64,
) {
let guard = node_arc.read();
match &*guard {
TreeNode::Internal(in_node) => {
verify_internal_node(
in_node, parent_key, db_name, config, result, records,
);
}
TreeNode::Bottom(bin_stub) => {
verify_bin_stub(
bin_stub, parent_key, db_name, config, result, records,
);
}
}
}
fn verify_internal_node(
in_node: &InNodeStub,
_parent_key: Option<&[u8]>,
db_name: &str,
config: &VerifyConfig,
result: &mut VerifyResult,
records: &mut u64,
) {
if in_node.entries.is_empty() {
return;
}
for (i, entry) in in_node.entries.iter().enumerate() {
let child_owned = in_node.get_child(i);
let child_arc = match &child_owned {
Some(c) => c,
None => {
result.add_error(VerifyError::BtreeError {
db_name: db_name.to_string(),
description: format!(
"IN node (id={}) entry {} has null child reference",
in_node.node_id, i
),
});
if result.error_count() >= config.max_errors as usize {
return;
}
continue;
}
};
let expected_parent_key: Option<&[u8]> =
if i == 0 { None } else { Some(entry.key.as_slice()) };
verify_node(
child_arc,
expected_parent_key,
db_name,
config,
result,
records,
);
if result.error_count() >= config.max_errors as usize {
return;
}
}
}
fn verify_bin_stub(
bin: &BinStub,
parent_key: Option<&[u8]>,
db_name: &str,
config: &VerifyConfig,
result: &mut VerifyResult,
records: &mut u64,
) {
if let Some(pk) = parent_key
&& !bin.entries.is_empty()
{
let first_full = bin.get_full_key(0);
if let Some(ref first_key) = first_full
&& first_key.as_slice() < pk
{
result.add_error(VerifyError::BtreeError {
db_name: db_name.to_string(),
description: format!(
"BIN (id={}) first key {:?} is less than parent routing key {:?}",
bin.node_id, first_key, pk
),
});
}
}
for (i, entry) in bin.entries.iter().enumerate() {
if !entry.known_deleted && bin.get_lsn(i) == NULL_LSN {
result.add_error(VerifyError::BtreeError {
db_name: db_name.to_string(),
description: format!(
"BIN (id={}) slot {} has NULL LSN but is not known-deleted",
bin.node_id, i
),
});
if result.error_count() >= config.max_errors as usize {
return;
}
}
if !entry.known_deleted {
*records += 1;
}
}
}
pub fn verify_database_impl(
db_impl: &DatabaseImpl,
config: &VerifyConfig,
) -> VerifyResult {
let db_name = db_impl.get_name();
match db_impl.get_real_tree() {
Some(tree) => verify_tree(&tree, db_name, config),
None => {
VerifyResult {
errors: Vec::new(),
warnings: Vec::new(),
databases_verified: 1,
records_verified: 0,
passed: true,
}
}
}
}
pub fn gather_tree_lsns(tree: &Tree) -> HashSet<Lsn> {
let mut lsns = HashSet::new();
if let Some(root) = tree.get_root() {
gather_node_lsns(&root, &mut lsns);
}
lsns
}
fn gather_node_lsns(node_arc: &Arc<RwLock<TreeNode>>, lsns: &mut HashSet<Lsn>) {
let guard = node_arc.read();
match &*guard {
TreeNode::Internal(in_node) => {
for child in in_node.resident_children() {
gather_node_lsns(&child, lsns);
}
}
TreeNode::Bottom(bin) => {
for (i, entry) in bin.entries.iter().enumerate() {
let lsn = bin.get_lsn(i);
if !entry.known_deleted && lsn != NULL_LSN {
lsns.insert(lsn);
}
}
}
}
}
pub fn check_lsns_against_tracker(
db_impl: &DatabaseImpl,
tracker: &noxu_cleaner::UtilizationTracker,
result: &mut VerifyResult,
) {
let tree = match db_impl.get_real_tree() {
Some(t) => t,
None => return,
};
let live = gather_tree_lsns(&tree);
let check = noxu_cleaner::check_lsns(live, tracker);
for lsn in check.obsolete_contains_live {
result.add_error(VerifyError::DataInconsistency {
description: format!(
"Obsolete LSN set contains valid LSN {} in database '{}' \
(VerifyUtils.checkLsns: live tree LSN recorded obsolete)",
lsn,
db_impl.get_name()
),
});
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_verify_result_new() {
let result = VerifyResult::new();
assert!(result.passed);
assert_eq!(result.errors.len(), 0);
assert_eq!(result.warnings.len(), 0);
assert_eq!(result.databases_verified, 0);
assert_eq!(result.records_verified, 0);
}
#[test]
fn test_verify_result_default() {
let result = VerifyResult::default();
assert!(result.passed);
assert!(result.errors.is_empty());
}
#[test]
fn test_verify_result_with_errors() {
let errors = vec![VerifyError::BtreeError {
db_name: "test".to_string(),
description: "Invalid node".to_string(),
}];
let result = VerifyResult::with_errors(errors);
assert!(!result.passed);
assert_eq!(result.errors.len(), 1);
}
#[test]
fn test_verify_result_with_no_errors() {
let errors = vec![];
let result = VerifyResult::with_errors(errors);
assert!(result.passed);
assert_eq!(result.errors.len(), 0);
}
#[test]
fn test_add_error() {
let mut result = VerifyResult::new();
assert!(result.passed);
result.add_error(VerifyError::DataInconsistency {
description: "Test error".to_string(),
});
assert!(!result.passed);
assert_eq!(result.errors.len(), 1);
}
#[test]
fn test_add_warning() {
let mut result = VerifyResult::new();
result.add_warning("Test warning".to_string());
assert!(result.passed); assert_eq!(result.warnings.len(), 1);
}
#[test]
fn test_error_count() {
let mut result = VerifyResult::new();
assert_eq!(result.error_count(), 0);
result.add_error(VerifyError::DataInconsistency {
description: "Error 1".to_string(),
});
result.add_error(VerifyError::DataInconsistency {
description: "Error 2".to_string(),
});
assert_eq!(result.error_count(), 2);
}
#[test]
fn test_warning_count() {
let mut result = VerifyResult::new();
assert_eq!(result.warning_count(), 0);
result.add_warning("Warning 1".to_string());
result.add_warning("Warning 2".to_string());
assert_eq!(result.warning_count(), 2);
}
#[test]
fn test_is_passed() {
let result = VerifyResult::new();
assert!(result.is_passed());
let mut failed_result = VerifyResult::new();
failed_result.add_error(VerifyError::DataInconsistency {
description: "Error".to_string(),
});
assert!(!failed_result.is_passed());
}
#[test]
fn test_verify_error_btree() {
let error = VerifyError::BtreeError {
db_name: "mydb".to_string(),
description: "Invalid child reference".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("B-tree error"));
assert!(s.contains("mydb"));
assert!(s.contains("Invalid child reference"));
}
#[test]
fn test_verify_error_log() {
let error = VerifyError::LogError {
file_number: 42,
description: "Corrupted entry".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("Log file"));
assert!(s.contains("0000002a.ndb"));
assert!(s.contains("Corrupted entry"));
}
#[test]
fn test_verify_error_data_inconsistency() {
let error = VerifyError::DataInconsistency {
description: "Mismatched LSN".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("Data inconsistency"));
assert!(s.contains("Mismatched LSN"));
}
#[test]
fn test_verify_error_checksum() {
let error = VerifyError::ChecksumError {
location: "file 10, offset 1024".to_string(),
description: "CRC mismatch".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("Checksum error"));
assert!(s.contains("file 10, offset 1024"));
assert!(s.contains("CRC mismatch"));
}
#[test]
fn test_verify_error_invalid_node_reference() {
let error = VerifyError::InvalidNodeReference {
node_id: 12345,
description: "Node not found".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("Invalid node reference"));
assert!(s.contains("12345"));
assert!(s.contains("Node not found"));
}
#[test]
fn test_verify_error_metadata() {
let error = VerifyError::MetadataError {
db_name: "testdb".to_string(),
description: "Invalid format version".to_string(),
};
let s = format!("{}", error);
assert!(s.contains("Metadata error"));
assert!(s.contains("testdb"));
assert!(s.contains("Invalid format version"));
}
#[test]
fn test_verify_config_default() {
let config = VerifyConfig::default();
assert!(config.verify_btree);
assert!(config.verify_log);
assert!(config.verify_data_checksums);
assert!(!config.repair);
assert_eq!(config.max_errors, 100);
assert!(!config.verbose);
assert!(config.database_name.is_none());
}
#[test]
fn test_verify_config_new() {
let config = VerifyConfig::new();
assert_eq!(config, VerifyConfig::default());
}
#[test]
fn test_verify_config_builder() {
let config = VerifyConfig::new()
.with_btree_verification(false)
.with_log_verification(true)
.with_checksum_verification(false)
.with_repair(true)
.with_max_errors(50)
.with_verbose(true)
.for_database("mydb".to_string());
assert!(!config.verify_btree);
assert!(config.verify_log);
assert!(!config.verify_data_checksums);
assert!(config.repair);
assert_eq!(config.max_errors, 50);
assert!(config.verbose);
assert_eq!(config.database_name, Some("mydb".to_string()));
}
#[test]
fn test_verify_result_display_passed() {
let result = VerifyResult {
errors: Vec::new(),
warnings: Vec::new(),
databases_verified: 5,
records_verified: 1000,
passed: true,
};
let output = format!("{}", result);
assert!(output.contains("PASSED"));
assert!(output.contains("Databases verified: 5"));
assert!(output.contains("Records verified: 1000"));
assert!(output.contains("No errors or warnings"));
}
#[test]
fn test_verify_result_display_with_errors() {
let mut result = VerifyResult::new();
result.add_error(VerifyError::BtreeError {
db_name: "test".to_string(),
description: "Bad node".to_string(),
});
result.databases_verified = 2;
result.records_verified = 500;
let output = format!("{}", result);
assert!(output.contains("FAILED"));
assert!(output.contains("Errors (1)"));
assert!(output.contains("B-tree error"));
}
#[test]
fn test_verify_result_display_with_warnings() {
let mut result = VerifyResult::new();
result.add_warning("Low cache utilization".to_string());
result.databases_verified = 3;
let output = format!("{}", result);
assert!(output.contains("PASSED"));
assert!(output.contains("Warnings (1)"));
assert!(output.contains("Low cache utilization"));
}
#[test]
fn test_verify_result_clone() {
let mut result = VerifyResult::new();
result.add_error(VerifyError::DataInconsistency {
description: "Test".to_string(),
});
let cloned = result.clone();
assert_eq!(cloned.errors.len(), result.errors.len());
assert_eq!(cloned.passed, result.passed);
}
#[test]
fn test_verify_error_equality() {
let error1 = VerifyError::BtreeError {
db_name: "db1".to_string(),
description: "error".to_string(),
};
let error2 = VerifyError::BtreeError {
db_name: "db1".to_string(),
description: "error".to_string(),
};
let error3 = VerifyError::BtreeError {
db_name: "db2".to_string(),
description: "error".to_string(),
};
assert_eq!(error1, error2);
assert_ne!(error1, error3);
}
#[test]
fn test_verify_config_equality() {
let config1 = VerifyConfig::default();
let config2 = VerifyConfig::default();
let config3 = VerifyConfig::new().with_repair(true);
assert_eq!(config1, config2);
assert_ne!(config1, config3);
}
#[test]
fn test_verify_tree_empty() {
use noxu_dbi::{DatabaseConfig, DatabaseId, DatabaseImpl, DbType};
use noxu_sync::RwLock;
use std::sync::Arc;
let db_id = DatabaseId::new(1);
let config = DatabaseConfig::default();
let db_impl = DatabaseImpl::new(
db_id,
"verify_test".to_string(),
DbType::User,
&config,
);
let db = Arc::new(RwLock::new(db_impl));
let guard = db.read();
let cfg = VerifyConfig::default();
if let Some(t) = guard.get_real_tree() {
let result = verify_tree(&t, "verify_test", &cfg);
assert!(
result.passed,
"empty tree should pass: {:?}",
result.errors
);
assert_eq!(result.databases_verified, 1);
}
}
#[test]
fn test_verify_tree_populated() {
use noxu_dbi::{
CursorImpl, DatabaseConfig, DatabaseId, DatabaseImpl, DbType,
PutMode,
};
use noxu_log::{FileManager, LogManager};
use noxu_sync::RwLock;
use std::sync::Arc;
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let fm = Arc::new(
FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
);
let lm =
Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
let db_id = DatabaseId::new(2);
let config = DatabaseConfig::default();
let db_impl = DatabaseImpl::new(
db_id,
"pop_test".to_string(),
DbType::User,
&config,
);
let db = Arc::new(RwLock::new(db_impl));
{
let mut cursor = CursorImpl::with_log_manager(
Arc::clone(&db),
1,
Arc::clone(&lm),
);
cursor.put(b"alpha", b"1", PutMode::Overwrite).unwrap();
cursor.put(b"beta", b"2", PutMode::Overwrite).unwrap();
cursor.put(b"gamma", b"3", PutMode::Overwrite).unwrap();
}
let guard = db.read();
let cfg = VerifyConfig::default();
if let Some(t) = guard.get_real_tree() {
let result = verify_tree(&t, "pop_test", &cfg);
assert!(
result.passed,
"populated tree should pass: {:?}",
result.errors
);
assert_eq!(result.databases_verified, 1);
}
}
#[test]
fn test_verify_tree_detects_null_lsn() {
use noxu_dbi::{
CursorImpl, DatabaseConfig, DatabaseId, DatabaseImpl, DbType,
PutMode,
};
use noxu_log::{FileManager, LogManager};
use noxu_sync::RwLock;
use noxu_tree::tree::TreeNode;
use std::sync::Arc;
use tempfile::TempDir;
let dir = TempDir::new().unwrap();
let fm = Arc::new(
FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
);
let lm =
Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
let db_id = DatabaseId::new(3);
let config = DatabaseConfig::default();
let db_impl = DatabaseImpl::new(
db_id,
"corrupt_test".to_string(),
DbType::User,
&config,
);
let db = Arc::new(RwLock::new(db_impl));
{
let mut cursor = CursorImpl::with_log_manager(
Arc::clone(&db),
1,
Arc::clone(&lm),
);
cursor.put(b"alpha", b"1", PutMode::Overwrite).unwrap();
cursor.put(b"beta", b"2", PutMode::Overwrite).unwrap();
cursor.put(b"gamma", b"3", PutMode::Overwrite).unwrap();
}
let guard = db.read();
let t = guard
.get_real_tree()
.expect("invariant: populated db has a real tree");
let corrupted = corrupt_first_bin_slot(&t, NULL_LSN);
assert!(corrupted, "test setup: expected at least one BIN slot");
let cfg = VerifyConfig::default();
let result = verify_tree(&t, "corrupt_test", &cfg);
assert!(
!result.passed,
"verifier must detect the NULL-LSN corruption, got passed=true"
);
assert!(
result.errors.iter().any(|e| matches!(
e,
VerifyError::BtreeError { description, .. }
if description.contains("NULL LSN")
)),
"expected a NULL-LSN BtreeError, got: {:?}",
result.errors
);
fn corrupt_first_bin_slot(
tree: &noxu_tree::Tree,
null_lsn: noxu_util::Lsn,
) -> bool {
fn recurse(
node: &Arc<noxu_tree::NodeRwLock<TreeNode>>,
null_lsn: noxu_util::Lsn,
) -> bool {
let mut guard = node.write();
match &mut *guard {
TreeNode::Bottom(bin) => {
if !bin.entries.is_empty() {
bin.entries[0].known_deleted = false;
bin.set_lsn(0, null_lsn);
return true;
}
false
}
TreeNode::Internal(in_node) => {
for child in in_node.resident_children() {
if recurse(&child, null_lsn) {
return true;
}
}
false
}
}
}
match tree.get_root() {
Some(root) => recurse(&root, null_lsn),
None => false,
}
}
}
}