1use std::sync::atomic::{AtomicUsize, Ordering};
4
5#[derive(Debug)]
10pub struct Budget {
11 limit: AtomicUsize,
13
14 allocated: AtomicUsize,
16
17 peak: AtomicUsize,
19
20 rejection_count: AtomicUsize,
22}
23
24impl Budget {
25 pub fn new(limit: usize) -> Self {
27 Self {
28 limit: AtomicUsize::new(limit),
29 allocated: AtomicUsize::new(0),
30 peak: AtomicUsize::new(0),
31 rejection_count: AtomicUsize::new(0),
32 }
33 }
34
35 pub fn try_reserve(&self, size: usize) -> bool {
40 let limit = self.limit.load(Ordering::Relaxed);
41
42 loop {
44 let current = self.allocated.load(Ordering::Relaxed);
45 if current + size > limit {
46 self.rejection_count.fetch_add(1, Ordering::Relaxed);
47 return false;
48 }
49
50 match self.allocated.compare_exchange_weak(
51 current,
52 current + size,
53 Ordering::AcqRel,
54 Ordering::Relaxed,
55 ) {
56 Ok(_) => {
57 let new_allocated = current + size;
59 let mut peak = self.peak.load(Ordering::Relaxed);
60 while new_allocated > peak {
61 match self.peak.compare_exchange_weak(
62 peak,
63 new_allocated,
64 Ordering::Relaxed,
65 Ordering::Relaxed,
66 ) {
67 Ok(_) => break,
68 Err(actual) => peak = actual,
69 }
70 }
71 return true;
72 }
73 Err(_) => continue, }
75 }
76 }
77
78 pub fn release(&self, size: usize) {
83 loop {
84 let current = self.allocated.load(Ordering::Acquire);
85 let new_val = current.saturating_sub(size);
86 match self.allocated.compare_exchange_weak(
87 current,
88 new_val,
89 Ordering::Release,
90 Ordering::Relaxed,
91 ) {
92 Ok(_) => {
93 if size > current {
94 tracing::warn!(
95 released = size,
96 allocated = current,
97 "memory release exceeds allocation (WAL replay or accounting drift)"
98 );
99 }
100 return;
101 }
102 Err(_) => continue,
103 }
104 }
105 }
106
107 pub fn allocated(&self) -> usize {
109 self.allocated.load(Ordering::Relaxed)
110 }
111
112 pub fn limit(&self) -> usize {
114 self.limit.load(Ordering::Relaxed)
115 }
116
117 pub fn available(&self) -> usize {
119 let limit = self.limit();
120 let allocated = self.allocated();
121 limit.saturating_sub(allocated)
122 }
123
124 pub fn utilization_percent(&self) -> u8 {
126 let limit = self.limit();
127 if limit == 0 {
128 return 100;
129 }
130 let allocated = self.allocated();
131 ((allocated * 100) / limit).min(100) as u8
132 }
133
134 pub fn peak(&self) -> usize {
136 self.peak.load(Ordering::Relaxed)
137 }
138
139 pub fn rejections(&self) -> usize {
141 self.rejection_count.load(Ordering::Relaxed)
142 }
143
144 pub fn set_limit(&self, new_limit: usize) {
149 let allocated = self.allocated();
150 let effective = new_limit.max(allocated);
151 self.limit.store(effective, Ordering::Release);
152 }
153
154 #[cfg(test)]
156 pub fn reset(&self) {
157 self.allocated.store(0, Ordering::Relaxed);
158 self.peak.store(0, Ordering::Relaxed);
159 self.rejection_count.store(0, Ordering::Relaxed);
160 }
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166
167 #[test]
168 fn reserve_within_limit() {
169 let budget = Budget::new(1024);
170 assert!(budget.try_reserve(512));
171 assert_eq!(budget.allocated(), 512);
172 assert_eq!(budget.available(), 512);
173 assert_eq!(budget.utilization_percent(), 50);
174 }
175
176 #[test]
177 fn reserve_at_limit() {
178 let budget = Budget::new(1024);
179 assert!(budget.try_reserve(1024));
180 assert!(!budget.try_reserve(1));
181 assert_eq!(budget.rejections(), 1);
182 }
183
184 #[test]
185 fn reserve_exceeds_limit() {
186 let budget = Budget::new(100);
187 assert!(!budget.try_reserve(101));
188 assert_eq!(budget.allocated(), 0);
189 assert_eq!(budget.rejections(), 1);
190 }
191
192 #[test]
193 fn release_frees_capacity() {
194 let budget = Budget::new(1024);
195 assert!(budget.try_reserve(512));
196 assert!(budget.try_reserve(512));
197 assert!(!budget.try_reserve(1));
198
199 budget.release(256);
200 assert!(budget.try_reserve(256));
201 }
202
203 #[test]
204 fn peak_tracks_high_water_mark() {
205 let budget = Budget::new(1024);
206 budget.try_reserve(800);
207 budget.release(500);
208 budget.try_reserve(100);
209
210 assert_eq!(budget.peak(), 800);
211 assert_eq!(budget.allocated(), 400);
212 }
213
214 #[test]
215 fn dynamic_limit_adjustment() {
216 let budget = Budget::new(1024);
217 budget.try_reserve(600);
218
219 budget.set_limit(2048);
221 assert_eq!(budget.limit(), 2048);
222 assert!(budget.try_reserve(1000));
223
224 budget.set_limit(100);
226 assert_eq!(budget.limit(), 1600); }
228
229 #[test]
230 fn concurrent_reserves() {
231 use std::sync::Arc;
232 use std::thread;
233
234 let budget = Arc::new(Budget::new(10_000));
235 let mut handles = Vec::new();
236
237 for _ in 0..10 {
238 let b = Arc::clone(&budget);
239 handles.push(thread::spawn(move || {
240 let mut reserved = 0;
241 for _ in 0..100 {
242 if b.try_reserve(10) {
243 reserved += 10;
244 }
245 }
246 reserved
247 }));
248 }
249
250 let total_reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
251
252 assert_eq!(total_reserved, 10_000);
254 assert_eq!(budget.allocated(), 10_000);
255 }
256}