Skip to main content

ringkernel_core/
memory_pressure.rs

1//! GPU Memory Pressure Handling — FR-005
2//!
3//! Automatic memory pressure response for GPU actor systems:
4//! - Per-actor memory budgets with soft/hard limits
5//! - Pressure signals broadcast to all actors
6//! - Configurable mitigation strategies
7//! - OOM recovery via supervisor restart with reduced allocation
8
9use std::collections::HashMap;
10use std::fmt;
11
12use crate::actor::ActorId;
13
14/// Memory pressure level.
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
16pub enum PressureLevel {
17    /// Normal operation (< 60% usage).
18    Normal,
19    /// Warning level (60-80% usage). Non-essential allocations should be avoided.
20    Warning,
21    /// Critical level (80-95% usage). Active mitigation required.
22    Critical,
23    /// OOM imminent (>95% usage). Emergency measures.
24    Emergency,
25}
26
27impl PressureLevel {
28    /// Compute pressure level from usage fraction (0.0 - 1.0).
29    pub fn from_usage(fraction: f64) -> Self {
30        if fraction >= 0.95 {
31            Self::Emergency
32        } else if fraction >= 0.80 {
33            Self::Critical
34        } else if fraction >= 0.60 {
35            Self::Warning
36        } else {
37            Self::Normal
38        }
39    }
40}
41
42impl fmt::Display for PressureLevel {
43    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
44        match self {
45            Self::Normal => write!(f, "normal"),
46            Self::Warning => write!(f, "warning"),
47            Self::Critical => write!(f, "critical"),
48            Self::Emergency => write!(f, "emergency"),
49        }
50    }
51}
52
53/// Mitigation strategy for memory pressure.
54#[derive(Debug, Clone, Copy, PartialEq, Eq)]
55pub enum MitigationStrategy {
56    /// Reduce queue depths (discard oldest buffered messages to DLQ).
57    ReduceQueues,
58    /// Spill cold data to host memory (GPU → CPU fallback).
59    SpillToHost,
60    /// Checkpoint and deactivate lowest-priority actors.
61    DeactivateActors,
62    /// Request garbage collection from actors (custom handler).
63    RequestGc,
64    /// Do nothing (let OOM happen naturally).
65    None,
66}
67
68/// Per-actor memory budget.
69#[derive(Debug, Clone)]
70pub struct MemoryBudget {
71    /// Soft limit: warning emitted when exceeded.
72    pub soft_limit_bytes: u64,
73    /// Hard limit: enforced maximum.
74    pub hard_limit_bytes: u64,
75    /// Current allocation.
76    pub current_bytes: u64,
77    /// Peak allocation observed.
78    pub peak_bytes: u64,
79    /// Mitigation strategy when budget exceeded.
80    pub mitigation: MitigationStrategy,
81}
82
83impl MemoryBudget {
84    /// Create a budget with soft and hard limits.
85    pub fn new(soft_limit: u64, hard_limit: u64) -> Self {
86        Self {
87            soft_limit_bytes: soft_limit,
88            hard_limit_bytes: hard_limit,
89            current_bytes: 0,
90            peak_bytes: 0,
91            mitigation: MitigationStrategy::ReduceQueues,
92        }
93    }
94
95    /// Record an allocation.
96    pub fn alloc(&mut self, bytes: u64) -> AllocationResult {
97        let new_total = self.current_bytes + bytes;
98
99        if new_total > self.hard_limit_bytes {
100            return AllocationResult::Denied {
101                requested: bytes,
102                available: self.hard_limit_bytes.saturating_sub(self.current_bytes),
103            };
104        }
105
106        self.current_bytes = new_total;
107        if new_total > self.peak_bytes {
108            self.peak_bytes = new_total;
109        }
110
111        if new_total > self.soft_limit_bytes {
112            AllocationResult::GrantedWithWarning {
113                usage_fraction: new_total as f64 / self.hard_limit_bytes as f64,
114            }
115        } else {
116            AllocationResult::Granted
117        }
118    }
119
120    /// Record a deallocation.
121    pub fn dealloc(&mut self, bytes: u64) {
122        self.current_bytes = self.current_bytes.saturating_sub(bytes);
123    }
124
125    /// Current usage as fraction (0.0 - 1.0).
126    pub fn usage_fraction(&self) -> f64 {
127        if self.hard_limit_bytes == 0 {
128            return 0.0;
129        }
130        self.current_bytes as f64 / self.hard_limit_bytes as f64
131    }
132
133    /// Current pressure level.
134    pub fn pressure_level(&self) -> PressureLevel {
135        PressureLevel::from_usage(self.usage_fraction())
136    }
137}
138
139/// Result of a memory allocation request.
140#[derive(Debug, Clone, PartialEq)]
141pub enum AllocationResult {
142    /// Allocation granted, within soft limit.
143    Granted,
144    /// Allocation granted, but soft limit exceeded (warning).
145    GrantedWithWarning {
146        /// Fraction of memory budget currently in use (0.0 to 1.0).
147        usage_fraction: f64,
148    },
149    /// Allocation denied — would exceed hard limit.
150    Denied {
151        /// Number of bytes requested.
152        requested: u64,
153        /// Number of bytes available.
154        available: u64,
155    },
156}
157
158/// Global memory pressure monitor for all actors.
159pub struct MemoryPressureMonitor {
160    /// Per-actor budgets.
161    budgets: HashMap<ActorId, MemoryBudget>,
162    /// Total GPU memory available.
163    total_gpu_memory: u64,
164    /// Total currently allocated across all actors.
165    total_allocated: u64,
166    /// Default budget for actors without explicit config.
167    default_budget: MemoryBudget,
168}
169
170impl MemoryPressureMonitor {
171    /// Create a monitor for a GPU with the given total memory.
172    pub fn new(total_gpu_memory: u64) -> Self {
173        let per_actor_default = total_gpu_memory / 8; // Default: 1/8 of GPU memory
174        Self {
175            budgets: HashMap::new(),
176            total_gpu_memory,
177            total_allocated: 0,
178            default_budget: MemoryBudget::new(
179                (per_actor_default as f64 * 0.8) as u64,
180                per_actor_default,
181            ),
182        }
183    }
184
185    /// Set a budget for a specific actor.
186    pub fn set_budget(&mut self, actor: ActorId, budget: MemoryBudget) {
187        self.budgets.insert(actor, budget);
188    }
189
190    /// Request allocation from an actor's budget.
191    pub fn request_alloc(&mut self, actor: ActorId, bytes: u64) -> AllocationResult {
192        let budget = self
193            .budgets
194            .entry(actor)
195            .or_insert_with(|| self.default_budget.clone());
196
197        let result = budget.alloc(bytes);
198        if matches!(
199            result,
200            AllocationResult::Granted | AllocationResult::GrantedWithWarning { .. }
201        ) {
202            self.total_allocated += bytes;
203        }
204        result
205    }
206
207    /// Record a deallocation.
208    pub fn record_dealloc(&mut self, actor: ActorId, bytes: u64) {
209        if let Some(budget) = self.budgets.get_mut(&actor) {
210            budget.dealloc(bytes);
211        }
212        self.total_allocated = self.total_allocated.saturating_sub(bytes);
213    }
214
215    /// Get the global pressure level across all actors.
216    pub fn global_pressure(&self) -> PressureLevel {
217        if self.total_gpu_memory == 0 {
218            return PressureLevel::Normal;
219        }
220        PressureLevel::from_usage(self.total_allocated as f64 / self.total_gpu_memory as f64)
221    }
222
223    /// Get actors that are above their soft limit.
224    pub fn actors_over_budget(&self) -> Vec<(ActorId, f64)> {
225        self.budgets
226            .iter()
227            .filter(|(_, b)| b.current_bytes > b.soft_limit_bytes)
228            .map(|(&id, b)| (id, b.usage_fraction()))
229            .collect()
230    }
231
232    /// Get actors sorted by memory usage (descending).
233    pub fn actors_by_usage(&self) -> Vec<(ActorId, u64)> {
234        let mut actors: Vec<_> = self
235            .budgets
236            .iter()
237            .map(|(&id, b)| (id, b.current_bytes))
238            .collect();
239        actors.sort_by_key(|a| std::cmp::Reverse(a.1));
240        actors
241    }
242
243    /// Get an actor's budget.
244    pub fn get_budget(&self, actor: ActorId) -> Option<&MemoryBudget> {
245        self.budgets.get(&actor)
246    }
247
248    /// Get total allocated across all actors.
249    pub fn total_allocated(&self) -> u64 {
250        self.total_allocated
251    }
252
253    /// Get total GPU memory.
254    pub fn total_gpu_memory(&self) -> u64 {
255        self.total_gpu_memory
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_pressure_levels() {
265        assert_eq!(PressureLevel::from_usage(0.3), PressureLevel::Normal);
266        assert_eq!(PressureLevel::from_usage(0.7), PressureLevel::Warning);
267        assert_eq!(PressureLevel::from_usage(0.9), PressureLevel::Critical);
268        assert_eq!(PressureLevel::from_usage(0.96), PressureLevel::Emergency);
269    }
270
271    #[test]
272    fn test_memory_budget() {
273        let mut budget = MemoryBudget::new(800, 1000);
274
275        assert_eq!(budget.alloc(500), AllocationResult::Granted);
276        assert_eq!(budget.current_bytes, 500);
277
278        // Exceeds soft limit
279        let result = budget.alloc(400);
280        assert!(matches!(
281            result,
282            AllocationResult::GrantedWithWarning { .. }
283        ));
284        assert_eq!(budget.current_bytes, 900);
285
286        // Exceeds hard limit
287        let result = budget.alloc(200);
288        assert!(matches!(result, AllocationResult::Denied { .. }));
289        assert_eq!(budget.current_bytes, 900); // Unchanged
290
291        budget.dealloc(500);
292        assert_eq!(budget.current_bytes, 400);
293        assert_eq!(budget.peak_bytes, 900); // Peak preserved
294    }
295
296    #[test]
297    fn test_monitor_global_pressure() {
298        let mut monitor = MemoryPressureMonitor::new(1000);
299
300        monitor.set_budget(ActorId(1), MemoryBudget::new(400, 500));
301        monitor.set_budget(ActorId(2), MemoryBudget::new(400, 500));
302
303        monitor.request_alloc(ActorId(1), 300);
304        monitor.request_alloc(ActorId(2), 300);
305
306        assert_eq!(monitor.total_allocated(), 600);
307        assert_eq!(monitor.global_pressure(), PressureLevel::Warning);
308    }
309
310    #[test]
311    fn test_actors_over_budget() {
312        let mut monitor = MemoryPressureMonitor::new(10000);
313        monitor.set_budget(ActorId(1), MemoryBudget::new(100, 200));
314        monitor.set_budget(ActorId(2), MemoryBudget::new(100, 200));
315
316        monitor.request_alloc(ActorId(1), 150); // Over soft
317        monitor.request_alloc(ActorId(2), 50); // Under soft
318
319        let over = monitor.actors_over_budget();
320        assert_eq!(over.len(), 1);
321        assert_eq!(over[0].0, ActorId(1));
322    }
323
324    #[test]
325    fn test_actors_by_usage() {
326        let mut monitor = MemoryPressureMonitor::new(10000);
327        monitor.set_budget(ActorId(1), MemoryBudget::new(500, 1000));
328        monitor.set_budget(ActorId(2), MemoryBudget::new(500, 1000));
329
330        monitor.request_alloc(ActorId(1), 100);
331        monitor.request_alloc(ActorId(2), 300);
332
333        let sorted = monitor.actors_by_usage();
334        assert_eq!(sorted[0].0, ActorId(2)); // Highest usage first
335        assert_eq!(sorted[1].0, ActorId(1));
336    }
337}