Skip to main content

nodedb_mem/
reservation_token.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! RAII reservation token for the four-level memory hierarchy.
4//!
5//! A [`ReservationToken`] is produced by
6//! [`MemoryGovernor::try_reserve`](crate::governor::MemoryGovernor::try_reserve)
7//! and holds references to all four budget layers:
8//! global counter, optional per-database counter, optional per-tenant counter,
9//! and the engine identifier for engine-budget release.
10//!
11//! Dropping the token releases all four layers atomically.
12//!
13//! # Panic safety
14//!
15//! `Drop` uses atomic operations only and never panics.
16//!
17//! # `mem::forget`
18//!
19//! Calling `mem::forget` on a token prevents release. This is intentional:
20//! the token represents live allocations that must not be double-freed.
21
22use std::sync::Arc;
23use std::sync::atomic::AtomicUsize;
24
25use nodedb_types::{DatabaseId, TenantId};
26
27use crate::engine::EngineId;
28use crate::governor::GlobalCounter;
29
30/// Holds a memory reservation across the four budget layers.
31///
32/// Releasing happens in reverse order (engine → tenant → database → global)
33/// on drop.
34#[must_use = "dropping a ReservationToken immediately releases the reservation; bind it to a variable"]
35pub struct ReservationToken {
36    /// Shared global-ceiling atomic. Drop decrements this.
37    pub(crate) global_counter: Arc<GlobalCounter>,
38    /// Per-database allocated counter. `None` if no database budget.
39    pub(crate) database_counter: Option<Arc<AtomicUsize>>,
40    /// Per-tenant allocated counter. `None` if no tenant budget.
41    pub(crate) tenant_counter: Option<Arc<AtomicUsize>>,
42    /// Per-engine allocated counter. `None` if no engine budget (unusual —
43    /// `try_reserve` always requires a registered engine).
44    pub(crate) engine_counter: Option<Arc<AtomicUsize>>,
45    /// Bytes reserved at every layer.
46    pub(crate) size: usize,
47    /// Identity carried for `Debug` and metrics.
48    db: DatabaseId,
49    tenant: TenantId,
50    engine: EngineId,
51}
52
53/// Parameters for constructing a [`ReservationToken`].
54///
55/// Used by [`MemoryGovernor::try_reserve`] to avoid a too-many-arguments
56/// constructor.
57pub(crate) struct ReservationParams {
58    pub global_counter: Arc<GlobalCounter>,
59    pub database_counter: Option<Arc<AtomicUsize>>,
60    pub tenant_counter: Option<Arc<AtomicUsize>>,
61    pub engine_counter: Option<Arc<AtomicUsize>>,
62    pub size: usize,
63    pub db: DatabaseId,
64    pub tenant: TenantId,
65    pub engine: EngineId,
66}
67
68impl ReservationToken {
69    /// Construct a new token. Called only by [`MemoryGovernor::try_reserve`].
70    pub(crate) fn new(params: ReservationParams) -> Self {
71        Self {
72            global_counter: params.global_counter,
73            database_counter: params.database_counter,
74            tenant_counter: params.tenant_counter,
75            engine_counter: params.engine_counter,
76            size: params.size,
77            db: params.db,
78            tenant: params.tenant,
79            engine: params.engine,
80        }
81    }
82
83    /// Number of bytes reserved by this token.
84    pub fn size(&self) -> usize {
85        self.size
86    }
87
88    /// The database this reservation is scoped to.
89    pub fn database_id(&self) -> DatabaseId {
90        self.db
91    }
92
93    /// The tenant this reservation is scoped to.
94    pub fn tenant_id(&self) -> TenantId {
95        self.tenant
96    }
97
98    /// The engine this reservation is scoped to.
99    pub fn engine(&self) -> EngineId {
100        self.engine
101    }
102}
103
104impl Drop for ReservationToken {
105    fn drop(&mut self) {
106        let size = self.size;
107        if size == 0 {
108            return;
109        }
110
111        // Release in reverse order: engine → tenant → database → global.
112        //
113        // Each decrement saturates at zero. The legacy `MemoryGovernor::release`
114        // path touches the engine + global counters directly, so a counter can
115        // legitimately be below this token's `size` by the time the token drops
116        // (e.g. a timeseries flush released the memtable footprint while a
117        // per-batch token was still in scope). A plain `fetch_sub` would wrap
118        // such a counter to ~usize::MAX, which every utilization reader treats
119        // as 100 % → permanent Emergency pressure → suspended SPSC reads →
120        // schema-register barrier deadlock. Clamping keeps an over-release a
121        // harmless zero instead.
122        if let Some(ref counter) = self.engine_counter {
123            crate::budget::atomic_saturating_sub(counter, size);
124        }
125        if let Some(ref counter) = self.tenant_counter {
126            crate::budget::atomic_saturating_sub(counter, size);
127        }
128        if let Some(ref counter) = self.database_counter {
129            crate::budget::atomic_saturating_sub(counter, size);
130        }
131        crate::budget::atomic_saturating_sub(&self.global_counter.allocated, size);
132    }
133}
134
135impl std::fmt::Debug for ReservationToken {
136    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
137        f.debug_struct("ReservationToken")
138            .field("size", &self.size)
139            .field("db", &self.db)
140            .field("tenant", &self.tenant)
141            .field("engine", &self.engine)
142            .finish()
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use std::sync::Arc;
149    use std::sync::atomic::AtomicUsize;
150
151    use nodedb_types::{DatabaseId, TenantId};
152
153    use super::{ReservationParams, ReservationToken};
154    use crate::engine::EngineId;
155    use crate::governor::GlobalCounter;
156
157    fn make_counter(val: usize) -> Arc<AtomicUsize> {
158        Arc::new(AtomicUsize::new(val))
159    }
160
161    fn make_global(val: usize) -> Arc<GlobalCounter> {
162        Arc::new(GlobalCounter {
163            allocated: AtomicUsize::new(val),
164            ceiling: 1024 * 1024,
165        })
166    }
167
168    #[test]
169    fn drop_releases_all_four_levels() {
170        let global = make_global(100);
171        let db_ctr = make_counter(100);
172        let tenant_ctr = make_counter(100);
173        let engine_ctr = make_counter(100);
174
175        let token = ReservationToken::new(ReservationParams {
176            global_counter: Arc::clone(&global),
177            database_counter: Some(Arc::clone(&db_ctr)),
178            tenant_counter: Some(Arc::clone(&tenant_ctr)),
179            engine_counter: Some(Arc::clone(&engine_ctr)),
180            size: 100,
181            db: DatabaseId::DEFAULT,
182            tenant: TenantId::new(1),
183            engine: EngineId::Vector,
184        });
185
186        assert_eq!(
187            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
188            100
189        );
190
191        drop(token);
192
193        assert_eq!(
194            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
195            0
196        );
197        assert_eq!(db_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
198        assert_eq!(tenant_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
199        assert_eq!(engine_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
200    }
201
202    #[test]
203    fn drop_with_no_scoped_counters_releases_global() {
204        let global = make_global(200);
205        let token = ReservationToken::new(ReservationParams {
206            global_counter: Arc::clone(&global),
207            database_counter: None,
208            tenant_counter: None,
209            engine_counter: None,
210            size: 200,
211            db: DatabaseId::DEFAULT,
212            tenant: TenantId::new(1),
213            engine: EngineId::Query,
214        });
215        drop(token);
216        assert_eq!(
217            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
218            0
219        );
220    }
221
222    #[test]
223    fn drop_does_not_underflow_a_counter_released_below_size() {
224        // The governor exposes two release paths: the RAII token (this
225        // type, four layers) and the legacy `MemoryGovernor::release`
226        // (engine + global only). When both touch the same engine budget
227        // — e.g. a timeseries flush calls `release(memtable_bytes)` while
228        // a live per-batch token still holds a small reservation — the
229        // budget can be driven to zero before the token drops. The
230        // token's `fetch_sub` on drop must NOT wrap that counter into the
231        // multi-exabyte range: a wrapped engine or tenant counter reads
232        // as 100% utilization (Emergency) forever, suspends the core's
233        // SPSC reads, and deadlocks every subsequent DDL on the
234        // schema-register barrier — the exact "healthy /healthz, every
235        // query fails" failure mode. Drop must saturate at zero.
236        let global = make_global(40);
237        let engine_ctr = make_counter(40);
238        let tenant_ctr = make_counter(40);
239
240        let token = ReservationToken::new(ReservationParams {
241            global_counter: Arc::clone(&global),
242            database_counter: None,
243            tenant_counter: Some(Arc::clone(&tenant_ctr)),
244            engine_counter: Some(Arc::clone(&engine_ctr)),
245            size: 40,
246            db: DatabaseId::DEFAULT,
247            tenant: TenantId::new(1),
248            engine: EngineId::Timeseries,
249        });
250
251        // A concurrent legacy release drains the engine + global counters
252        // past what this token reserved (a flush releasing the full
253        // memtable footprint while the small per-batch token is alive).
254        engine_ctr.store(0, std::sync::atomic::Ordering::Relaxed);
255        global
256            .allocated
257            .store(0, std::sync::atomic::Ordering::Relaxed);
258
259        drop(token);
260
261        let engine = engine_ctr.load(std::sync::atomic::Ordering::Relaxed);
262        let glob = global.allocated.load(std::sync::atomic::Ordering::Relaxed);
263        let tenant = tenant_ctr.load(std::sync::atomic::Ordering::Relaxed);
264        assert_eq!(
265            engine, 0,
266            "engine counter underflowed to {engine} on token drop — a wrapped \
267             counter reads as 100% utilization (Emergency) forever"
268        );
269        assert_eq!(
270            glob, 0,
271            "global counter underflowed to {glob} on token drop"
272        );
273        // The tenant layer was not touched by the legacy release, so it
274        // returns to zero normally — proving the drop still works where
275        // the counter is consistent.
276        assert_eq!(tenant, 0, "tenant counter should release normally to 0");
277    }
278
279    #[test]
280    fn zero_size_drop_is_noop() {
281        let global = make_global(0);
282        let token = ReservationToken::new(ReservationParams {
283            global_counter: Arc::clone(&global),
284            database_counter: None,
285            tenant_counter: None,
286            engine_counter: None,
287            size: 0,
288            db: DatabaseId::DEFAULT,
289            tenant: TenantId::new(1),
290            engine: EngineId::Query,
291        });
292        drop(token);
293        assert_eq!(
294            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
295            0
296        );
297    }
298}