1use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
5use noxu_tree::{KeyComparatorFn, Tree};
6use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use crate::dup_key_data;
10use crate::throughput_stats::ThroughputStats;
11
12use crate::{DatabaseConfig, DatabaseId, DbType};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum DeleteState {
17 NotDeleted,
18 DeletedCleanupInListHarvest,
19 DeletedCleanupLogHarvest,
20 Deleted,
21}
22
23const DUPS_ENABLED: u8 = 0x01;
25const TEMPORARY_BIT: u8 = 0x02;
26const IS_REPLICATED_BIT: u8 = 0x04;
27const NOT_REPLICATED_BIT: u8 = 0x08;
28const PREFIXING_ENABLED: u8 = 0x10;
29
30pub struct DatabaseImpl {
34 id: DatabaseId,
36 name: String,
38 db_type: DbType,
40 flags: u8,
42 delete_state: DeleteState,
44 dirty: AtomicBool,
46 max_tree_entries_per_node: i32,
48 reference_count: AtomicI64,
50 tree: Option<DatabaseTree>,
53 real_tree: Option<Arc<RwLock<Tree>>>,
62 deferred_write: bool,
68 entry_count: Arc<AtomicU64>,
77 pub throughput: Arc<ThroughputStats>,
83 btree_comparator_id: Option<String>,
91 duplicate_comparator_id: Option<String>,
95}
96
97#[derive(Debug)]
105pub struct DatabaseTree {
106 root_lsn: u64,
108}
109
110impl Default for DatabaseTree {
111 fn default() -> Self {
112 Self::new()
113 }
114}
115
116impl DatabaseTree {
117 pub fn new() -> Self {
118 DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
119 }
120 pub fn get_root_lsn(&self) -> u64 {
121 self.root_lsn
122 }
123 pub fn set_root_lsn(&mut self, lsn: u64) {
124 self.root_lsn = lsn;
125 }
126}
127
128impl DatabaseImpl {
129 pub fn new(
131 id: DatabaseId,
132 name: String,
133 db_type: DbType,
134 config: &DatabaseConfig,
135 ) -> Self {
136 let mut flags = 0u8;
137 if config.sorted_duplicates {
138 flags |= DUPS_ENABLED;
139 }
140 if config.temporary {
141 flags |= TEMPORARY_BIT;
142 }
143 if config.key_prefixing {
144 flags |= PREFIXING_ENABLED;
145 }
146
147 let max_entries = config.node_max_entries as usize;
148 let btree_comparator_id =
149 config.btree_comparator.as_ref().map(|c| c.identity.clone());
150 let duplicate_comparator_id =
151 config.duplicate_comparator.as_ref().map(|c| c.identity.clone());
152 let real_tree = Self::build_tree(id, max_entries, config);
153 let mut real_tree = real_tree;
159 real_tree.set_key_prefixing(config.key_prefixing);
160 DatabaseImpl {
161 id,
162 name,
163 db_type,
164 flags,
165 delete_state: DeleteState::NotDeleted,
166 dirty: AtomicBool::new(false),
167 max_tree_entries_per_node: config.node_max_entries,
168 reference_count: AtomicI64::new(0),
169 tree: Some(DatabaseTree::new()),
170 real_tree: Some(Arc::new(RwLock::new(real_tree))),
171 deferred_write: config.deferred_write,
172 entry_count: Arc::new(AtomicU64::new(0)),
173 throughput: ThroughputStats::new(),
174 btree_comparator_id,
175 duplicate_comparator_id,
176 }
177 }
178
179 fn build_tree(
196 id: DatabaseId,
197 max_entries: usize,
198 config: &DatabaseConfig,
199 ) -> Tree {
200 let btree_fn: Option<KeyComparatorFn> =
201 config.btree_comparator.as_ref().map(|c| c.func.clone());
202 if config.sorted_duplicates {
203 let dup_fn: Option<KeyComparatorFn> =
204 config.duplicate_comparator.as_ref().map(|c| c.func.clone());
205 let dup_cmp: KeyComparatorFn =
206 Arc::new(move |a: &[u8], b: &[u8]| {
207 dup_key_data::cmp_two_part_keys(
208 a,
209 b,
210 |x, y| match &btree_fn {
211 Some(f) => f(x, y),
212 None => x.cmp(y),
213 },
214 |x, y| match &dup_fn {
215 Some(f) => f(x, y),
216 None => x.cmp(y),
217 },
218 )
219 });
220 Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
221 } else if let Some(btree_fn) = btree_fn {
222 Tree::new_with_comparator(id.id() as u64, max_entries, btree_fn)
223 } else {
224 Tree::new(id.id() as u64, max_entries)
225 }
226 }
227
228 pub fn get_id(&self) -> DatabaseId {
230 self.id
231 }
232 pub fn get_name(&self) -> &str {
233 &self.name
234 }
235 pub fn get_db_type(&self) -> DbType {
236 self.db_type
237 }
238
239 pub fn is_deferred_write(&self) -> bool {
243 self.deferred_write
244 }
245
246 pub fn get_sorted_duplicates(&self) -> bool {
248 self.flags & DUPS_ENABLED != 0
249 }
250
251 pub fn btree_comparator_id(&self) -> Option<&str> {
255 self.btree_comparator_id.as_deref()
256 }
257
258 pub fn duplicate_comparator_id(&self) -> Option<&str> {
262 self.duplicate_comparator_id.as_deref()
263 }
264
265 pub fn is_ln_immediately_obsolete(&self) -> bool {
275 self.get_sorted_duplicates()
276 }
279 pub fn is_temporary(&self) -> bool {
280 self.flags & TEMPORARY_BIT != 0
281 }
282 pub fn get_key_prefixing(&self) -> bool {
283 self.flags & PREFIXING_ENABLED != 0
284 }
285 pub fn is_replicated(&self) -> bool {
286 self.flags & IS_REPLICATED_BIT != 0
287 }
288
289 pub fn is_deleted(&self) -> bool {
291 self.delete_state == DeleteState::Deleted
292 }
293 pub fn is_deleting(&self) -> bool {
294 self.delete_state != DeleteState::NotDeleted
295 }
296 pub fn start_delete(&mut self) {
297 self.delete_state = DeleteState::DeletedCleanupInListHarvest;
298 }
299 pub fn finish_delete(&mut self) {
300 self.delete_state = DeleteState::Deleted;
301 }
302
303 pub fn is_dirty(&self) -> bool {
305 self.dirty.load(Ordering::Relaxed)
306 }
307 pub fn set_dirty(&self) {
308 self.dirty.store(true, Ordering::Relaxed);
309 }
310 pub fn clear_dirty(&self) {
311 self.dirty.store(false, Ordering::Relaxed);
312 }
313
314 pub fn increment_reference_count(&self) {
316 self.reference_count.fetch_add(1, Ordering::Relaxed);
317 }
318 pub fn decrement_reference_count(&self) {
319 self.reference_count.fetch_sub(1, Ordering::Relaxed);
320 }
321 pub fn reference_count(&self) -> i64 {
322 self.reference_count.load(Ordering::Relaxed)
323 }
324
325 pub fn entry_count(&self) -> u64 {
330 self.entry_count.load(Ordering::Relaxed)
331 }
332
333 pub fn increment_entry_count(&self) {
335 self.entry_count.fetch_add(1, Ordering::Relaxed);
336 }
337
338 pub fn decrement_entry_count(&self) {
340 loop {
342 let cur = self.entry_count.load(Ordering::Relaxed);
343 if cur == 0 {
344 break;
345 }
346 if self
347 .entry_count
348 .compare_exchange_weak(
349 cur,
350 cur - 1,
351 Ordering::Relaxed,
352 Ordering::Relaxed,
353 )
354 .is_ok()
355 {
356 break;
357 }
358 }
359 }
360
361 pub fn get_tree(&self) -> Option<&DatabaseTree> {
363 self.tree.as_ref()
364 }
365 pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
366 self.tree.as_mut()
367 }
368
369 pub fn get_real_tree(
383 &self,
384 ) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
385 self.real_tree.as_ref()?.read().ok()
386 }
387
388 pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
391 self.real_tree.clone()
392 }
393
394 pub fn update_key_expiration(
400 &self,
401 key: &[u8],
402 expiration_hours: u32,
403 ) -> bool {
404 self.real_tree
405 .as_ref()
406 .and_then(|arc| arc.read().ok())
407 .map(|t| t.update_key_expiration(key, expiration_hours))
408 .unwrap_or(false)
409 }
410
411 pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
419 self.real_tree
420 .as_ref()
421 .and_then(|arc| arc.read().ok())
422 .map(|t| t.collect_stats())
423 }
424
425 pub fn set_recovered_tree(&mut self, mut tree: Tree) {
430 let count = tree.count_entries();
433 self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
434 let mut had_comparator = false;
437 if let Some(ref current_arc) = self.real_tree
438 && let Ok(mut current) = current_arc.write()
439 && let Some(cmp) = current.take_comparator()
440 {
441 tree.set_comparator(cmp);
442 had_comparator = true;
443 }
444 tree.set_key_prefixing(self.flags & PREFIXING_ENABLED != 0);
451 if had_comparator {
458 tree.resort_under_comparator();
459 }
460 self.real_tree = Some(Arc::new(RwLock::new(tree)));
461 }
462
463 pub fn set_memory_counter(
470 &mut self,
471 counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
472 ) {
473 if let Some(tree_arc) = self.real_tree.as_ref()
474 && let Ok(mut tree) = tree_arc.write()
475 {
476 tree.set_memory_counter(counter);
477 }
478 }
479
480 pub fn set_tree_compact_max_key_length(&mut self, len: i32) {
484 if let Some(tree_arc) = self.real_tree.as_ref()
485 && let Ok(mut tree) = tree_arc.write()
486 {
487 tree.set_compact_max_key_length(len);
488 }
489 }
490
491 pub fn max_tree_entries_per_node(&self) -> i32 {
493 self.max_tree_entries_per_node
494 }
495
496 pub fn log_size(&self) -> usize {
499 8 + 4 + self.name.len() + 1 + 4 + 8 }
505
506 pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
507 buf.write_i64::<BigEndian>(self.id.id())?;
508 buf.write_u32::<BigEndian>(self.name.len() as u32)?;
509 buf.extend_from_slice(self.name.as_bytes());
510 buf.write_u8(self.flags)?;
511 buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
512 let root_lsn = self
513 .tree
514 .as_ref()
515 .map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
516 buf.write_u64::<BigEndian>(root_lsn)?;
517 Ok(())
518 }
519
520 pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
521 fn type_for_db_name(name: &str) -> DbType {
523 match name {
524 "_jeIdMap" | "_noxuIdMap" => DbType::Id,
525 "_jeNameMap" | "_noxuNameMap" => DbType::Name,
526 "_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
527 _ => DbType::User,
528 }
529 }
530 use std::io::Cursor;
531
532 let mut cursor = Cursor::new(buf);
533 let id = cursor.read_i64::<BigEndian>()?;
534 let name_len = cursor.read_u32::<BigEndian>()? as usize;
535
536 let name_start = cursor.position() as usize;
538 let name_end = name_start + name_len;
539 if name_end > buf.len() {
540 return Err(std::io::Error::new(
541 std::io::ErrorKind::UnexpectedEof,
542 "Buffer too short for name",
543 ));
544 }
545 let name = String::from_utf8(buf[name_start..name_end].to_vec())
546 .map_err(|e| {
547 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
548 })?;
549 cursor.set_position(name_end as u64);
550
551 let flags = cursor.read_u8()?;
552 let max_entries = cursor.read_i32::<BigEndian>()?;
553 let root_lsn = cursor.read_u64::<BigEndian>()?;
554
555 let db_type = type_for_db_name(&name);
556
557 let mut tree = DatabaseTree::new();
558 tree.root_lsn = root_lsn;
559
560 let real_tree = Tree::new(id as u64, max_entries as usize);
561 Ok(DatabaseImpl {
562 id: DatabaseId::new(id),
563 name,
564 db_type,
565 flags,
566 delete_state: DeleteState::NotDeleted,
567 dirty: AtomicBool::new(false),
568 max_tree_entries_per_node: max_entries,
569 reference_count: AtomicI64::new(0),
570 tree: Some(tree),
571 real_tree: Some(Arc::new(RwLock::new(real_tree))),
572 deferred_write: false, entry_count: Arc::new(AtomicU64::new(0)),
574 throughput: ThroughputStats::new(),
575 btree_comparator_id: None,
576 duplicate_comparator_id: None,
577 })
578 }
579}
580
581impl std::fmt::Debug for DatabaseImpl {
582 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
583 f.debug_struct("DatabaseImpl")
584 .field("id", &self.id)
585 .field("name", &self.name)
586 .field("db_type", &self.db_type)
587 .field("flags", &self.flags)
588 .field("delete_state", &self.delete_state)
589 .finish()
590 }
591}
592
593#[cfg(test)]
594#[expect(clippy::field_reassign_with_default)]
595mod tests {
596 use super::*;
597
598 fn make_config() -> DatabaseConfig {
599 DatabaseConfig::default()
600 }
601
602 #[test]
603 fn test_new_database() {
604 let config = make_config();
605 let db = DatabaseImpl::new(
606 DatabaseId::new(100),
607 "test_db".to_string(),
608 DbType::User,
609 &config,
610 );
611
612 assert_eq!(db.get_id(), DatabaseId::new(100));
613 assert_eq!(db.get_name(), "test_db");
614 assert_eq!(db.get_db_type(), DbType::User);
615 assert!(!db.is_deleted());
616 assert!(!db.is_deleting());
617 assert_eq!(db.reference_count(), 0);
618 }
619
620 #[test]
621 fn test_sorted_duplicates_flag() {
622 let mut config = DatabaseConfig::default();
623 config.sorted_duplicates = false;
624 let db1 = DatabaseImpl::new(
625 DatabaseId::new(1),
626 "db1".to_string(),
627 DbType::User,
628 &config,
629 );
630 assert!(!db1.get_sorted_duplicates());
631
632 config.sorted_duplicates = true;
633 let db2 = DatabaseImpl::new(
634 DatabaseId::new(2),
635 "db2".to_string(),
636 DbType::User,
637 &config,
638 );
639 assert!(db2.get_sorted_duplicates());
640 }
641
642 #[test]
643 fn test_temporary_flag() {
644 let mut config = DatabaseConfig::default();
645 config.temporary = false;
646 let db1 = DatabaseImpl::new(
647 DatabaseId::new(1),
648 "db1".to_string(),
649 DbType::User,
650 &config,
651 );
652 assert!(!db1.is_temporary());
653
654 config.temporary = true;
655 let db2 = DatabaseImpl::new(
656 DatabaseId::new(2),
657 "db2".to_string(),
658 DbType::User,
659 &config,
660 );
661 assert!(db2.is_temporary());
662 }
663
664 #[test]
665 fn test_key_prefixing_flag() {
666 let mut config = DatabaseConfig::default();
667 config.key_prefixing = false;
668 let db1 = DatabaseImpl::new(
669 DatabaseId::new(1),
670 "db1".to_string(),
671 DbType::User,
672 &config,
673 );
674 assert!(!db1.get_key_prefixing());
675
676 config.key_prefixing = true;
677 let db2 = DatabaseImpl::new(
678 DatabaseId::new(2),
679 "db2".to_string(),
680 DbType::User,
681 &config,
682 );
683 assert!(db2.get_key_prefixing());
684 }
685
686 #[test]
687 fn test_set_recovered_tree_preserves_key_prefixing() {
688 let mut config = DatabaseConfig::default();
694 config.key_prefixing = true;
695 let mut db = DatabaseImpl::new(
696 DatabaseId::new(7),
697 "kp_recover".to_string(),
698 DbType::User,
699 &config,
700 );
701 let recovered = Tree::new(7, 256);
703 assert!(!recovered.key_prefixing, "recovered tree starts false");
704 db.set_recovered_tree(recovered);
705 let t = db.get_real_tree_arc().expect("real tree");
707 assert!(
708 t.read().unwrap().key_prefixing,
709 "GAP-5: set_recovered_tree must preserve key_prefixing=true"
710 );
711 }
712
713 #[test]
714 fn test_delete_state_transitions() {
715 let config = make_config();
716 let mut db = DatabaseImpl::new(
717 DatabaseId::new(1),
718 "db".to_string(),
719 DbType::User,
720 &config,
721 );
722
723 assert!(!db.is_deleted());
724 assert!(!db.is_deleting());
725
726 db.start_delete();
727 assert!(!db.is_deleted());
728 assert!(db.is_deleting());
729
730 db.finish_delete();
731 assert!(db.is_deleted());
732 assert!(db.is_deleting());
733 }
734
735 #[test]
736 fn test_dirty_tracking() {
737 let config = make_config();
738 let db = DatabaseImpl::new(
739 DatabaseId::new(1),
740 "db".to_string(),
741 DbType::User,
742 &config,
743 );
744
745 assert!(!db.is_dirty());
746
747 db.set_dirty();
748 assert!(db.is_dirty());
749
750 db.clear_dirty();
751 assert!(!db.is_dirty());
752 }
753
754 #[test]
755 fn test_reference_counting() {
756 let config = make_config();
757 let db = DatabaseImpl::new(
758 DatabaseId::new(1),
759 "db".to_string(),
760 DbType::User,
761 &config,
762 );
763
764 assert_eq!(db.reference_count(), 0);
765
766 db.increment_reference_count();
767 assert_eq!(db.reference_count(), 1);
768
769 db.increment_reference_count();
770 assert_eq!(db.reference_count(), 2);
771
772 db.decrement_reference_count();
773 assert_eq!(db.reference_count(), 1);
774
775 db.decrement_reference_count();
776 assert_eq!(db.reference_count(), 0);
777 }
778
779 #[test]
780 fn test_serialization_round_trip() {
781 let mut config = DatabaseConfig::default();
782 config.sorted_duplicates = true;
783 config.key_prefixing = true;
784 config.node_max_entries = 256;
785
786 let db = DatabaseImpl::new(
787 DatabaseId::new(42),
788 "my_database".to_string(),
789 DbType::User,
790 &config,
791 );
792
793 let mut buf = Vec::new();
794 db.write_to_log(&mut buf).unwrap();
795
796 let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
797
798 assert_eq!(db2.get_id(), DatabaseId::new(42));
799 assert_eq!(db2.get_name(), "my_database");
800 assert!(db2.get_sorted_duplicates());
801 assert!(db2.get_key_prefixing());
802 assert_eq!(db2.max_tree_entries_per_node(), 256);
803 }
804
805 #[test]
806 fn test_tree_access() {
807 let config = make_config();
808 let mut db = DatabaseImpl::new(
809 DatabaseId::new(1),
810 "db".to_string(),
811 DbType::User,
812 &config,
813 );
814
815 {
817 let tree = db.get_tree().unwrap();
818 assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
819 }
820
821 {
823 let tree = db.get_tree_mut().unwrap();
824 tree.set_root_lsn(12345);
825 }
826
827 {
829 let tree = db.get_tree().unwrap();
830 assert_eq!(tree.get_root_lsn(), 12345);
831 }
832 }
833
834 #[test]
835 fn test_log_size() {
836 let config = make_config();
837 let db = DatabaseImpl::new(
838 DatabaseId::new(1),
839 "test".to_string(),
840 DbType::User,
841 &config,
842 );
843
844 let expected_size = 8 + 4 + 4 + 1 + 4 + 8; assert_eq!(db.log_size(), expected_size);
846 }
847}