use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use nodedb_types::DatabaseId;
#[derive(Clone, Default)]
struct Bucket {
consumed_secs: f64,
}
struct DbWindow {
buckets: [Bucket; 60],
last_bucket_secs: u64,
}
impl DbWindow {
fn new() -> Self {
Self {
buckets: std::array::from_fn(|_| Bucket::default()),
last_bucket_secs: 0,
}
}
fn advance(&mut self, now_secs: u64) {
if now_secs <= self.last_bucket_secs {
return;
}
let elapsed = (now_secs - self.last_bucket_secs).min(60);
for i in 1..=elapsed {
let idx = ((self.last_bucket_secs + i) % 60) as usize;
self.buckets[idx] = Bucket::default();
}
self.last_bucket_secs = now_secs;
}
fn window_total(&self) -> f64 {
self.buckets.iter().map(|b| b.consumed_secs).sum()
}
fn record(&mut self, now_secs: u64, secs: f64) {
self.advance(now_secs);
let idx = (now_secs % 60) as usize;
self.buckets[idx].consumed_secs += secs;
}
}
pub struct MaintenanceBudgetTracker {
inner: Mutex<TrackerInner>,
}
struct TrackerInner {
windows: HashMap<DatabaseId, DbWindow>,
caps: HashMap<DatabaseId, f64>,
}
impl TrackerInner {
fn new() -> Self {
Self {
windows: HashMap::new(),
caps: HashMap::new(),
}
}
fn cap_for(&self, db: DatabaseId) -> f64 {
self.caps.get(&db).copied().unwrap_or(f64::INFINITY)
}
fn window_for_mut(&mut self, db: DatabaseId) -> &mut DbWindow {
self.windows.entry(db).or_insert_with(DbWindow::new)
}
}
impl std::fmt::Debug for MaintenanceBudgetTracker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("MaintenanceBudgetTracker { .. }")
}
}
impl MaintenanceBudgetTracker {
pub fn new() -> Self {
Self {
inner: Mutex::new(TrackerInner::new()),
}
}
pub fn set_cap(&self, db: DatabaseId, maintenance_cpu_pct: u8) {
let cap = if maintenance_cpu_pct == 0 {
f64::INFINITY
} else {
(maintenance_cpu_pct as f64 / 100.0) * 60.0
};
let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
inner.caps.insert(db, cap);
}
pub fn try_acquire(
self: &Arc<Self>,
db: DatabaseId,
estimated_secs: f64,
) -> Option<MaintenanceLease> {
let now_secs = current_secs();
let mut inner = self.inner.lock().unwrap_or_else(|p| p.into_inner());
let cap = inner.cap_for(db);
let window = inner.window_for_mut(db);
window.advance(now_secs);
let consumed = window.window_total();
if consumed + estimated_secs <= cap {
Some(MaintenanceLease {
tracker: Arc::clone(self),
db,
start: Instant::now(),
})
} else {
None
}
}
}
impl Default for MaintenanceBudgetTracker {
fn default() -> Self {
Self::new()
}
}
pub struct MaintenanceLease {
tracker: Arc<MaintenanceBudgetTracker>,
db: DatabaseId,
start: Instant,
}
impl Drop for MaintenanceLease {
fn drop(&mut self) {
let elapsed = self.start.elapsed().as_secs_f64();
let now_secs = current_secs();
let mut inner = self.tracker.inner.lock().unwrap_or_else(|p| p.into_inner());
inner.window_for_mut(self.db).record(now_secs, elapsed);
}
}
impl std::fmt::Debug for MaintenanceLease {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("MaintenanceLease")
.field("db", &self.db)
.finish()
}
}
fn current_secs() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
fn tracker() -> Arc<MaintenanceBudgetTracker> {
Arc::new(MaintenanceBudgetTracker::new())
}
#[test]
fn over_cap_defers() {
let t = tracker();
let db = DatabaseId::new(1);
t.set_cap(db, 10);
{
let now = current_secs();
let mut inner = t.inner.lock().unwrap();
inner.window_for_mut(db).record(now, 6.0);
}
assert!(t.try_acquire(db, 0.1).is_none());
}
#[test]
fn acquire_within_cap() {
let t = tracker();
let db = DatabaseId::new(2);
t.set_cap(db, 50); assert!(t.try_acquire(db, 5.0).is_some());
}
#[test]
fn no_cap_is_infinite() {
let t = tracker();
let db = DatabaseId::new(3);
t.set_cap(db, 0);
assert!(t.try_acquire(db, 1_000_000.0).is_some());
}
#[test]
fn lease_drop_records_actual() {
let t = tracker();
let db = DatabaseId::new(4);
t.set_cap(db, 100);
{
let _lease = t.try_acquire(db, 5.0).unwrap();
std::thread::sleep(std::time::Duration::from_millis(10));
}
{
let now = current_secs();
let mut inner = t.inner.lock().unwrap();
inner.window_for_mut(db).advance(now);
let total = inner.window_for_mut(db).window_total();
assert!(total > 0.0, "lease drop should have recorded elapsed time");
assert!(total < 1.0, "elapsed should be under 1s");
}
}
#[test]
fn window_resets_after_sixty_seconds() {
let t = tracker();
let db = DatabaseId::new(5);
t.set_cap(db, 10);
{
let now = current_secs();
let past = now.saturating_sub(61);
let mut inner = t.inner.lock().unwrap();
inner.window_for_mut(db).record(past, 6.0);
}
let lease = t.try_acquire(db, 5.9); assert!(
lease.is_some(),
"old consumption should have expired out of the window"
);
}
}