use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use nodedb_types::{DatabaseId, TenantId};
use crate::engine::EngineId;
use crate::governor::GlobalCounter;
#[must_use = "dropping a ReservationToken immediately releases the reservation; bind it to a variable"]
pub struct ReservationToken {
pub(crate) global_counter: Arc<GlobalCounter>,
pub(crate) database_counter: Option<Arc<AtomicUsize>>,
pub(crate) tenant_counter: Option<Arc<AtomicUsize>>,
pub(crate) engine_counter: Option<Arc<AtomicUsize>>,
pub(crate) size: usize,
db: DatabaseId,
tenant: TenantId,
engine: EngineId,
}
pub(crate) struct ReservationParams {
pub global_counter: Arc<GlobalCounter>,
pub database_counter: Option<Arc<AtomicUsize>>,
pub tenant_counter: Option<Arc<AtomicUsize>>,
pub engine_counter: Option<Arc<AtomicUsize>>,
pub size: usize,
pub db: DatabaseId,
pub tenant: TenantId,
pub engine: EngineId,
}
impl ReservationToken {
pub(crate) fn new(params: ReservationParams) -> Self {
Self {
global_counter: params.global_counter,
database_counter: params.database_counter,
tenant_counter: params.tenant_counter,
engine_counter: params.engine_counter,
size: params.size,
db: params.db,
tenant: params.tenant,
engine: params.engine,
}
}
pub fn size(&self) -> usize {
self.size
}
pub fn database_id(&self) -> DatabaseId {
self.db
}
pub fn tenant_id(&self) -> TenantId {
self.tenant
}
pub fn engine(&self) -> EngineId {
self.engine
}
}
impl Drop for ReservationToken {
fn drop(&mut self) {
let size = self.size;
if size == 0 {
return;
}
if let Some(ref counter) = self.engine_counter {
crate::budget::atomic_saturating_sub(counter, size);
}
if let Some(ref counter) = self.tenant_counter {
crate::budget::atomic_saturating_sub(counter, size);
}
if let Some(ref counter) = self.database_counter {
crate::budget::atomic_saturating_sub(counter, size);
}
crate::budget::atomic_saturating_sub(&self.global_counter.allocated, size);
}
}
impl std::fmt::Debug for ReservationToken {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ReservationToken")
.field("size", &self.size)
.field("db", &self.db)
.field("tenant", &self.tenant)
.field("engine", &self.engine)
.finish()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use std::sync::atomic::AtomicUsize;
use nodedb_types::{DatabaseId, TenantId};
use super::{ReservationParams, ReservationToken};
use crate::engine::EngineId;
use crate::governor::GlobalCounter;
fn make_counter(val: usize) -> Arc<AtomicUsize> {
Arc::new(AtomicUsize::new(val))
}
fn make_global(val: usize) -> Arc<GlobalCounter> {
Arc::new(GlobalCounter {
allocated: AtomicUsize::new(val),
ceiling: 1024 * 1024,
})
}
#[test]
fn drop_releases_all_four_levels() {
let global = make_global(100);
let db_ctr = make_counter(100);
let tenant_ctr = make_counter(100);
let engine_ctr = make_counter(100);
let token = ReservationToken::new(ReservationParams {
global_counter: Arc::clone(&global),
database_counter: Some(Arc::clone(&db_ctr)),
tenant_counter: Some(Arc::clone(&tenant_ctr)),
engine_counter: Some(Arc::clone(&engine_ctr)),
size: 100,
db: DatabaseId::DEFAULT,
tenant: TenantId::new(1),
engine: EngineId::Vector,
});
assert_eq!(
global.allocated.load(std::sync::atomic::Ordering::Relaxed),
100
);
drop(token);
assert_eq!(
global.allocated.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(db_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
assert_eq!(tenant_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
assert_eq!(engine_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
}
#[test]
fn drop_with_no_scoped_counters_releases_global() {
let global = make_global(200);
let token = ReservationToken::new(ReservationParams {
global_counter: Arc::clone(&global),
database_counter: None,
tenant_counter: None,
engine_counter: None,
size: 200,
db: DatabaseId::DEFAULT,
tenant: TenantId::new(1),
engine: EngineId::Query,
});
drop(token);
assert_eq!(
global.allocated.load(std::sync::atomic::Ordering::Relaxed),
0
);
}
#[test]
fn drop_does_not_underflow_a_counter_released_below_size() {
let global = make_global(40);
let engine_ctr = make_counter(40);
let tenant_ctr = make_counter(40);
let token = ReservationToken::new(ReservationParams {
global_counter: Arc::clone(&global),
database_counter: None,
tenant_counter: Some(Arc::clone(&tenant_ctr)),
engine_counter: Some(Arc::clone(&engine_ctr)),
size: 40,
db: DatabaseId::DEFAULT,
tenant: TenantId::new(1),
engine: EngineId::Timeseries,
});
engine_ctr.store(0, std::sync::atomic::Ordering::Relaxed);
global
.allocated
.store(0, std::sync::atomic::Ordering::Relaxed);
drop(token);
let engine = engine_ctr.load(std::sync::atomic::Ordering::Relaxed);
let glob = global.allocated.load(std::sync::atomic::Ordering::Relaxed);
let tenant = tenant_ctr.load(std::sync::atomic::Ordering::Relaxed);
assert_eq!(
engine, 0,
"engine counter underflowed to {engine} on token drop — a wrapped \
counter reads as 100% utilization (Emergency) forever"
);
assert_eq!(
glob, 0,
"global counter underflowed to {glob} on token drop"
);
assert_eq!(tenant, 0, "tenant counter should release normally to 0");
}
#[test]
fn zero_size_drop_is_noop() {
let global = make_global(0);
let token = ReservationToken::new(ReservationParams {
global_counter: Arc::clone(&global),
database_counter: None,
tenant_counter: None,
engine_counter: None,
size: 0,
db: DatabaseId::DEFAULT,
tenant: TenantId::new(1),
engine: EngineId::Query,
});
drop(token);
assert_eq!(
global.allocated.load(std::sync::atomic::Ordering::Relaxed),
0
);
}
}