use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
pub(crate) fn atomic_saturating_sub(counter: &AtomicUsize, size: usize) {
if size == 0 {
return;
}
let mut current = counter.load(Ordering::Acquire);
loop {
let new_val = current.saturating_sub(size);
match counter.compare_exchange_weak(current, new_val, Ordering::Release, Ordering::Relaxed)
{
Ok(_) => return,
Err(actual) => current = actual,
}
}
}
#[derive(Debug)]
pub struct Budget {
limit: AtomicUsize,
allocated: Arc<AtomicUsize>,
peak: AtomicUsize,
rejection_count: AtomicUsize,
over_release_count: AtomicUsize,
}
impl Budget {
pub fn new(limit: usize) -> Self {
Self {
limit: AtomicUsize::new(limit),
allocated: Arc::new(AtomicUsize::new(0)),
peak: AtomicUsize::new(0),
rejection_count: AtomicUsize::new(0),
over_release_count: AtomicUsize::new(0),
}
}
pub fn try_reserve(&self, size: usize) -> bool {
let limit = self.limit.load(Ordering::Relaxed);
loop {
let current = self.allocated.load(Ordering::Relaxed);
if current + size > limit {
self.rejection_count.fetch_add(1, Ordering::Relaxed);
return false;
}
match self.allocated.compare_exchange_weak(
current,
current + size,
Ordering::AcqRel,
Ordering::Relaxed,
) {
Ok(_) => {
let new_allocated = current + size;
let mut peak = self.peak.load(Ordering::Relaxed);
while new_allocated > peak {
match self.peak.compare_exchange_weak(
peak,
new_allocated,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(actual) => peak = actual,
}
}
return true;
}
Err(_) => continue, }
}
}
pub fn try_reserve_arc(&self, size: usize) -> Option<Arc<AtomicUsize>> {
if self.try_reserve(size) {
Some(Arc::clone(&self.allocated))
} else {
None
}
}
pub fn release(&self, size: usize) {
loop {
let current = self.allocated.load(Ordering::Acquire);
let new_val = current.saturating_sub(size);
match self.allocated.compare_exchange_weak(
current,
new_val,
Ordering::Release,
Ordering::Relaxed,
) {
Ok(_) => {
if size > current {
self.over_release_count.fetch_add(1, Ordering::Relaxed);
tracing::warn!(
released = size,
allocated = current,
"memory release exceeds allocation (WAL replay or accounting drift)"
);
}
return;
}
Err(_) => continue,
}
}
}
pub fn allocated(&self) -> usize {
self.allocated.load(Ordering::Relaxed)
}
pub fn limit(&self) -> usize {
self.limit.load(Ordering::Relaxed)
}
pub fn over_release_count(&self) -> usize {
self.over_release_count.load(Ordering::Relaxed)
}
pub fn available(&self) -> usize {
let limit = self.limit();
let allocated = self.allocated();
limit.saturating_sub(allocated)
}
pub fn utilization_percent(&self) -> u8 {
let limit = self.limit();
if limit == 0 {
return 100;
}
let allocated = self.allocated() as u128;
((allocated * 100) / limit as u128).min(100) as u8
}
pub fn peak(&self) -> usize {
self.peak.load(Ordering::Relaxed)
}
pub fn rejections(&self) -> usize {
self.rejection_count.load(Ordering::Relaxed)
}
pub fn set_limit(&self, new_limit: usize) {
let allocated = self.allocated();
let effective = new_limit.max(allocated);
self.limit.store(effective, Ordering::Release);
}
#[cfg(test)]
pub fn reset(&self) {
self.allocated.store(0, Ordering::Relaxed);
self.peak.store(0, Ordering::Relaxed);
self.rejection_count.store(0, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn utilization_and_available_never_panic_on_corrupted_allocated() {
let budget = Budget::new(1024);
budget.allocated.store(usize::MAX, Ordering::Relaxed);
assert_eq!(
budget.utilization_percent(),
100,
"a wrapped/over-large allocated counter must clamp to 100%, not \
panic on `allocated * 100` and not wrap to a small percentage"
);
assert_eq!(
budget.available(),
0,
"no capacity is available when allocated has run past the limit"
);
}
#[test]
fn reserve_within_limit() {
let budget = Budget::new(1024);
assert!(budget.try_reserve(512));
assert_eq!(budget.allocated(), 512);
assert_eq!(budget.available(), 512);
assert_eq!(budget.utilization_percent(), 50);
}
#[test]
fn reserve_at_limit() {
let budget = Budget::new(1024);
assert!(budget.try_reserve(1024));
assert!(!budget.try_reserve(1));
assert_eq!(budget.rejections(), 1);
}
#[test]
fn reserve_exceeds_limit() {
let budget = Budget::new(100);
assert!(!budget.try_reserve(101));
assert_eq!(budget.allocated(), 0);
assert_eq!(budget.rejections(), 1);
}
#[test]
fn release_frees_capacity() {
let budget = Budget::new(1024);
assert!(budget.try_reserve(512));
assert!(budget.try_reserve(512));
assert!(!budget.try_reserve(1));
budget.release(256);
assert!(budget.try_reserve(256));
}
#[test]
fn peak_tracks_high_water_mark() {
let budget = Budget::new(1024);
budget.try_reserve(800);
budget.release(500);
budget.try_reserve(100);
assert_eq!(budget.peak(), 800);
assert_eq!(budget.allocated(), 400);
}
#[test]
fn dynamic_limit_adjustment() {
let budget = Budget::new(1024);
budget.try_reserve(600);
budget.set_limit(2048);
assert_eq!(budget.limit(), 2048);
assert!(budget.try_reserve(1000));
budget.set_limit(100);
assert_eq!(budget.limit(), 1600); }
#[test]
fn try_reserve_arc_returns_shared_counter() {
let budget = Budget::new(1024);
let arc = budget.try_reserve_arc(512).expect("within budget");
assert_eq!(arc.load(Ordering::Relaxed), 512);
arc.fetch_sub(512, Ordering::Relaxed);
assert_eq!(budget.allocated(), 0);
}
#[test]
fn concurrent_reserves() {
use std::sync::Arc;
use std::thread;
let budget = Arc::new(Budget::new(10_000));
let mut handles = Vec::new();
for _ in 0..10 {
let b = Arc::clone(&budget);
handles.push(thread::spawn(move || {
let mut reserved = 0;
for _ in 0..100 {
if b.try_reserve(10) {
reserved += 10;
}
}
reserved
}));
}
let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
assert_eq!(total_reserved, 10_000);
assert_eq!(budget.allocated(), 10_000);
}
}