Skip to main content

noxu_dbi/
database_impl.rs

1//! Internal database implementation.
2//!
3
4use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
5use noxu_tree::{KeyComparatorFn, Tree};
6use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
7use std::sync::{Arc, RwLock};
8
9use crate::dup_key_data;
10use crate::throughput_stats::ThroughputStats;
11
12use crate::{DatabaseConfig, DatabaseId, DbType};
13
14/// Deletion processing states.
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
16enum DeleteState {
17    NotDeleted,
18    DeletedCleanupInListHarvest,
19    DeletedCleanupLogHarvest,
20    Deleted,
21}
22
23/// Flag bits for persistent database properties.
24const DUPS_ENABLED: u8 = 0x01;
25const TEMPORARY_BIT: u8 = 0x02;
26const IS_REPLICATED_BIT: u8 = 0x04;
27const NOT_REPLICATED_BIT: u8 = 0x08;
28const PREFIXING_ENABLED: u8 = 0x10;
29
30/// The underlying object for a given database.
31///
32///
33pub struct DatabaseImpl {
34    /// Unique database ID.
35    id: DatabaseId,
36    /// Database name (user databases) or internal type name.
37    name: String,
38    /// Database type.
39    db_type: DbType,
40    /// Persistent flag bits.
41    flags: u8,
42    /// Delete processing state.
43    delete_state: DeleteState,
44    /// Whether this database is dirty (needs to be written to log).
45    dirty: AtomicBool,
46    /// Maximum number of entries in a B-tree node.
47    max_tree_entries_per_node: i32,
48    /// Number of open database handles (user handles referencing this db).
49    reference_count: AtomicI64,
50    /// Persistent B-tree root metadata (root LSN, serialized with the database
51    /// record in the ID database).  Populated from the log during recovery.
52    tree: Option<DatabaseTree>,
53    /// The in-memory B+tree backing cursor traversal (search, insert, delete).
54    ///
55    /// `None` only for read-only or freshly created databases before the first
56    /// write; otherwise always `Some`.  Populated either from recovery via
57    /// `set_recovered_tree()` or lazily on first write.
58    /// Wrapped in `Arc<RwLock<Tree>>` so the cleaner can share the same tree
59    /// instance for secondary-database LN liveness checks (X-7 fix).  All
60    /// cursor operations take a read guard; only setup calls need a write guard.
61    real_tree: Option<Arc<RwLock<Tree>>>,
62    /// Whether writes are deferred (not WAL-logged immediately).
63    ///
64    ///
65    /// When true, `log_ln_write()` skips WAL logging and returns NULL_LSN;
66    /// data is flushed to disk only at eviction or checkpoint.
67    deferred_write: bool,
68    /// Per-database entry count.
69    ///
70    /// Incremented on every new insert, decremented on every delete.
71    /// Shared (Arc) so that CursorImpl can update it without holding the
72    /// `DatabaseImpl` write lock — reads and writes are both O(1) atomics.
73    ///
74    /// `DatabaseImpl.count` (AtomicLong, updated in
75    /// `BIN.insertEntry` / `BIN.deleteEntry`).
76    entry_count: Arc<AtomicU64>,
77    /// Per-database operation throughput counters.
78    ///
79    /// Shared with every CursorImpl opened on this database so that insert,
80    /// search, update, delete and position operations can be counted on the
81    /// hot path without acquiring any mutex.
82    pub throughput: Arc<ThroughputStats>,
83}
84
85/// Persistent B-tree root metadata stored alongside the database record.
86///
87/// Holds the root LSN so that recovery can locate the tree root on disk.
88/// The live in-memory tree is `DatabaseImpl::real_tree`.
89///
90/// (the persistent `Tree` object stored as part
91/// of the database record).
92#[derive(Debug)]
93pub struct DatabaseTree {
94    /// Root LSN of the tree.
95    root_lsn: u64,
96}
97
98impl Default for DatabaseTree {
99    fn default() -> Self {
100        Self::new()
101    }
102}
103
104impl DatabaseTree {
105    pub fn new() -> Self {
106        DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
107    }
108    pub fn get_root_lsn(&self) -> u64 {
109        self.root_lsn
110    }
111    pub fn set_root_lsn(&mut self, lsn: u64) {
112        self.root_lsn = lsn;
113    }
114}
115
116impl DatabaseImpl {
117    /// Creates a new DatabaseImpl.
118    pub fn new(
119        id: DatabaseId,
120        name: String,
121        db_type: DbType,
122        config: &DatabaseConfig,
123    ) -> Self {
124        let mut flags = 0u8;
125        if config.sorted_duplicates {
126            flags |= DUPS_ENABLED;
127        }
128        if config.temporary {
129            flags |= TEMPORARY_BIT;
130        }
131        if config.key_prefixing {
132            flags |= PREFIXING_ENABLED;
133        }
134
135        let max_entries = config.node_max_entries as usize;
136        let real_tree = if config.sorted_duplicates {
137            // Sorted-dup databases store (key, data) as two-part composite keys.
138            // A custom comparator is required: pure lexicographic ordering fails
139            // when a shorter primary key is a byte-prefix of a longer key's data.
140            let dup_cmp: KeyComparatorFn = Arc::new(|a: &[u8], b: &[u8]| {
141                dup_key_data::cmp_two_part_keys(
142                    a,
143                    b,
144                    |x, y| x.cmp(y),
145                    |x, y| x.cmp(y),
146                )
147            });
148            Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
149        } else {
150            Tree::new(id.id() as u64, max_entries)
151        };
152        DatabaseImpl {
153            id,
154            name,
155            db_type,
156            flags,
157            delete_state: DeleteState::NotDeleted,
158            dirty: AtomicBool::new(false),
159            max_tree_entries_per_node: config.node_max_entries,
160            reference_count: AtomicI64::new(0),
161            tree: Some(DatabaseTree::new()),
162            real_tree: Some(Arc::new(RwLock::new(real_tree))),
163            deferred_write: config.deferred_write,
164            entry_count: Arc::new(AtomicU64::new(0)),
165            throughput: ThroughputStats::new(),
166        }
167    }
168
169    // Getters
170    pub fn get_id(&self) -> DatabaseId {
171        self.id
172    }
173    pub fn get_name(&self) -> &str {
174        &self.name
175    }
176    pub fn get_db_type(&self) -> DbType {
177        self.db_type
178    }
179
180    /// Returns true if this database uses deferred write mode.
181    ///
182    ///
183    pub fn is_deferred_write(&self) -> bool {
184        self.deferred_write
185    }
186
187    // Flag methods
188    pub fn get_sorted_duplicates(&self) -> bool {
189        self.flags & DUPS_ENABLED != 0
190    }
191    pub fn is_temporary(&self) -> bool {
192        self.flags & TEMPORARY_BIT != 0
193    }
194    pub fn get_key_prefixing(&self) -> bool {
195        self.flags & PREFIXING_ENABLED != 0
196    }
197    pub fn is_replicated(&self) -> bool {
198        self.flags & IS_REPLICATED_BIT != 0
199    }
200
201    // Delete state
202    pub fn is_deleted(&self) -> bool {
203        self.delete_state == DeleteState::Deleted
204    }
205    pub fn is_deleting(&self) -> bool {
206        self.delete_state != DeleteState::NotDeleted
207    }
208    pub fn start_delete(&mut self) {
209        self.delete_state = DeleteState::DeletedCleanupInListHarvest;
210    }
211    pub fn finish_delete(&mut self) {
212        self.delete_state = DeleteState::Deleted;
213    }
214
215    // Dirty tracking
216    pub fn is_dirty(&self) -> bool {
217        self.dirty.load(Ordering::Relaxed)
218    }
219    pub fn set_dirty(&self) {
220        self.dirty.store(true, Ordering::Relaxed);
221    }
222    pub fn clear_dirty(&self) {
223        self.dirty.store(false, Ordering::Relaxed);
224    }
225
226    // Reference counting (for open handles)
227    pub fn increment_reference_count(&self) {
228        self.reference_count.fetch_add(1, Ordering::Relaxed);
229    }
230    pub fn decrement_reference_count(&self) {
231        self.reference_count.fetch_sub(1, Ordering::Relaxed);
232    }
233    pub fn reference_count(&self) -> i64 {
234        self.reference_count.load(Ordering::Relaxed)
235    }
236
237    // Entry count (O(1) atomic counter)
238    /// Returns the current entry count.
239    ///
240    /// In — reads an AtomicLong.
241    pub fn entry_count(&self) -> u64 {
242        self.entry_count.load(Ordering::Relaxed)
243    }
244
245    /// Increments the entry count by 1 (on new insert).
246    pub fn increment_entry_count(&self) {
247        self.entry_count.fetch_add(1, Ordering::Relaxed);
248    }
249
250    /// Decrements the entry count by 1 (on delete), saturating at zero.
251    pub fn decrement_entry_count(&self) {
252        // Use a compare-and-swap loop to avoid underflow.
253        loop {
254            let cur = self.entry_count.load(Ordering::Relaxed);
255            if cur == 0 {
256                break;
257            }
258            if self
259                .entry_count
260                .compare_exchange_weak(
261                    cur,
262                    cur - 1,
263                    Ordering::Relaxed,
264                    Ordering::Relaxed,
265                )
266                .is_ok()
267            {
268                break;
269            }
270        }
271    }
272
273    // Tree access (stub for LSN tracking)
274    pub fn get_tree(&self) -> Option<&DatabaseTree> {
275        self.tree.as_ref()
276    }
277    pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
278        self.tree.as_mut()
279    }
280
281    // Real B+tree access for cursor traversal and data operations.
282    /// Returns a read guard over the real B+tree.
283    ///
284    /// Returns `Option<RwLockReadGuard<'_, Tree>>` — the guard `Deref`s to
285    /// `&Tree`, so all existing cursor-code patterns (`tree.search(key)`,
286    /// `Self::get_data_from_tree(tree, key)`, etc.) continue to work without
287    /// modification through auto-deref coercion.
288    ///
289    /// Returns `None` if no tree is present or if the lock is poisoned.
290    ///
291    /// # X-7 fix
292    /// Use `get_real_tree_arc()` (below) to obtain the `Arc<RwLock<Tree>>`
293    /// for sharing with the cleaner's db-tree registry.
294    pub fn get_real_tree(
295        &self,
296    ) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
297        self.real_tree.as_ref()?.read().ok()
298    }
299
300    /// Returns a clone of the `Arc<RwLock<Tree>>` for sharing with the
301    /// cleaner's per-database tree registry (X-7 fix).
302    pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
303        self.real_tree.clone()
304    }
305
306    /// Sets the expiration time (absolute hours since Unix epoch) for the
307    /// BIN slot holding `key`.
308    ///
309    /// Returns `true` if the key was found and updated.
310    /// Delegates to `Tree::update_key_expiration()`.
311    pub fn update_key_expiration(
312        &self,
313        key: &[u8],
314        expiration_hours: u32,
315    ) -> bool {
316        self.real_tree
317            .as_ref()
318            .and_then(|arc| arc.read().ok())
319            .map(|t| t.update_key_expiration(key, expiration_hours))
320            .unwrap_or(false)
321    }
322
323    /// Collects structural B-tree statistics.
324    ///
325    /// Walks the full tree (O(n) in node count) and returns node counts
326    /// and maximum depth.  Implements `DatabaseImpl.getDbStats(fast=false)`.
327    ///
328    /// Returns `None` if this DatabaseImpl has no real tree (e.g. internal
329    /// metadata databases).
330    pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
331        self.real_tree
332            .as_ref()
333            .and_then(|arc| arc.read().ok())
334            .map(|t| t.collect_stats())
335    }
336
337    /// Replace the real B+tree with a tree recovered from the log.
338    ///
339    /// Called by `EnvironmentImpl::open_database()` when a matching
340    /// `recovered_trees` entry exists (Approach B of P1b wiring).
341    pub fn set_recovered_tree(&mut self, mut tree: Tree) {
342        // Synchronise the in-memory entry_count counter from the recovered
343        // tree so that Database::count() returns the correct value after reopen.
344        let count = tree.count_entries();
345        self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
346        // Transfer the key comparator from the current tree (if any) to the
347        // recovered tree — RecoveryManager builds trees without db-level config.
348        if let Some(ref current_arc) = self.real_tree
349            && let Ok(mut current) = current_arc.write()
350            && let Some(cmp) = current.take_comparator()
351        {
352            tree.set_comparator(cmp);
353        }
354        self.real_tree = Some(Arc::new(RwLock::new(tree)));
355    }
356
357    /// Wires the environment's shared memory-usage counter into this database's
358    /// tree so that BIN insertions/deletions update the Arbiter's budget.
359    ///
360    /// Must be called after `new()` in `EnvironmentImpl::open_database()`.
361    /// Also forwards the counter to the recovered tree (if any) so that
362    /// databases opened after recovery also track memory.
363    pub fn set_memory_counter(
364        &mut self,
365        counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
366    ) {
367        if let Some(tree_arc) = self.real_tree.as_ref()
368            && let Ok(mut tree) = tree_arc.write()
369        {
370            tree.set_memory_counter(counter);
371        }
372    }
373
374    // Configuration
375    pub fn max_tree_entries_per_node(&self) -> i32 {
376        self.max_tree_entries_per_node
377    }
378
379    /// Serialization.
380    ///
381    pub fn log_size(&self) -> usize {
382        8 + // id
383        4 + self.name.len() + // name (length-prefixed)
384        1 + // flags
385        4 + // max entries
386        8 // root LSN
387    }
388
389    pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
390        buf.write_i64::<BigEndian>(self.id.id())?;
391        buf.write_u32::<BigEndian>(self.name.len() as u32)?;
392        buf.extend_from_slice(self.name.as_bytes());
393        buf.write_u8(self.flags)?;
394        buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
395        let root_lsn = self
396            .tree
397            .as_ref()
398            .map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
399        buf.write_u64::<BigEndian>(root_lsn)?;
400        Ok(())
401    }
402
403    pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
404        // Helper:
405        fn type_for_db_name(name: &str) -> DbType {
406            match name {
407                "_jeIdMap" | "_noxuIdMap" => DbType::Id,
408                "_jeNameMap" | "_noxuNameMap" => DbType::Name,
409                "_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
410                _ => DbType::User,
411            }
412        }
413        use std::io::Cursor;
414
415        let mut cursor = Cursor::new(buf);
416        let id = cursor.read_i64::<BigEndian>()?;
417        let name_len = cursor.read_u32::<BigEndian>()? as usize;
418
419        // Read name bytes
420        let name_start = cursor.position() as usize;
421        let name_end = name_start + name_len;
422        if name_end > buf.len() {
423            return Err(std::io::Error::new(
424                std::io::ErrorKind::UnexpectedEof,
425                "Buffer too short for name",
426            ));
427        }
428        let name = String::from_utf8(buf[name_start..name_end].to_vec())
429            .map_err(|e| {
430                std::io::Error::new(std::io::ErrorKind::InvalidData, e)
431            })?;
432        cursor.set_position(name_end as u64);
433
434        let flags = cursor.read_u8()?;
435        let max_entries = cursor.read_i32::<BigEndian>()?;
436        let root_lsn = cursor.read_u64::<BigEndian>()?;
437
438        let db_type = type_for_db_name(&name);
439
440        let mut tree = DatabaseTree::new();
441        tree.root_lsn = root_lsn;
442
443        let real_tree = Tree::new(id as u64, max_entries as usize);
444        Ok(DatabaseImpl {
445            id: DatabaseId::new(id),
446            name,
447            db_type,
448            flags,
449            delete_state: DeleteState::NotDeleted,
450            dirty: AtomicBool::new(false),
451            max_tree_entries_per_node: max_entries,
452            reference_count: AtomicI64::new(0),
453            tree: Some(tree),
454            real_tree: Some(Arc::new(RwLock::new(real_tree))),
455            deferred_write: false, // not persisted in log record; set after open if needed
456            entry_count: Arc::new(AtomicU64::new(0)),
457            throughput: ThroughputStats::new(),
458        })
459    }
460}
461
462impl std::fmt::Debug for DatabaseImpl {
463    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
464        f.debug_struct("DatabaseImpl")
465            .field("id", &self.id)
466            .field("name", &self.name)
467            .field("db_type", &self.db_type)
468            .field("flags", &self.flags)
469            .field("delete_state", &self.delete_state)
470            .finish()
471    }
472}
473
474#[cfg(test)]
475#[expect(clippy::field_reassign_with_default)]
476mod tests {
477    use super::*;
478
479    fn make_config() -> DatabaseConfig {
480        DatabaseConfig::default()
481    }
482
483    #[test]
484    fn test_new_database() {
485        let config = make_config();
486        let db = DatabaseImpl::new(
487            DatabaseId::new(100),
488            "test_db".to_string(),
489            DbType::User,
490            &config,
491        );
492
493        assert_eq!(db.get_id(), DatabaseId::new(100));
494        assert_eq!(db.get_name(), "test_db");
495        assert_eq!(db.get_db_type(), DbType::User);
496        assert!(!db.is_deleted());
497        assert!(!db.is_deleting());
498        assert_eq!(db.reference_count(), 0);
499    }
500
501    #[test]
502    fn test_sorted_duplicates_flag() {
503        let mut config = DatabaseConfig::default();
504        config.sorted_duplicates = false;
505        let db1 = DatabaseImpl::new(
506            DatabaseId::new(1),
507            "db1".to_string(),
508            DbType::User,
509            &config,
510        );
511        assert!(!db1.get_sorted_duplicates());
512
513        config.sorted_duplicates = true;
514        let db2 = DatabaseImpl::new(
515            DatabaseId::new(2),
516            "db2".to_string(),
517            DbType::User,
518            &config,
519        );
520        assert!(db2.get_sorted_duplicates());
521    }
522
523    #[test]
524    fn test_temporary_flag() {
525        let mut config = DatabaseConfig::default();
526        config.temporary = false;
527        let db1 = DatabaseImpl::new(
528            DatabaseId::new(1),
529            "db1".to_string(),
530            DbType::User,
531            &config,
532        );
533        assert!(!db1.is_temporary());
534
535        config.temporary = true;
536        let db2 = DatabaseImpl::new(
537            DatabaseId::new(2),
538            "db2".to_string(),
539            DbType::User,
540            &config,
541        );
542        assert!(db2.is_temporary());
543    }
544
545    #[test]
546    fn test_key_prefixing_flag() {
547        let mut config = DatabaseConfig::default();
548        config.key_prefixing = false;
549        let db1 = DatabaseImpl::new(
550            DatabaseId::new(1),
551            "db1".to_string(),
552            DbType::User,
553            &config,
554        );
555        assert!(!db1.get_key_prefixing());
556
557        config.key_prefixing = true;
558        let db2 = DatabaseImpl::new(
559            DatabaseId::new(2),
560            "db2".to_string(),
561            DbType::User,
562            &config,
563        );
564        assert!(db2.get_key_prefixing());
565    }
566
567    #[test]
568    fn test_delete_state_transitions() {
569        let config = make_config();
570        let mut db = DatabaseImpl::new(
571            DatabaseId::new(1),
572            "db".to_string(),
573            DbType::User,
574            &config,
575        );
576
577        assert!(!db.is_deleted());
578        assert!(!db.is_deleting());
579
580        db.start_delete();
581        assert!(!db.is_deleted());
582        assert!(db.is_deleting());
583
584        db.finish_delete();
585        assert!(db.is_deleted());
586        assert!(db.is_deleting());
587    }
588
589    #[test]
590    fn test_dirty_tracking() {
591        let config = make_config();
592        let db = DatabaseImpl::new(
593            DatabaseId::new(1),
594            "db".to_string(),
595            DbType::User,
596            &config,
597        );
598
599        assert!(!db.is_dirty());
600
601        db.set_dirty();
602        assert!(db.is_dirty());
603
604        db.clear_dirty();
605        assert!(!db.is_dirty());
606    }
607
608    #[test]
609    fn test_reference_counting() {
610        let config = make_config();
611        let db = DatabaseImpl::new(
612            DatabaseId::new(1),
613            "db".to_string(),
614            DbType::User,
615            &config,
616        );
617
618        assert_eq!(db.reference_count(), 0);
619
620        db.increment_reference_count();
621        assert_eq!(db.reference_count(), 1);
622
623        db.increment_reference_count();
624        assert_eq!(db.reference_count(), 2);
625
626        db.decrement_reference_count();
627        assert_eq!(db.reference_count(), 1);
628
629        db.decrement_reference_count();
630        assert_eq!(db.reference_count(), 0);
631    }
632
633    #[test]
634    fn test_serialization_round_trip() {
635        let mut config = DatabaseConfig::default();
636        config.sorted_duplicates = true;
637        config.key_prefixing = true;
638        config.node_max_entries = 256;
639
640        let db = DatabaseImpl::new(
641            DatabaseId::new(42),
642            "my_database".to_string(),
643            DbType::User,
644            &config,
645        );
646
647        let mut buf = Vec::new();
648        db.write_to_log(&mut buf).unwrap();
649
650        let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
651
652        assert_eq!(db2.get_id(), DatabaseId::new(42));
653        assert_eq!(db2.get_name(), "my_database");
654        assert!(db2.get_sorted_duplicates());
655        assert!(db2.get_key_prefixing());
656        assert_eq!(db2.max_tree_entries_per_node(), 256);
657    }
658
659    #[test]
660    fn test_tree_access() {
661        let config = make_config();
662        let mut db = DatabaseImpl::new(
663            DatabaseId::new(1),
664            "db".to_string(),
665            DbType::User,
666            &config,
667        );
668
669        // Default tree has NULL_LSN
670        {
671            let tree = db.get_tree().unwrap();
672            assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
673        }
674
675        // Set root LSN
676        {
677            let tree = db.get_tree_mut().unwrap();
678            tree.set_root_lsn(12345);
679        }
680
681        // Verify it was set
682        {
683            let tree = db.get_tree().unwrap();
684            assert_eq!(tree.get_root_lsn(), 12345);
685        }
686    }
687
688    #[test]
689    fn test_log_size() {
690        let config = make_config();
691        let db = DatabaseImpl::new(
692            DatabaseId::new(1),
693            "test".to_string(),
694            DbType::User,
695            &config,
696        );
697
698        let expected_size = 8 + 4 + 4 + 1 + 4 + 8; // id + name_len + "test" + flags + max_entries + root_lsn
699        assert_eq!(db.log_size(), expected_size);
700    }
701}