1use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
5use noxu_tree::{KeyComparatorFn, Tree};
6use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use crate::dup_key_data;
10use crate::throughput_stats::ThroughputStats;
11
12use crate::{DatabaseConfig, DatabaseId, DbType};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum DeleteState {
17 NotDeleted,
18 DeletedCleanupInListHarvest,
19 DeletedCleanupLogHarvest,
20 Deleted,
21}
22
23const DUPS_ENABLED: u8 = 0x01;
25const TEMPORARY_BIT: u8 = 0x02;
26const IS_REPLICATED_BIT: u8 = 0x04;
27const NOT_REPLICATED_BIT: u8 = 0x08;
28const PREFIXING_ENABLED: u8 = 0x10;
29
30pub struct DatabaseImpl {
34 id: DatabaseId,
36 name: String,
38 db_type: DbType,
40 flags: u8,
42 delete_state: DeleteState,
44 dirty: AtomicBool,
46 max_tree_entries_per_node: i32,
48 reference_count: AtomicI64,
50 tree: Option<DatabaseTree>,
53 real_tree: Option<Arc<RwLock<Tree>>>,
62 deferred_write: bool,
68 entry_count: Arc<AtomicU64>,
77 pub throughput: Arc<ThroughputStats>,
83 btree_comparator_id: Option<String>,
91 duplicate_comparator_id: Option<String>,
95 triggers: Vec<Arc<dyn crate::trigger::Trigger>>,
104}
105
106#[derive(Debug)]
114pub struct DatabaseTree {
115 root_lsn: u64,
117}
118
119impl Default for DatabaseTree {
120 fn default() -> Self {
121 Self::new()
122 }
123}
124
125impl DatabaseTree {
126 pub fn new() -> Self {
127 DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
128 }
129 pub fn get_root_lsn(&self) -> u64 {
130 self.root_lsn
131 }
132 pub fn set_root_lsn(&mut self, lsn: u64) {
133 self.root_lsn = lsn;
134 }
135}
136
137impl DatabaseImpl {
138 pub fn new(
140 id: DatabaseId,
141 name: String,
142 db_type: DbType,
143 config: &DatabaseConfig,
144 ) -> Self {
145 let mut flags = 0u8;
146 if config.sorted_duplicates {
147 flags |= DUPS_ENABLED;
148 }
149 if config.temporary {
150 flags |= TEMPORARY_BIT;
151 }
152 if config.key_prefixing {
153 flags |= PREFIXING_ENABLED;
154 }
155
156 let max_entries = config.node_max_entries as usize;
157 let btree_comparator_id =
158 config.btree_comparator.as_ref().map(|c| c.identity.clone());
159 let duplicate_comparator_id =
160 config.duplicate_comparator.as_ref().map(|c| c.identity.clone());
161 let real_tree = Self::build_tree(id, max_entries, config);
162 let mut real_tree = real_tree;
168 real_tree.set_key_prefixing(config.key_prefixing);
169 DatabaseImpl {
170 id,
171 name,
172 db_type,
173 flags,
174 delete_state: DeleteState::NotDeleted,
175 dirty: AtomicBool::new(false),
176 max_tree_entries_per_node: config.node_max_entries,
177 reference_count: AtomicI64::new(0),
178 tree: Some(DatabaseTree::new()),
179 real_tree: Some(Arc::new(RwLock::new(real_tree))),
180 deferred_write: config.deferred_write,
181 entry_count: Arc::new(AtomicU64::new(0)),
182 throughput: ThroughputStats::new(),
183 btree_comparator_id,
184 duplicate_comparator_id,
185 triggers: config.triggers.clone(),
186 }
187 }
188
189 fn build_tree(
206 id: DatabaseId,
207 max_entries: usize,
208 config: &DatabaseConfig,
209 ) -> Tree {
210 let btree_fn: Option<KeyComparatorFn> =
211 config.btree_comparator.as_ref().map(|c| c.func.clone());
212 if config.sorted_duplicates {
213 let dup_fn: Option<KeyComparatorFn> =
214 config.duplicate_comparator.as_ref().map(|c| c.func.clone());
215 let dup_cmp: KeyComparatorFn =
216 Arc::new(move |a: &[u8], b: &[u8]| {
217 dup_key_data::cmp_two_part_keys(
218 a,
219 b,
220 |x, y| match &btree_fn {
221 Some(f) => f(x, y),
222 None => x.cmp(y),
223 },
224 |x, y| match &dup_fn {
225 Some(f) => f(x, y),
226 None => x.cmp(y),
227 },
228 )
229 });
230 Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
231 } else if let Some(btree_fn) = btree_fn {
232 Tree::new_with_comparator(id.id() as u64, max_entries, btree_fn)
233 } else {
234 Tree::new(id.id() as u64, max_entries)
235 }
236 }
237
238 pub fn get_id(&self) -> DatabaseId {
240 self.id
241 }
242 pub fn get_name(&self) -> &str {
243 &self.name
244 }
245 pub fn get_db_type(&self) -> DbType {
246 self.db_type
247 }
248
249 pub fn is_deferred_write(&self) -> bool {
253 self.deferred_write
254 }
255
256 pub fn get_sorted_duplicates(&self) -> bool {
258 self.flags & DUPS_ENABLED != 0
259 }
260
261 pub fn btree_comparator_id(&self) -> Option<&str> {
265 self.btree_comparator_id.as_deref()
266 }
267
268 pub fn duplicate_comparator_id(&self) -> Option<&str> {
272 self.duplicate_comparator_id.as_deref()
273 }
274
275 pub fn triggers(&self) -> &[Arc<dyn crate::trigger::Trigger>] {
279 &self.triggers
280 }
281
282 pub fn has_user_triggers(&self) -> bool {
287 !self.triggers.is_empty()
288 }
289
290 pub fn is_ln_immediately_obsolete(&self) -> bool {
300 self.get_sorted_duplicates()
301 }
304 pub fn is_temporary(&self) -> bool {
305 self.flags & TEMPORARY_BIT != 0
306 }
307 pub fn get_key_prefixing(&self) -> bool {
308 self.flags & PREFIXING_ENABLED != 0
309 }
310 pub fn is_replicated(&self) -> bool {
311 self.flags & IS_REPLICATED_BIT != 0
312 }
313
314 pub fn is_deleted(&self) -> bool {
316 self.delete_state == DeleteState::Deleted
317 }
318 pub fn is_deleting(&self) -> bool {
319 self.delete_state != DeleteState::NotDeleted
320 }
321 pub fn start_delete(&mut self) {
322 self.delete_state = DeleteState::DeletedCleanupInListHarvest;
323 }
324 pub fn finish_delete(&mut self) {
325 self.delete_state = DeleteState::Deleted;
326 }
327
328 pub fn is_dirty(&self) -> bool {
330 self.dirty.load(Ordering::Relaxed)
331 }
332 pub fn set_dirty(&self) {
333 self.dirty.store(true, Ordering::Relaxed);
334 }
335 pub fn clear_dirty(&self) {
336 self.dirty.store(false, Ordering::Relaxed);
337 }
338
339 pub fn increment_reference_count(&self) {
341 self.reference_count.fetch_add(1, Ordering::Relaxed);
342 }
343 pub fn decrement_reference_count(&self) {
344 self.reference_count.fetch_sub(1, Ordering::Relaxed);
345 }
346 pub fn reference_count(&self) -> i64 {
347 self.reference_count.load(Ordering::Relaxed)
348 }
349
350 pub fn entry_count(&self) -> u64 {
355 self.entry_count.load(Ordering::Relaxed)
356 }
357
358 pub fn increment_entry_count(&self) {
360 self.entry_count.fetch_add(1, Ordering::Relaxed);
361 }
362
363 pub fn decrement_entry_count(&self) {
365 loop {
367 let cur = self.entry_count.load(Ordering::Relaxed);
368 if cur == 0 {
369 break;
370 }
371 if self
372 .entry_count
373 .compare_exchange_weak(
374 cur,
375 cur - 1,
376 Ordering::Relaxed,
377 Ordering::Relaxed,
378 )
379 .is_ok()
380 {
381 break;
382 }
383 }
384 }
385
386 pub fn get_tree(&self) -> Option<&DatabaseTree> {
388 self.tree.as_ref()
389 }
390 pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
391 self.tree.as_mut()
392 }
393
394 pub fn get_real_tree(
408 &self,
409 ) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
410 self.real_tree.as_ref()?.read().ok()
411 }
412
413 pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
416 self.real_tree.clone()
417 }
418
419 pub fn update_key_expiration(
425 &self,
426 key: &[u8],
427 expiration_hours: u32,
428 ) -> bool {
429 self.real_tree
430 .as_ref()
431 .and_then(|arc| arc.read().ok())
432 .map(|t| t.update_key_expiration(key, expiration_hours))
433 .unwrap_or(false)
434 }
435
436 pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
444 self.real_tree
445 .as_ref()
446 .and_then(|arc| arc.read().ok())
447 .map(|t| t.collect_stats())
448 }
449
450 pub fn set_recovered_tree(&mut self, mut tree: Tree) {
455 let count = tree.count_entries();
458 self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
459 let mut had_comparator = false;
462 if let Some(ref current_arc) = self.real_tree
463 && let Ok(mut current) = current_arc.write()
464 && let Some(cmp) = current.take_comparator()
465 {
466 tree.set_comparator(cmp);
467 had_comparator = true;
468 }
469 tree.set_key_prefixing(self.flags & PREFIXING_ENABLED != 0);
476 if had_comparator {
483 tree.resort_under_comparator();
484 }
485 self.real_tree = Some(Arc::new(RwLock::new(tree)));
486 }
487
488 pub fn set_memory_counter(
495 &mut self,
496 counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
497 ) {
498 if let Some(tree_arc) = self.real_tree.as_ref()
499 && let Ok(mut tree) = tree_arc.write()
500 {
501 tree.set_memory_counter(counter);
502 }
503 }
504
505 pub fn set_tree_compact_max_key_length(&mut self, len: i32) {
509 if let Some(tree_arc) = self.real_tree.as_ref()
510 && let Ok(mut tree) = tree_arc.write()
511 {
512 tree.set_compact_max_key_length(len);
513 }
514 }
515
516 pub fn max_tree_entries_per_node(&self) -> i32 {
518 self.max_tree_entries_per_node
519 }
520
521 pub fn log_size(&self) -> usize {
524 8 + 4 + self.name.len() + 1 + 4 + 8 }
530
531 pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
532 buf.write_i64::<BigEndian>(self.id.id())?;
533 buf.write_u32::<BigEndian>(self.name.len() as u32)?;
534 buf.extend_from_slice(self.name.as_bytes());
535 buf.write_u8(self.flags)?;
536 buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
537 let root_lsn = self
538 .tree
539 .as_ref()
540 .map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
541 buf.write_u64::<BigEndian>(root_lsn)?;
542 Ok(())
543 }
544
545 pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
546 fn type_for_db_name(name: &str) -> DbType {
548 match name {
549 "_jeIdMap" | "_noxuIdMap" => DbType::Id,
550 "_jeNameMap" | "_noxuNameMap" => DbType::Name,
551 "_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
552 _ => DbType::User,
553 }
554 }
555 use std::io::Cursor;
556
557 let mut cursor = Cursor::new(buf);
558 let id = cursor.read_i64::<BigEndian>()?;
559 let name_len = cursor.read_u32::<BigEndian>()? as usize;
560
561 let name_start = cursor.position() as usize;
563 let name_end = name_start + name_len;
564 if name_end > buf.len() {
565 return Err(std::io::Error::new(
566 std::io::ErrorKind::UnexpectedEof,
567 "Buffer too short for name",
568 ));
569 }
570 let name = String::from_utf8(buf[name_start..name_end].to_vec())
571 .map_err(|e| {
572 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
573 })?;
574 cursor.set_position(name_end as u64);
575
576 let flags = cursor.read_u8()?;
577 let max_entries = cursor.read_i32::<BigEndian>()?;
578 let root_lsn = cursor.read_u64::<BigEndian>()?;
579
580 let db_type = type_for_db_name(&name);
581
582 let mut tree = DatabaseTree::new();
583 tree.root_lsn = root_lsn;
584
585 let real_tree = Tree::new(id as u64, max_entries as usize);
586 Ok(DatabaseImpl {
587 id: DatabaseId::new(id),
588 name,
589 db_type,
590 flags,
591 delete_state: DeleteState::NotDeleted,
592 dirty: AtomicBool::new(false),
593 max_tree_entries_per_node: max_entries,
594 reference_count: AtomicI64::new(0),
595 tree: Some(tree),
596 real_tree: Some(Arc::new(RwLock::new(real_tree))),
597 deferred_write: false, entry_count: Arc::new(AtomicU64::new(0)),
599 throughput: ThroughputStats::new(),
600 btree_comparator_id: None,
601 duplicate_comparator_id: None,
602 triggers: Vec::new(),
606 })
607 }
608}
609
610impl std::fmt::Debug for DatabaseImpl {
611 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
612 f.debug_struct("DatabaseImpl")
613 .field("id", &self.id)
614 .field("name", &self.name)
615 .field("db_type", &self.db_type)
616 .field("flags", &self.flags)
617 .field("delete_state", &self.delete_state)
618 .finish()
619 }
620}
621
622#[cfg(test)]
623#[expect(clippy::field_reassign_with_default)]
624mod tests {
625 use super::*;
626
627 fn make_config() -> DatabaseConfig {
628 DatabaseConfig::default()
629 }
630
631 #[test]
632 fn test_new_database() {
633 let config = make_config();
634 let db = DatabaseImpl::new(
635 DatabaseId::new(100),
636 "test_db".to_string(),
637 DbType::User,
638 &config,
639 );
640
641 assert_eq!(db.get_id(), DatabaseId::new(100));
642 assert_eq!(db.get_name(), "test_db");
643 assert_eq!(db.get_db_type(), DbType::User);
644 assert!(!db.is_deleted());
645 assert!(!db.is_deleting());
646 assert_eq!(db.reference_count(), 0);
647 }
648
649 #[test]
650 fn test_sorted_duplicates_flag() {
651 let mut config = DatabaseConfig::default();
652 config.sorted_duplicates = false;
653 let db1 = DatabaseImpl::new(
654 DatabaseId::new(1),
655 "db1".to_string(),
656 DbType::User,
657 &config,
658 );
659 assert!(!db1.get_sorted_duplicates());
660
661 config.sorted_duplicates = true;
662 let db2 = DatabaseImpl::new(
663 DatabaseId::new(2),
664 "db2".to_string(),
665 DbType::User,
666 &config,
667 );
668 assert!(db2.get_sorted_duplicates());
669 }
670
671 #[test]
672 fn test_temporary_flag() {
673 let mut config = DatabaseConfig::default();
674 config.temporary = false;
675 let db1 = DatabaseImpl::new(
676 DatabaseId::new(1),
677 "db1".to_string(),
678 DbType::User,
679 &config,
680 );
681 assert!(!db1.is_temporary());
682
683 config.temporary = true;
684 let db2 = DatabaseImpl::new(
685 DatabaseId::new(2),
686 "db2".to_string(),
687 DbType::User,
688 &config,
689 );
690 assert!(db2.is_temporary());
691 }
692
693 #[test]
694 fn test_key_prefixing_flag() {
695 let mut config = DatabaseConfig::default();
696 config.key_prefixing = false;
697 let db1 = DatabaseImpl::new(
698 DatabaseId::new(1),
699 "db1".to_string(),
700 DbType::User,
701 &config,
702 );
703 assert!(!db1.get_key_prefixing());
704
705 config.key_prefixing = true;
706 let db2 = DatabaseImpl::new(
707 DatabaseId::new(2),
708 "db2".to_string(),
709 DbType::User,
710 &config,
711 );
712 assert!(db2.get_key_prefixing());
713 }
714
715 #[test]
716 fn test_set_recovered_tree_preserves_key_prefixing() {
717 let mut config = DatabaseConfig::default();
723 config.key_prefixing = true;
724 let mut db = DatabaseImpl::new(
725 DatabaseId::new(7),
726 "kp_recover".to_string(),
727 DbType::User,
728 &config,
729 );
730 let recovered = Tree::new(7, 256);
732 assert!(!recovered.key_prefixing, "recovered tree starts false");
733 db.set_recovered_tree(recovered);
734 let t = db.get_real_tree_arc().expect("real tree");
736 assert!(
737 t.read().unwrap().key_prefixing,
738 "GAP-5: set_recovered_tree must preserve key_prefixing=true"
739 );
740 }
741
742 #[test]
743 fn test_delete_state_transitions() {
744 let config = make_config();
745 let mut db = DatabaseImpl::new(
746 DatabaseId::new(1),
747 "db".to_string(),
748 DbType::User,
749 &config,
750 );
751
752 assert!(!db.is_deleted());
753 assert!(!db.is_deleting());
754
755 db.start_delete();
756 assert!(!db.is_deleted());
757 assert!(db.is_deleting());
758
759 db.finish_delete();
760 assert!(db.is_deleted());
761 assert!(db.is_deleting());
762 }
763
764 #[test]
765 fn test_dirty_tracking() {
766 let config = make_config();
767 let db = DatabaseImpl::new(
768 DatabaseId::new(1),
769 "db".to_string(),
770 DbType::User,
771 &config,
772 );
773
774 assert!(!db.is_dirty());
775
776 db.set_dirty();
777 assert!(db.is_dirty());
778
779 db.clear_dirty();
780 assert!(!db.is_dirty());
781 }
782
783 #[test]
784 fn test_reference_counting() {
785 let config = make_config();
786 let db = DatabaseImpl::new(
787 DatabaseId::new(1),
788 "db".to_string(),
789 DbType::User,
790 &config,
791 );
792
793 assert_eq!(db.reference_count(), 0);
794
795 db.increment_reference_count();
796 assert_eq!(db.reference_count(), 1);
797
798 db.increment_reference_count();
799 assert_eq!(db.reference_count(), 2);
800
801 db.decrement_reference_count();
802 assert_eq!(db.reference_count(), 1);
803
804 db.decrement_reference_count();
805 assert_eq!(db.reference_count(), 0);
806 }
807
808 #[test]
809 fn test_serialization_round_trip() {
810 let mut config = DatabaseConfig::default();
811 config.sorted_duplicates = true;
812 config.key_prefixing = true;
813 config.node_max_entries = 256;
814
815 let db = DatabaseImpl::new(
816 DatabaseId::new(42),
817 "my_database".to_string(),
818 DbType::User,
819 &config,
820 );
821
822 let mut buf = Vec::new();
823 db.write_to_log(&mut buf).unwrap();
824
825 let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
826
827 assert_eq!(db2.get_id(), DatabaseId::new(42));
828 assert_eq!(db2.get_name(), "my_database");
829 assert!(db2.get_sorted_duplicates());
830 assert!(db2.get_key_prefixing());
831 assert_eq!(db2.max_tree_entries_per_node(), 256);
832 }
833
834 #[test]
835 fn test_tree_access() {
836 let config = make_config();
837 let mut db = DatabaseImpl::new(
838 DatabaseId::new(1),
839 "db".to_string(),
840 DbType::User,
841 &config,
842 );
843
844 {
846 let tree = db.get_tree().unwrap();
847 assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
848 }
849
850 {
852 let tree = db.get_tree_mut().unwrap();
853 tree.set_root_lsn(12345);
854 }
855
856 {
858 let tree = db.get_tree().unwrap();
859 assert_eq!(tree.get_root_lsn(), 12345);
860 }
861 }
862
863 #[test]
864 fn test_log_size() {
865 let config = make_config();
866 let db = DatabaseImpl::new(
867 DatabaseId::new(1),
868 "test".to_string(),
869 DbType::User,
870 &config,
871 );
872
873 let expected_size = 8 + 4 + 4 + 1 + 4 + 8; assert_eq!(db.log_size(), expected_size);
875 }
876}