nodedb_mem/budget.rs
1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-engine memory budget tracking.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// Decrement `counter` by `size`, clamping at zero.
9///
10/// Plain `fetch_sub` wraps on underflow — a counter that has been
11/// released past zero by another path (e.g. a legacy `MemoryGovernor::release`
12/// draining a budget while a `ReservationToken` is still alive) would
13/// otherwise jump to ~`usize::MAX`, which every utilization reader then
14/// interprets as 100 % → permanent Emergency pressure. All release sites
15/// must go through this so an over-release is at worst a saturated zero,
16/// never a wrapped maximum.
17pub(crate) fn atomic_saturating_sub(counter: &AtomicUsize, size: usize) {
18 if size == 0 {
19 return;
20 }
21 let mut current = counter.load(Ordering::Acquire);
22 loop {
23 let new_val = current.saturating_sub(size);
24 match counter.compare_exchange_weak(current, new_val, Ordering::Release, Ordering::Relaxed)
25 {
26 Ok(_) => return,
27 Err(actual) => current = actual,
28 }
29 }
30}
31
32/// A memory budget for a single engine.
33///
34/// Tracks current allocation against a configurable limit using atomic
35/// counters (safe to read from any thread — metrics exporter, governor, etc.).
36///
37/// The `allocated` counter is stored behind an `Arc` so that
38/// [`ReservationToken`](crate::reservation_token::ReservationToken) can hold a
39/// reference to it without requiring a back-reference to the governor.
40#[derive(Debug)]
41pub struct Budget {
42 /// Hard limit in bytes. Allocations beyond this are rejected.
43 limit: AtomicUsize,
44
45 /// Current allocated bytes. Arc-wrapped so tokens can release on drop.
46 allocated: Arc<AtomicUsize>,
47
48 /// Peak allocated bytes (high-water mark).
49 peak: AtomicUsize,
50
51 /// Number of times an allocation was rejected due to budget exhaustion.
52 rejection_count: AtomicUsize,
53
54 /// Number of `release(size)` calls where `size > current` — the
55 /// accounting-drift symptom. Saturating the per-engine counter
56 /// to zero hides the drift from `allocated()`, so this counter
57 /// is the only observable signal that the call-site is
58 /// over-releasing.
59 over_release_count: AtomicUsize,
60}
61
62impl Budget {
63 /// Create a new budget with the given limit.
64 pub fn new(limit: usize) -> Self {
65 Self {
66 limit: AtomicUsize::new(limit),
67 allocated: Arc::new(AtomicUsize::new(0)),
68 peak: AtomicUsize::new(0),
69 rejection_count: AtomicUsize::new(0),
70 over_release_count: AtomicUsize::new(0),
71 }
72 }
73
74 /// Try to reserve `size` bytes from this budget.
75 ///
76 /// Returns `true` if the reservation succeeded, `false` if it would
77 /// exceed the limit.
78 pub fn try_reserve(&self, size: usize) -> bool {
79 let limit = self.limit.load(Ordering::Relaxed);
80
81 // CAS loop to atomically check and increment.
82 loop {
83 let current = self.allocated.load(Ordering::Relaxed);
84 if current + size > limit {
85 self.rejection_count.fetch_add(1, Ordering::Relaxed);
86 return false;
87 }
88
89 match self.allocated.compare_exchange_weak(
90 current,
91 current + size,
92 Ordering::AcqRel,
93 Ordering::Relaxed,
94 ) {
95 Ok(_) => {
96 // Update peak if necessary.
97 let new_allocated = current + size;
98 let mut peak = self.peak.load(Ordering::Relaxed);
99 while new_allocated > peak {
100 match self.peak.compare_exchange_weak(
101 peak,
102 new_allocated,
103 Ordering::Relaxed,
104 Ordering::Relaxed,
105 ) {
106 Ok(_) => break,
107 Err(actual) => peak = actual,
108 }
109 }
110 return true;
111 }
112 Err(_) => continue, // Retry CAS.
113 }
114 }
115 }
116
117 /// Try to reserve `size` bytes and return a shared `Arc` to the allocated
118 /// counter so the caller can decrement it on drop.
119 ///
120 /// Returns `Some(arc)` on success, `None` on budget exhaustion.
121 pub fn try_reserve_arc(&self, size: usize) -> Option<Arc<AtomicUsize>> {
122 if self.try_reserve(size) {
123 Some(Arc::clone(&self.allocated))
124 } else {
125 None
126 }
127 }
128
129 /// Release `size` bytes back to the budget.
130 ///
131 /// Saturates to zero if `size` exceeds the current allocation (which can
132 /// happen when data is replayed from WAL without a matching reservation).
133 pub fn release(&self, size: usize) {
134 loop {
135 let current = self.allocated.load(Ordering::Acquire);
136 let new_val = current.saturating_sub(size);
137 match self.allocated.compare_exchange_weak(
138 current,
139 new_val,
140 Ordering::Release,
141 Ordering::Relaxed,
142 ) {
143 Ok(_) => {
144 if size > current {
145 self.over_release_count.fetch_add(1, Ordering::Relaxed);
146 tracing::warn!(
147 released = size,
148 allocated = current,
149 "memory release exceeds allocation (WAL replay or accounting drift)"
150 );
151 }
152 return;
153 }
154 Err(_) => continue,
155 }
156 }
157 }
158
159 /// Current allocated bytes.
160 pub fn allocated(&self) -> usize {
161 self.allocated.load(Ordering::Relaxed)
162 }
163
164 /// Hard limit in bytes.
165 pub fn limit(&self) -> usize {
166 self.limit.load(Ordering::Relaxed)
167 }
168
169 /// Number of over-release events observed on this budget. A
170 /// non-zero value is the smoking-gun signal that some call-site
171 /// is releasing more bytes than it reserved. Per-engine
172 /// `allocated()` saturates to zero on over-release, so this
173 /// counter is the only post-hoc observable.
174 pub fn over_release_count(&self) -> usize {
175 self.over_release_count.load(Ordering::Relaxed)
176 }
177
178 /// Remaining bytes available.
179 pub fn available(&self) -> usize {
180 let limit = self.limit();
181 let allocated = self.allocated();
182 limit.saturating_sub(allocated)
183 }
184
185 /// Utilization as a percentage (0-100).
186 ///
187 /// Computed in `u128` so a corrupted (e.g. underflow-wrapped near
188 /// `usize::MAX`) `allocated` clamps to 100 % rather than panicking on
189 /// `allocated * 100` overflow — a panic here is taken inside the Data
190 /// Plane core loop (`apply_spsc_pressure` → `snapshot`) and escalates
191 /// to a DEGRADED core.
192 pub fn utilization_percent(&self) -> u8 {
193 let limit = self.limit();
194 if limit == 0 {
195 return 100;
196 }
197 let allocated = self.allocated() as u128;
198 ((allocated * 100) / limit as u128).min(100) as u8
199 }
200
201 /// Peak allocation (high-water mark).
202 pub fn peak(&self) -> usize {
203 self.peak.load(Ordering::Relaxed)
204 }
205
206 /// Number of rejected allocation attempts.
207 pub fn rejections(&self) -> usize {
208 self.rejection_count.load(Ordering::Relaxed)
209 }
210
211 /// Update the limit dynamically (for rebalancing).
212 ///
213 /// The new limit must be >= current allocation. If it's less, the limit
214 /// is set to the current allocation (no immediate eviction).
215 pub fn set_limit(&self, new_limit: usize) {
216 let allocated = self.allocated();
217 let effective = new_limit.max(allocated);
218 self.limit.store(effective, Ordering::Release);
219 }
220
221 /// Reset all counters (for testing).
222 #[cfg(test)]
223 pub fn reset(&self) {
224 self.allocated.store(0, Ordering::Relaxed);
225 self.peak.store(0, Ordering::Relaxed);
226 self.rejection_count.store(0, Ordering::Relaxed);
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233
234 #[test]
235 fn utilization_and_available_never_panic_on_corrupted_allocated() {
236 // `allocated` is the sum of `try_reserve`/`release` deltas. An
237 // unbalanced call site — or a `ReservationToken` drop that
238 // `fetch_sub`s past zero — leaves it wrapped near `usize::MAX`.
239 // `utilization_percent` must not panic computing `allocated * 100`
240 // (it does today in debug builds: `attempt to multiply with
241 // overflow`, taken inside the Data Plane core loop via
242 // `apply_spsc_pressure → snapshot`, which the panic watchdog then
243 // escalates to a DEGRADED core). In release the same multiply
244 // wraps and reports a garbage level. A budget reader must be
245 // robust to a corrupted counter: report a clamped 100 % (so the
246 // pressure detector at least errs toward Emergency, not toward
247 // "idle") and never crash.
248 let budget = Budget::new(1024);
249 budget.allocated.store(usize::MAX, Ordering::Relaxed);
250
251 assert_eq!(
252 budget.utilization_percent(),
253 100,
254 "a wrapped/over-large allocated counter must clamp to 100%, not \
255 panic on `allocated * 100` and not wrap to a small percentage"
256 );
257 assert_eq!(
258 budget.available(),
259 0,
260 "no capacity is available when allocated has run past the limit"
261 );
262 }
263
264 #[test]
265 fn reserve_within_limit() {
266 let budget = Budget::new(1024);
267 assert!(budget.try_reserve(512));
268 assert_eq!(budget.allocated(), 512);
269 assert_eq!(budget.available(), 512);
270 assert_eq!(budget.utilization_percent(), 50);
271 }
272
273 #[test]
274 fn reserve_at_limit() {
275 let budget = Budget::new(1024);
276 assert!(budget.try_reserve(1024));
277 assert!(!budget.try_reserve(1));
278 assert_eq!(budget.rejections(), 1);
279 }
280
281 #[test]
282 fn reserve_exceeds_limit() {
283 let budget = Budget::new(100);
284 assert!(!budget.try_reserve(101));
285 assert_eq!(budget.allocated(), 0);
286 assert_eq!(budget.rejections(), 1);
287 }
288
289 #[test]
290 fn release_frees_capacity() {
291 let budget = Budget::new(1024);
292 assert!(budget.try_reserve(512));
293 assert!(budget.try_reserve(512));
294 assert!(!budget.try_reserve(1));
295
296 budget.release(256);
297 assert!(budget.try_reserve(256));
298 }
299
300 #[test]
301 fn peak_tracks_high_water_mark() {
302 let budget = Budget::new(1024);
303 budget.try_reserve(800);
304 budget.release(500);
305 budget.try_reserve(100);
306
307 assert_eq!(budget.peak(), 800);
308 assert_eq!(budget.allocated(), 400);
309 }
310
311 #[test]
312 fn dynamic_limit_adjustment() {
313 let budget = Budget::new(1024);
314 budget.try_reserve(600);
315
316 // Increase limit.
317 budget.set_limit(2048);
318 assert_eq!(budget.limit(), 2048);
319 assert!(budget.try_reserve(1000));
320
321 // Decrease limit — but not below current allocation.
322 budget.set_limit(100);
323 assert_eq!(budget.limit(), 1600); // max(100, 1600 allocated)
324 }
325
326 #[test]
327 fn try_reserve_arc_returns_shared_counter() {
328 let budget = Budget::new(1024);
329 let arc = budget.try_reserve_arc(512).expect("within budget");
330 assert_eq!(arc.load(Ordering::Relaxed), 512);
331 // Release via the arc.
332 arc.fetch_sub(512, Ordering::Relaxed);
333 assert_eq!(budget.allocated(), 0);
334 }
335
336 #[test]
337 fn concurrent_reserves() {
338 use std::sync::Arc;
339 use std::thread;
340
341 let budget = Arc::new(Budget::new(10_000));
342 let mut handles = Vec::new();
343
344 for _ in 0..10 {
345 let b = Arc::clone(&budget);
346 handles.push(thread::spawn(move || {
347 let mut reserved = 0;
348 for _ in 0..100 {
349 if b.try_reserve(10) {
350 reserved += 10;
351 }
352 }
353 reserved
354 }));
355 }
356
357 let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
358
359 // All 1000 reservations of 10 bytes should succeed (10 * 100 * 10 = 10000).
360 assert_eq!(total_reserved, 10_000);
361 assert_eq!(budget.allocated(), 10_000);
362 }
363}