Skip to main content

nodedb_mem/
reservation_token.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! RAII reservation token for the four-level memory hierarchy.
4//!
5//! A [`ReservationToken`] is produced by
6//! [`MemoryGovernor::try_reserve`](crate::governor::MemoryGovernor::try_reserve)
7//! and holds references to all four budget layers:
8//! global counter, optional per-database counter, optional per-tenant counter,
9//! and the engine identifier for engine-budget release.
10//!
11//! Dropping the token releases all four layers atomically.
12//!
13//! # Panic safety
14//!
15//! `Drop` uses atomic operations only and never panics.
16//!
17//! # `mem::forget`
18//!
19//! Calling `mem::forget` on a token prevents release. This is intentional:
20//! the token represents live allocations that must not be double-freed.
21
22use std::sync::Arc;
23use std::sync::atomic::{AtomicUsize, Ordering};
24
25use nodedb_types::{DatabaseId, TenantId};
26
27use crate::engine::EngineId;
28use crate::governor::GlobalCounter;
29
30/// Holds a memory reservation across the four budget layers.
31///
32/// Releasing happens in reverse order (engine → tenant → database → global)
33/// on drop.
34#[must_use = "dropping a ReservationToken immediately releases the reservation; bind it to a variable"]
35pub struct ReservationToken {
36    /// Shared global-ceiling atomic. Drop decrements this.
37    pub(crate) global_counter: Arc<GlobalCounter>,
38    /// Per-database allocated counter. `None` if no database budget.
39    pub(crate) database_counter: Option<Arc<AtomicUsize>>,
40    /// Per-tenant allocated counter. `None` if no tenant budget.
41    pub(crate) tenant_counter: Option<Arc<AtomicUsize>>,
42    /// Per-engine allocated counter. `None` if no engine budget (unusual —
43    /// `try_reserve` always requires a registered engine).
44    pub(crate) engine_counter: Option<Arc<AtomicUsize>>,
45    /// Bytes reserved at every layer.
46    pub(crate) size: usize,
47    /// Identity carried for `Debug` and metrics.
48    db: DatabaseId,
49    tenant: TenantId,
50    engine: EngineId,
51}
52
53/// Parameters for constructing a [`ReservationToken`].
54///
55/// Used by [`MemoryGovernor::try_reserve`] to avoid a too-many-arguments
56/// constructor.
57pub(crate) struct ReservationParams {
58    pub global_counter: Arc<GlobalCounter>,
59    pub database_counter: Option<Arc<AtomicUsize>>,
60    pub tenant_counter: Option<Arc<AtomicUsize>>,
61    pub engine_counter: Option<Arc<AtomicUsize>>,
62    pub size: usize,
63    pub db: DatabaseId,
64    pub tenant: TenantId,
65    pub engine: EngineId,
66}
67
68impl ReservationToken {
69    /// Construct a new token. Called only by [`MemoryGovernor::try_reserve`].
70    pub(crate) fn new(params: ReservationParams) -> Self {
71        Self {
72            global_counter: params.global_counter,
73            database_counter: params.database_counter,
74            tenant_counter: params.tenant_counter,
75            engine_counter: params.engine_counter,
76            size: params.size,
77            db: params.db,
78            tenant: params.tenant,
79            engine: params.engine,
80        }
81    }
82
83    /// Number of bytes reserved by this token.
84    pub fn size(&self) -> usize {
85        self.size
86    }
87
88    /// The database this reservation is scoped to.
89    pub fn database_id(&self) -> DatabaseId {
90        self.db
91    }
92
93    /// The tenant this reservation is scoped to.
94    pub fn tenant_id(&self) -> TenantId {
95        self.tenant
96    }
97
98    /// The engine this reservation is scoped to.
99    pub fn engine(&self) -> EngineId {
100        self.engine
101    }
102}
103
104impl Drop for ReservationToken {
105    fn drop(&mut self) {
106        let size = self.size;
107        if size == 0 {
108            return;
109        }
110
111        // Release in reverse order: engine → tenant → database → global.
112        if let Some(ref counter) = self.engine_counter {
113            counter.fetch_sub(size, Ordering::Relaxed);
114        }
115        if let Some(ref counter) = self.tenant_counter {
116            counter.fetch_sub(size, Ordering::Relaxed);
117        }
118        if let Some(ref counter) = self.database_counter {
119            counter.fetch_sub(size, Ordering::Relaxed);
120        }
121        self.global_counter
122            .allocated
123            .fetch_sub(size, Ordering::Relaxed);
124    }
125}
126
127impl std::fmt::Debug for ReservationToken {
128    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129        f.debug_struct("ReservationToken")
130            .field("size", &self.size)
131            .field("db", &self.db)
132            .field("tenant", &self.tenant)
133            .field("engine", &self.engine)
134            .finish()
135    }
136}
137
138#[cfg(test)]
139mod tests {
140    use std::sync::Arc;
141    use std::sync::atomic::AtomicUsize;
142
143    use nodedb_types::{DatabaseId, TenantId};
144
145    use super::{ReservationParams, ReservationToken};
146    use crate::engine::EngineId;
147    use crate::governor::GlobalCounter;
148
149    fn make_counter(val: usize) -> Arc<AtomicUsize> {
150        Arc::new(AtomicUsize::new(val))
151    }
152
153    fn make_global(val: usize) -> Arc<GlobalCounter> {
154        Arc::new(GlobalCounter {
155            allocated: AtomicUsize::new(val),
156            ceiling: 1024 * 1024,
157        })
158    }
159
160    #[test]
161    fn drop_releases_all_four_levels() {
162        let global = make_global(100);
163        let db_ctr = make_counter(100);
164        let tenant_ctr = make_counter(100);
165        let engine_ctr = make_counter(100);
166
167        let token = ReservationToken::new(ReservationParams {
168            global_counter: Arc::clone(&global),
169            database_counter: Some(Arc::clone(&db_ctr)),
170            tenant_counter: Some(Arc::clone(&tenant_ctr)),
171            engine_counter: Some(Arc::clone(&engine_ctr)),
172            size: 100,
173            db: DatabaseId::DEFAULT,
174            tenant: TenantId::new(1),
175            engine: EngineId::Vector,
176        });
177
178        assert_eq!(
179            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
180            100
181        );
182
183        drop(token);
184
185        assert_eq!(
186            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
187            0
188        );
189        assert_eq!(db_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
190        assert_eq!(tenant_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
191        assert_eq!(engine_ctr.load(std::sync::atomic::Ordering::Relaxed), 0);
192    }
193
194    #[test]
195    fn drop_with_no_scoped_counters_releases_global() {
196        let global = make_global(200);
197        let token = ReservationToken::new(ReservationParams {
198            global_counter: Arc::clone(&global),
199            database_counter: None,
200            tenant_counter: None,
201            engine_counter: None,
202            size: 200,
203            db: DatabaseId::DEFAULT,
204            tenant: TenantId::new(1),
205            engine: EngineId::Query,
206        });
207        drop(token);
208        assert_eq!(
209            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
210            0
211        );
212    }
213
214    #[test]
215    fn zero_size_drop_is_noop() {
216        let global = make_global(0);
217        let token = ReservationToken::new(ReservationParams {
218            global_counter: Arc::clone(&global),
219            database_counter: None,
220            tenant_counter: None,
221            engine_counter: None,
222            size: 0,
223            db: DatabaseId::DEFAULT,
224            tenant: TenantId::new(1),
225            engine: EngineId::Query,
226        });
227        drop(token);
228        assert_eq!(
229            global.allocated.load(std::sync::atomic::Ordering::Relaxed),
230            0
231        );
232    }
233}