1use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
5use noxu_tree::{KeyComparatorFn, Tree};
6use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use crate::dup_key_data;
10use crate::throughput_stats::ThroughputStats;
11
12use crate::{DatabaseConfig, DatabaseId, DbType};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum DeleteState {
17 NotDeleted,
18 DeletedCleanupInListHarvest,
19 DeletedCleanupLogHarvest,
20 Deleted,
21}
22
23const DUPS_ENABLED: u8 = 0x01;
25const TEMPORARY_BIT: u8 = 0x02;
26const IS_REPLICATED_BIT: u8 = 0x04;
27const NOT_REPLICATED_BIT: u8 = 0x08;
28const PREFIXING_ENABLED: u8 = 0x10;
29
30pub struct DatabaseImpl {
34 id: DatabaseId,
36 name: String,
38 db_type: DbType,
40 flags: u8,
42 delete_state: DeleteState,
44 dirty: AtomicBool,
46 max_tree_entries_per_node: i32,
48 reference_count: AtomicI64,
50 tree: Option<DatabaseTree>,
53 real_tree: Option<Arc<RwLock<Tree>>>,
62 deferred_write: bool,
68 entry_count: Arc<AtomicU64>,
77 pub throughput: Arc<ThroughputStats>,
83}
84
85#[derive(Debug)]
93pub struct DatabaseTree {
94 root_lsn: u64,
96}
97
98impl Default for DatabaseTree {
99 fn default() -> Self {
100 Self::new()
101 }
102}
103
104impl DatabaseTree {
105 pub fn new() -> Self {
106 DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
107 }
108 pub fn get_root_lsn(&self) -> u64 {
109 self.root_lsn
110 }
111 pub fn set_root_lsn(&mut self, lsn: u64) {
112 self.root_lsn = lsn;
113 }
114}
115
116impl DatabaseImpl {
117 pub fn new(
119 id: DatabaseId,
120 name: String,
121 db_type: DbType,
122 config: &DatabaseConfig,
123 ) -> Self {
124 let mut flags = 0u8;
125 if config.sorted_duplicates {
126 flags |= DUPS_ENABLED;
127 }
128 if config.temporary {
129 flags |= TEMPORARY_BIT;
130 }
131 if config.key_prefixing {
132 flags |= PREFIXING_ENABLED;
133 }
134
135 let max_entries = config.node_max_entries as usize;
136 let real_tree = if config.sorted_duplicates {
137 let dup_cmp: KeyComparatorFn = Arc::new(|a: &[u8], b: &[u8]| {
141 dup_key_data::cmp_two_part_keys(
142 a,
143 b,
144 |x, y| x.cmp(y),
145 |x, y| x.cmp(y),
146 )
147 });
148 Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
149 } else {
150 Tree::new(id.id() as u64, max_entries)
151 };
152 DatabaseImpl {
153 id,
154 name,
155 db_type,
156 flags,
157 delete_state: DeleteState::NotDeleted,
158 dirty: AtomicBool::new(false),
159 max_tree_entries_per_node: config.node_max_entries,
160 reference_count: AtomicI64::new(0),
161 tree: Some(DatabaseTree::new()),
162 real_tree: Some(Arc::new(RwLock::new(real_tree))),
163 deferred_write: config.deferred_write,
164 entry_count: Arc::new(AtomicU64::new(0)),
165 throughput: ThroughputStats::new(),
166 }
167 }
168
169 pub fn get_id(&self) -> DatabaseId {
171 self.id
172 }
173 pub fn get_name(&self) -> &str {
174 &self.name
175 }
176 pub fn get_db_type(&self) -> DbType {
177 self.db_type
178 }
179
180 pub fn is_deferred_write(&self) -> bool {
184 self.deferred_write
185 }
186
187 pub fn get_sorted_duplicates(&self) -> bool {
189 self.flags & DUPS_ENABLED != 0
190 }
191 pub fn is_temporary(&self) -> bool {
192 self.flags & TEMPORARY_BIT != 0
193 }
194 pub fn get_key_prefixing(&self) -> bool {
195 self.flags & PREFIXING_ENABLED != 0
196 }
197 pub fn is_replicated(&self) -> bool {
198 self.flags & IS_REPLICATED_BIT != 0
199 }
200
201 pub fn is_deleted(&self) -> bool {
203 self.delete_state == DeleteState::Deleted
204 }
205 pub fn is_deleting(&self) -> bool {
206 self.delete_state != DeleteState::NotDeleted
207 }
208 pub fn start_delete(&mut self) {
209 self.delete_state = DeleteState::DeletedCleanupInListHarvest;
210 }
211 pub fn finish_delete(&mut self) {
212 self.delete_state = DeleteState::Deleted;
213 }
214
215 pub fn is_dirty(&self) -> bool {
217 self.dirty.load(Ordering::Relaxed)
218 }
219 pub fn set_dirty(&self) {
220 self.dirty.store(true, Ordering::Relaxed);
221 }
222 pub fn clear_dirty(&self) {
223 self.dirty.store(false, Ordering::Relaxed);
224 }
225
226 pub fn increment_reference_count(&self) {
228 self.reference_count.fetch_add(1, Ordering::Relaxed);
229 }
230 pub fn decrement_reference_count(&self) {
231 self.reference_count.fetch_sub(1, Ordering::Relaxed);
232 }
233 pub fn reference_count(&self) -> i64 {
234 self.reference_count.load(Ordering::Relaxed)
235 }
236
237 pub fn entry_count(&self) -> u64 {
242 self.entry_count.load(Ordering::Relaxed)
243 }
244
245 pub fn increment_entry_count(&self) {
247 self.entry_count.fetch_add(1, Ordering::Relaxed);
248 }
249
250 pub fn decrement_entry_count(&self) {
252 loop {
254 let cur = self.entry_count.load(Ordering::Relaxed);
255 if cur == 0 {
256 break;
257 }
258 if self
259 .entry_count
260 .compare_exchange_weak(
261 cur,
262 cur - 1,
263 Ordering::Relaxed,
264 Ordering::Relaxed,
265 )
266 .is_ok()
267 {
268 break;
269 }
270 }
271 }
272
273 pub fn get_tree(&self) -> Option<&DatabaseTree> {
275 self.tree.as_ref()
276 }
277 pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
278 self.tree.as_mut()
279 }
280
281 pub fn get_real_tree(
295 &self,
296 ) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
297 self.real_tree.as_ref()?.read().ok()
298 }
299
300 pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
303 self.real_tree.clone()
304 }
305
306 pub fn update_key_expiration(
312 &self,
313 key: &[u8],
314 expiration_hours: u32,
315 ) -> bool {
316 self.real_tree
317 .as_ref()
318 .and_then(|arc| arc.read().ok())
319 .map(|t| t.update_key_expiration(key, expiration_hours))
320 .unwrap_or(false)
321 }
322
323 pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
331 self.real_tree
332 .as_ref()
333 .and_then(|arc| arc.read().ok())
334 .map(|t| t.collect_stats())
335 }
336
337 pub fn set_recovered_tree(&mut self, mut tree: Tree) {
342 let count = tree.count_entries();
345 self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
346 if let Some(ref current_arc) = self.real_tree
349 && let Ok(mut current) = current_arc.write()
350 && let Some(cmp) = current.take_comparator()
351 {
352 tree.set_comparator(cmp);
353 }
354 self.real_tree = Some(Arc::new(RwLock::new(tree)));
355 }
356
357 pub fn set_memory_counter(
364 &mut self,
365 counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
366 ) {
367 if let Some(tree_arc) = self.real_tree.as_ref()
368 && let Ok(mut tree) = tree_arc.write()
369 {
370 tree.set_memory_counter(counter);
371 }
372 }
373
374 pub fn max_tree_entries_per_node(&self) -> i32 {
376 self.max_tree_entries_per_node
377 }
378
379 pub fn log_size(&self) -> usize {
382 8 + 4 + self.name.len() + 1 + 4 + 8 }
388
389 pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
390 buf.write_i64::<BigEndian>(self.id.id())?;
391 buf.write_u32::<BigEndian>(self.name.len() as u32)?;
392 buf.extend_from_slice(self.name.as_bytes());
393 buf.write_u8(self.flags)?;
394 buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
395 let root_lsn = self
396 .tree
397 .as_ref()
398 .map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
399 buf.write_u64::<BigEndian>(root_lsn)?;
400 Ok(())
401 }
402
403 pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
404 fn type_for_db_name(name: &str) -> DbType {
406 match name {
407 "_jeIdMap" | "_noxuIdMap" => DbType::Id,
408 "_jeNameMap" | "_noxuNameMap" => DbType::Name,
409 "_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
410 _ => DbType::User,
411 }
412 }
413 use std::io::Cursor;
414
415 let mut cursor = Cursor::new(buf);
416 let id = cursor.read_i64::<BigEndian>()?;
417 let name_len = cursor.read_u32::<BigEndian>()? as usize;
418
419 let name_start = cursor.position() as usize;
421 let name_end = name_start + name_len;
422 if name_end > buf.len() {
423 return Err(std::io::Error::new(
424 std::io::ErrorKind::UnexpectedEof,
425 "Buffer too short for name",
426 ));
427 }
428 let name = String::from_utf8(buf[name_start..name_end].to_vec())
429 .map_err(|e| {
430 std::io::Error::new(std::io::ErrorKind::InvalidData, e)
431 })?;
432 cursor.set_position(name_end as u64);
433
434 let flags = cursor.read_u8()?;
435 let max_entries = cursor.read_i32::<BigEndian>()?;
436 let root_lsn = cursor.read_u64::<BigEndian>()?;
437
438 let db_type = type_for_db_name(&name);
439
440 let mut tree = DatabaseTree::new();
441 tree.root_lsn = root_lsn;
442
443 let real_tree = Tree::new(id as u64, max_entries as usize);
444 Ok(DatabaseImpl {
445 id: DatabaseId::new(id),
446 name,
447 db_type,
448 flags,
449 delete_state: DeleteState::NotDeleted,
450 dirty: AtomicBool::new(false),
451 max_tree_entries_per_node: max_entries,
452 reference_count: AtomicI64::new(0),
453 tree: Some(tree),
454 real_tree: Some(Arc::new(RwLock::new(real_tree))),
455 deferred_write: false, entry_count: Arc::new(AtomicU64::new(0)),
457 throughput: ThroughputStats::new(),
458 })
459 }
460}
461
462impl std::fmt::Debug for DatabaseImpl {
463 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464 f.debug_struct("DatabaseImpl")
465 .field("id", &self.id)
466 .field("name", &self.name)
467 .field("db_type", &self.db_type)
468 .field("flags", &self.flags)
469 .field("delete_state", &self.delete_state)
470 .finish()
471 }
472}
473
474#[cfg(test)]
475#[expect(clippy::field_reassign_with_default)]
476mod tests {
477 use super::*;
478
479 fn make_config() -> DatabaseConfig {
480 DatabaseConfig::default()
481 }
482
483 #[test]
484 fn test_new_database() {
485 let config = make_config();
486 let db = DatabaseImpl::new(
487 DatabaseId::new(100),
488 "test_db".to_string(),
489 DbType::User,
490 &config,
491 );
492
493 assert_eq!(db.get_id(), DatabaseId::new(100));
494 assert_eq!(db.get_name(), "test_db");
495 assert_eq!(db.get_db_type(), DbType::User);
496 assert!(!db.is_deleted());
497 assert!(!db.is_deleting());
498 assert_eq!(db.reference_count(), 0);
499 }
500
501 #[test]
502 fn test_sorted_duplicates_flag() {
503 let mut config = DatabaseConfig::default();
504 config.sorted_duplicates = false;
505 let db1 = DatabaseImpl::new(
506 DatabaseId::new(1),
507 "db1".to_string(),
508 DbType::User,
509 &config,
510 );
511 assert!(!db1.get_sorted_duplicates());
512
513 config.sorted_duplicates = true;
514 let db2 = DatabaseImpl::new(
515 DatabaseId::new(2),
516 "db2".to_string(),
517 DbType::User,
518 &config,
519 );
520 assert!(db2.get_sorted_duplicates());
521 }
522
523 #[test]
524 fn test_temporary_flag() {
525 let mut config = DatabaseConfig::default();
526 config.temporary = false;
527 let db1 = DatabaseImpl::new(
528 DatabaseId::new(1),
529 "db1".to_string(),
530 DbType::User,
531 &config,
532 );
533 assert!(!db1.is_temporary());
534
535 config.temporary = true;
536 let db2 = DatabaseImpl::new(
537 DatabaseId::new(2),
538 "db2".to_string(),
539 DbType::User,
540 &config,
541 );
542 assert!(db2.is_temporary());
543 }
544
545 #[test]
546 fn test_key_prefixing_flag() {
547 let mut config = DatabaseConfig::default();
548 config.key_prefixing = false;
549 let db1 = DatabaseImpl::new(
550 DatabaseId::new(1),
551 "db1".to_string(),
552 DbType::User,
553 &config,
554 );
555 assert!(!db1.get_key_prefixing());
556
557 config.key_prefixing = true;
558 let db2 = DatabaseImpl::new(
559 DatabaseId::new(2),
560 "db2".to_string(),
561 DbType::User,
562 &config,
563 );
564 assert!(db2.get_key_prefixing());
565 }
566
567 #[test]
568 fn test_delete_state_transitions() {
569 let config = make_config();
570 let mut db = DatabaseImpl::new(
571 DatabaseId::new(1),
572 "db".to_string(),
573 DbType::User,
574 &config,
575 );
576
577 assert!(!db.is_deleted());
578 assert!(!db.is_deleting());
579
580 db.start_delete();
581 assert!(!db.is_deleted());
582 assert!(db.is_deleting());
583
584 db.finish_delete();
585 assert!(db.is_deleted());
586 assert!(db.is_deleting());
587 }
588
589 #[test]
590 fn test_dirty_tracking() {
591 let config = make_config();
592 let db = DatabaseImpl::new(
593 DatabaseId::new(1),
594 "db".to_string(),
595 DbType::User,
596 &config,
597 );
598
599 assert!(!db.is_dirty());
600
601 db.set_dirty();
602 assert!(db.is_dirty());
603
604 db.clear_dirty();
605 assert!(!db.is_dirty());
606 }
607
608 #[test]
609 fn test_reference_counting() {
610 let config = make_config();
611 let db = DatabaseImpl::new(
612 DatabaseId::new(1),
613 "db".to_string(),
614 DbType::User,
615 &config,
616 );
617
618 assert_eq!(db.reference_count(), 0);
619
620 db.increment_reference_count();
621 assert_eq!(db.reference_count(), 1);
622
623 db.increment_reference_count();
624 assert_eq!(db.reference_count(), 2);
625
626 db.decrement_reference_count();
627 assert_eq!(db.reference_count(), 1);
628
629 db.decrement_reference_count();
630 assert_eq!(db.reference_count(), 0);
631 }
632
633 #[test]
634 fn test_serialization_round_trip() {
635 let mut config = DatabaseConfig::default();
636 config.sorted_duplicates = true;
637 config.key_prefixing = true;
638 config.node_max_entries = 256;
639
640 let db = DatabaseImpl::new(
641 DatabaseId::new(42),
642 "my_database".to_string(),
643 DbType::User,
644 &config,
645 );
646
647 let mut buf = Vec::new();
648 db.write_to_log(&mut buf).unwrap();
649
650 let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
651
652 assert_eq!(db2.get_id(), DatabaseId::new(42));
653 assert_eq!(db2.get_name(), "my_database");
654 assert!(db2.get_sorted_duplicates());
655 assert!(db2.get_key_prefixing());
656 assert_eq!(db2.max_tree_entries_per_node(), 256);
657 }
658
659 #[test]
660 fn test_tree_access() {
661 let config = make_config();
662 let mut db = DatabaseImpl::new(
663 DatabaseId::new(1),
664 "db".to_string(),
665 DbType::User,
666 &config,
667 );
668
669 {
671 let tree = db.get_tree().unwrap();
672 assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
673 }
674
675 {
677 let tree = db.get_tree_mut().unwrap();
678 tree.set_root_lsn(12345);
679 }
680
681 {
683 let tree = db.get_tree().unwrap();
684 assert_eq!(tree.get_root_lsn(), 12345);
685 }
686 }
687
688 #[test]
689 fn test_log_size() {
690 let config = make_config();
691 let db = DatabaseImpl::new(
692 DatabaseId::new(1),
693 "test".to_string(),
694 DbType::User,
695 &config,
696 );
697
698 let expected_size = 8 + 4 + 4 + 1 + 4 + 8; assert_eq!(db.log_size(), expected_size);
700 }
701}