Skip to main content

nodedb_mem/
budget_guard.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! RAII budget guard for the memory governor.
4//!
5//! [`BudgetGuard`] acquires a byte reservation from the [`MemoryGovernor`]
6//! on construction and releases it automatically when dropped.  This prevents
7//! budget leaks if the caller returns early or propagates an error between
8//! reserving and freeing memory.
9//!
10//! # Usage
11//!
12//! ```ignore
13//! let _g = governor.reserve(EngineId::Vector, n * size_of::<f32>())?;
14//! let v: Vec<f32> = Vec::with_capacity(n); // budget already reserved
15//! // _g dropped at end of scope → bytes returned to engine budget
16//! ```
17//!
18//! # `mem::forget` note
19//!
20//! If a `BudgetGuard` is forgotten via [`std::mem::forget`] the reservation
21//! is never released.  This is intentional: the guard owns accounting for
22//! bytes that a live allocation is using.  Callers must not forget guards
23//! that are the sole record of outstanding reservations.
24
25use std::sync::Arc;
26
27use crate::engine::EngineId;
28use crate::error::{MemError, Result};
29use crate::governor::MemoryGovernor;
30
31/// RAII guard that holds a byte reservation from the [`MemoryGovernor`].
32///
33/// Dropping the guard releases the reserved bytes back to the engine budget.
34/// The guard is `!Send` by default because it is normally used on Data-Plane
35/// TPC cores (`!Send` enforced by the executor).  If you genuinely need to
36/// move a guard across threads (e.g. from a background compaction task) you
37/// can wrap it in an explicit `Arc<Mutex<...>>` — but that pattern is rare
38/// and typically wrong on the Data Plane.
39#[must_use = "dropping a BudgetGuard immediately releases the reservation; bind it to a variable"]
40#[derive(Debug)]
41pub struct BudgetGuard {
42    governor: Arc<MemoryGovernor>,
43    engine: EngineId,
44    bytes: usize,
45}
46
47impl BudgetGuard {
48    /// Internal constructor — called only by [`MemoryGovernor::reserve`].
49    pub(crate) fn new(governor: Arc<MemoryGovernor>, engine: EngineId, bytes: usize) -> Self {
50        Self {
51            governor,
52            engine,
53            bytes,
54        }
55    }
56
57    /// The engine this guard is accounting against.
58    pub fn engine(&self) -> EngineId {
59        self.engine
60    }
61
62    /// The number of bytes reserved by this guard.
63    pub fn bytes(&self) -> usize {
64        self.bytes
65    }
66}
67
68impl Drop for BudgetGuard {
69    fn drop(&mut self) {
70        self.governor.release(self.engine, self.bytes);
71    }
72}
73
74impl MemoryGovernor {
75    /// Reserve `bytes` for `engine` and return a [`BudgetGuard`] that releases
76    /// them on drop.
77    ///
78    /// # Errors
79    ///
80    /// Returns [`MemError::BudgetExhausted`] or [`MemError::GlobalCeilingExceeded`]
81    /// if the reservation would exceed any configured limit.  Returns
82    /// [`MemError::UnknownEngine`] if `engine` is not registered.
83    pub fn reserve(self: &Arc<Self>, engine: EngineId, bytes: usize) -> Result<BudgetGuard> {
84        let budget = self.budget(engine).ok_or(MemError::UnknownEngine(engine))?;
85
86        // Global ceiling check.
87        let total_allocated = self.total_allocated();
88        let ceiling = self.global_ceiling();
89        if total_allocated + bytes > ceiling {
90            return Err(MemError::GlobalCeilingExceeded {
91                allocated: total_allocated,
92                ceiling,
93                requested: bytes,
94            });
95        }
96
97        // Per-engine check.
98        if !budget.try_reserve(bytes) {
99            return Err(MemError::BudgetExhausted {
100                engine,
101                requested: bytes,
102                available: budget.available(),
103                limit: budget.limit(),
104            });
105        }
106
107        Ok(BudgetGuard::new(Arc::clone(self), engine, bytes))
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::collections::HashMap;
114    use std::sync::Arc;
115
116    use super::*;
117    use crate::error::MemError;
118    use crate::governor::GovernorConfig;
119
120    fn make_governor(limits: &[(EngineId, usize)], ceiling: usize) -> Arc<MemoryGovernor> {
121        let engine_limits: HashMap<EngineId, usize> = limits.iter().copied().collect();
122        Arc::new(
123            MemoryGovernor::new(GovernorConfig {
124                global_ceiling: ceiling,
125                engine_limits,
126            })
127            .expect("valid config"),
128        )
129    }
130
131    #[test]
132    fn reserve_within_budget_releases_on_drop() {
133        let gov = make_governor(&[(EngineId::Vector, 4096)], 8192);
134
135        {
136            let guard = gov.reserve(EngineId::Vector, 1000).expect("within budget");
137            assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
138            assert_eq!(guard.bytes(), 1000);
139            assert_eq!(guard.engine(), EngineId::Vector);
140            // guard dropped here
141        }
142
143        assert_eq!(
144            gov.budget(EngineId::Vector).unwrap().allocated(),
145            0,
146            "bytes must be returned on drop"
147        );
148    }
149
150    #[test]
151    fn reserve_over_budget_returns_err() {
152        let gov = make_governor(&[(EngineId::Fts, 512)], 1024);
153
154        let err = gov.reserve(EngineId::Fts, 1000).unwrap_err();
155        assert!(
156            matches!(err, MemError::BudgetExhausted { .. }),
157            "expected BudgetExhausted, got {err:?}"
158        );
159        // No bytes charged.
160        assert_eq!(gov.budget(EngineId::Fts).unwrap().allocated(), 0);
161    }
162
163    #[test]
164    fn multiple_guards_accumulate_and_release_independently() {
165        let gov = make_governor(
166            &[
167                (EngineId::Vector, 4096),
168                (EngineId::Columnar, 4096),
169                (EngineId::Graph, 4096),
170            ],
171            16384,
172        );
173
174        let g1 = gov.reserve(EngineId::Vector, 1000).unwrap();
175        let g2 = gov.reserve(EngineId::Columnar, 2000).unwrap();
176        let g3 = gov.reserve(EngineId::Graph, 3000).unwrap();
177
178        assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
179        assert_eq!(gov.budget(EngineId::Columnar).unwrap().allocated(), 2000);
180        assert_eq!(gov.budget(EngineId::Graph).unwrap().allocated(), 3000);
181
182        drop(g2); // release only Columnar
183        assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
184        assert_eq!(gov.budget(EngineId::Columnar).unwrap().allocated(), 0);
185        assert_eq!(gov.budget(EngineId::Graph).unwrap().allocated(), 3000);
186
187        drop(g1);
188        drop(g3);
189        assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 0);
190        assert_eq!(gov.budget(EngineId::Graph).unwrap().allocated(), 0);
191    }
192
193    /// Demonstrates that `mem::forget` prevents the release.
194    /// This is documented behaviour — callers must not forget guards.
195    #[test]
196    fn mem_forget_does_not_release() {
197        let gov = make_governor(&[(EngineId::Kv, 4096)], 8192);
198
199        let guard = gov.reserve(EngineId::Kv, 500).unwrap();
200        assert_eq!(gov.budget(EngineId::Kv).unwrap().allocated(), 500);
201
202        std::mem::forget(guard);
203
204        // Bytes are NOT released — accounting drift matches the allocation.
205        assert_eq!(
206            gov.budget(EngineId::Kv).unwrap().allocated(),
207            500,
208            "mem::forget intentionally skips drop; bytes remain charged"
209        );
210    }
211
212    #[test]
213    fn reserve_zero_bytes_is_allowed() {
214        let gov = make_governor(&[(EngineId::Query, 1024)], 2048);
215        let guard = gov
216            .reserve(EngineId::Query, 0)
217            .expect("zero bytes always fits");
218        assert_eq!(guard.bytes(), 0);
219        drop(guard);
220        assert_eq!(gov.budget(EngineId::Query).unwrap().allocated(), 0);
221    }
222
223    #[test]
224    fn second_reserve_after_drop_succeeds() {
225        let gov = make_governor(&[(EngineId::Timeseries, 1024)], 2048);
226
227        {
228            let _g = gov.reserve(EngineId::Timeseries, 1024).unwrap();
229            // Budget fully consumed — a second reserve must fail.
230            assert!(gov.reserve(EngineId::Timeseries, 1).is_err());
231        } // _g dropped → budget freed
232
233        // Now the same reservation must succeed again.
234        let _g2 = gov
235            .reserve(EngineId::Timeseries, 1024)
236            .expect("budget freed by previous guard drop");
237    }
238}