use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use nodedb_types::DatabaseId;
use super::persist::DatabaseHwmPersist;
pub const FLUSH_OPS_THRESHOLD: u64 = 64;
pub const FLUSH_ELAPSED_THRESHOLD: Duration = Duration::from_millis(200);
pub const USER_DB_START: u64 = 1024;
#[derive(Debug, thiserror::Error)]
pub enum DatabaseAllocError {
#[error("database hwm flush failed: {detail}")]
FlushFailed { detail: String },
}
pub struct DatabaseRegistry {
counter: AtomicU64,
allocs_since_flush: AtomicU64,
last_flush_at: Mutex<Instant>,
}
impl DatabaseRegistry {
pub fn new() -> Self {
Self::from_persisted_hwm(0)
}
pub fn from_persisted_hwm(hwm: u64) -> Self {
let next = (hwm + 1).max(USER_DB_START);
Self {
counter: AtomicU64::new(next),
allocs_since_flush: AtomicU64::new(0),
last_flush_at: Mutex::new(Instant::now()),
}
}
pub fn alloc_one(&self) -> DatabaseId {
let prev = self.counter.fetch_add(1, Ordering::AcqRel);
self.allocs_since_flush.fetch_add(1, Ordering::AcqRel);
DatabaseId::new(prev)
}
pub fn current_hwm(&self) -> u64 {
let next = self.counter.load(Ordering::Acquire);
next.saturating_sub(1)
}
pub fn restore_hwm(&self, new_hwm: u64) {
let target = new_hwm + 1;
let mut current = self.counter.load(Ordering::Acquire);
loop {
if target <= current {
return;
}
match self.counter.compare_exchange_weak(
current,
target,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return,
Err(actual) => current = actual,
}
}
}
pub fn should_flush(&self) -> bool {
if self.allocs_since_flush.load(Ordering::Acquire) >= FLUSH_OPS_THRESHOLD {
return true;
}
if let Ok(last) = self.last_flush_at.lock() {
return last.elapsed() >= FLUSH_ELAPSED_THRESHOLD;
}
false
}
pub fn flush(&self, persist: &dyn DatabaseHwmPersist) -> Result<(), DatabaseAllocError> {
let hwm = self.current_hwm();
persist
.checkpoint(hwm)
.map_err(|e| DatabaseAllocError::FlushFailed {
detail: e.to_string(),
})?;
self.allocs_since_flush.store(0, Ordering::Release);
if let Ok(mut guard) = self.last_flush_at.lock() {
*guard = Instant::now();
}
Ok(())
}
#[cfg(test)]
fn rewind_flush_clock(&self, by: Duration) {
if let Ok(mut guard) = self.last_flush_at.lock()
&& let Some(earlier) = guard.checked_sub(by)
{
*guard = earlier;
}
}
}
impl Default for DatabaseRegistry {
fn default() -> Self {
Self::new()
}
}
impl From<DatabaseAllocError> for crate::Error {
fn from(e: DatabaseAllocError) -> Self {
match e {
DatabaseAllocError::FlushFailed { detail } => crate::Error::Storage {
engine: "database_registry".into(),
detail,
},
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use super::*;
struct MemPersist {
last: std::sync::Mutex<Option<u64>>,
calls: AtomicU32,
}
impl MemPersist {
fn new() -> Self {
Self {
last: std::sync::Mutex::new(None),
calls: AtomicU32::new(0),
}
}
fn last(&self) -> Option<u64> {
*self.last.lock().unwrap()
}
fn calls(&self) -> u32 {
self.calls.load(Ordering::Acquire)
}
}
impl DatabaseHwmPersist for MemPersist {
fn checkpoint(&self, hwm: u64) -> crate::Result<()> {
*self.last.lock().unwrap() = Some(hwm);
self.calls.fetch_add(1, Ordering::AcqRel);
Ok(())
}
fn load(&self) -> crate::Result<u64> {
Ok(self.last().unwrap_or(0))
}
}
#[test]
fn first_alloc_returns_user_db_start() {
let reg = DatabaseRegistry::new();
let d = reg.alloc_one();
assert_eq!(d.as_u64(), USER_DB_START);
}
#[test]
fn monotonic_100() {
let reg = DatabaseRegistry::new();
let mut prev = 0u64;
for _ in 0..100 {
let d = reg.alloc_one();
assert!(d.as_u64() > prev);
prev = d.as_u64();
}
}
#[test]
fn restart_respects_hwm() {
let reg = DatabaseRegistry::from_persisted_hwm(5000);
let d = reg.alloc_one();
assert_eq!(d.as_u64(), 5001);
assert_eq!(reg.current_hwm(), 5001);
}
#[test]
fn restart_below_user_start_floored() {
let reg = DatabaseRegistry::from_persisted_hwm(0);
let d = reg.alloc_one();
assert_eq!(d.as_u64(), USER_DB_START);
let reg2 = DatabaseRegistry::from_persisted_hwm(500);
let d2 = reg2.alloc_one();
assert_eq!(d2.as_u64(), USER_DB_START);
}
#[test]
fn restore_hwm_monotonic() {
let reg = DatabaseRegistry::new();
reg.restore_hwm(9000);
let d = reg.alloc_one();
assert_eq!(d.as_u64(), 9001);
reg.restore_hwm(100);
let d2 = reg.alloc_one();
assert_eq!(d2.as_u64(), 9002);
}
#[test]
fn concurrent_32x50_unique() {
let reg = Arc::new(DatabaseRegistry::new());
let mut handles = Vec::with_capacity(32);
for _ in 0..32 {
let r = reg.clone();
handles.push(std::thread::spawn(move || {
(0..50).map(|_| r.alloc_one().as_u64()).collect::<Vec<_>>()
}));
}
let mut all: Vec<u64> = handles
.into_iter()
.flat_map(|h| h.join().unwrap())
.collect();
all.sort();
all.dedup();
assert_eq!(all.len(), 1600);
}
#[test]
fn flush_ops_threshold() {
let reg = DatabaseRegistry::new();
for _ in 0..(FLUSH_OPS_THRESHOLD - 1) {
reg.alloc_one();
}
assert!(!reg.should_flush());
reg.alloc_one();
assert!(reg.should_flush());
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
assert_eq!(persist.calls(), 1);
assert!(!reg.should_flush());
}
#[test]
fn flush_elapsed_threshold() {
let reg = DatabaseRegistry::new();
reg.alloc_one();
assert!(!reg.should_flush());
reg.rewind_flush_clock(FLUSH_ELAPSED_THRESHOLD * 2);
assert!(reg.should_flush());
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
assert!(!reg.should_flush());
}
#[test]
fn flush_idempotent() {
let reg = DatabaseRegistry::new();
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
reg.flush(&persist).unwrap();
assert_eq!(persist.calls(), 2);
}
}