Skip to main content

nodedb_mem/
budget.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-engine memory budget tracking.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// Decrement `counter` by `size`, clamping at zero.
9///
10/// Plain `fetch_sub` wraps on underflow — a counter that has been
11/// released past zero by another path (e.g. a legacy `MemoryGovernor::release`
12/// draining a budget while a `ReservationToken` is still alive) would
13/// otherwise jump to ~`usize::MAX`, which every utilization reader then
14/// interprets as 100 % → permanent Emergency pressure. All release sites
15/// must go through this so an over-release is at worst a saturated zero,
16/// never a wrapped maximum.
17pub(crate) fn atomic_saturating_sub(counter: &AtomicUsize, size: usize) {
18    if size == 0 {
19        return;
20    }
21    let mut current = counter.load(Ordering::Acquire);
22    loop {
23        let new_val = current.saturating_sub(size);
24        match counter.compare_exchange_weak(current, new_val, Ordering::Release, Ordering::Relaxed)
25        {
26            Ok(_) => return,
27            Err(actual) => current = actual,
28        }
29    }
30}
31
32/// A memory budget for a single engine.
33///
34/// Tracks current allocation against a configurable limit using atomic
35/// counters (safe to read from any thread — metrics exporter, governor, etc.).
36///
37/// The `allocated` counter is stored behind an `Arc` so that
38/// [`ReservationToken`](crate::reservation_token::ReservationToken) can hold a
39/// reference to it without requiring a back-reference to the governor.
40#[derive(Debug)]
41pub struct Budget {
42    /// Hard limit in bytes. Allocations beyond this are rejected.
43    limit: AtomicUsize,
44
45    /// Current allocated bytes. Arc-wrapped so tokens can release on drop.
46    allocated: Arc<AtomicUsize>,
47
48    /// Peak allocated bytes (high-water mark).
49    peak: AtomicUsize,
50
51    /// Number of times an allocation was rejected due to budget exhaustion.
52    rejection_count: AtomicUsize,
53
54    /// Number of `release(size)` calls where `size > current` — the
55    /// accounting-drift symptom. Saturating the per-engine counter
56    /// to zero hides the drift from `allocated()`, so this counter
57    /// is the only observable signal that the call-site is
58    /// over-releasing.
59    over_release_count: AtomicUsize,
60}
61
62impl Budget {
63    /// Create a new budget with the given limit.
64    pub fn new(limit: usize) -> Self {
65        Self {
66            limit: AtomicUsize::new(limit),
67            allocated: Arc::new(AtomicUsize::new(0)),
68            peak: AtomicUsize::new(0),
69            rejection_count: AtomicUsize::new(0),
70            over_release_count: AtomicUsize::new(0),
71        }
72    }
73
74    /// Try to reserve `size` bytes from this budget.
75    ///
76    /// Returns `true` if the reservation succeeded, `false` if it would
77    /// exceed the limit.
78    pub fn try_reserve(&self, size: usize) -> bool {
79        let limit = self.limit.load(Ordering::Relaxed);
80
81        // CAS loop to atomically check and increment.
82        loop {
83            let current = self.allocated.load(Ordering::Relaxed);
84            if current + size > limit {
85                self.rejection_count.fetch_add(1, Ordering::Relaxed);
86                return false;
87            }
88
89            match self.allocated.compare_exchange_weak(
90                current,
91                current + size,
92                Ordering::AcqRel,
93                Ordering::Relaxed,
94            ) {
95                Ok(_) => {
96                    // Update peak if necessary.
97                    let new_allocated = current + size;
98                    let mut peak = self.peak.load(Ordering::Relaxed);
99                    while new_allocated > peak {
100                        match self.peak.compare_exchange_weak(
101                            peak,
102                            new_allocated,
103                            Ordering::Relaxed,
104                            Ordering::Relaxed,
105                        ) {
106                            Ok(_) => break,
107                            Err(actual) => peak = actual,
108                        }
109                    }
110                    return true;
111                }
112                Err(_) => continue, // Retry CAS.
113            }
114        }
115    }
116
117    /// Try to reserve `size` bytes and return a shared `Arc` to the allocated
118    /// counter so the caller can decrement it on drop.
119    ///
120    /// Returns `Some(arc)` on success, `None` on budget exhaustion.
121    pub fn try_reserve_arc(&self, size: usize) -> Option<Arc<AtomicUsize>> {
122        if self.try_reserve(size) {
123            Some(Arc::clone(&self.allocated))
124        } else {
125            None
126        }
127    }
128
129    /// Release `size` bytes back to the budget.
130    ///
131    /// Saturates to zero if `size` exceeds the current allocation (which can
132    /// happen when data is replayed from WAL without a matching reservation).
133    pub fn release(&self, size: usize) {
134        loop {
135            let current = self.allocated.load(Ordering::Acquire);
136            let new_val = current.saturating_sub(size);
137            match self.allocated.compare_exchange_weak(
138                current,
139                new_val,
140                Ordering::Release,
141                Ordering::Relaxed,
142            ) {
143                Ok(_) => {
144                    if size > current {
145                        self.over_release_count.fetch_add(1, Ordering::Relaxed);
146                        tracing::warn!(
147                            released = size,
148                            allocated = current,
149                            "memory release exceeds allocation (WAL replay or accounting drift)"
150                        );
151                    }
152                    return;
153                }
154                Err(_) => continue,
155            }
156        }
157    }
158
159    /// Current allocated bytes.
160    pub fn allocated(&self) -> usize {
161        self.allocated.load(Ordering::Relaxed)
162    }
163
164    /// Hard limit in bytes.
165    pub fn limit(&self) -> usize {
166        self.limit.load(Ordering::Relaxed)
167    }
168
169    /// Number of over-release events observed on this budget. A
170    /// non-zero value is the smoking-gun signal that some call-site
171    /// is releasing more bytes than it reserved. Per-engine
172    /// `allocated()` saturates to zero on over-release, so this
173    /// counter is the only post-hoc observable.
174    pub fn over_release_count(&self) -> usize {
175        self.over_release_count.load(Ordering::Relaxed)
176    }
177
178    /// Remaining bytes available.
179    pub fn available(&self) -> usize {
180        let limit = self.limit();
181        let allocated = self.allocated();
182        limit.saturating_sub(allocated)
183    }
184
185    /// Utilization as a percentage (0-100).
186    ///
187    /// Computed in `u128` so a corrupted (e.g. underflow-wrapped near
188    /// `usize::MAX`) `allocated` clamps to 100 % rather than panicking on
189    /// `allocated * 100` overflow — a panic here is taken inside the Data
190    /// Plane core loop (`apply_spsc_pressure` → `snapshot`) and escalates
191    /// to a DEGRADED core.
192    pub fn utilization_percent(&self) -> u8 {
193        let limit = self.limit();
194        if limit == 0 {
195            return 100;
196        }
197        let allocated = self.allocated() as u128;
198        ((allocated * 100) / limit as u128).min(100) as u8
199    }
200
201    /// Peak allocation (high-water mark).
202    pub fn peak(&self) -> usize {
203        self.peak.load(Ordering::Relaxed)
204    }
205
206    /// Number of rejected allocation attempts.
207    pub fn rejections(&self) -> usize {
208        self.rejection_count.load(Ordering::Relaxed)
209    }
210
211    /// Update the limit dynamically (for rebalancing).
212    ///
213    /// The new limit must be >= current allocation. If it's less, the limit
214    /// is set to the current allocation (no immediate eviction).
215    pub fn set_limit(&self, new_limit: usize) {
216        let allocated = self.allocated();
217        let effective = new_limit.max(allocated);
218        self.limit.store(effective, Ordering::Release);
219    }
220
221    /// Reset all counters (for testing).
222    #[cfg(test)]
223    pub fn reset(&self) {
224        self.allocated.store(0, Ordering::Relaxed);
225        self.peak.store(0, Ordering::Relaxed);
226        self.rejection_count.store(0, Ordering::Relaxed);
227    }
228}
229
230#[cfg(test)]
231mod tests {
232    use super::*;
233
234    #[test]
235    fn utilization_and_available_never_panic_on_corrupted_allocated() {
236        // `allocated` is the sum of `try_reserve`/`release` deltas. An
237        // unbalanced call site — or a `ReservationToken` drop that
238        // `fetch_sub`s past zero — leaves it wrapped near `usize::MAX`.
239        // `utilization_percent` must not panic computing `allocated * 100`
240        // (it does today in debug builds: `attempt to multiply with
241        // overflow`, taken inside the Data Plane core loop via
242        // `apply_spsc_pressure → snapshot`, which the panic watchdog then
243        // escalates to a DEGRADED core). In release the same multiply
244        // wraps and reports a garbage level. A budget reader must be
245        // robust to a corrupted counter: report a clamped 100 % (so the
246        // pressure detector at least errs toward Emergency, not toward
247        // "idle") and never crash.
248        let budget = Budget::new(1024);
249        budget.allocated.store(usize::MAX, Ordering::Relaxed);
250
251        assert_eq!(
252            budget.utilization_percent(),
253            100,
254            "a wrapped/over-large allocated counter must clamp to 100%, not \
255             panic on `allocated * 100` and not wrap to a small percentage"
256        );
257        assert_eq!(
258            budget.available(),
259            0,
260            "no capacity is available when allocated has run past the limit"
261        );
262    }
263
264    #[test]
265    fn reserve_within_limit() {
266        let budget = Budget::new(1024);
267        assert!(budget.try_reserve(512));
268        assert_eq!(budget.allocated(), 512);
269        assert_eq!(budget.available(), 512);
270        assert_eq!(budget.utilization_percent(), 50);
271    }
272
273    #[test]
274    fn reserve_at_limit() {
275        let budget = Budget::new(1024);
276        assert!(budget.try_reserve(1024));
277        assert!(!budget.try_reserve(1));
278        assert_eq!(budget.rejections(), 1);
279    }
280
281    #[test]
282    fn reserve_exceeds_limit() {
283        let budget = Budget::new(100);
284        assert!(!budget.try_reserve(101));
285        assert_eq!(budget.allocated(), 0);
286        assert_eq!(budget.rejections(), 1);
287    }
288
289    #[test]
290    fn release_frees_capacity() {
291        let budget = Budget::new(1024);
292        assert!(budget.try_reserve(512));
293        assert!(budget.try_reserve(512));
294        assert!(!budget.try_reserve(1));
295
296        budget.release(256);
297        assert!(budget.try_reserve(256));
298    }
299
300    #[test]
301    fn peak_tracks_high_water_mark() {
302        let budget = Budget::new(1024);
303        budget.try_reserve(800);
304        budget.release(500);
305        budget.try_reserve(100);
306
307        assert_eq!(budget.peak(), 800);
308        assert_eq!(budget.allocated(), 400);
309    }
310
311    #[test]
312    fn dynamic_limit_adjustment() {
313        let budget = Budget::new(1024);
314        budget.try_reserve(600);
315
316        // Increase limit.
317        budget.set_limit(2048);
318        assert_eq!(budget.limit(), 2048);
319        assert!(budget.try_reserve(1000));
320
321        // Decrease limit — but not below current allocation.
322        budget.set_limit(100);
323        assert_eq!(budget.limit(), 1600); // max(100, 1600 allocated)
324    }
325
326    #[test]
327    fn try_reserve_arc_returns_shared_counter() {
328        let budget = Budget::new(1024);
329        let arc = budget.try_reserve_arc(512).expect("within budget");
330        assert_eq!(arc.load(Ordering::Relaxed), 512);
331        // Release via the arc.
332        arc.fetch_sub(512, Ordering::Relaxed);
333        assert_eq!(budget.allocated(), 0);
334    }
335
336    #[test]
337    fn concurrent_reserves() {
338        use std::sync::Arc;
339        use std::thread;
340
341        let budget = Arc::new(Budget::new(10_000));
342        let mut handles = Vec::new();
343
344        for _ in 0..10 {
345            let b = Arc::clone(&budget);
346            handles.push(thread::spawn(move || {
347                let mut reserved = 0;
348                for _ in 0..100 {
349                    if b.try_reserve(10) {
350                        reserved += 10;
351                    }
352                }
353                reserved
354            }));
355        }
356
357        let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
358
359        // All 1000 reservations of 10 bytes should succeed (10 * 100 * 10 = 10000).
360        assert_eq!(total_reserved, 10_000);
361        assert_eq!(budget.allocated(), 10_000);
362    }
363}