Skip to main content

nodedb_mem/
governor.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Central memory governor.
4//!
5//! The governor owns all engine budgets and enforces the global ceiling.
6//! Every subsystem that wants to allocate significant memory must go through
7//! the governor.
8
9use std::collections::HashMap;
10
11use crate::budget::Budget;
12use crate::engine::EngineId;
13use crate::error::{MemError, Result};
14use crate::pressure::{PressureLevel, PressureThresholds};
15
16/// Configuration for the memory governor.
17#[derive(Debug, Clone)]
18pub struct GovernorConfig {
19    /// Global memory ceiling in bytes. The sum of all engine budgets
20    /// must not exceed this.
21    pub global_ceiling: usize,
22
23    /// Per-engine budget limits.
24    pub engine_limits: HashMap<EngineId, usize>,
25}
26
27impl GovernorConfig {
28    /// Validate that the sum of engine limits does not exceed the global ceiling.
29    pub fn validate(&self) -> Result<()> {
30        let total: usize = self.engine_limits.values().sum();
31        if total > self.global_ceiling {
32            return Err(MemError::GlobalCeilingExceeded {
33                allocated: total,
34                ceiling: self.global_ceiling,
35                requested: 0,
36            });
37        }
38        Ok(())
39    }
40}
41
42/// The central memory governor.
43///
44/// Thread-safe: all budget operations use atomics internally.
45#[derive(Debug)]
46pub struct MemoryGovernor {
47    /// Per-engine budgets.
48    budgets: HashMap<EngineId, Budget>,
49
50    /// Global ceiling in bytes.
51    global_ceiling: usize,
52
53    /// Pressure thresholds for graduated backpressure.
54    thresholds: PressureThresholds,
55}
56
57impl MemoryGovernor {
58    /// Create a new governor with the given configuration.
59    pub fn new(config: GovernorConfig) -> Result<Self> {
60        config.validate()?;
61
62        let mut budgets = HashMap::new();
63        for (engine, limit) in &config.engine_limits {
64            budgets.insert(*engine, Budget::new(*limit));
65        }
66
67        Ok(Self {
68            budgets,
69            global_ceiling: config.global_ceiling,
70            thresholds: PressureThresholds::default(),
71        })
72    }
73
74    /// Try to reserve `size` bytes for the given engine.
75    ///
76    /// Returns `Ok(())` if the reservation succeeded, or an error describing
77    /// why it was rejected.
78    pub fn try_reserve(&self, engine: EngineId, size: usize) -> Result<()> {
79        let budget = self
80            .budgets
81            .get(&engine)
82            .ok_or(MemError::UnknownEngine(engine))?;
83
84        // Check global ceiling first.
85        let total_allocated: usize = self.budgets.values().map(|b| b.allocated()).sum();
86        if total_allocated + size > self.global_ceiling {
87            return Err(MemError::GlobalCeilingExceeded {
88                allocated: total_allocated,
89                ceiling: self.global_ceiling,
90                requested: size,
91            });
92        }
93
94        // Check per-engine budget.
95        if !budget.try_reserve(size) {
96            return Err(MemError::BudgetExhausted {
97                engine,
98                requested: size,
99                available: budget.available(),
100                limit: budget.limit(),
101            });
102        }
103
104        Ok(())
105    }
106
107    /// Release `size` bytes back to the given engine's budget.
108    pub fn release(&self, engine: EngineId, size: usize) {
109        if let Some(budget) = self.budgets.get(&engine) {
110            budget.release(size);
111        }
112    }
113
114    /// Get the budget for a specific engine.
115    pub fn budget(&self, engine: EngineId) -> Option<&Budget> {
116        self.budgets.get(&engine)
117    }
118
119    /// Get the global ceiling.
120    pub fn global_ceiling(&self) -> usize {
121        self.global_ceiling
122    }
123
124    /// Total memory allocated across all engines.
125    pub fn total_allocated(&self) -> usize {
126        self.budgets.values().map(|b| b.allocated()).sum()
127    }
128
129    /// Global utilization as a percentage (0-100).
130    pub fn global_utilization_percent(&self) -> u8 {
131        if self.global_ceiling == 0 {
132            return 100;
133        }
134        ((self.total_allocated() * 100) / self.global_ceiling).min(100) as u8
135    }
136
137    /// Current pressure level for a specific engine.
138    pub fn engine_pressure(&self, engine: EngineId) -> PressureLevel {
139        self.budgets
140            .get(&engine)
141            .map(|b| self.thresholds.level_for(b.utilization_percent()))
142            .unwrap_or(PressureLevel::Emergency)
143    }
144
145    /// Current global pressure level.
146    pub fn global_pressure(&self) -> PressureLevel {
147        self.thresholds.level_for(self.global_utilization_percent())
148    }
149
150    /// Set custom pressure thresholds.
151    pub fn set_thresholds(&mut self, thresholds: PressureThresholds) {
152        self.thresholds = thresholds;
153    }
154
155    /// Snapshot of all engine budget states (for metrics/debugging).
156    pub fn snapshot(&self) -> Vec<EngineSnapshot> {
157        self.budgets
158            .iter()
159            .map(|(engine, budget)| EngineSnapshot {
160                engine: *engine,
161                allocated: budget.allocated(),
162                limit: budget.limit(),
163                peak: budget.peak(),
164                rejections: budget.rejections(),
165                utilization_percent: budget.utilization_percent(),
166            })
167            .collect()
168    }
169}
170
171/// Point-in-time snapshot of an engine's memory state.
172#[derive(Debug, Clone)]
173pub struct EngineSnapshot {
174    pub engine: EngineId,
175    pub allocated: usize,
176    pub limit: usize,
177    pub peak: usize,
178    pub rejections: usize,
179    pub utilization_percent: u8,
180}
181
182#[cfg(test)]
183mod tests {
184    use super::*;
185
186    fn test_config() -> GovernorConfig {
187        let mut engine_limits = HashMap::new();
188        engine_limits.insert(EngineId::Vector, 4096);
189        engine_limits.insert(EngineId::Query, 2048);
190        engine_limits.insert(EngineId::Timeseries, 1024);
191
192        GovernorConfig {
193            global_ceiling: 8192,
194            engine_limits,
195        }
196    }
197
198    #[test]
199    fn reserve_within_budget() {
200        let gov = MemoryGovernor::new(test_config()).unwrap();
201        gov.try_reserve(EngineId::Vector, 1000).unwrap();
202        assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
203    }
204
205    #[test]
206    fn reserve_exceeds_engine_budget() {
207        let gov = MemoryGovernor::new(test_config()).unwrap();
208        let err = gov.try_reserve(EngineId::Query, 3000).unwrap_err();
209        assert!(matches!(err, MemError::BudgetExhausted { .. }));
210    }
211
212    #[test]
213    fn reserve_exceeds_global_ceiling() {
214        let gov = MemoryGovernor::new(test_config()).unwrap();
215        gov.try_reserve(EngineId::Vector, 4096).unwrap();
216        gov.try_reserve(EngineId::Query, 2048).unwrap();
217        gov.try_reserve(EngineId::Timeseries, 1024).unwrap();
218
219        // Within engine budget but exceeds global ceiling.
220        // (This would need a 4th engine, but the global is already at 7168.)
221        // Let's try adding more to timeseries.
222        let err = gov.try_reserve(EngineId::Timeseries, 2000).unwrap_err();
223        // Should fail because timeseries budget is 1024 and already used 1024.
224        assert!(matches!(
225            err,
226            MemError::BudgetExhausted { .. } | MemError::GlobalCeilingExceeded { .. }
227        ));
228    }
229
230    #[test]
231    fn release_frees_capacity() {
232        let gov = MemoryGovernor::new(test_config()).unwrap();
233        gov.try_reserve(EngineId::Vector, 4096).unwrap();
234
235        assert!(gov.try_reserve(EngineId::Vector, 1).is_err());
236
237        gov.release(EngineId::Vector, 1000);
238        gov.try_reserve(EngineId::Vector, 500).unwrap();
239    }
240
241    #[test]
242    fn unknown_engine_rejected() {
243        let gov = MemoryGovernor::new(test_config()).unwrap();
244        let err = gov.try_reserve(EngineId::Crdt, 100).unwrap_err();
245        assert!(matches!(err, MemError::UnknownEngine(EngineId::Crdt)));
246    }
247
248    #[test]
249    fn snapshot_reports_all_engines() {
250        let gov = MemoryGovernor::new(test_config()).unwrap();
251        gov.try_reserve(EngineId::Vector, 2048).unwrap();
252
253        let snap = gov.snapshot();
254        assert_eq!(snap.len(), 3);
255
256        let vector_snap = snap.iter().find(|s| s.engine == EngineId::Vector).unwrap();
257        assert_eq!(vector_snap.allocated, 2048);
258        assert_eq!(vector_snap.limit, 4096);
259        assert_eq!(vector_snap.utilization_percent, 50);
260    }
261
262    #[test]
263    fn engine_pressure_levels() {
264        let gov = MemoryGovernor::new(test_config()).unwrap();
265
266        // Vector budget is 4096. At 0% → Normal.
267        assert_eq!(gov.engine_pressure(EngineId::Vector), PressureLevel::Normal);
268
269        // Allocate 70% → Warning.
270        gov.try_reserve(EngineId::Vector, 2868).unwrap(); // ~70%
271        assert_eq!(
272            gov.engine_pressure(EngineId::Vector),
273            PressureLevel::Warning
274        );
275
276        // Allocate to 87% → Critical.
277        gov.try_reserve(EngineId::Vector, 700).unwrap(); // ~87%
278        assert_eq!(
279            gov.engine_pressure(EngineId::Vector),
280            PressureLevel::Critical
281        );
282    }
283
284    #[test]
285    fn global_pressure_tracks_total() {
286        let gov = MemoryGovernor::new(test_config()).unwrap();
287        assert_eq!(gov.global_pressure(), PressureLevel::Normal);
288
289        // Fill to ~70% of global ceiling (8192).
290        gov.try_reserve(EngineId::Vector, 4096).unwrap();
291        gov.try_reserve(EngineId::Query, 1700).unwrap();
292        // ~70% = 5796/8192
293        assert!(gov.global_pressure() >= PressureLevel::Warning);
294    }
295
296    #[test]
297    fn invalid_config_rejected() {
298        let mut config = test_config();
299        config.global_ceiling = 100; // Way too small for the engine limits.
300        assert!(MemoryGovernor::new(config).is_err());
301    }
302}