1use noxu_dbi::DatabaseImpl;
6use noxu_tree::NodeRwLock as RwLock;
7use noxu_tree::Tree;
8use noxu_tree::tree::{BinStub, InNodeStub, TreeNode};
9use noxu_util::NULL_LSN;
10use std::fmt;
11use std::sync::Arc;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub struct VerifyResult {
16 pub errors: Vec<VerifyError>,
18 pub warnings: Vec<String>,
20 pub databases_verified: u32,
22 pub records_verified: u64,
24 pub passed: bool,
26}
27
28impl VerifyResult {
29 pub fn new() -> Self {
31 Self {
32 errors: Vec::new(),
33 warnings: Vec::new(),
34 databases_verified: 0,
35 records_verified: 0,
36 passed: true,
37 }
38 }
39
40 pub fn with_errors(errors: Vec<VerifyError>) -> Self {
42 Self {
43 passed: errors.is_empty(),
44 errors,
45 warnings: Vec::new(),
46 databases_verified: 0,
47 records_verified: 0,
48 }
49 }
50
51 pub fn add_error(&mut self, error: VerifyError) {
53 self.errors.push(error);
54 self.passed = false;
55 }
56
57 pub fn add_warning(&mut self, warning: String) {
59 self.warnings.push(warning);
60 }
61
62 pub fn is_passed(&self) -> bool {
64 self.passed
65 }
66
67 pub fn error_count(&self) -> usize {
69 self.errors.len()
70 }
71
72 pub fn warning_count(&self) -> usize {
74 self.warnings.len()
75 }
76}
77
78impl Default for VerifyResult {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl fmt::Display for VerifyResult {
85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86 writeln!(f, "Verification Result")?;
87 writeln!(f, "===================")?;
88 writeln!(
89 f,
90 "Status: {}",
91 if self.passed { "PASSED" } else { "FAILED" }
92 )?;
93 writeln!(f, "Databases verified: {}", self.databases_verified)?;
94 writeln!(f, "Records verified: {}", self.records_verified)?;
95 writeln!(f)?;
96
97 if !self.errors.is_empty() {
98 writeln!(f, "Errors ({}):", self.errors.len())?;
99 for error in &self.errors {
100 writeln!(f, " - {}", error)?;
101 }
102 writeln!(f)?;
103 }
104
105 if !self.warnings.is_empty() {
106 writeln!(f, "Warnings ({}):", self.warnings.len())?;
107 for warning in &self.warnings {
108 writeln!(f, " - {}", warning)?;
109 }
110 writeln!(f)?;
111 }
112
113 if self.errors.is_empty() && self.warnings.is_empty() {
114 writeln!(f, "No errors or warnings found.")?;
115 }
116
117 Ok(())
118 }
119}
120
121#[derive(Debug, Clone, PartialEq, Eq)]
123pub enum VerifyError {
124 BtreeError { db_name: String, description: String },
126 LogError { file_number: u32, description: String },
128 DataInconsistency { description: String },
130 ChecksumError { location: String, description: String },
132 InvalidNodeReference { node_id: u64, description: String },
134 MetadataError { db_name: String, description: String },
136}
137
138impl fmt::Display for VerifyError {
139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140 match self {
141 VerifyError::BtreeError { db_name, description } => {
142 write!(f, "B-tree error in '{}': {}", db_name, description)
143 }
144 VerifyError::LogError { file_number, description } => {
145 write!(
146 f,
147 "Log file {:08x}.ndb error: {}",
148 file_number, description
149 )
150 }
151 VerifyError::DataInconsistency { description } => {
152 write!(f, "Data inconsistency: {}", description)
153 }
154 VerifyError::ChecksumError { location, description } => {
155 write!(f, "Checksum error at {}: {}", location, description)
156 }
157 VerifyError::InvalidNodeReference { node_id, description } => {
158 write!(
159 f,
160 "Invalid node reference (ID {}): {}",
161 node_id, description
162 )
163 }
164 VerifyError::MetadataError { db_name, description } => {
165 write!(f, "Metadata error in '{}': {}", db_name, description)
166 }
167 }
168 }
169}
170
171#[derive(Debug, Clone, PartialEq, Eq)]
173pub struct VerifyConfig {
174 pub verify_btree: bool,
176 pub verify_log: bool,
178 pub verify_data_checksums: bool,
180 pub repair: bool,
182 pub max_errors: u32,
184 pub verbose: bool,
186 pub database_name: Option<String>,
188}
189
190impl VerifyConfig {
191 pub fn new() -> Self {
193 Self::default()
194 }
195
196 pub fn with_btree_verification(mut self, enabled: bool) -> Self {
198 self.verify_btree = enabled;
199 self
200 }
201
202 pub fn with_log_verification(mut self, enabled: bool) -> Self {
204 self.verify_log = enabled;
205 self
206 }
207
208 pub fn with_checksum_verification(mut self, enabled: bool) -> Self {
210 self.verify_data_checksums = enabled;
211 self
212 }
213
214 pub fn with_repair(mut self, enabled: bool) -> Self {
216 self.repair = enabled;
217 self
218 }
219
220 pub fn with_max_errors(mut self, max: u32) -> Self {
222 self.max_errors = max;
223 self
224 }
225
226 pub fn with_verbose(mut self, enabled: bool) -> Self {
228 self.verbose = enabled;
229 self
230 }
231
232 pub fn for_database(mut self, name: String) -> Self {
234 self.database_name = Some(name);
235 self
236 }
237}
238
239impl Default for VerifyConfig {
240 fn default() -> Self {
241 VerifyConfig {
242 verify_btree: true,
243 verify_log: true,
244 verify_data_checksums: true,
245 repair: false,
246 max_errors: 100,
247 verbose: false,
248 database_name: None,
249 }
250 }
251}
252
253pub fn verify_tree(
271 tree: &Tree,
272 db_name: &str,
273 config: &VerifyConfig,
274) -> VerifyResult {
275 let mut result = VerifyResult::new();
276
277 if !config.verify_btree {
278 return result;
279 }
280
281 let root = match tree.get_root() {
282 Some(r) => r,
283 None => {
284 result.databases_verified = 1;
286 return result;
287 }
288 };
289
290 let mut records: u64 = 0;
291 verify_node(&root, None, db_name, config, &mut result, &mut records);
292 result.records_verified = records;
293 result.databases_verified = 1;
294 result
295}
296
297fn verify_node(
299 node_arc: &Arc<RwLock<TreeNode>>,
300 parent_key: Option<&[u8]>,
301 db_name: &str,
302 config: &VerifyConfig,
303 result: &mut VerifyResult,
304 records: &mut u64,
305) {
306 let guard = node_arc.read();
307
308 match &*guard {
309 TreeNode::Internal(in_node) => {
310 verify_internal_node(
311 in_node, parent_key, db_name, config, result, records,
312 );
313 }
314 TreeNode::Bottom(bin_stub) => {
315 verify_bin_stub(
316 bin_stub, parent_key, db_name, config, result, records,
317 );
318 }
319 }
320}
321
322fn verify_internal_node(
327 in_node: &InNodeStub,
328 _parent_key: Option<&[u8]>,
329 db_name: &str,
330 config: &VerifyConfig,
331 result: &mut VerifyResult,
332 records: &mut u64,
333) {
334 if in_node.entries.is_empty() {
335 return;
338 }
339
340 for (i, entry) in in_node.entries.iter().enumerate() {
342 let child_arc = match &entry.child {
343 Some(c) => c,
344 None => {
345 result.add_error(VerifyError::BtreeError {
346 db_name: db_name.to_string(),
347 description: format!(
348 "IN node (id={}) entry {} has null child reference",
349 in_node.node_id, i
350 ),
351 });
352 if result.error_count() >= config.max_errors as usize {
353 return;
354 }
355 continue;
356 }
357 };
358
359 let expected_parent_key: Option<&[u8]> =
363 if i == 0 { None } else { Some(entry.key.as_slice()) };
364
365 verify_node(
366 child_arc,
367 expected_parent_key,
368 db_name,
369 config,
370 result,
371 records,
372 );
373
374 if result.error_count() >= config.max_errors as usize {
375 return;
376 }
377 }
378}
379
380fn verify_bin_stub(
386 bin: &BinStub,
387 parent_key: Option<&[u8]>,
388 db_name: &str,
389 config: &VerifyConfig,
390 result: &mut VerifyResult,
391 records: &mut u64,
392) {
393 if let Some(pk) = parent_key
395 && !bin.entries.is_empty()
396 {
397 let first_full = bin.get_full_key(0);
398 if let Some(ref first_key) = first_full
399 && first_key.as_slice() < pk
400 {
401 result.add_error(VerifyError::BtreeError {
402 db_name: db_name.to_string(),
403 description: format!(
404 "BIN (id={}) first key {:?} is less than parent routing key {:?}",
405 bin.node_id, first_key, pk
406 ),
407 });
408 }
409 }
410
411 for (i, entry) in bin.entries.iter().enumerate() {
413 if !entry.known_deleted && entry.lsn == NULL_LSN {
415 result.add_error(VerifyError::BtreeError {
416 db_name: db_name.to_string(),
417 description: format!(
418 "BIN (id={}) slot {} has NULL LSN but is not known-deleted",
419 bin.node_id, i
420 ),
421 });
422 if result.error_count() >= config.max_errors as usize {
423 return;
424 }
425 }
426
427 if !entry.known_deleted {
428 *records += 1;
429 }
430 }
431}
432
433pub fn verify_database_impl(
464 db_impl: &DatabaseImpl,
465 config: &VerifyConfig,
466) -> VerifyResult {
467 let db_name = db_impl.get_name();
468 match db_impl.get_real_tree() {
469 Some(tree) => verify_tree(&tree, db_name, config),
470 None => {
471 VerifyResult {
473 errors: Vec::new(),
474 warnings: Vec::new(),
475 databases_verified: 1,
476 records_verified: 0,
477 passed: true,
478 }
479 }
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use super::*;
486
487 #[test]
488 fn test_verify_result_new() {
489 let result = VerifyResult::new();
490 assert!(result.passed);
491 assert_eq!(result.errors.len(), 0);
492 assert_eq!(result.warnings.len(), 0);
493 assert_eq!(result.databases_verified, 0);
494 assert_eq!(result.records_verified, 0);
495 }
496
497 #[test]
498 fn test_verify_result_default() {
499 let result = VerifyResult::default();
500 assert!(result.passed);
501 assert!(result.errors.is_empty());
502 }
503
504 #[test]
505 fn test_verify_result_with_errors() {
506 let errors = vec![VerifyError::BtreeError {
507 db_name: "test".to_string(),
508 description: "Invalid node".to_string(),
509 }];
510 let result = VerifyResult::with_errors(errors);
511 assert!(!result.passed);
512 assert_eq!(result.errors.len(), 1);
513 }
514
515 #[test]
516 fn test_verify_result_with_no_errors() {
517 let errors = vec![];
518 let result = VerifyResult::with_errors(errors);
519 assert!(result.passed);
520 assert_eq!(result.errors.len(), 0);
521 }
522
523 #[test]
524 fn test_add_error() {
525 let mut result = VerifyResult::new();
526 assert!(result.passed);
527
528 result.add_error(VerifyError::DataInconsistency {
529 description: "Test error".to_string(),
530 });
531
532 assert!(!result.passed);
533 assert_eq!(result.errors.len(), 1);
534 }
535
536 #[test]
537 fn test_add_warning() {
538 let mut result = VerifyResult::new();
539 result.add_warning("Test warning".to_string());
540
541 assert!(result.passed); assert_eq!(result.warnings.len(), 1);
543 }
544
545 #[test]
546 fn test_error_count() {
547 let mut result = VerifyResult::new();
548 assert_eq!(result.error_count(), 0);
549
550 result.add_error(VerifyError::DataInconsistency {
551 description: "Error 1".to_string(),
552 });
553 result.add_error(VerifyError::DataInconsistency {
554 description: "Error 2".to_string(),
555 });
556
557 assert_eq!(result.error_count(), 2);
558 }
559
560 #[test]
561 fn test_warning_count() {
562 let mut result = VerifyResult::new();
563 assert_eq!(result.warning_count(), 0);
564
565 result.add_warning("Warning 1".to_string());
566 result.add_warning("Warning 2".to_string());
567
568 assert_eq!(result.warning_count(), 2);
569 }
570
571 #[test]
572 fn test_is_passed() {
573 let result = VerifyResult::new();
574 assert!(result.is_passed());
575
576 let mut failed_result = VerifyResult::new();
577 failed_result.add_error(VerifyError::DataInconsistency {
578 description: "Error".to_string(),
579 });
580 assert!(!failed_result.is_passed());
581 }
582
583 #[test]
584 fn test_verify_error_btree() {
585 let error = VerifyError::BtreeError {
586 db_name: "mydb".to_string(),
587 description: "Invalid child reference".to_string(),
588 };
589 let s = format!("{}", error);
590 assert!(s.contains("B-tree error"));
591 assert!(s.contains("mydb"));
592 assert!(s.contains("Invalid child reference"));
593 }
594
595 #[test]
596 fn test_verify_error_log() {
597 let error = VerifyError::LogError {
598 file_number: 42,
599 description: "Corrupted entry".to_string(),
600 };
601 let s = format!("{}", error);
602 assert!(s.contains("Log file"));
603 assert!(s.contains("0000002a.ndb"));
604 assert!(s.contains("Corrupted entry"));
605 }
606
607 #[test]
608 fn test_verify_error_data_inconsistency() {
609 let error = VerifyError::DataInconsistency {
610 description: "Mismatched LSN".to_string(),
611 };
612 let s = format!("{}", error);
613 assert!(s.contains("Data inconsistency"));
614 assert!(s.contains("Mismatched LSN"));
615 }
616
617 #[test]
618 fn test_verify_error_checksum() {
619 let error = VerifyError::ChecksumError {
620 location: "file 10, offset 1024".to_string(),
621 description: "CRC mismatch".to_string(),
622 };
623 let s = format!("{}", error);
624 assert!(s.contains("Checksum error"));
625 assert!(s.contains("file 10, offset 1024"));
626 assert!(s.contains("CRC mismatch"));
627 }
628
629 #[test]
630 fn test_verify_error_invalid_node_reference() {
631 let error = VerifyError::InvalidNodeReference {
632 node_id: 12345,
633 description: "Node not found".to_string(),
634 };
635 let s = format!("{}", error);
636 assert!(s.contains("Invalid node reference"));
637 assert!(s.contains("12345"));
638 assert!(s.contains("Node not found"));
639 }
640
641 #[test]
642 fn test_verify_error_metadata() {
643 let error = VerifyError::MetadataError {
644 db_name: "testdb".to_string(),
645 description: "Invalid format version".to_string(),
646 };
647 let s = format!("{}", error);
648 assert!(s.contains("Metadata error"));
649 assert!(s.contains("testdb"));
650 assert!(s.contains("Invalid format version"));
651 }
652
653 #[test]
654 fn test_verify_config_default() {
655 let config = VerifyConfig::default();
656 assert!(config.verify_btree);
657 assert!(config.verify_log);
658 assert!(config.verify_data_checksums);
659 assert!(!config.repair);
660 assert_eq!(config.max_errors, 100);
661 assert!(!config.verbose);
662 assert!(config.database_name.is_none());
663 }
664
665 #[test]
666 fn test_verify_config_new() {
667 let config = VerifyConfig::new();
668 assert_eq!(config, VerifyConfig::default());
669 }
670
671 #[test]
672 fn test_verify_config_builder() {
673 let config = VerifyConfig::new()
674 .with_btree_verification(false)
675 .with_log_verification(true)
676 .with_checksum_verification(false)
677 .with_repair(true)
678 .with_max_errors(50)
679 .with_verbose(true)
680 .for_database("mydb".to_string());
681
682 assert!(!config.verify_btree);
683 assert!(config.verify_log);
684 assert!(!config.verify_data_checksums);
685 assert!(config.repair);
686 assert_eq!(config.max_errors, 50);
687 assert!(config.verbose);
688 assert_eq!(config.database_name, Some("mydb".to_string()));
689 }
690
691 #[test]
692 fn test_verify_result_display_passed() {
693 let result = VerifyResult {
694 errors: Vec::new(),
695 warnings: Vec::new(),
696 databases_verified: 5,
697 records_verified: 1000,
698 passed: true,
699 };
700
701 let output = format!("{}", result);
702 assert!(output.contains("PASSED"));
703 assert!(output.contains("Databases verified: 5"));
704 assert!(output.contains("Records verified: 1000"));
705 assert!(output.contains("No errors or warnings"));
706 }
707
708 #[test]
709 fn test_verify_result_display_with_errors() {
710 let mut result = VerifyResult::new();
711 result.add_error(VerifyError::BtreeError {
712 db_name: "test".to_string(),
713 description: "Bad node".to_string(),
714 });
715 result.databases_verified = 2;
716 result.records_verified = 500;
717
718 let output = format!("{}", result);
719 assert!(output.contains("FAILED"));
720 assert!(output.contains("Errors (1)"));
721 assert!(output.contains("B-tree error"));
722 }
723
724 #[test]
725 fn test_verify_result_display_with_warnings() {
726 let mut result = VerifyResult::new();
727 result.add_warning("Low cache utilization".to_string());
728 result.databases_verified = 3;
729
730 let output = format!("{}", result);
731 assert!(output.contains("PASSED"));
732 assert!(output.contains("Warnings (1)"));
733 assert!(output.contains("Low cache utilization"));
734 }
735
736 #[test]
737 fn test_verify_result_clone() {
738 let mut result = VerifyResult::new();
739 result.add_error(VerifyError::DataInconsistency {
740 description: "Test".to_string(),
741 });
742
743 let cloned = result.clone();
744 assert_eq!(cloned.errors.len(), result.errors.len());
745 assert_eq!(cloned.passed, result.passed);
746 }
747
748 #[test]
749 fn test_verify_error_equality() {
750 let error1 = VerifyError::BtreeError {
751 db_name: "db1".to_string(),
752 description: "error".to_string(),
753 };
754 let error2 = VerifyError::BtreeError {
755 db_name: "db1".to_string(),
756 description: "error".to_string(),
757 };
758 let error3 = VerifyError::BtreeError {
759 db_name: "db2".to_string(),
760 description: "error".to_string(),
761 };
762
763 assert_eq!(error1, error2);
764 assert_ne!(error1, error3);
765 }
766
767 #[test]
768 fn test_verify_config_equality() {
769 let config1 = VerifyConfig::default();
770 let config2 = VerifyConfig::default();
771 let config3 = VerifyConfig::new().with_repair(true);
772
773 assert_eq!(config1, config2);
774 assert_ne!(config1, config3);
775 }
776
777 #[test]
781 fn test_verify_tree_empty() {
782 use noxu_dbi::{DatabaseConfig, DatabaseId, DatabaseImpl, DbType};
783 use noxu_sync::RwLock;
784 use std::sync::Arc;
785
786 let db_id = DatabaseId::new(1);
787 let config = DatabaseConfig::default();
788 let db_impl = DatabaseImpl::new(
789 db_id,
790 "verify_test".to_string(),
791 DbType::User,
792 &config,
793 );
794 let db = Arc::new(RwLock::new(db_impl));
795 let guard = db.read();
796 let cfg = VerifyConfig::default();
797
798 if let Some(t) = guard.get_real_tree() {
799 let result = verify_tree(&t, "verify_test", &cfg);
800 assert!(
801 result.passed,
802 "empty tree should pass: {:?}",
803 result.errors
804 );
805 assert_eq!(result.databases_verified, 1);
806 }
807 }
809
810 #[test]
815 fn test_verify_tree_populated() {
816 use noxu_dbi::{
817 CursorImpl, DatabaseConfig, DatabaseId, DatabaseImpl, DbType,
818 PutMode,
819 };
820 use noxu_log::{FileManager, LogManager};
821 use noxu_sync::RwLock;
822 use std::sync::Arc;
823 use tempfile::TempDir;
824
825 let dir = TempDir::new().unwrap();
826 let fm = Arc::new(
827 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
828 );
829 let lm =
830 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
831
832 let db_id = DatabaseId::new(2);
833 let config = DatabaseConfig::default();
834 let db_impl = DatabaseImpl::new(
835 db_id,
836 "pop_test".to_string(),
837 DbType::User,
838 &config,
839 );
840 let db = Arc::new(RwLock::new(db_impl));
841
842 {
843 let mut cursor = CursorImpl::with_log_manager(
844 Arc::clone(&db),
845 1,
846 Arc::clone(&lm),
847 );
848 cursor.put(b"alpha", b"1", PutMode::Overwrite).unwrap();
849 cursor.put(b"beta", b"2", PutMode::Overwrite).unwrap();
850 cursor.put(b"gamma", b"3", PutMode::Overwrite).unwrap();
851 }
852
853 let guard = db.read();
854 let cfg = VerifyConfig::default();
855
856 if let Some(t) = guard.get_real_tree() {
857 let result = verify_tree(&t, "pop_test", &cfg);
858 assert!(
859 result.passed,
860 "populated tree should pass: {:?}",
861 result.errors
862 );
863 assert_eq!(result.databases_verified, 1);
864 }
865 }
866
867 #[test]
875 fn test_verify_tree_detects_null_lsn() {
876 use noxu_dbi::{
877 CursorImpl, DatabaseConfig, DatabaseId, DatabaseImpl, DbType,
878 PutMode,
879 };
880 use noxu_log::{FileManager, LogManager};
881 use noxu_sync::RwLock;
882 use noxu_tree::tree::TreeNode;
883 use std::sync::Arc;
884 use tempfile::TempDir;
885
886 let dir = TempDir::new().unwrap();
887 let fm = Arc::new(
888 FileManager::new(dir.path(), false, 64 * 1024 * 1024, 100).unwrap(),
889 );
890 let lm =
891 Arc::new(LogManager::new(Arc::clone(&fm), 3, 1024 * 1024, 65536));
892
893 let db_id = DatabaseId::new(3);
894 let config = DatabaseConfig::default();
895 let db_impl = DatabaseImpl::new(
896 db_id,
897 "corrupt_test".to_string(),
898 DbType::User,
899 &config,
900 );
901 let db = Arc::new(RwLock::new(db_impl));
902
903 {
904 let mut cursor = CursorImpl::with_log_manager(
905 Arc::clone(&db),
906 1,
907 Arc::clone(&lm),
908 );
909 cursor.put(b"alpha", b"1", PutMode::Overwrite).unwrap();
910 cursor.put(b"beta", b"2", PutMode::Overwrite).unwrap();
911 cursor.put(b"gamma", b"3", PutMode::Overwrite).unwrap();
912 }
913
914 let guard = db.read();
915 let t = guard
916 .get_real_tree()
917 .expect("invariant: populated db has a real tree");
918
919 let corrupted = corrupt_first_bin_slot(&t, NULL_LSN);
921 assert!(corrupted, "test setup: expected at least one BIN slot");
922
923 let cfg = VerifyConfig::default();
924 let result = verify_tree(&t, "corrupt_test", &cfg);
925 assert!(
926 !result.passed,
927 "verifier must detect the NULL-LSN corruption, got passed=true"
928 );
929 assert!(
930 result.errors.iter().any(|e| matches!(
931 e,
932 VerifyError::BtreeError { description, .. }
933 if description.contains("NULL LSN")
934 )),
935 "expected a NULL-LSN BtreeError, got: {:?}",
936 result.errors
937 );
938
939 fn corrupt_first_bin_slot(
941 tree: &noxu_tree::Tree,
942 null_lsn: noxu_util::Lsn,
943 ) -> bool {
944 fn recurse(
945 node: &Arc<noxu_tree::NodeRwLock<TreeNode>>,
946 null_lsn: noxu_util::Lsn,
947 ) -> bool {
948 let mut guard = node.write();
949 match &mut *guard {
950 TreeNode::Bottom(bin) => {
951 if let Some(entry) = bin.entries.first_mut() {
952 entry.known_deleted = false;
953 entry.lsn = null_lsn;
954 return true;
955 }
956 false
957 }
958 TreeNode::Internal(in_node) => {
959 for e in &in_node.entries {
960 if let Some(child) = &e.child
961 && recurse(child, null_lsn)
962 {
963 return true;
964 }
965 }
966 false
967 }
968 }
969 }
970 match tree.get_root() {
971 Some(root) => recurse(&root, null_lsn),
972 None => false,
973 }
974 }
975 }
976}