1use std::collections::HashMap;
10use std::fmt;
11
12use crate::actor::ActorId;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
16pub enum PressureLevel {
17 Normal,
19 Warning,
21 Critical,
23 Emergency,
25}
26
27impl PressureLevel {
28 pub fn from_usage(fraction: f64) -> Self {
30 if fraction >= 0.95 {
31 Self::Emergency
32 } else if fraction >= 0.80 {
33 Self::Critical
34 } else if fraction >= 0.60 {
35 Self::Warning
36 } else {
37 Self::Normal
38 }
39 }
40}
41
42impl fmt::Display for PressureLevel {
43 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44 match self {
45 Self::Normal => write!(f, "normal"),
46 Self::Warning => write!(f, "warning"),
47 Self::Critical => write!(f, "critical"),
48 Self::Emergency => write!(f, "emergency"),
49 }
50 }
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum MitigationStrategy {
56 ReduceQueues,
58 SpillToHost,
60 DeactivateActors,
62 RequestGc,
64 None,
66}
67
68#[derive(Debug, Clone)]
70pub struct MemoryBudget {
71 pub soft_limit_bytes: u64,
73 pub hard_limit_bytes: u64,
75 pub current_bytes: u64,
77 pub peak_bytes: u64,
79 pub mitigation: MitigationStrategy,
81}
82
83impl MemoryBudget {
84 pub fn new(soft_limit: u64, hard_limit: u64) -> Self {
86 Self {
87 soft_limit_bytes: soft_limit,
88 hard_limit_bytes: hard_limit,
89 current_bytes: 0,
90 peak_bytes: 0,
91 mitigation: MitigationStrategy::ReduceQueues,
92 }
93 }
94
95 pub fn alloc(&mut self, bytes: u64) -> AllocationResult {
97 let new_total = self.current_bytes + bytes;
98
99 if new_total > self.hard_limit_bytes {
100 return AllocationResult::Denied {
101 requested: bytes,
102 available: self.hard_limit_bytes.saturating_sub(self.current_bytes),
103 };
104 }
105
106 self.current_bytes = new_total;
107 if new_total > self.peak_bytes {
108 self.peak_bytes = new_total;
109 }
110
111 if new_total > self.soft_limit_bytes {
112 AllocationResult::GrantedWithWarning {
113 usage_fraction: new_total as f64 / self.hard_limit_bytes as f64,
114 }
115 } else {
116 AllocationResult::Granted
117 }
118 }
119
120 pub fn dealloc(&mut self, bytes: u64) {
122 self.current_bytes = self.current_bytes.saturating_sub(bytes);
123 }
124
125 pub fn usage_fraction(&self) -> f64 {
127 if self.hard_limit_bytes == 0 {
128 return 0.0;
129 }
130 self.current_bytes as f64 / self.hard_limit_bytes as f64
131 }
132
133 pub fn pressure_level(&self) -> PressureLevel {
135 PressureLevel::from_usage(self.usage_fraction())
136 }
137}
138
139#[derive(Debug, Clone, PartialEq)]
141pub enum AllocationResult {
142 Granted,
144 GrantedWithWarning {
146 usage_fraction: f64,
148 },
149 Denied {
151 requested: u64,
153 available: u64,
155 },
156}
157
158pub struct MemoryPressureMonitor {
160 budgets: HashMap<ActorId, MemoryBudget>,
162 total_gpu_memory: u64,
164 total_allocated: u64,
166 default_budget: MemoryBudget,
168}
169
170impl MemoryPressureMonitor {
171 pub fn new(total_gpu_memory: u64) -> Self {
173 let per_actor_default = total_gpu_memory / 8; Self {
175 budgets: HashMap::new(),
176 total_gpu_memory,
177 total_allocated: 0,
178 default_budget: MemoryBudget::new(
179 (per_actor_default as f64 * 0.8) as u64,
180 per_actor_default,
181 ),
182 }
183 }
184
185 pub fn set_budget(&mut self, actor: ActorId, budget: MemoryBudget) {
187 self.budgets.insert(actor, budget);
188 }
189
190 pub fn request_alloc(&mut self, actor: ActorId, bytes: u64) -> AllocationResult {
192 let budget = self
193 .budgets
194 .entry(actor)
195 .or_insert_with(|| self.default_budget.clone());
196
197 let result = budget.alloc(bytes);
198 if matches!(
199 result,
200 AllocationResult::Granted | AllocationResult::GrantedWithWarning { .. }
201 ) {
202 self.total_allocated += bytes;
203 }
204 result
205 }
206
207 pub fn record_dealloc(&mut self, actor: ActorId, bytes: u64) {
209 if let Some(budget) = self.budgets.get_mut(&actor) {
210 budget.dealloc(bytes);
211 }
212 self.total_allocated = self.total_allocated.saturating_sub(bytes);
213 }
214
215 pub fn global_pressure(&self) -> PressureLevel {
217 if self.total_gpu_memory == 0 {
218 return PressureLevel::Normal;
219 }
220 PressureLevel::from_usage(self.total_allocated as f64 / self.total_gpu_memory as f64)
221 }
222
223 pub fn actors_over_budget(&self) -> Vec<(ActorId, f64)> {
225 self.budgets
226 .iter()
227 .filter(|(_, b)| b.current_bytes > b.soft_limit_bytes)
228 .map(|(&id, b)| (id, b.usage_fraction()))
229 .collect()
230 }
231
232 pub fn actors_by_usage(&self) -> Vec<(ActorId, u64)> {
234 let mut actors: Vec<_> = self
235 .budgets
236 .iter()
237 .map(|(&id, b)| (id, b.current_bytes))
238 .collect();
239 actors.sort_by_key(|a| std::cmp::Reverse(a.1));
240 actors
241 }
242
243 pub fn get_budget(&self, actor: ActorId) -> Option<&MemoryBudget> {
245 self.budgets.get(&actor)
246 }
247
248 pub fn total_allocated(&self) -> u64 {
250 self.total_allocated
251 }
252
253 pub fn total_gpu_memory(&self) -> u64 {
255 self.total_gpu_memory
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn test_pressure_levels() {
265 assert_eq!(PressureLevel::from_usage(0.3), PressureLevel::Normal);
266 assert_eq!(PressureLevel::from_usage(0.7), PressureLevel::Warning);
267 assert_eq!(PressureLevel::from_usage(0.9), PressureLevel::Critical);
268 assert_eq!(PressureLevel::from_usage(0.96), PressureLevel::Emergency);
269 }
270
271 #[test]
272 fn test_memory_budget() {
273 let mut budget = MemoryBudget::new(800, 1000);
274
275 assert_eq!(budget.alloc(500), AllocationResult::Granted);
276 assert_eq!(budget.current_bytes, 500);
277
278 let result = budget.alloc(400);
280 assert!(matches!(
281 result,
282 AllocationResult::GrantedWithWarning { .. }
283 ));
284 assert_eq!(budget.current_bytes, 900);
285
286 let result = budget.alloc(200);
288 assert!(matches!(result, AllocationResult::Denied { .. }));
289 assert_eq!(budget.current_bytes, 900); budget.dealloc(500);
292 assert_eq!(budget.current_bytes, 400);
293 assert_eq!(budget.peak_bytes, 900); }
295
296 #[test]
297 fn test_monitor_global_pressure() {
298 let mut monitor = MemoryPressureMonitor::new(1000);
299
300 monitor.set_budget(ActorId(1), MemoryBudget::new(400, 500));
301 monitor.set_budget(ActorId(2), MemoryBudget::new(400, 500));
302
303 monitor.request_alloc(ActorId(1), 300);
304 monitor.request_alloc(ActorId(2), 300);
305
306 assert_eq!(monitor.total_allocated(), 600);
307 assert_eq!(monitor.global_pressure(), PressureLevel::Warning);
308 }
309
310 #[test]
311 fn test_actors_over_budget() {
312 let mut monitor = MemoryPressureMonitor::new(10000);
313 monitor.set_budget(ActorId(1), MemoryBudget::new(100, 200));
314 monitor.set_budget(ActorId(2), MemoryBudget::new(100, 200));
315
316 monitor.request_alloc(ActorId(1), 150); monitor.request_alloc(ActorId(2), 50); let over = monitor.actors_over_budget();
320 assert_eq!(over.len(), 1);
321 assert_eq!(over[0].0, ActorId(1));
322 }
323
324 #[test]
325 fn test_actors_by_usage() {
326 let mut monitor = MemoryPressureMonitor::new(10000);
327 monitor.set_budget(ActorId(1), MemoryBudget::new(500, 1000));
328 monitor.set_budget(ActorId(2), MemoryBudget::new(500, 1000));
329
330 monitor.request_alloc(ActorId(1), 100);
331 monitor.request_alloc(ActorId(2), 300);
332
333 let sorted = monitor.actors_by_usage();
334 assert_eq!(sorted[0].0, ActorId(2)); assert_eq!(sorted[1].0, ActorId(1));
336 }
337}