Skip to main content

nodedb_mem/
governor.rs

1//! Central memory governor.
2//!
3//! The governor owns all engine budgets and enforces the global ceiling.
4//! Every subsystem that wants to allocate significant memory must go through
5//! the governor.
6
7use std::collections::HashMap;
8
9use crate::budget::Budget;
10use crate::engine::EngineId;
11use crate::error::{MemError, Result};
12use crate::pressure::{PressureLevel, PressureThresholds};
13
14/// Configuration for the memory governor.
15#[derive(Debug, Clone)]
16pub struct GovernorConfig {
17    /// Global memory ceiling in bytes. The sum of all engine budgets
18    /// must not exceed this.
19    pub global_ceiling: usize,
20
21    /// Per-engine budget limits.
22    pub engine_limits: HashMap<EngineId, usize>,
23}
24
25impl GovernorConfig {
26    /// Validate that the sum of engine limits does not exceed the global ceiling.
27    pub fn validate(&self) -> Result<()> {
28        let total: usize = self.engine_limits.values().sum();
29        if total > self.global_ceiling {
30            return Err(MemError::GlobalCeilingExceeded {
31                allocated: total,
32                ceiling: self.global_ceiling,
33                requested: 0,
34            });
35        }
36        Ok(())
37    }
38}
39
40/// The central memory governor.
41///
42/// Thread-safe: all budget operations use atomics internally.
43#[derive(Debug)]
44pub struct MemoryGovernor {
45    /// Per-engine budgets.
46    budgets: HashMap<EngineId, Budget>,
47
48    /// Global ceiling in bytes.
49    global_ceiling: usize,
50
51    /// Pressure thresholds for graduated backpressure.
52    thresholds: PressureThresholds,
53}
54
55impl MemoryGovernor {
56    /// Create a new governor with the given configuration.
57    pub fn new(config: GovernorConfig) -> Result<Self> {
58        config.validate()?;
59
60        let mut budgets = HashMap::new();
61        for (engine, limit) in &config.engine_limits {
62            budgets.insert(*engine, Budget::new(*limit));
63        }
64
65        Ok(Self {
66            budgets,
67            global_ceiling: config.global_ceiling,
68            thresholds: PressureThresholds::default(),
69        })
70    }
71
72    /// Try to reserve `size` bytes for the given engine.
73    ///
74    /// Returns `Ok(())` if the reservation succeeded, or an error describing
75    /// why it was rejected.
76    pub fn try_reserve(&self, engine: EngineId, size: usize) -> Result<()> {
77        let budget = self
78            .budgets
79            .get(&engine)
80            .ok_or(MemError::UnknownEngine(engine))?;
81
82        // Check global ceiling first.
83        let total_allocated: usize = self.budgets.values().map(|b| b.allocated()).sum();
84        if total_allocated + size > self.global_ceiling {
85            return Err(MemError::GlobalCeilingExceeded {
86                allocated: total_allocated,
87                ceiling: self.global_ceiling,
88                requested: size,
89            });
90        }
91
92        // Check per-engine budget.
93        if !budget.try_reserve(size) {
94            return Err(MemError::BudgetExhausted {
95                engine,
96                requested: size,
97                available: budget.available(),
98                limit: budget.limit(),
99            });
100        }
101
102        Ok(())
103    }
104
105    /// Release `size` bytes back to the given engine's budget.
106    pub fn release(&self, engine: EngineId, size: usize) {
107        if let Some(budget) = self.budgets.get(&engine) {
108            budget.release(size);
109        }
110    }
111
112    /// Get the budget for a specific engine.
113    pub fn budget(&self, engine: EngineId) -> Option<&Budget> {
114        self.budgets.get(&engine)
115    }
116
117    /// Get the global ceiling.
118    pub fn global_ceiling(&self) -> usize {
119        self.global_ceiling
120    }
121
122    /// Total memory allocated across all engines.
123    pub fn total_allocated(&self) -> usize {
124        self.budgets.values().map(|b| b.allocated()).sum()
125    }
126
127    /// Global utilization as a percentage (0-100).
128    pub fn global_utilization_percent(&self) -> u8 {
129        if self.global_ceiling == 0 {
130            return 100;
131        }
132        ((self.total_allocated() * 100) / self.global_ceiling).min(100) as u8
133    }
134
135    /// Current pressure level for a specific engine.
136    pub fn engine_pressure(&self, engine: EngineId) -> PressureLevel {
137        self.budgets
138            .get(&engine)
139            .map(|b| self.thresholds.level_for(b.utilization_percent()))
140            .unwrap_or(PressureLevel::Emergency)
141    }
142
143    /// Current global pressure level.
144    pub fn global_pressure(&self) -> PressureLevel {
145        self.thresholds.level_for(self.global_utilization_percent())
146    }
147
148    /// Set custom pressure thresholds.
149    pub fn set_thresholds(&mut self, thresholds: PressureThresholds) {
150        self.thresholds = thresholds;
151    }
152
153    /// Snapshot of all engine budget states (for metrics/debugging).
154    pub fn snapshot(&self) -> Vec<EngineSnapshot> {
155        self.budgets
156            .iter()
157            .map(|(engine, budget)| EngineSnapshot {
158                engine: *engine,
159                allocated: budget.allocated(),
160                limit: budget.limit(),
161                peak: budget.peak(),
162                rejections: budget.rejections(),
163                utilization_percent: budget.utilization_percent(),
164            })
165            .collect()
166    }
167}
168
169/// Point-in-time snapshot of an engine's memory state.
170#[derive(Debug, Clone)]
171pub struct EngineSnapshot {
172    pub engine: EngineId,
173    pub allocated: usize,
174    pub limit: usize,
175    pub peak: usize,
176    pub rejections: usize,
177    pub utilization_percent: u8,
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    fn test_config() -> GovernorConfig {
185        let mut engine_limits = HashMap::new();
186        engine_limits.insert(EngineId::Vector, 4096);
187        engine_limits.insert(EngineId::Query, 2048);
188        engine_limits.insert(EngineId::Timeseries, 1024);
189
190        GovernorConfig {
191            global_ceiling: 8192,
192            engine_limits,
193        }
194    }
195
196    #[test]
197    fn reserve_within_budget() {
198        let gov = MemoryGovernor::new(test_config()).unwrap();
199        gov.try_reserve(EngineId::Vector, 1000).unwrap();
200        assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
201    }
202
203    #[test]
204    fn reserve_exceeds_engine_budget() {
205        let gov = MemoryGovernor::new(test_config()).unwrap();
206        let err = gov.try_reserve(EngineId::Query, 3000).unwrap_err();
207        assert!(matches!(err, MemError::BudgetExhausted { .. }));
208    }
209
210    #[test]
211    fn reserve_exceeds_global_ceiling() {
212        let gov = MemoryGovernor::new(test_config()).unwrap();
213        gov.try_reserve(EngineId::Vector, 4096).unwrap();
214        gov.try_reserve(EngineId::Query, 2048).unwrap();
215        gov.try_reserve(EngineId::Timeseries, 1024).unwrap();
216
217        // Within engine budget but exceeds global ceiling.
218        // (This would need a 4th engine, but the global is already at 7168.)
219        // Let's try adding more to timeseries.
220        let err = gov.try_reserve(EngineId::Timeseries, 2000).unwrap_err();
221        // Should fail because timeseries budget is 1024 and already used 1024.
222        assert!(matches!(
223            err,
224            MemError::BudgetExhausted { .. } | MemError::GlobalCeilingExceeded { .. }
225        ));
226    }
227
228    #[test]
229    fn release_frees_capacity() {
230        let gov = MemoryGovernor::new(test_config()).unwrap();
231        gov.try_reserve(EngineId::Vector, 4096).unwrap();
232
233        assert!(gov.try_reserve(EngineId::Vector, 1).is_err());
234
235        gov.release(EngineId::Vector, 1000);
236        gov.try_reserve(EngineId::Vector, 500).unwrap();
237    }
238
239    #[test]
240    fn unknown_engine_rejected() {
241        let gov = MemoryGovernor::new(test_config()).unwrap();
242        let err = gov.try_reserve(EngineId::Crdt, 100).unwrap_err();
243        assert!(matches!(err, MemError::UnknownEngine(EngineId::Crdt)));
244    }
245
246    #[test]
247    fn snapshot_reports_all_engines() {
248        let gov = MemoryGovernor::new(test_config()).unwrap();
249        gov.try_reserve(EngineId::Vector, 2048).unwrap();
250
251        let snap = gov.snapshot();
252        assert_eq!(snap.len(), 3);
253
254        let vector_snap = snap.iter().find(|s| s.engine == EngineId::Vector).unwrap();
255        assert_eq!(vector_snap.allocated, 2048);
256        assert_eq!(vector_snap.limit, 4096);
257        assert_eq!(vector_snap.utilization_percent, 50);
258    }
259
260    #[test]
261    fn engine_pressure_levels() {
262        let gov = MemoryGovernor::new(test_config()).unwrap();
263
264        // Vector budget is 4096. At 0% → Normal.
265        assert_eq!(gov.engine_pressure(EngineId::Vector), PressureLevel::Normal);
266
267        // Allocate 70% → Warning.
268        gov.try_reserve(EngineId::Vector, 2868).unwrap(); // ~70%
269        assert_eq!(
270            gov.engine_pressure(EngineId::Vector),
271            PressureLevel::Warning
272        );
273
274        // Allocate to 87% → Critical.
275        gov.try_reserve(EngineId::Vector, 700).unwrap(); // ~87%
276        assert_eq!(
277            gov.engine_pressure(EngineId::Vector),
278            PressureLevel::Critical
279        );
280    }
281
282    #[test]
283    fn global_pressure_tracks_total() {
284        let gov = MemoryGovernor::new(test_config()).unwrap();
285        assert_eq!(gov.global_pressure(), PressureLevel::Normal);
286
287        // Fill to ~70% of global ceiling (8192).
288        gov.try_reserve(EngineId::Vector, 4096).unwrap();
289        gov.try_reserve(EngineId::Query, 1700).unwrap();
290        // ~70% = 5796/8192
291        assert!(gov.global_pressure() >= PressureLevel::Warning);
292    }
293
294    #[test]
295    fn invalid_config_rejected() {
296        let mut config = test_config();
297        config.global_ceiling = 100; // Way too small for the engine limits.
298        assert!(MemoryGovernor::new(config).is_err());
299    }
300}