1use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
5use noxu_tree::{KeyComparatorFn, Tree};
6use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use crate::dup_key_data;
10use crate::throughput_stats::ThroughputStats;
11
12use crate::{DatabaseConfig, DatabaseId, DbType};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum DeleteState {
17 NotDeleted,
18 DeletedCleanupInListHarvest,
19 DeletedCleanupLogHarvest,
20 Deleted,
21}
22
23const DUPS_ENABLED: u8 = 0x01;
25const TEMPORARY_BIT: u8 = 0x02;
26const IS_REPLICATED_BIT: u8 = 0x04;
27const NOT_REPLICATED_BIT: u8 = 0x08;
28const PREFIXING_ENABLED: u8 = 0x10;
29
30pub struct DatabaseImpl {
34 id: DatabaseId,
36 name: String,
38 db_type: DbType,
40 flags: u8,
42 delete_state: DeleteState,
44 dirty: AtomicBool,
46 max_tree_entries_per_node: i32,
48 reference_count: AtomicI64,
50 tree: Option<DatabaseTree>,
53 real_tree: Option<Arc<RwLock<Tree>>>,
62 deferred_write: bool,
68 entry_count: Arc<AtomicU64>,
77 pub throughput: Arc<ThroughputStats>,
83}
84
85#[derive(Debug)]
93pub struct DatabaseTree {
94 root_lsn: u64,
96}
97
98impl Default for DatabaseTree {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl DatabaseTree {
105 pub fn new() -> Self {
106 DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
107 }
108 pub fn get_root_lsn(&self) -> u64 {
109 self.root_lsn
110 }
111 pub fn set_root_lsn(&mut self, lsn: u64) {
112 self.root_lsn = lsn;
113 }
114}
115
116impl DatabaseImpl {
117 pub fn new(
119 id: DatabaseId,
120 name: String,
121 db_type: DbType,
122 config: &DatabaseConfig,
123 ) -> Self {
124 let mut flags = 0u8;
125 if config.sorted_duplicates {
126 flags |= DUPS_ENABLED;
127 }
128 if config.temporary {
129 flags |= TEMPORARY_BIT;
130 }
131 if config.key_prefixing {
132 flags |= PREFIXING_ENABLED;
133 }
134
135 let max_entries = config.node_max_entries as usize;
136 let real_tree = if config.sorted_duplicates {
137 let dup_cmp: KeyComparatorFn = Arc::new(|a: &[u8], b: &[u8]| {
141 dup_key_data::cmp_two_part_keys(
142 a,
143 b,
144 |x, y| x.cmp(y),
145 |x, y| x.cmp(y),
146 )
147 });
148 Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
149 } else {
150 Tree::new(id.id() as u64, max_entries)
151 };
152 let mut real_tree = real_tree;
158 real_tree.set_key_prefixing(config.key_prefixing);
159 DatabaseImpl {
160 id,
161 name,
162 db_type,
163 flags,
164 delete_state: DeleteState::NotDeleted,
165 dirty: AtomicBool::new(false),
166 max_tree_entries_per_node: config.node_max_entries,
167 reference_count: AtomicI64::new(0),
168 tree: Some(DatabaseTree::new()),
169 real_tree: Some(Arc::new(RwLock::new(real_tree))),
170 deferred_write: config.deferred_write,
171 entry_count: Arc::new(AtomicU64::new(0)),
172 throughput: ThroughputStats::new(),
173 }
174 }
175
176 pub fn get_id(&self) -> DatabaseId {
178 self.id
179 }
180 pub fn get_name(&self) -> &str {
181 &self.name
182 }
183 pub fn get_db_type(&self) -> DbType {
184 self.db_type
185 }
186
187 pub fn is_deferred_write(&self) -> bool {
191 self.deferred_write
192 }
193
194 pub fn get_sorted_duplicates(&self) -> bool {
196 self.flags & DUPS_ENABLED != 0
197 }
198
199 pub fn is_ln_immediately_obsolete(&self) -> bool {
209 self.get_sorted_duplicates()
210 }
213 pub fn is_temporary(&self) -> bool {
214 self.flags & TEMPORARY_BIT != 0
215 }
216 pub fn get_key_prefixing(&self) -> bool {
217 self.flags & PREFIXING_ENABLED != 0
218 }
219 pub fn is_replicated(&self) -> bool {
220 self.flags & IS_REPLICATED_BIT != 0
221 }
222
223 pub fn is_deleted(&self) -> bool {
225 self.delete_state == DeleteState::Deleted
226 }
227 pub fn is_deleting(&self) -> bool {
228 self.delete_state != DeleteState::NotDeleted
229 }
230 pub fn start_delete(&mut self) {
231 self.delete_state = DeleteState::DeletedCleanupInListHarvest;
232 }
233 pub fn finish_delete(&mut self) {
234 self.delete_state = DeleteState::Deleted;
235 }
236
237 pub fn is_dirty(&self) -> bool {
239 self.dirty.load(Ordering::Relaxed)
240 }
241 pub fn set_dirty(&self) {
242 self.dirty.store(true, Ordering::Relaxed);
243 }
244 pub fn clear_dirty(&self) {
245 self.dirty.store(false, Ordering::Relaxed);
246 }
247
248 pub fn increment_reference_count(&self) {
250 self.reference_count.fetch_add(1, Ordering::Relaxed);
251 }
252 pub fn decrement_reference_count(&self) {
253 self.reference_count.fetch_sub(1, Ordering::Relaxed);
254 }
255 pub fn reference_count(&self) -> i64 {
256 self.reference_count.load(Ordering::Relaxed)
257 }
258
259 pub fn entry_count(&self) -> u64 {
264 self.entry_count.load(Ordering::Relaxed)
265 }
266
267 pub fn increment_entry_count(&self) {
269 self.entry_count.fetch_add(1, Ordering::Relaxed);
270 }
271
272 pub fn decrement_entry_count(&self) {
274 loop {
276 let cur = self.entry_count.load(Ordering::Relaxed);
277 if cur == 0 {
278 break;
279 }
280 if self
281 .entry_count
282 .compare_exchange_weak(
283 cur,
284 cur - 1,
285 Ordering::Relaxed,
286 Ordering::Relaxed,
287 )
288 .is_ok()
289 {
290 break;
291 }
292 }
293 }
294
295 pub fn get_tree(&self) -> Option<&DatabaseTree> {
297 self.tree.as_ref()
298 }
299 pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
300 self.tree.as_mut()
301 }
302
303 pub fn get_real_tree(
317 &self,
318 ) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
319 self.real_tree.as_ref()?.read().ok()
320 }
321
322 pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
325 self.real_tree.clone()
326 }
327
328 pub fn update_key_expiration(
334 &self,
335 key: &[u8],
336 expiration_hours: u32,
337 ) -> bool {
338 self.real_tree
339 .as_ref()
340 .and_then(|arc| arc.read().ok())
341 .map(|t| t.update_key_expiration(key, expiration_hours))
342 .unwrap_or(false)
343 }
344
345 pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
353 self.real_tree
354 .as_ref()
355 .and_then(|arc| arc.read().ok())
356 .map(|t| t.collect_stats())
357 }
358
359 pub fn set_recovered_tree(&mut self, mut tree: Tree) {
364 let count = tree.count_entries();
367 self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
368 if let Some(ref current_arc) = self.real_tree
371 && let Ok(mut current) = current_arc.write()
372 && let Some(cmp) = current.take_comparator()
373 {
374 tree.set_comparator(cmp);
375 }
376 tree.set_key_prefixing(self.flags & PREFIXING_ENABLED != 0);
383 self.real_tree = Some(Arc::new(RwLock::new(tree)));
384 }
385
386 pub fn set_memory_counter(
393 &mut self,
394 counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
395 ) {
396 if let Some(tree_arc) = self.real_tree.as_ref()
397 && let Ok(mut tree) = tree_arc.write()
398 {
399 tree.set_memory_counter(counter);
400 }
401 }
402
403 pub fn max_tree_entries_per_node(&self) -> i32 {
405 self.max_tree_entries_per_node
406 }
407
408 pub fn log_size(&self) -> usize {
411 8 + 4 + self.name.len() + 1 + 4 + 8 }
417
418 pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
419 buf.write_i64::<BigEndian>(self.id.id())?;
420 buf.write_u32::<BigEndian>(self.name.len() as u32)?;
421 buf.extend_from_slice(self.name.as_bytes());
422 buf.write_u8(self.flags)?;
423 buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
424 let root_lsn = self
425 .tree
426 .as_ref()
427 .map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
428 buf.write_u64::<BigEndian>(root_lsn)?;
429 Ok(())
430 }
431
432 pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
433 fn type_for_db_name(name: &str) -> DbType {
435 match name {
436 "_jeIdMap" | "_noxuIdMap" => DbType::Id,
437 "_jeNameMap" | "_noxuNameMap" => DbType::Name,
438 "_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
439 _ => DbType::User,
440 }
441 }
442 use std::io::Cursor;
443
444 let mut cursor = Cursor::new(buf);
445 let id = cursor.read_i64::<BigEndian>()?;
446 let name_len = cursor.read_u32::<BigEndian>()? as usize;
447
448 let name_start = cursor.position() as usize;
450 let name_end = name_start + name_len;
451 if name_end > buf.len() {
452 return Err(std::io::Error::new(
453 std::io::ErrorKind::UnexpectedEof,
454 "Buffer too short for name",
455 ));
456 }
457 let name = String::from_utf8(buf[name_start..name_end].to_vec())
458 .map_err(|e| {
459 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
460 })?;
461 cursor.set_position(name_end as u64);
462
463 let flags = cursor.read_u8()?;
464 let max_entries = cursor.read_i32::<BigEndian>()?;
465 let root_lsn = cursor.read_u64::<BigEndian>()?;
466
467 let db_type = type_for_db_name(&name);
468
469 let mut tree = DatabaseTree::new();
470 tree.root_lsn = root_lsn;
471
472 let real_tree = Tree::new(id as u64, max_entries as usize);
473 Ok(DatabaseImpl {
474 id: DatabaseId::new(id),
475 name,
476 db_type,
477 flags,
478 delete_state: DeleteState::NotDeleted,
479 dirty: AtomicBool::new(false),
480 max_tree_entries_per_node: max_entries,
481 reference_count: AtomicI64::new(0),
482 tree: Some(tree),
483 real_tree: Some(Arc::new(RwLock::new(real_tree))),
484 deferred_write: false, entry_count: Arc::new(AtomicU64::new(0)),
486 throughput: ThroughputStats::new(),
487 })
488 }
489}
490
491impl std::fmt::Debug for DatabaseImpl {
492 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
493 f.debug_struct("DatabaseImpl")
494 .field("id", &self.id)
495 .field("name", &self.name)
496 .field("db_type", &self.db_type)
497 .field("flags", &self.flags)
498 .field("delete_state", &self.delete_state)
499 .finish()
500 }
501}
502
503#[cfg(test)]
504#[expect(clippy::field_reassign_with_default)]
505mod tests {
506 use super::*;
507
508 fn make_config() -> DatabaseConfig {
509 DatabaseConfig::default()
510 }
511
512 #[test]
513 fn test_new_database() {
514 let config = make_config();
515 let db = DatabaseImpl::new(
516 DatabaseId::new(100),
517 "test_db".to_string(),
518 DbType::User,
519 &config,
520 );
521
522 assert_eq!(db.get_id(), DatabaseId::new(100));
523 assert_eq!(db.get_name(), "test_db");
524 assert_eq!(db.get_db_type(), DbType::User);
525 assert!(!db.is_deleted());
526 assert!(!db.is_deleting());
527 assert_eq!(db.reference_count(), 0);
528 }
529
530 #[test]
531 fn test_sorted_duplicates_flag() {
532 let mut config = DatabaseConfig::default();
533 config.sorted_duplicates = false;
534 let db1 = DatabaseImpl::new(
535 DatabaseId::new(1),
536 "db1".to_string(),
537 DbType::User,
538 &config,
539 );
540 assert!(!db1.get_sorted_duplicates());
541
542 config.sorted_duplicates = true;
543 let db2 = DatabaseImpl::new(
544 DatabaseId::new(2),
545 "db2".to_string(),
546 DbType::User,
547 &config,
548 );
549 assert!(db2.get_sorted_duplicates());
550 }
551
552 #[test]
553 fn test_temporary_flag() {
554 let mut config = DatabaseConfig::default();
555 config.temporary = false;
556 let db1 = DatabaseImpl::new(
557 DatabaseId::new(1),
558 "db1".to_string(),
559 DbType::User,
560 &config,
561 );
562 assert!(!db1.is_temporary());
563
564 config.temporary = true;
565 let db2 = DatabaseImpl::new(
566 DatabaseId::new(2),
567 "db2".to_string(),
568 DbType::User,
569 &config,
570 );
571 assert!(db2.is_temporary());
572 }
573
574 #[test]
575 fn test_key_prefixing_flag() {
576 let mut config = DatabaseConfig::default();
577 config.key_prefixing = false;
578 let db1 = DatabaseImpl::new(
579 DatabaseId::new(1),
580 "db1".to_string(),
581 DbType::User,
582 &config,
583 );
584 assert!(!db1.get_key_prefixing());
585
586 config.key_prefixing = true;
587 let db2 = DatabaseImpl::new(
588 DatabaseId::new(2),
589 "db2".to_string(),
590 DbType::User,
591 &config,
592 );
593 assert!(db2.get_key_prefixing());
594 }
595
596 #[test]
597 fn test_set_recovered_tree_preserves_key_prefixing() {
598 let mut config = DatabaseConfig::default();
604 config.key_prefixing = true;
605 let mut db = DatabaseImpl::new(
606 DatabaseId::new(7),
607 "kp_recover".to_string(),
608 DbType::User,
609 &config,
610 );
611 let recovered = Tree::new(7, 256);
613 assert!(!recovered.key_prefixing, "recovered tree starts false");
614 db.set_recovered_tree(recovered);
615 let t = db.get_real_tree_arc().expect("real tree");
617 assert!(
618 t.read().unwrap().key_prefixing,
619 "GAP-5: set_recovered_tree must preserve key_prefixing=true"
620 );
621 }
622
623 #[test]
624 fn test_delete_state_transitions() {
625 let config = make_config();
626 let mut db = DatabaseImpl::new(
627 DatabaseId::new(1),
628 "db".to_string(),
629 DbType::User,
630 &config,
631 );
632
633 assert!(!db.is_deleted());
634 assert!(!db.is_deleting());
635
636 db.start_delete();
637 assert!(!db.is_deleted());
638 assert!(db.is_deleting());
639
640 db.finish_delete();
641 assert!(db.is_deleted());
642 assert!(db.is_deleting());
643 }
644
645 #[test]
646 fn test_dirty_tracking() {
647 let config = make_config();
648 let db = DatabaseImpl::new(
649 DatabaseId::new(1),
650 "db".to_string(),
651 DbType::User,
652 &config,
653 );
654
655 assert!(!db.is_dirty());
656
657 db.set_dirty();
658 assert!(db.is_dirty());
659
660 db.clear_dirty();
661 assert!(!db.is_dirty());
662 }
663
664 #[test]
665 fn test_reference_counting() {
666 let config = make_config();
667 let db = DatabaseImpl::new(
668 DatabaseId::new(1),
669 "db".to_string(),
670 DbType::User,
671 &config,
672 );
673
674 assert_eq!(db.reference_count(), 0);
675
676 db.increment_reference_count();
677 assert_eq!(db.reference_count(), 1);
678
679 db.increment_reference_count();
680 assert_eq!(db.reference_count(), 2);
681
682 db.decrement_reference_count();
683 assert_eq!(db.reference_count(), 1);
684
685 db.decrement_reference_count();
686 assert_eq!(db.reference_count(), 0);
687 }
688
689 #[test]
690 fn test_serialization_round_trip() {
691 let mut config = DatabaseConfig::default();
692 config.sorted_duplicates = true;
693 config.key_prefixing = true;
694 config.node_max_entries = 256;
695
696 let db = DatabaseImpl::new(
697 DatabaseId::new(42),
698 "my_database".to_string(),
699 DbType::User,
700 &config,
701 );
702
703 let mut buf = Vec::new();
704 db.write_to_log(&mut buf).unwrap();
705
706 let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
707
708 assert_eq!(db2.get_id(), DatabaseId::new(42));
709 assert_eq!(db2.get_name(), "my_database");
710 assert!(db2.get_sorted_duplicates());
711 assert!(db2.get_key_prefixing());
712 assert_eq!(db2.max_tree_entries_per_node(), 256);
713 }
714
715 #[test]
716 fn test_tree_access() {
717 let config = make_config();
718 let mut db = DatabaseImpl::new(
719 DatabaseId::new(1),
720 "db".to_string(),
721 DbType::User,
722 &config,
723 );
724
725 {
727 let tree = db.get_tree().unwrap();
728 assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
729 }
730
731 {
733 let tree = db.get_tree_mut().unwrap();
734 tree.set_root_lsn(12345);
735 }
736
737 {
739 let tree = db.get_tree().unwrap();
740 assert_eq!(tree.get_root_lsn(), 12345);
741 }
742 }
743
744 #[test]
745 fn test_log_size() {
746 let config = make_config();
747 let db = DatabaseImpl::new(
748 DatabaseId::new(1),
749 "test".to_string(),
750 DbType::User,
751 &config,
752 );
753
754 let expected_size = 8 + 4 + 4 + 1 + 4 + 8; assert_eq!(db.log_size(), expected_size);
756 }
757}