nodedb_mem/
reservation_token.rs1use std::sync::Arc;
23use std::sync::atomic::AtomicUsize;
24
25use nodedb_types::{DatabaseId, TenantId};
26
27use crate::engine::EngineId;
28use crate::governor::GlobalCounter;
29
30#[must_use = "dropping a ReservationToken immediately releases the reservation; bind it to a variable"]
35pub struct ReservationToken {
36 pub(crate) global_counter: Arc<GlobalCounter>,
38 pub(crate) database_counter: Option<Arc<AtomicUsize>>,
40 pub(crate) tenant_counter: Option<Arc<AtomicUsize>>,
42 pub(crate) engine_counter: Option<Arc<AtomicUsize>>,
45 pub(crate) size: usize,
47 db: DatabaseId,
49 tenant: TenantId,
50 engine: EngineId,
51}
52
53pub(crate) struct ReservationParams {
58 pub global_counter: Arc<GlobalCounter>,
59 pub database_counter: Option<Arc<AtomicUsize>>,
60 pub tenant_counter: Option<Arc<AtomicUsize>>,
61 pub engine_counter: Option<Arc<AtomicUsize>>,
62 pub size: usize,
63 pub db: DatabaseId,
64 pub tenant: TenantId,
65 pub engine: EngineId,
66}
67
68impl ReservationToken {
69 pub(crate) fn new(params: ReservationParams) -> Self {
71 Self {
72 global_counter: params.global_counter,
73 database_counter: params.database_counter,
74 tenant_counter: params.tenant_counter,
75 engine_counter: params.engine_counter,
76 size: params.size,
77 db: params.db,
78 tenant: params.tenant,
79 engine: params.engine,
80 }
81 }
82
83 pub fn size(&self) -> usize {
85 self.size
86 }
87
88 pub fn database_id(&self) -> DatabaseId {
90 self.db
91 }
92
93 pub fn tenant_id(&self) -> TenantId {
95 self.tenant
96 }
97
98 pub fn engine(&self) -> EngineId {
100 self.engine
101 }
102}
103
104impl Drop for ReservationToken {
105 fn drop(&mut self) {
106 let size = self.size;
107 if size == 0 {
108 return;
109 }
110
111 if let Some(ref counter) = self.engine_counter {
123 crate::budget::atomic_saturating_sub(counter, size);
124 }
125 if let Some(ref counter) = self.tenant_counter {
126 crate::budget::atomic_saturating_sub(counter, size);
127 }
128 if let Some(ref counter) = self.database_counter {
129 crate::budget::atomic_saturating_sub(counter, size);
130 }
131 crate::budget::atomic_saturating_sub(&self.global_counter.allocated, size);
132 }
133}
134
135impl std::fmt::Debug for ReservationToken {
136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137 f.debug_struct("ReservationToken")
138 .field("size", &self.size)
139 .field("db", &self.db)
140 .field("tenant", &self.tenant)
141 .field("engine", &self.engine)
142 .finish()
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use std::sync::Arc;
149 use std::sync::atomic::AtomicUsize;
150
151 use nodedb_types::{DatabaseId, TenantId};
152
153 use super::{ReservationParams, ReservationToken};
154 use crate::engine::EngineId;
155 use crate::governor::GlobalCounter;
156
157 fn make_counter(val: usize) -> Arc<AtomicUsize> {
158 Arc::new(AtomicUsize::new(val))
159 }
160
161 fn make_global(val: usize) -> Arc<GlobalCounter> {
162 Arc::new(GlobalCounter {
163 allocated: AtomicUsize::new(val),
164 ceiling: 1024 * 1024,
165 })
166 }
167
168 #[test]
169 fn drop_releases_all_four_levels() {
170 let global = make_global(100);
171 let db_ctr = make_counter(100);
172 let tenant_ctr = make_counter(100);
173 let engine_ctr = make_counter(100);
174
175 let token = ReservationToken::new(ReservationParams {
176 global_counter: Arc::clone(&global),
177 database_counter: Some(Arc::clone(&db_ctr)),
178 tenant_counter: Some(Arc::clone(&tenant_ctr)),
179 engine_counter: Some(Arc::clone(&engine_ctr)),
180 size: 100,
181 db: DatabaseId::DEFAULT,
182 tenant: TenantId::new(1),
183 engine: EngineId::Vector,
184 });
185
186 assert_eq!(
187 global.allocated.load(std::sync::atomic::Ordering::Relaxed),
188 100
189 );
190
191 drop(token);
192
193 assert_eq!(
194 global.allocated.load(std::sync::atomic::Ordering::Relaxed),
195 0
196 );
197 assert_eq!(db_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
198 assert_eq!(tenant_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
199 assert_eq!(engine_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
200 }
201
202 #[test]
203 fn drop_with_no_scoped_counters_releases_global() {
204 let global = make_global(200);
205 let token = ReservationToken::new(ReservationParams {
206 global_counter: Arc::clone(&global),
207 database_counter: None,
208 tenant_counter: None,
209 engine_counter: None,
210 size: 200,
211 db: DatabaseId::DEFAULT,
212 tenant: TenantId::new(1),
213 engine: EngineId::Query,
214 });
215 drop(token);
216 assert_eq!(
217 global.allocated.load(std::sync::atomic::Ordering::Relaxed),
218 0
219 );
220 }
221
222 #[test]
223 fn drop_does_not_underflow_a_counter_released_below_size() {
224 let global = make_global(40);
237 let engine_ctr = make_counter(40);
238 let tenant_ctr = make_counter(40);
239
240 let token = ReservationToken::new(ReservationParams {
241 global_counter: Arc::clone(&global),
242 database_counter: None,
243 tenant_counter: Some(Arc::clone(&tenant_ctr)),
244 engine_counter: Some(Arc::clone(&engine_ctr)),
245 size: 40,
246 db: DatabaseId::DEFAULT,
247 tenant: TenantId::new(1),
248 engine: EngineId::Timeseries,
249 });
250
251 engine_ctr.store(0, std::sync::atomic::Ordering::Relaxed);
255 global
256 .allocated
257 .store(0, std::sync::atomic::Ordering::Relaxed);
258
259 drop(token);
260
261 let engine = engine_ctr.load(std::sync::atomic::Ordering::Relaxed);
262 let glob = global.allocated.load(std::sync::atomic::Ordering::Relaxed);
263 let tenant = tenant_ctr.load(std::sync::atomic::Ordering::Relaxed);
264 assert_eq!(
265 engine, 0,
266 "engine counter underflowed to {engine} on token drop — a wrapped \
267 counter reads as 100% utilization (Emergency) forever"
268 );
269 assert_eq!(
270 glob, 0,
271 "global counter underflowed to {glob} on token drop"
272 );
273 assert_eq!(tenant, 0, "tenant counter should release normally to 0");
277 }
278
279 #[test]
280 fn zero_size_drop_is_noop() {
281 let global = make_global(0);
282 let token = ReservationToken::new(ReservationParams {
283 global_counter: Arc::clone(&global),
284 database_counter: None,
285 tenant_counter: None,
286 engine_counter: None,
287 size: 0,
288 db: DatabaseId::DEFAULT,
289 tenant: TenantId::new(1),
290 engine: EngineId::Query,
291 });
292 drop(token);
293 assert_eq!(
294 global.allocated.load(std::sync::atomic::Ordering::Relaxed),
295 0
296 );
297 }
298}