Skip to main content

nodedb_mem/
budget.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Per-engine memory budget tracking.
4
5use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8/// A memory budget for a single engine.
9///
10/// Tracks current allocation against a configurable limit using atomic
11/// counters (safe to read from any thread — metrics exporter, governor, etc.).
12///
13/// The `allocated` counter is stored behind an `Arc` so that
14/// [`ReservationToken`](crate::reservation_token::ReservationToken) can hold a
15/// reference to it without requiring a back-reference to the governor.
16#[derive(Debug)]
17pub struct Budget {
18    /// Hard limit in bytes. Allocations beyond this are rejected.
19    limit: AtomicUsize,
20
21    /// Current allocated bytes. Arc-wrapped so tokens can release on drop.
22    allocated: Arc<AtomicUsize>,
23
24    /// Peak allocated bytes (high-water mark).
25    peak: AtomicUsize,
26
27    /// Number of times an allocation was rejected due to budget exhaustion.
28    rejection_count: AtomicUsize,
29
30    /// Number of `release(size)` calls where `size > current` — the
31    /// accounting-drift symptom. Saturating the per-engine counter
32    /// to zero hides the drift from `allocated()`, so this counter
33    /// is the only observable signal that the call-site is
34    /// over-releasing.
35    over_release_count: AtomicUsize,
36}
37
38impl Budget {
39    /// Create a new budget with the given limit.
40    pub fn new(limit: usize) -> Self {
41        Self {
42            limit: AtomicUsize::new(limit),
43            allocated: Arc::new(AtomicUsize::new(0)),
44            peak: AtomicUsize::new(0),
45            rejection_count: AtomicUsize::new(0),
46            over_release_count: AtomicUsize::new(0),
47        }
48    }
49
50    /// Try to reserve `size` bytes from this budget.
51    ///
52    /// Returns `true` if the reservation succeeded, `false` if it would
53    /// exceed the limit.
54    pub fn try_reserve(&self, size: usize) -> bool {
55        let limit = self.limit.load(Ordering::Relaxed);
56
57        // CAS loop to atomically check and increment.
58        loop {
59            let current = self.allocated.load(Ordering::Relaxed);
60            if current + size > limit {
61                self.rejection_count.fetch_add(1, Ordering::Relaxed);
62                return false;
63            }
64
65            match self.allocated.compare_exchange_weak(
66                current,
67                current + size,
68                Ordering::AcqRel,
69                Ordering::Relaxed,
70            ) {
71                Ok(_) => {
72                    // Update peak if necessary.
73                    let new_allocated = current + size;
74                    let mut peak = self.peak.load(Ordering::Relaxed);
75                    while new_allocated > peak {
76                        match self.peak.compare_exchange_weak(
77                            peak,
78                            new_allocated,
79                            Ordering::Relaxed,
80                            Ordering::Relaxed,
81                        ) {
82                            Ok(_) => break,
83                            Err(actual) => peak = actual,
84                        }
85                    }
86                    return true;
87                }
88                Err(_) => continue, // Retry CAS.
89            }
90        }
91    }
92
93    /// Try to reserve `size` bytes and return a shared `Arc` to the allocated
94    /// counter so the caller can decrement it on drop.
95    ///
96    /// Returns `Some(arc)` on success, `None` on budget exhaustion.
97    pub fn try_reserve_arc(&self, size: usize) -> Option<Arc<AtomicUsize>> {
98        if self.try_reserve(size) {
99            Some(Arc::clone(&self.allocated))
100        } else {
101            None
102        }
103    }
104
105    /// Release `size` bytes back to the budget.
106    ///
107    /// Saturates to zero if `size` exceeds the current allocation (which can
108    /// happen when data is replayed from WAL without a matching reservation).
109    pub fn release(&self, size: usize) {
110        loop {
111            let current = self.allocated.load(Ordering::Acquire);
112            let new_val = current.saturating_sub(size);
113            match self.allocated.compare_exchange_weak(
114                current,
115                new_val,
116                Ordering::Release,
117                Ordering::Relaxed,
118            ) {
119                Ok(_) => {
120                    if size > current {
121                        self.over_release_count.fetch_add(1, Ordering::Relaxed);
122                        tracing::warn!(
123                            released = size,
124                            allocated = current,
125                            "memory release exceeds allocation (WAL replay or accounting drift)"
126                        );
127                    }
128                    return;
129                }
130                Err(_) => continue,
131            }
132        }
133    }
134
135    /// Current allocated bytes.
136    pub fn allocated(&self) -> usize {
137        self.allocated.load(Ordering::Relaxed)
138    }
139
140    /// Hard limit in bytes.
141    pub fn limit(&self) -> usize {
142        self.limit.load(Ordering::Relaxed)
143    }
144
145    /// Number of over-release events observed on this budget. A
146    /// non-zero value is the smoking-gun signal that some call-site
147    /// is releasing more bytes than it reserved. Per-engine
148    /// `allocated()` saturates to zero on over-release, so this
149    /// counter is the only post-hoc observable.
150    pub fn over_release_count(&self) -> usize {
151        self.over_release_count.load(Ordering::Relaxed)
152    }
153
154    /// Remaining bytes available.
155    pub fn available(&self) -> usize {
156        let limit = self.limit();
157        let allocated = self.allocated();
158        limit.saturating_sub(allocated)
159    }
160
161    /// Utilization as a percentage (0-100).
162    pub fn utilization_percent(&self) -> u8 {
163        let limit = self.limit();
164        if limit == 0 {
165            return 100;
166        }
167        let allocated = self.allocated();
168        ((allocated * 100) / limit).min(100) as u8
169    }
170
171    /// Peak allocation (high-water mark).
172    pub fn peak(&self) -> usize {
173        self.peak.load(Ordering::Relaxed)
174    }
175
176    /// Number of rejected allocation attempts.
177    pub fn rejections(&self) -> usize {
178        self.rejection_count.load(Ordering::Relaxed)
179    }
180
181    /// Update the limit dynamically (for rebalancing).
182    ///
183    /// The new limit must be >= current allocation. If it's less, the limit
184    /// is set to the current allocation (no immediate eviction).
185    pub fn set_limit(&self, new_limit: usize) {
186        let allocated = self.allocated();
187        let effective = new_limit.max(allocated);
188        self.limit.store(effective, Ordering::Release);
189    }
190
191    /// Reset all counters (for testing).
192    #[cfg(test)]
193    pub fn reset(&self) {
194        self.allocated.store(0, Ordering::Relaxed);
195        self.peak.store(0, Ordering::Relaxed);
196        self.rejection_count.store(0, Ordering::Relaxed);
197    }
198}
199
200#[cfg(test)]
201mod tests {
202    use super::*;
203
204    #[test]
205    fn reserve_within_limit() {
206        let budget = Budget::new(1024);
207        assert!(budget.try_reserve(512));
208        assert_eq!(budget.allocated(), 512);
209        assert_eq!(budget.available(), 512);
210        assert_eq!(budget.utilization_percent(), 50);
211    }
212
213    #[test]
214    fn reserve_at_limit() {
215        let budget = Budget::new(1024);
216        assert!(budget.try_reserve(1024));
217        assert!(!budget.try_reserve(1));
218        assert_eq!(budget.rejections(), 1);
219    }
220
221    #[test]
222    fn reserve_exceeds_limit() {
223        let budget = Budget::new(100);
224        assert!(!budget.try_reserve(101));
225        assert_eq!(budget.allocated(), 0);
226        assert_eq!(budget.rejections(), 1);
227    }
228
229    #[test]
230    fn release_frees_capacity() {
231        let budget = Budget::new(1024);
232        assert!(budget.try_reserve(512));
233        assert!(budget.try_reserve(512));
234        assert!(!budget.try_reserve(1));
235
236        budget.release(256);
237        assert!(budget.try_reserve(256));
238    }
239
240    #[test]
241    fn peak_tracks_high_water_mark() {
242        let budget = Budget::new(1024);
243        budget.try_reserve(800);
244        budget.release(500);
245        budget.try_reserve(100);
246
247        assert_eq!(budget.peak(), 800);
248        assert_eq!(budget.allocated(), 400);
249    }
250
251    #[test]
252    fn dynamic_limit_adjustment() {
253        let budget = Budget::new(1024);
254        budget.try_reserve(600);
255
256        // Increase limit.
257        budget.set_limit(2048);
258        assert_eq!(budget.limit(), 2048);
259        assert!(budget.try_reserve(1000));
260
261        // Decrease limit — but not below current allocation.
262        budget.set_limit(100);
263        assert_eq!(budget.limit(), 1600); // max(100, 1600 allocated)
264    }
265
266    #[test]
267    fn try_reserve_arc_returns_shared_counter() {
268        let budget = Budget::new(1024);
269        let arc = budget.try_reserve_arc(512).expect("within budget");
270        assert_eq!(arc.load(Ordering::Relaxed), 512);
271        // Release via the arc.
272        arc.fetch_sub(512, Ordering::Relaxed);
273        assert_eq!(budget.allocated(), 0);
274    }
275
276    #[test]
277    fn concurrent_reserves() {
278        use std::sync::Arc;
279        use std::thread;
280
281        let budget = Arc::new(Budget::new(10_000));
282        let mut handles = Vec::new();
283
284        for _ in 0..10 {
285            let b = Arc::clone(&budget);
286            handles.push(thread::spawn(move || {
287                let mut reserved = 0;
288                for _ in 0..100 {
289                    if b.try_reserve(10) {
290                        reserved += 10;
291                    }
292                }
293                reserved
294            }));
295        }
296
297        let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
298
299        // All 1000 reservations of 10 bytes should succeed (10 * 100 * 10 = 10000).
300        assert_eq!(total_reserved, 10_000);
301        assert_eq!(budget.allocated(), 10_000);
302    }
303}