Skip to main content

nodedb_mem/
budget.rs

1//! Per-engine memory budget tracking.
2
3use std::sync::atomic::{AtomicUsize, Ordering};
4
5/// A memory budget for a single engine.
6///
7/// Tracks current allocation against a configurable limit using atomic
8/// counters (safe to read from any thread — metrics exporter, governor, etc.).
9#[derive(Debug)]
10pub struct Budget {
11    /// Hard limit in bytes. Allocations beyond this are rejected.
12    limit: AtomicUsize,
13
14    /// Current allocated bytes.
15    allocated: AtomicUsize,
16
17    /// Peak allocated bytes (high-water mark).
18    peak: AtomicUsize,
19
20    /// Number of times an allocation was rejected due to budget exhaustion.
21    rejection_count: AtomicUsize,
22}
23
24impl Budget {
25    /// Create a new budget with the given limit.
26    pub fn new(limit: usize) -> Self {
27        Self {
28            limit: AtomicUsize::new(limit),
29            allocated: AtomicUsize::new(0),
30            peak: AtomicUsize::new(0),
31            rejection_count: AtomicUsize::new(0),
32        }
33    }
34
35    /// Try to reserve `size` bytes from this budget.
36    ///
37    /// Returns `true` if the reservation succeeded, `false` if it would
38    /// exceed the limit.
39    pub fn try_reserve(&self, size: usize) -> bool {
40        let limit = self.limit.load(Ordering::Relaxed);
41
42        // CAS loop to atomically check and increment.
43        loop {
44            let current = self.allocated.load(Ordering::Relaxed);
45            if current + size > limit {
46                self.rejection_count.fetch_add(1, Ordering::Relaxed);
47                return false;
48            }
49
50            match self.allocated.compare_exchange_weak(
51                current,
52                current + size,
53                Ordering::AcqRel,
54                Ordering::Relaxed,
55            ) {
56                Ok(_) => {
57                    // Update peak if necessary.
58                    let new_allocated = current + size;
59                    let mut peak = self.peak.load(Ordering::Relaxed);
60                    while new_allocated > peak {
61                        match self.peak.compare_exchange_weak(
62                            peak,
63                            new_allocated,
64                            Ordering::Relaxed,
65                            Ordering::Relaxed,
66                        ) {
67                            Ok(_) => break,
68                            Err(actual) => peak = actual,
69                        }
70                    }
71                    return true;
72                }
73                Err(_) => continue, // Retry CAS.
74            }
75        }
76    }
77
78    /// Release `size` bytes back to the budget.
79    ///
80    /// Saturates to zero if `size` exceeds the current allocation (which can
81    /// happen when data is replayed from WAL without a matching reservation).
82    pub fn release(&self, size: usize) {
83        loop {
84            let current = self.allocated.load(Ordering::Acquire);
85            let new_val = current.saturating_sub(size);
86            match self.allocated.compare_exchange_weak(
87                current,
88                new_val,
89                Ordering::Release,
90                Ordering::Relaxed,
91            ) {
92                Ok(_) => {
93                    if size > current {
94                        tracing::warn!(
95                            released = size,
96                            allocated = current,
97                            "memory release exceeds allocation (WAL replay or accounting drift)"
98                        );
99                    }
100                    return;
101                }
102                Err(_) => continue,
103            }
104        }
105    }
106
107    /// Current allocated bytes.
108    pub fn allocated(&self) -> usize {
109        self.allocated.load(Ordering::Relaxed)
110    }
111
112    /// Hard limit in bytes.
113    pub fn limit(&self) -> usize {
114        self.limit.load(Ordering::Relaxed)
115    }
116
117    /// Remaining bytes available.
118    pub fn available(&self) -> usize {
119        let limit = self.limit();
120        let allocated = self.allocated();
121        limit.saturating_sub(allocated)
122    }
123
124    /// Utilization as a percentage (0-100).
125    pub fn utilization_percent(&self) -> u8 {
126        let limit = self.limit();
127        if limit == 0 {
128            return 100;
129        }
130        let allocated = self.allocated();
131        ((allocated * 100) / limit).min(100) as u8
132    }
133
134    /// Peak allocation (high-water mark).
135    pub fn peak(&self) -> usize {
136        self.peak.load(Ordering::Relaxed)
137    }
138
139    /// Number of rejected allocation attempts.
140    pub fn rejections(&self) -> usize {
141        self.rejection_count.load(Ordering::Relaxed)
142    }
143
144    /// Update the limit dynamically (for rebalancing).
145    ///
146    /// The new limit must be >= current allocation. If it's less, the limit
147    /// is set to the current allocation (no immediate eviction).
148    pub fn set_limit(&self, new_limit: usize) {
149        let allocated = self.allocated();
150        let effective = new_limit.max(allocated);
151        self.limit.store(effective, Ordering::Release);
152    }
153
154    /// Reset all counters (for testing).
155    #[cfg(test)]
156    pub fn reset(&self) {
157        self.allocated.store(0, Ordering::Relaxed);
158        self.peak.store(0, Ordering::Relaxed);
159        self.rejection_count.store(0, Ordering::Relaxed);
160    }
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166
167    #[test]
168    fn reserve_within_limit() {
169        let budget = Budget::new(1024);
170        assert!(budget.try_reserve(512));
171        assert_eq!(budget.allocated(), 512);
172        assert_eq!(budget.available(), 512);
173        assert_eq!(budget.utilization_percent(), 50);
174    }
175
176    #[test]
177    fn reserve_at_limit() {
178        let budget = Budget::new(1024);
179        assert!(budget.try_reserve(1024));
180        assert!(!budget.try_reserve(1));
181        assert_eq!(budget.rejections(), 1);
182    }
183
184    #[test]
185    fn reserve_exceeds_limit() {
186        let budget = Budget::new(100);
187        assert!(!budget.try_reserve(101));
188        assert_eq!(budget.allocated(), 0);
189        assert_eq!(budget.rejections(), 1);
190    }
191
192    #[test]
193    fn release_frees_capacity() {
194        let budget = Budget::new(1024);
195        assert!(budget.try_reserve(512));
196        assert!(budget.try_reserve(512));
197        assert!(!budget.try_reserve(1));
198
199        budget.release(256);
200        assert!(budget.try_reserve(256));
201    }
202
203    #[test]
204    fn peak_tracks_high_water_mark() {
205        let budget = Budget::new(1024);
206        budget.try_reserve(800);
207        budget.release(500);
208        budget.try_reserve(100);
209
210        assert_eq!(budget.peak(), 800);
211        assert_eq!(budget.allocated(), 400);
212    }
213
214    #[test]
215    fn dynamic_limit_adjustment() {
216        let budget = Budget::new(1024);
217        budget.try_reserve(600);
218
219        // Increase limit.
220        budget.set_limit(2048);
221        assert_eq!(budget.limit(), 2048);
222        assert!(budget.try_reserve(1000));
223
224        // Decrease limit — but not below current allocation.
225        budget.set_limit(100);
226        assert_eq!(budget.limit(), 1600); // max(100, 1600 allocated)
227    }
228
229    #[test]
230    fn concurrent_reserves() {
231        use std::sync::Arc;
232        use std::thread;
233
234        let budget = Arc::new(Budget::new(10_000));
235        let mut handles = Vec::new();
236
237        for _ in 0..10 {
238            let b = Arc::clone(&budget);
239            handles.push(thread::spawn(move || {
240                let mut reserved = 0;
241                for _ in 0..100 {
242                    if b.try_reserve(10) {
243                        reserved += 10;
244                    }
245                }
246                reserved
247            }));
248        }
249
250        let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
251
252        // All 1000 reservations of 10 bytes should succeed (10 * 100 * 10 = 10000).
253        assert_eq!(total_reserved, 10_000);
254        assert_eq!(budget.allocated(), 10_000);
255    }
256}