1use std::sync::Arc;
6use std::sync::atomic::{AtomicUsize, Ordering};
7
8#[derive(Debug)]
17pub struct Budget {
18 limit: AtomicUsize,
20
21 allocated: Arc<AtomicUsize>,
23
24 peak: AtomicUsize,
26
27 rejection_count: AtomicUsize,
29
30 over_release_count: AtomicUsize,
36}
37
38impl Budget {
39 pub fn new(limit: usize) -> Self {
41 Self {
42 limit: AtomicUsize::new(limit),
43 allocated: Arc::new(AtomicUsize::new(0)),
44 peak: AtomicUsize::new(0),
45 rejection_count: AtomicUsize::new(0),
46 over_release_count: AtomicUsize::new(0),
47 }
48 }
49
50 pub fn try_reserve(&self, size: usize) -> bool {
55 let limit = self.limit.load(Ordering::Relaxed);
56
57 loop {
59 let current = self.allocated.load(Ordering::Relaxed);
60 if current + size > limit {
61 self.rejection_count.fetch_add(1, Ordering::Relaxed);
62 return false;
63 }
64
65 match self.allocated.compare_exchange_weak(
66 current,
67 current + size,
68 Ordering::AcqRel,
69 Ordering::Relaxed,
70 ) {
71 Ok(_) => {
72 let new_allocated = current + size;
74 let mut peak = self.peak.load(Ordering::Relaxed);
75 while new_allocated > peak {
76 match self.peak.compare_exchange_weak(
77 peak,
78 new_allocated,
79 Ordering::Relaxed,
80 Ordering::Relaxed,
81 ) {
82 Ok(_) => break,
83 Err(actual) => peak = actual,
84 }
85 }
86 return true;
87 }
88 Err(_) => continue, }
90 }
91 }
92
93 pub fn try_reserve_arc(&self, size: usize) -> Option<Arc<AtomicUsize>> {
98 if self.try_reserve(size) {
99 Some(Arc::clone(&self.allocated))
100 } else {
101 None
102 }
103 }
104
105 pub fn release(&self, size: usize) {
110 loop {
111 let current = self.allocated.load(Ordering::Acquire);
112 let new_val = current.saturating_sub(size);
113 match self.allocated.compare_exchange_weak(
114 current,
115 new_val,
116 Ordering::Release,
117 Ordering::Relaxed,
118 ) {
119 Ok(_) => {
120 if size > current {
121 self.over_release_count.fetch_add(1, Ordering::Relaxed);
122 tracing::warn!(
123 released = size,
124 allocated = current,
125 "memory release exceeds allocation (WAL replay or accounting drift)"
126 );
127 }
128 return;
129 }
130 Err(_) => continue,
131 }
132 }
133 }
134
135 pub fn allocated(&self) -> usize {
137 self.allocated.load(Ordering::Relaxed)
138 }
139
140 pub fn limit(&self) -> usize {
142 self.limit.load(Ordering::Relaxed)
143 }
144
145 pub fn over_release_count(&self) -> usize {
151 self.over_release_count.load(Ordering::Relaxed)
152 }
153
154 pub fn available(&self) -> usize {
156 let limit = self.limit();
157 let allocated = self.allocated();
158 limit.saturating_sub(allocated)
159 }
160
161 pub fn utilization_percent(&self) -> u8 {
163 let limit = self.limit();
164 if limit == 0 {
165 return 100;
166 }
167 let allocated = self.allocated();
168 ((allocated * 100) / limit).min(100) as u8
169 }
170
171 pub fn peak(&self) -> usize {
173 self.peak.load(Ordering::Relaxed)
174 }
175
176 pub fn rejections(&self) -> usize {
178 self.rejection_count.load(Ordering::Relaxed)
179 }
180
181 pub fn set_limit(&self, new_limit: usize) {
186 let allocated = self.allocated();
187 let effective = new_limit.max(allocated);
188 self.limit.store(effective, Ordering::Release);
189 }
190
191 #[cfg(test)]
193 pub fn reset(&self) {
194 self.allocated.store(0, Ordering::Relaxed);
195 self.peak.store(0, Ordering::Relaxed);
196 self.rejection_count.store(0, Ordering::Relaxed);
197 }
198}
199
200#[cfg(test)]
201mod tests {
202 use super::*;
203
204 #[test]
205 fn reserve_within_limit() {
206 let budget = Budget::new(1024);
207 assert!(budget.try_reserve(512));
208 assert_eq!(budget.allocated(), 512);
209 assert_eq!(budget.available(), 512);
210 assert_eq!(budget.utilization_percent(), 50);
211 }
212
213 #[test]
214 fn reserve_at_limit() {
215 let budget = Budget::new(1024);
216 assert!(budget.try_reserve(1024));
217 assert!(!budget.try_reserve(1));
218 assert_eq!(budget.rejections(), 1);
219 }
220
221 #[test]
222 fn reserve_exceeds_limit() {
223 let budget = Budget::new(100);
224 assert!(!budget.try_reserve(101));
225 assert_eq!(budget.allocated(), 0);
226 assert_eq!(budget.rejections(), 1);
227 }
228
229 #[test]
230 fn release_frees_capacity() {
231 let budget = Budget::new(1024);
232 assert!(budget.try_reserve(512));
233 assert!(budget.try_reserve(512));
234 assert!(!budget.try_reserve(1));
235
236 budget.release(256);
237 assert!(budget.try_reserve(256));
238 }
239
240 #[test]
241 fn peak_tracks_high_water_mark() {
242 let budget = Budget::new(1024);
243 budget.try_reserve(800);
244 budget.release(500);
245 budget.try_reserve(100);
246
247 assert_eq!(budget.peak(), 800);
248 assert_eq!(budget.allocated(), 400);
249 }
250
251 #[test]
252 fn dynamic_limit_adjustment() {
253 let budget = Budget::new(1024);
254 budget.try_reserve(600);
255
256 budget.set_limit(2048);
258 assert_eq!(budget.limit(), 2048);
259 assert!(budget.try_reserve(1000));
260
261 budget.set_limit(100);
263 assert_eq!(budget.limit(), 1600); }
265
266 #[test]
267 fn try_reserve_arc_returns_shared_counter() {
268 let budget = Budget::new(1024);
269 let arc = budget.try_reserve_arc(512).expect("within budget");
270 assert_eq!(arc.load(Ordering::Relaxed), 512);
271 arc.fetch_sub(512, Ordering::Relaxed);
273 assert_eq!(budget.allocated(), 0);
274 }
275
276 #[test]
277 fn concurrent_reserves() {
278 use std::sync::Arc;
279 use std::thread;
280
281 let budget = Arc::new(Budget::new(10_000));
282 let mut handles = Vec::new();
283
284 for _ in 0..10 {
285 let b = Arc::clone(&budget);
286 handles.push(thread::spawn(move || {
287 let mut reserved = 0;
288 for _ in 0..100 {
289 if b.try_reserve(10) {
290 reserved += 10;
291 }
292 }
293 reserved
294 }));
295 }
296
297 let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
298
299 assert_eq!(total_reserved, 10_000);
301 assert_eq!(budget.allocated(), 10_000);
302 }
303}