use std::ops::RangeInclusive;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use nodedb_types::Surrogate;
use super::persist::SurrogateHwmPersist;
pub const FLUSH_OPS_THRESHOLD: u64 = 1024;
pub const FLUSH_ELAPSED_THRESHOLD: Duration = Duration::from_millis(200);
#[derive(Debug, thiserror::Error)]
pub enum SurrogateAllocError {
#[error("surrogate space exhausted (u32::MAX reached)")]
Exhausted,
#[error("surrogate batch size 0 is not allowed")]
EmptyBatch,
#[error("surrogate flush failed: {detail}")]
FlushFailed { detail: String },
}
pub struct SurrogateRegistry {
counter: AtomicU64,
allocs_since_flush: AtomicU64,
last_flush_at: Mutex<Instant>,
}
impl SurrogateRegistry {
pub fn new() -> Self {
Self::from_persisted_hwm(0)
}
pub fn from_persisted_hwm(hwm: u32) -> Self {
Self {
counter: AtomicU64::new(u64::from(hwm) + 1),
allocs_since_flush: AtomicU64::new(0),
last_flush_at: Mutex::new(Instant::now()),
}
}
pub fn alloc_one(&self) -> Result<Surrogate, SurrogateAllocError> {
let prev = self.counter.fetch_add(1, Ordering::AcqRel);
if prev > u64::from(u32::MAX) {
self.counter
.store(u64::from(u32::MAX) + 1, Ordering::Release);
return Err(SurrogateAllocError::Exhausted);
}
self.allocs_since_flush.fetch_add(1, Ordering::AcqRel);
Ok(Surrogate::new(prev as u32))
}
pub fn alloc(&self, n: u32) -> Result<RangeInclusive<Surrogate>, SurrogateAllocError> {
if n == 0 {
return Err(SurrogateAllocError::EmptyBatch);
}
let prev = self.counter.fetch_add(u64::from(n), Ordering::AcqRel);
let last = prev + u64::from(n) - 1;
if last > u64::from(u32::MAX) {
self.counter
.store(u64::from(u32::MAX) + 1, Ordering::Release);
return Err(SurrogateAllocError::Exhausted);
}
self.allocs_since_flush
.fetch_add(u64::from(n), Ordering::AcqRel);
Ok(Surrogate::new(prev as u32)..=Surrogate::new(last as u32))
}
pub fn current_hwm(&self) -> u32 {
let next = self.counter.load(Ordering::Acquire);
next.saturating_sub(1).min(u64::from(u32::MAX)) as u32
}
pub fn should_flush(&self) -> bool {
if self.allocs_since_flush.load(Ordering::Acquire) >= FLUSH_OPS_THRESHOLD {
return true;
}
if let Ok(last) = self.last_flush_at.lock() {
return last.elapsed() >= FLUSH_ELAPSED_THRESHOLD;
}
false
}
pub fn restore_hwm(&self, new_hwm: u32) -> Result<(), SurrogateAllocError> {
let target = u64::from(new_hwm) + 1;
let mut current = self.counter.load(Ordering::Acquire);
loop {
if target <= current {
return Ok(());
}
match self.counter.compare_exchange_weak(
current,
target,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => return Ok(()),
Err(actual) => current = actual,
}
}
}
pub fn flush(&self, persist: &dyn SurrogateHwmPersist) -> Result<(), SurrogateAllocError> {
let hwm = self.current_hwm();
persist
.checkpoint(hwm)
.map_err(|e| SurrogateAllocError::FlushFailed {
detail: e.to_string(),
})?;
self.allocs_since_flush.store(0, Ordering::Release);
if let Ok(mut guard) = self.last_flush_at.lock() {
*guard = Instant::now();
}
Ok(())
}
#[cfg(test)]
fn rewind_flush_clock(&self, by: Duration) {
if let Ok(mut guard) = self.last_flush_at.lock()
&& let Some(earlier) = guard.checked_sub(by)
{
*guard = earlier;
}
}
}
impl Default for SurrogateRegistry {
fn default() -> Self {
Self::new()
}
}
impl From<SurrogateAllocError> for crate::Error {
fn from(e: SurrogateAllocError) -> Self {
match e {
SurrogateAllocError::Exhausted => crate::Error::Internal {
detail: "surrogate space exhausted (u32::MAX reached)".into(),
},
SurrogateAllocError::EmptyBatch => crate::Error::BadRequest {
detail: "surrogate batch size 0 is not allowed".into(),
},
SurrogateAllocError::FlushFailed { detail } => crate::Error::Storage {
engine: "surrogate".into(),
detail,
},
}
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicU32;
use super::*;
struct MemPersist {
last: std::sync::Mutex<Option<u32>>,
calls: AtomicU32,
}
impl MemPersist {
fn new() -> Self {
Self {
last: std::sync::Mutex::new(None),
calls: AtomicU32::new(0),
}
}
fn last(&self) -> Option<u32> {
*self.last.lock().unwrap()
}
fn calls(&self) -> u32 {
self.calls.load(Ordering::Acquire)
}
}
impl SurrogateHwmPersist for MemPersist {
fn checkpoint(&self, hwm: u32) -> crate::Result<()> {
*self.last.lock().unwrap() = Some(hwm);
self.calls.fetch_add(1, Ordering::AcqRel);
Ok(())
}
fn load(&self) -> crate::Result<u32> {
Ok(self.last().unwrap_or(0))
}
}
#[test]
fn monotonic_10k() {
let reg = SurrogateRegistry::new();
let mut prev = 0u32;
for _ in 0..10_000 {
let s = reg.alloc_one().unwrap().as_u32();
assert!(s > prev, "expected monotonic, got {prev} then {s}");
prev = s;
}
assert_eq!(reg.current_hwm(), 10_000);
}
#[test]
fn batch_alloc_returns_range_then_advances() {
let reg = SurrogateRegistry::new();
let range = reg.alloc(100).unwrap();
assert_eq!(*range.start(), Surrogate::new(1));
assert_eq!(*range.end(), Surrogate::new(100));
let count = (range.end().as_u32() - range.start().as_u32() + 1) as usize;
assert_eq!(count, 100);
let next = reg.alloc_one().unwrap();
assert_eq!(next, Surrogate::new(101));
}
#[test]
fn batch_alloc_zero_rejected() {
let reg = SurrogateRegistry::new();
assert!(matches!(reg.alloc(0), Err(SurrogateAllocError::EmptyBatch)));
}
#[test]
fn restart_survives_hwm() {
let reg = SurrogateRegistry::from_persisted_hwm(5000);
let s = reg.alloc_one().unwrap();
assert_eq!(s, Surrogate::new(5001));
assert_eq!(reg.current_hwm(), 5001);
}
#[test]
fn concurrent_16x1000_unique() {
let reg = Arc::new(SurrogateRegistry::new());
let mut handles = Vec::with_capacity(16);
for _ in 0..16 {
let r = reg.clone();
handles.push(std::thread::spawn(move || {
let mut local = Vec::with_capacity(1000);
for _ in 0..1000 {
local.push(r.alloc_one().unwrap());
}
local
}));
}
let mut all = Vec::with_capacity(16_000);
for h in handles {
all.extend(h.join().unwrap());
}
all.sort();
all.dedup();
assert_eq!(all.len(), 16_000, "expected 16000 unique surrogates");
assert!(reg.current_hwm() >= 16_000);
}
#[test]
fn overflow_surfaces_typed_error() {
let reg = SurrogateRegistry::from_persisted_hwm(u32::MAX - 1);
let last = reg.alloc_one().unwrap();
assert_eq!(last, Surrogate::new(u32::MAX));
let err = reg.alloc_one().unwrap_err();
assert!(matches!(err, SurrogateAllocError::Exhausted));
assert!(matches!(
reg.alloc_one().unwrap_err(),
SurrogateAllocError::Exhausted
));
}
#[test]
fn batch_overflow_surfaces_typed_error() {
let reg = SurrogateRegistry::from_persisted_hwm(u32::MAX - 5);
let err = reg.alloc(100).unwrap_err();
assert!(matches!(err, SurrogateAllocError::Exhausted));
}
#[test]
fn flush_threshold_ops() {
let reg = SurrogateRegistry::new();
assert!(!reg.should_flush(), "fresh registry should not flush yet");
for _ in 0..(FLUSH_OPS_THRESHOLD - 1) {
let _ = reg.alloc_one().unwrap();
}
assert!(!reg.should_flush(), "below ops threshold should not flush");
let _ = reg.alloc_one().unwrap();
assert!(reg.should_flush(), "at ops threshold should flush");
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
assert_eq!(persist.calls(), 1);
assert_eq!(persist.last(), Some(FLUSH_OPS_THRESHOLD as u32));
assert!(!reg.should_flush(), "post-flush should clear ops");
}
#[test]
fn flush_threshold_elapsed() {
let reg = SurrogateRegistry::new();
let _ = reg.alloc_one().unwrap();
assert!(!reg.should_flush());
reg.rewind_flush_clock(FLUSH_ELAPSED_THRESHOLD * 2);
assert!(reg.should_flush(), "rewound clock should fire elapsed");
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
assert!(!reg.should_flush(), "post-flush should reset clock");
}
#[test]
fn flush_idempotent_on_empty_registry() {
let reg = SurrogateRegistry::new();
let persist = MemPersist::new();
reg.flush(&persist).unwrap();
reg.flush(&persist).unwrap();
assert_eq!(persist.calls(), 2);
assert_eq!(persist.last(), Some(0));
}
#[test]
fn current_hwm_tracks_allocs() {
let reg = SurrogateRegistry::new();
assert_eq!(reg.current_hwm(), 0);
let _ = reg.alloc_one().unwrap();
assert_eq!(reg.current_hwm(), 1);
let _ = reg.alloc(10).unwrap();
assert_eq!(reg.current_hwm(), 11);
}
}