1use std::collections::HashMap;
8
9use crate::budget::Budget;
10use crate::engine::EngineId;
11use crate::error::{MemError, Result};
12use crate::pressure::{PressureLevel, PressureThresholds};
13
14#[derive(Debug, Clone)]
16pub struct GovernorConfig {
17 pub global_ceiling: usize,
20
21 pub engine_limits: HashMap<EngineId, usize>,
23}
24
25impl GovernorConfig {
26 pub fn validate(&self) -> Result<()> {
28 let total: usize = self.engine_limits.values().sum();
29 if total > self.global_ceiling {
30 return Err(MemError::GlobalCeilingExceeded {
31 allocated: total,
32 ceiling: self.global_ceiling,
33 requested: 0,
34 });
35 }
36 Ok(())
37 }
38}
39
40#[derive(Debug)]
44pub struct MemoryGovernor {
45 budgets: HashMap<EngineId, Budget>,
47
48 global_ceiling: usize,
50
51 thresholds: PressureThresholds,
53}
54
55impl MemoryGovernor {
56 pub fn new(config: GovernorConfig) -> Result<Self> {
58 config.validate()?;
59
60 let mut budgets = HashMap::new();
61 for (engine, limit) in &config.engine_limits {
62 budgets.insert(*engine, Budget::new(*limit));
63 }
64
65 Ok(Self {
66 budgets,
67 global_ceiling: config.global_ceiling,
68 thresholds: PressureThresholds::default(),
69 })
70 }
71
72 pub fn try_reserve(&self, engine: EngineId, size: usize) -> Result<()> {
77 let budget = self
78 .budgets
79 .get(&engine)
80 .ok_or(MemError::UnknownEngine(engine))?;
81
82 let total_allocated: usize = self.budgets.values().map(|b| b.allocated()).sum();
84 if total_allocated + size > self.global_ceiling {
85 return Err(MemError::GlobalCeilingExceeded {
86 allocated: total_allocated,
87 ceiling: self.global_ceiling,
88 requested: size,
89 });
90 }
91
92 if !budget.try_reserve(size) {
94 return Err(MemError::BudgetExhausted {
95 engine,
96 requested: size,
97 available: budget.available(),
98 limit: budget.limit(),
99 });
100 }
101
102 Ok(())
103 }
104
105 pub fn release(&self, engine: EngineId, size: usize) {
107 if let Some(budget) = self.budgets.get(&engine) {
108 budget.release(size);
109 }
110 }
111
112 pub fn budget(&self, engine: EngineId) -> Option<&Budget> {
114 self.budgets.get(&engine)
115 }
116
117 pub fn global_ceiling(&self) -> usize {
119 self.global_ceiling
120 }
121
122 pub fn total_allocated(&self) -> usize {
124 self.budgets.values().map(|b| b.allocated()).sum()
125 }
126
127 pub fn global_utilization_percent(&self) -> u8 {
129 if self.global_ceiling == 0 {
130 return 100;
131 }
132 ((self.total_allocated() * 100) / self.global_ceiling).min(100) as u8
133 }
134
135 pub fn engine_pressure(&self, engine: EngineId) -> PressureLevel {
137 self.budgets
138 .get(&engine)
139 .map(|b| self.thresholds.level_for(b.utilization_percent()))
140 .unwrap_or(PressureLevel::Emergency)
141 }
142
143 pub fn global_pressure(&self) -> PressureLevel {
145 self.thresholds.level_for(self.global_utilization_percent())
146 }
147
148 pub fn set_thresholds(&mut self, thresholds: PressureThresholds) {
150 self.thresholds = thresholds;
151 }
152
153 pub fn snapshot(&self) -> Vec<EngineSnapshot> {
155 self.budgets
156 .iter()
157 .map(|(engine, budget)| EngineSnapshot {
158 engine: *engine,
159 allocated: budget.allocated(),
160 limit: budget.limit(),
161 peak: budget.peak(),
162 rejections: budget.rejections(),
163 utilization_percent: budget.utilization_percent(),
164 })
165 .collect()
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct EngineSnapshot {
172 pub engine: EngineId,
173 pub allocated: usize,
174 pub limit: usize,
175 pub peak: usize,
176 pub rejections: usize,
177 pub utilization_percent: u8,
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 fn test_config() -> GovernorConfig {
185 let mut engine_limits = HashMap::new();
186 engine_limits.insert(EngineId::Vector, 4096);
187 engine_limits.insert(EngineId::Query, 2048);
188 engine_limits.insert(EngineId::Timeseries, 1024);
189
190 GovernorConfig {
191 global_ceiling: 8192,
192 engine_limits,
193 }
194 }
195
196 #[test]
197 fn reserve_within_budget() {
198 let gov = MemoryGovernor::new(test_config()).unwrap();
199 gov.try_reserve(EngineId::Vector, 1000).unwrap();
200 assert_eq!(gov.budget(EngineId::Vector).unwrap().allocated(), 1000);
201 }
202
203 #[test]
204 fn reserve_exceeds_engine_budget() {
205 let gov = MemoryGovernor::new(test_config()).unwrap();
206 let err = gov.try_reserve(EngineId::Query, 3000).unwrap_err();
207 assert!(matches!(err, MemError::BudgetExhausted { .. }));
208 }
209
210 #[test]
211 fn reserve_exceeds_global_ceiling() {
212 let gov = MemoryGovernor::new(test_config()).unwrap();
213 gov.try_reserve(EngineId::Vector, 4096).unwrap();
214 gov.try_reserve(EngineId::Query, 2048).unwrap();
215 gov.try_reserve(EngineId::Timeseries, 1024).unwrap();
216
217 let err = gov.try_reserve(EngineId::Timeseries, 2000).unwrap_err();
221 assert!(matches!(
223 err,
224 MemError::BudgetExhausted { .. } | MemError::GlobalCeilingExceeded { .. }
225 ));
226 }
227
228 #[test]
229 fn release_frees_capacity() {
230 let gov = MemoryGovernor::new(test_config()).unwrap();
231 gov.try_reserve(EngineId::Vector, 4096).unwrap();
232
233 assert!(gov.try_reserve(EngineId::Vector, 1).is_err());
234
235 gov.release(EngineId::Vector, 1000);
236 gov.try_reserve(EngineId::Vector, 500).unwrap();
237 }
238
239 #[test]
240 fn unknown_engine_rejected() {
241 let gov = MemoryGovernor::new(test_config()).unwrap();
242 let err = gov.try_reserve(EngineId::Crdt, 100).unwrap_err();
243 assert!(matches!(err, MemError::UnknownEngine(EngineId::Crdt)));
244 }
245
246 #[test]
247 fn snapshot_reports_all_engines() {
248 let gov = MemoryGovernor::new(test_config()).unwrap();
249 gov.try_reserve(EngineId::Vector, 2048).unwrap();
250
251 let snap = gov.snapshot();
252 assert_eq!(snap.len(), 3);
253
254 let vector_snap = snap.iter().find(|s| s.engine == EngineId::Vector).unwrap();
255 assert_eq!(vector_snap.allocated, 2048);
256 assert_eq!(vector_snap.limit, 4096);
257 assert_eq!(vector_snap.utilization_percent, 50);
258 }
259
260 #[test]
261 fn engine_pressure_levels() {
262 let gov = MemoryGovernor::new(test_config()).unwrap();
263
264 assert_eq!(gov.engine_pressure(EngineId::Vector), PressureLevel::Normal);
266
267 gov.try_reserve(EngineId::Vector, 2868).unwrap(); assert_eq!(
270 gov.engine_pressure(EngineId::Vector),
271 PressureLevel::Warning
272 );
273
274 gov.try_reserve(EngineId::Vector, 700).unwrap(); assert_eq!(
277 gov.engine_pressure(EngineId::Vector),
278 PressureLevel::Critical
279 );
280 }
281
282 #[test]
283 fn global_pressure_tracks_total() {
284 let gov = MemoryGovernor::new(test_config()).unwrap();
285 assert_eq!(gov.global_pressure(), PressureLevel::Normal);
286
287 gov.try_reserve(EngineId::Vector, 4096).unwrap();
289 gov.try_reserve(EngineId::Query, 1700).unwrap();
290 assert!(gov.global_pressure() >= PressureLevel::Warning);
292 }
293
294 #[test]
295 fn invalid_config_rejected() {
296 let mut config = test_config();
297 config.global_ceiling = 100; assert!(MemoryGovernor::new(config).is_err());
299 }
300}