1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
//! Internal database implementation.
//!
use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
use noxu_tree::{KeyComparatorFn, Tree};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, Ordering};
use std::sync::{Arc, RwLock};
use crate::dup_key_data;
use crate::throughput_stats::ThroughputStats;
use crate::{DatabaseConfig, DatabaseId, DbType};
/// Deletion processing states.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DeleteState {
NotDeleted,
DeletedCleanupInListHarvest,
DeletedCleanupLogHarvest,
Deleted,
}
/// Flag bits for persistent database properties.
const DUPS_ENABLED: u8 = 0x01;
const TEMPORARY_BIT: u8 = 0x02;
const IS_REPLICATED_BIT: u8 = 0x04;
const NOT_REPLICATED_BIT: u8 = 0x08;
const PREFIXING_ENABLED: u8 = 0x10;
/// The underlying object for a given database.
///
///
pub struct DatabaseImpl {
/// Unique database ID.
id: DatabaseId,
/// Database name (user databases) or internal type name.
name: String,
/// Database type.
db_type: DbType,
/// Persistent flag bits.
flags: u8,
/// Delete processing state.
delete_state: DeleteState,
/// Whether this database is dirty (needs to be written to log).
dirty: AtomicBool,
/// Maximum number of entries in a B-tree node.
max_tree_entries_per_node: i32,
/// Number of open database handles (user handles referencing this db).
reference_count: AtomicI64,
/// Persistent B-tree root metadata (root LSN, serialized with the database
/// record in the ID database). Populated from the log during recovery.
tree: Option<DatabaseTree>,
/// The in-memory B+tree backing cursor traversal (search, insert, delete).
///
/// `None` only for read-only or freshly created databases before the first
/// write; otherwise always `Some`. Populated either from recovery via
/// `set_recovered_tree()` or lazily on first write.
/// Wrapped in `Arc<RwLock<Tree>>` so the cleaner can share the same tree
/// instance for secondary-database LN liveness checks (X-7 fix). All
/// cursor operations take a read guard; only setup calls need a write guard.
real_tree: Option<Arc<RwLock<Tree>>>,
/// Whether writes are deferred (not WAL-logged immediately).
///
///
/// When true, `log_ln_write()` skips WAL logging and returns NULL_LSN;
/// data is flushed to disk only at eviction or checkpoint.
deferred_write: bool,
/// Per-database entry count.
///
/// Incremented on every new insert, decremented on every delete.
/// Shared (Arc) so that CursorImpl can update it without holding the
/// `DatabaseImpl` write lock — reads and writes are both O(1) atomics.
///
/// `DatabaseImpl.count` (AtomicLong, updated in
/// `BIN.insertEntry` / `BIN.deleteEntry`).
entry_count: Arc<AtomicU64>,
/// Per-database operation throughput counters.
///
/// Shared with every CursorImpl opened on this database so that insert,
/// search, update, delete and position operations can be counted on the
/// hot path without acquiring any mutex.
pub throughput: Arc<ThroughputStats>,
}
/// Persistent B-tree root metadata stored alongside the database record.
///
/// Holds the root LSN so that recovery can locate the tree root on disk.
/// The live in-memory tree is `DatabaseImpl::real_tree`.
///
/// (the persistent `Tree` object stored as part
/// of the database record).
#[derive(Debug)]
pub struct DatabaseTree {
/// Root LSN of the tree.
root_lsn: u64,
}
impl Default for DatabaseTree {
fn default() -> Self {
Self::new()
}
}
impl DatabaseTree {
pub fn new() -> Self {
DatabaseTree { root_lsn: noxu_util::NULL_LSN.as_u64() }
}
pub fn get_root_lsn(&self) -> u64 {
self.root_lsn
}
pub fn set_root_lsn(&mut self, lsn: u64) {
self.root_lsn = lsn;
}
}
impl DatabaseImpl {
/// Creates a new DatabaseImpl.
pub fn new(
id: DatabaseId,
name: String,
db_type: DbType,
config: &DatabaseConfig,
) -> Self {
let mut flags = 0u8;
if config.sorted_duplicates {
flags |= DUPS_ENABLED;
}
if config.temporary {
flags |= TEMPORARY_BIT;
}
if config.key_prefixing {
flags |= PREFIXING_ENABLED;
}
let max_entries = config.node_max_entries as usize;
let real_tree = if config.sorted_duplicates {
// Sorted-dup databases store (key, data) as two-part composite keys.
// A custom comparator is required: pure lexicographic ordering fails
// when a shorter primary key is a byte-prefix of a longer key's data.
let dup_cmp: KeyComparatorFn = Arc::new(|a: &[u8], b: &[u8]| {
dup_key_data::cmp_two_part_keys(
a,
b,
|x, y| x.cmp(y),
|x, y| x.cmp(y),
)
});
Tree::new_with_comparator(id.id() as u64, max_entries, dup_cmp)
} else {
Tree::new(id.id() as u64, max_entries)
};
DatabaseImpl {
id,
name,
db_type,
flags,
delete_state: DeleteState::NotDeleted,
dirty: AtomicBool::new(false),
max_tree_entries_per_node: config.node_max_entries,
reference_count: AtomicI64::new(0),
tree: Some(DatabaseTree::new()),
real_tree: Some(Arc::new(RwLock::new(real_tree))),
deferred_write: config.deferred_write,
entry_count: Arc::new(AtomicU64::new(0)),
throughput: ThroughputStats::new(),
}
}
// Getters
pub fn get_id(&self) -> DatabaseId {
self.id
}
pub fn get_name(&self) -> &str {
&self.name
}
pub fn get_db_type(&self) -> DbType {
self.db_type
}
/// Returns true if this database uses deferred write mode.
///
///
pub fn is_deferred_write(&self) -> bool {
self.deferred_write
}
// Flag methods
pub fn get_sorted_duplicates(&self) -> bool {
self.flags & DUPS_ENABLED != 0
}
pub fn is_temporary(&self) -> bool {
self.flags & TEMPORARY_BIT != 0
}
pub fn get_key_prefixing(&self) -> bool {
self.flags & PREFIXING_ENABLED != 0
}
pub fn is_replicated(&self) -> bool {
self.flags & IS_REPLICATED_BIT != 0
}
// Delete state
pub fn is_deleted(&self) -> bool {
self.delete_state == DeleteState::Deleted
}
pub fn is_deleting(&self) -> bool {
self.delete_state != DeleteState::NotDeleted
}
pub fn start_delete(&mut self) {
self.delete_state = DeleteState::DeletedCleanupInListHarvest;
}
pub fn finish_delete(&mut self) {
self.delete_state = DeleteState::Deleted;
}
// Dirty tracking
pub fn is_dirty(&self) -> bool {
self.dirty.load(Ordering::Relaxed)
}
pub fn set_dirty(&self) {
self.dirty.store(true, Ordering::Relaxed);
}
pub fn clear_dirty(&self) {
self.dirty.store(false, Ordering::Relaxed);
}
// Reference counting (for open handles)
pub fn increment_reference_count(&self) {
self.reference_count.fetch_add(1, Ordering::Relaxed);
}
pub fn decrement_reference_count(&self) {
self.reference_count.fetch_sub(1, Ordering::Relaxed);
}
pub fn reference_count(&self) -> i64 {
self.reference_count.load(Ordering::Relaxed)
}
// Entry count (O(1) atomic counter)
/// Returns the current entry count.
///
/// In — reads an AtomicLong.
pub fn entry_count(&self) -> u64 {
self.entry_count.load(Ordering::Relaxed)
}
/// Increments the entry count by 1 (on new insert).
pub fn increment_entry_count(&self) {
self.entry_count.fetch_add(1, Ordering::Relaxed);
}
/// Decrements the entry count by 1 (on delete), saturating at zero.
pub fn decrement_entry_count(&self) {
// Use a compare-and-swap loop to avoid underflow.
loop {
let cur = self.entry_count.load(Ordering::Relaxed);
if cur == 0 {
break;
}
if self
.entry_count
.compare_exchange_weak(
cur,
cur - 1,
Ordering::Relaxed,
Ordering::Relaxed,
)
.is_ok()
{
break;
}
}
}
// Tree access (stub for LSN tracking)
pub fn get_tree(&self) -> Option<&DatabaseTree> {
self.tree.as_ref()
}
pub fn get_tree_mut(&mut self) -> Option<&mut DatabaseTree> {
self.tree.as_mut()
}
// Real B+tree access for cursor traversal and data operations.
/// Returns a read guard over the real B+tree.
///
/// Returns `Option<RwLockReadGuard<'_, Tree>>` — the guard `Deref`s to
/// `&Tree`, so all existing cursor-code patterns (`tree.search(key)`,
/// `Self::get_data_from_tree(tree, key)`, etc.) continue to work without
/// modification through auto-deref coercion.
///
/// Returns `None` if no tree is present or if the lock is poisoned.
///
/// # X-7 fix
/// Use `get_real_tree_arc()` (below) to obtain the `Arc<RwLock<Tree>>`
/// for sharing with the cleaner's db-tree registry.
pub fn get_real_tree(
&self,
) -> Option<std::sync::RwLockReadGuard<'_, Tree>> {
self.real_tree.as_ref()?.read().ok()
}
/// Returns a clone of the `Arc<RwLock<Tree>>` for sharing with the
/// cleaner's per-database tree registry (X-7 fix).
pub fn get_real_tree_arc(&self) -> Option<Arc<RwLock<Tree>>> {
self.real_tree.clone()
}
/// Sets the expiration time (absolute hours since Unix epoch) for the
/// BIN slot holding `key`.
///
/// Returns `true` if the key was found and updated.
/// Delegates to `Tree::update_key_expiration()`.
pub fn update_key_expiration(
&self,
key: &[u8],
expiration_hours: u32,
) -> bool {
self.real_tree
.as_ref()
.and_then(|arc| arc.read().ok())
.map(|t| t.update_key_expiration(key, expiration_hours))
.unwrap_or(false)
}
/// Collects structural B-tree statistics.
///
/// Walks the full tree (O(n) in node count) and returns node counts
/// and maximum depth. Implements `DatabaseImpl.getDbStats(fast=false)`.
///
/// Returns `None` if this DatabaseImpl has no real tree (e.g. internal
/// metadata databases).
pub fn collect_btree_stats(&self) -> Option<noxu_tree::TreeStats> {
self.real_tree
.as_ref()
.and_then(|arc| arc.read().ok())
.map(|t| t.collect_stats())
}
/// Replace the real B+tree with a tree recovered from the log.
///
/// Called by `EnvironmentImpl::open_database()` when a matching
/// `recovered_trees` entry exists (Approach B of P1b wiring).
pub fn set_recovered_tree(&mut self, mut tree: Tree) {
// Synchronise the in-memory entry_count counter from the recovered
// tree so that Database::count() returns the correct value after reopen.
let count = tree.count_entries();
self.entry_count.store(count, std::sync::atomic::Ordering::Relaxed);
// Transfer the key comparator from the current tree (if any) to the
// recovered tree — RecoveryManager builds trees without db-level config.
if let Some(ref current_arc) = self.real_tree
&& let Ok(mut current) = current_arc.write()
&& let Some(cmp) = current.take_comparator()
{
tree.set_comparator(cmp);
}
self.real_tree = Some(Arc::new(RwLock::new(tree)));
}
/// Wires the environment's shared memory-usage counter into this database's
/// tree so that BIN insertions/deletions update the Arbiter's budget.
///
/// Must be called after `new()` in `EnvironmentImpl::open_database()`.
/// Also forwards the counter to the recovered tree (if any) so that
/// databases opened after recovery also track memory.
pub fn set_memory_counter(
&mut self,
counter: std::sync::Arc<std::sync::atomic::AtomicI64>,
) {
if let Some(tree_arc) = self.real_tree.as_ref()
&& let Ok(mut tree) = tree_arc.write()
{
tree.set_memory_counter(counter);
}
}
// Configuration
pub fn max_tree_entries_per_node(&self) -> i32 {
self.max_tree_entries_per_node
}
/// Serialization.
///
pub fn log_size(&self) -> usize {
8 + // id
4 + self.name.len() + // name (length-prefixed)
1 + // flags
4 + // max entries
8 // root LSN
}
pub fn write_to_log(&self, buf: &mut Vec<u8>) -> std::io::Result<()> {
buf.write_i64::<BigEndian>(self.id.id())?;
buf.write_u32::<BigEndian>(self.name.len() as u32)?;
buf.extend_from_slice(self.name.as_bytes());
buf.write_u8(self.flags)?;
buf.write_i32::<BigEndian>(self.max_tree_entries_per_node)?;
let root_lsn = self
.tree
.as_ref()
.map_or(noxu_util::NULL_LSN.as_u64(), |t| t.root_lsn);
buf.write_u64::<BigEndian>(root_lsn)?;
Ok(())
}
pub fn read_from_log(buf: &[u8]) -> std::io::Result<Self> {
// Helper:
fn type_for_db_name(name: &str) -> DbType {
match name {
"_jeIdMap" | "_noxuIdMap" => DbType::Id,
"_jeNameMap" | "_noxuNameMap" => DbType::Name,
"_jeUtilization" | "_noxuUtilization" => DbType::Utilization,
_ => DbType::User,
}
}
use std::io::Cursor;
let mut cursor = Cursor::new(buf);
let id = cursor.read_i64::<BigEndian>()?;
let name_len = cursor.read_u32::<BigEndian>()? as usize;
// Read name bytes
let name_start = cursor.position() as usize;
let name_end = name_start + name_len;
if name_end > buf.len() {
return Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Buffer too short for name",
));
}
let name = String::from_utf8(buf[name_start..name_end].to_vec())
.map_err(|e| {
std::io::Error::new(std::io::ErrorKind::InvalidData, e)
})?;
cursor.set_position(name_end as u64);
let flags = cursor.read_u8()?;
let max_entries = cursor.read_i32::<BigEndian>()?;
let root_lsn = cursor.read_u64::<BigEndian>()?;
let db_type = type_for_db_name(&name);
let mut tree = DatabaseTree::new();
tree.root_lsn = root_lsn;
let real_tree = Tree::new(id as u64, max_entries as usize);
Ok(DatabaseImpl {
id: DatabaseId::new(id),
name,
db_type,
flags,
delete_state: DeleteState::NotDeleted,
dirty: AtomicBool::new(false),
max_tree_entries_per_node: max_entries,
reference_count: AtomicI64::new(0),
tree: Some(tree),
real_tree: Some(Arc::new(RwLock::new(real_tree))),
deferred_write: false, // not persisted in log record; set after open if needed
entry_count: Arc::new(AtomicU64::new(0)),
throughput: ThroughputStats::new(),
})
}
}
impl std::fmt::Debug for DatabaseImpl {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DatabaseImpl")
.field("id", &self.id)
.field("name", &self.name)
.field("db_type", &self.db_type)
.field("flags", &self.flags)
.field("delete_state", &self.delete_state)
.finish()
}
}
#[cfg(test)]
#[expect(clippy::field_reassign_with_default)]
mod tests {
use super::*;
fn make_config() -> DatabaseConfig {
DatabaseConfig::default()
}
#[test]
fn test_new_database() {
let config = make_config();
let db = DatabaseImpl::new(
DatabaseId::new(100),
"test_db".to_string(),
DbType::User,
&config,
);
assert_eq!(db.get_id(), DatabaseId::new(100));
assert_eq!(db.get_name(), "test_db");
assert_eq!(db.get_db_type(), DbType::User);
assert!(!db.is_deleted());
assert!(!db.is_deleting());
assert_eq!(db.reference_count(), 0);
}
#[test]
fn test_sorted_duplicates_flag() {
let mut config = DatabaseConfig::default();
config.sorted_duplicates = false;
let db1 = DatabaseImpl::new(
DatabaseId::new(1),
"db1".to_string(),
DbType::User,
&config,
);
assert!(!db1.get_sorted_duplicates());
config.sorted_duplicates = true;
let db2 = DatabaseImpl::new(
DatabaseId::new(2),
"db2".to_string(),
DbType::User,
&config,
);
assert!(db2.get_sorted_duplicates());
}
#[test]
fn test_temporary_flag() {
let mut config = DatabaseConfig::default();
config.temporary = false;
let db1 = DatabaseImpl::new(
DatabaseId::new(1),
"db1".to_string(),
DbType::User,
&config,
);
assert!(!db1.is_temporary());
config.temporary = true;
let db2 = DatabaseImpl::new(
DatabaseId::new(2),
"db2".to_string(),
DbType::User,
&config,
);
assert!(db2.is_temporary());
}
#[test]
fn test_key_prefixing_flag() {
let mut config = DatabaseConfig::default();
config.key_prefixing = false;
let db1 = DatabaseImpl::new(
DatabaseId::new(1),
"db1".to_string(),
DbType::User,
&config,
);
assert!(!db1.get_key_prefixing());
config.key_prefixing = true;
let db2 = DatabaseImpl::new(
DatabaseId::new(2),
"db2".to_string(),
DbType::User,
&config,
);
assert!(db2.get_key_prefixing());
}
#[test]
fn test_delete_state_transitions() {
let config = make_config();
let mut db = DatabaseImpl::new(
DatabaseId::new(1),
"db".to_string(),
DbType::User,
&config,
);
assert!(!db.is_deleted());
assert!(!db.is_deleting());
db.start_delete();
assert!(!db.is_deleted());
assert!(db.is_deleting());
db.finish_delete();
assert!(db.is_deleted());
assert!(db.is_deleting());
}
#[test]
fn test_dirty_tracking() {
let config = make_config();
let db = DatabaseImpl::new(
DatabaseId::new(1),
"db".to_string(),
DbType::User,
&config,
);
assert!(!db.is_dirty());
db.set_dirty();
assert!(db.is_dirty());
db.clear_dirty();
assert!(!db.is_dirty());
}
#[test]
fn test_reference_counting() {
let config = make_config();
let db = DatabaseImpl::new(
DatabaseId::new(1),
"db".to_string(),
DbType::User,
&config,
);
assert_eq!(db.reference_count(), 0);
db.increment_reference_count();
assert_eq!(db.reference_count(), 1);
db.increment_reference_count();
assert_eq!(db.reference_count(), 2);
db.decrement_reference_count();
assert_eq!(db.reference_count(), 1);
db.decrement_reference_count();
assert_eq!(db.reference_count(), 0);
}
#[test]
fn test_serialization_round_trip() {
let mut config = DatabaseConfig::default();
config.sorted_duplicates = true;
config.key_prefixing = true;
config.node_max_entries = 256;
let db = DatabaseImpl::new(
DatabaseId::new(42),
"my_database".to_string(),
DbType::User,
&config,
);
let mut buf = Vec::new();
db.write_to_log(&mut buf).unwrap();
let db2 = DatabaseImpl::read_from_log(&buf).unwrap();
assert_eq!(db2.get_id(), DatabaseId::new(42));
assert_eq!(db2.get_name(), "my_database");
assert!(db2.get_sorted_duplicates());
assert!(db2.get_key_prefixing());
assert_eq!(db2.max_tree_entries_per_node(), 256);
}
#[test]
fn test_tree_access() {
let config = make_config();
let mut db = DatabaseImpl::new(
DatabaseId::new(1),
"db".to_string(),
DbType::User,
&config,
);
// Default tree has NULL_LSN
{
let tree = db.get_tree().unwrap();
assert_eq!(tree.get_root_lsn(), noxu_util::NULL_LSN.as_u64());
}
// Set root LSN
{
let tree = db.get_tree_mut().unwrap();
tree.set_root_lsn(12345);
}
// Verify it was set
{
let tree = db.get_tree().unwrap();
assert_eq!(tree.get_root_lsn(), 12345);
}
}
#[test]
fn test_log_size() {
let config = make_config();
let db = DatabaseImpl::new(
DatabaseId::new(1),
"test".to_string(),
DbType::User,
&config,
);
let expected_size = 8 + 4 + 4 + 1 + 4 + 8; // id + name_len + "test" + flags + max_entries + root_lsn
assert_eq!(db.log_size(), expected_size);
}
}