Skip to main content

datasynth_core/
memory_guard.rs

1//! Memory management and guardrails for preventing OOM conditions.
2//!
3//! This module provides memory tracking and enforcement across different platforms,
4//! with configurable soft and hard limits, automatic GC hints, and graceful degradation.
5
6use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
7use std::sync::Arc;
8
9/// Memory usage statistics.
10#[derive(Debug, Clone, Default)]
11pub struct MemoryStats {
12    /// Current resident memory in bytes
13    pub resident_bytes: u64,
14    /// Peak resident memory in bytes
15    pub peak_resident_bytes: u64,
16    /// Number of memory limit checks performed
17    pub checks_performed: u64,
18    /// Number of soft limit warnings
19    pub soft_limit_warnings: u64,
20    /// Whether hard limit was ever exceeded
21    pub hard_limit_exceeded: bool,
22}
23
24/// Memory guard configuration.
25#[derive(Debug, Clone)]
26pub struct MemoryGuardConfig {
27    /// Hard memory limit in MB (0 = disabled)
28    pub hard_limit_mb: usize,
29    /// Soft memory limit in MB for warnings (0 = disabled, typically 80% of hard limit)
30    pub soft_limit_mb: usize,
31    /// Check interval (every N operations)
32    pub check_interval: usize,
33    /// Whether to enable aggressive mode (check more frequently)
34    pub aggressive_mode: bool,
35    /// Maximum allowed growth rate (MB per second) before warning
36    pub max_growth_rate_mb_per_sec: f64,
37}
38
39impl Default for MemoryGuardConfig {
40    fn default() -> Self {
41        Self {
42            hard_limit_mb: 0,    // Disabled by default
43            soft_limit_mb: 0,    // Disabled by default
44            check_interval: 500, // Check every 500 operations
45            aggressive_mode: false,
46            max_growth_rate_mb_per_sec: 100.0,
47        }
48    }
49}
50
51impl MemoryGuardConfig {
52    /// Create config with specified hard limit (soft limit auto-calculated at 80%)
53    pub fn with_limit_mb(hard_limit_mb: usize) -> Self {
54        Self {
55            hard_limit_mb,
56            soft_limit_mb: (hard_limit_mb * 80) / 100,
57            ..Default::default()
58        }
59    }
60
61    /// Enable aggressive memory checking
62    pub fn aggressive(mut self) -> Self {
63        self.aggressive_mode = true;
64        self.check_interval = 100;
65        self
66    }
67}
68
69/// Memory limit exceeded error.
70#[derive(Debug, Clone)]
71pub struct MemoryLimitExceeded {
72    pub current_mb: usize,
73    pub limit_mb: usize,
74    pub is_soft_limit: bool,
75    pub message: String,
76}
77
78impl std::fmt::Display for MemoryLimitExceeded {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "{}", self.message)
81    }
82}
83
84impl std::error::Error for MemoryLimitExceeded {}
85
86/// Thread-safe memory guard for monitoring and enforcing memory limits.
87#[derive(Debug)]
88pub struct MemoryGuard {
89    config: MemoryGuardConfig,
90    operation_counter: AtomicU64,
91    peak_memory_mb: AtomicUsize,
92    soft_warnings_count: AtomicU64,
93    hard_limit_exceeded: AtomicBool,
94    last_check_time_ns: AtomicU64,
95    last_check_memory_mb: AtomicUsize,
96}
97
98impl MemoryGuard {
99    /// Create a new memory guard with the given configuration.
100    pub fn new(config: MemoryGuardConfig) -> Self {
101        Self {
102            config,
103            operation_counter: AtomicU64::new(0),
104            peak_memory_mb: AtomicUsize::new(0),
105            soft_warnings_count: AtomicU64::new(0),
106            hard_limit_exceeded: AtomicBool::new(false),
107            last_check_time_ns: AtomicU64::new(0),
108            last_check_memory_mb: AtomicUsize::new(0),
109        }
110    }
111
112    /// Create a memory guard with default configuration.
113    pub fn default_guard() -> Self {
114        Self::new(MemoryGuardConfig::default())
115    }
116
117    /// Create a memory guard with specified limit.
118    pub fn with_limit(limit_mb: usize) -> Self {
119        Self::new(MemoryGuardConfig::with_limit_mb(limit_mb))
120    }
121
122    /// Create an Arc-wrapped memory guard for sharing across threads.
123    pub fn shared(config: MemoryGuardConfig) -> Arc<Self> {
124        Arc::new(Self::new(config))
125    }
126
127    /// Check memory limit (returns error if hard limit exceeded).
128    ///
129    /// This should be called periodically during generation.
130    /// It's designed to be efficient - actual memory checks only happen
131    /// at the configured interval.
132    pub fn check(&self) -> Result<(), MemoryLimitExceeded> {
133        // Disabled if no limits set
134        if self.config.hard_limit_mb == 0 {
135            return Ok(());
136        }
137
138        let count = self.operation_counter.fetch_add(1, Ordering::Relaxed);
139
140        // Only check at intervals to minimize overhead
141        let interval = if self.config.aggressive_mode {
142            self.config.check_interval / 5
143        } else {
144            self.config.check_interval
145        };
146
147        if count % interval as u64 != 0 {
148            return Ok(());
149        }
150
151        self.check_now()
152    }
153
154    /// Force an immediate memory check (bypasses interval).
155    pub fn check_now(&self) -> Result<(), MemoryLimitExceeded> {
156        if self.config.hard_limit_mb == 0 {
157            return Ok(());
158        }
159
160        let current_mb = get_memory_usage_mb().unwrap_or(0);
161
162        // Update peak
163        let mut peak = self.peak_memory_mb.load(Ordering::Relaxed);
164        while current_mb > peak {
165            match self.peak_memory_mb.compare_exchange_weak(
166                peak,
167                current_mb,
168                Ordering::Relaxed,
169                Ordering::Relaxed,
170            ) {
171                Ok(_) => break,
172                Err(p) => peak = p,
173            }
174        }
175
176        // Check growth rate
177        let now_ns = std::time::SystemTime::now()
178            .duration_since(std::time::UNIX_EPOCH)
179            .map(|d| d.as_nanos() as u64)
180            .unwrap_or(0);
181
182        let last_time = self.last_check_time_ns.swap(now_ns, Ordering::Relaxed);
183        let last_mem = self
184            .last_check_memory_mb
185            .swap(current_mb, Ordering::Relaxed);
186
187        if last_time > 0 && now_ns > last_time {
188            let elapsed_sec = (now_ns - last_time) as f64 / 1_000_000_000.0;
189            if elapsed_sec > 0.0 && current_mb > last_mem {
190                let growth_rate = (current_mb - last_mem) as f64 / elapsed_sec;
191                if growth_rate > self.config.max_growth_rate_mb_per_sec {
192                    // High memory growth rate detected - consumer should check stats
193                    // Note: Growth rate warning is logged by the caller
194                    let _ = growth_rate; // Silence unused variable warning
195                }
196            }
197        }
198
199        // Check hard limit
200        if current_mb > self.config.hard_limit_mb {
201            self.hard_limit_exceeded.store(true, Ordering::Relaxed);
202            return Err(MemoryLimitExceeded {
203                current_mb,
204                limit_mb: self.config.hard_limit_mb,
205                is_soft_limit: false,
206                message: format!(
207                    "Memory limit exceeded: using {} MB, hard limit is {} MB. \
208                     Reduce transaction volume or increase memory_limit_mb in config.",
209                    current_mb, self.config.hard_limit_mb
210                ),
211            });
212        }
213
214        // Check soft limit (warning only)
215        if self.config.soft_limit_mb > 0 && current_mb > self.config.soft_limit_mb {
216            self.soft_warnings_count.fetch_add(1, Ordering::Relaxed);
217            // Soft limit exceeded - consumer should check stats for warning count
218        }
219
220        Ok(())
221    }
222
223    /// Get current memory statistics.
224    pub fn stats(&self) -> MemoryStats {
225        let current = get_memory_usage_mb().unwrap_or(0);
226        MemoryStats {
227            resident_bytes: (current as u64) * 1024 * 1024,
228            peak_resident_bytes: (self.peak_memory_mb.load(Ordering::Relaxed) as u64) * 1024 * 1024,
229            checks_performed: self.operation_counter.load(Ordering::Relaxed),
230            soft_limit_warnings: self.soft_warnings_count.load(Ordering::Relaxed),
231            hard_limit_exceeded: self.hard_limit_exceeded.load(Ordering::Relaxed),
232        }
233    }
234
235    /// Get current memory usage in MB.
236    pub fn current_usage_mb(&self) -> usize {
237        get_memory_usage_mb().unwrap_or(0)
238    }
239
240    /// Get peak memory usage in MB.
241    pub fn peak_usage_mb(&self) -> usize {
242        self.peak_memory_mb.load(Ordering::Relaxed)
243    }
244
245    /// Check if memory tracking is available on this platform.
246    pub fn is_available() -> bool {
247        get_memory_usage_mb().is_some()
248    }
249
250    /// Reset statistics (for testing).
251    pub fn reset_stats(&self) {
252        self.operation_counter.store(0, Ordering::Relaxed);
253        self.soft_warnings_count.store(0, Ordering::Relaxed);
254        self.hard_limit_exceeded.store(false, Ordering::Relaxed);
255    }
256}
257
258impl Default for MemoryGuard {
259    fn default() -> Self {
260        Self::default_guard()
261    }
262}
263
264/// Get current process memory usage in MB (Linux implementation).
265#[cfg(target_os = "linux")]
266pub fn get_memory_usage_mb() -> Option<usize> {
267    use std::fs;
268
269    // Try /proc/self/statm first (faster)
270    if let Ok(content) = fs::read_to_string("/proc/self/statm") {
271        let parts: Vec<&str> = content.split_whitespace().collect();
272        if parts.len() >= 2 {
273            if let Ok(pages) = parts[1].parse::<usize>() {
274                // Resident pages * page size (typically 4KB)
275                let page_size_kb = 4;
276                return Some((pages * page_size_kb) / 1024);
277            }
278        }
279    }
280
281    // Fallback to /proc/self/status (more detailed but slower)
282    if let Ok(content) = fs::read_to_string("/proc/self/status") {
283        for line in content.lines() {
284            if line.starts_with("VmRSS:") {
285                let parts: Vec<&str> = line.split_whitespace().collect();
286                if parts.len() >= 2 {
287                    if let Ok(kb) = parts[1].parse::<usize>() {
288                        return Some(kb / 1024);
289                    }
290                }
291            }
292        }
293    }
294
295    None
296}
297
298/// Get current process memory usage in MB (macOS implementation).
299#[cfg(target_os = "macos")]
300pub fn get_memory_usage_mb() -> Option<usize> {
301    use std::process::Command;
302
303    // Use ps to get RSS (resident set size)
304    let output = Command::new("ps")
305        .args(["-o", "rss=", "-p", &std::process::id().to_string()])
306        .output()
307        .ok()?;
308
309    let rss_kb: usize = String::from_utf8_lossy(&output.stdout)
310        .trim()
311        .parse()
312        .ok()?;
313
314    Some(rss_kb / 1024)
315}
316
317/// Get current process memory usage in MB (Windows implementation).
318#[cfg(target_os = "windows")]
319pub fn get_memory_usage_mb() -> Option<usize> {
320    // Windows implementation using GetProcessMemoryInfo would go here
321    // For now, return None to indicate unavailable
322    None
323}
324
325/// Get current process memory usage in MB (fallback for other platforms).
326#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
327pub fn get_memory_usage_mb() -> Option<usize> {
328    None
329}
330
331/// Estimate memory needed for generating N journal entries.
332///
333/// Returns estimated memory in MB based on typical entry size.
334pub fn estimate_memory_mb(num_entries: usize, avg_lines_per_entry: usize) -> usize {
335    // Rough estimates based on struct sizes:
336    // - JournalEntry header: ~500 bytes
337    // - JournalEntryLine: ~300 bytes each
338    // - Overhead (strings, vecs): ~200 bytes per entry
339    let bytes_per_entry = 500 + (avg_lines_per_entry * 300) + 200;
340    let total_bytes = num_entries * bytes_per_entry;
341
342    // Add 50% overhead for temporary allocations during generation
343    let with_overhead = (total_bytes as f64 * 1.5) as usize;
344
345    // Convert to MB, round up
346    with_overhead.div_ceil(1024 * 1024)
347}
348
349/// Check if there's enough memory for the planned generation.
350pub fn check_sufficient_memory(
351    planned_entries: usize,
352    avg_lines: usize,
353    available_limit_mb: usize,
354) -> Result<(), String> {
355    let estimated = estimate_memory_mb(planned_entries, avg_lines);
356
357    if available_limit_mb > 0 && estimated > available_limit_mb {
358        Err(format!(
359            "Estimated memory requirement ({} MB) exceeds limit ({} MB). \
360             Reduce transaction count from {} to approximately {}",
361            estimated,
362            available_limit_mb,
363            planned_entries,
364            (planned_entries * available_limit_mb) / estimated
365        ))
366    } else {
367        Ok(())
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    #[test]
376    fn test_memory_guard_creation() {
377        let guard = MemoryGuard::with_limit(1024);
378        assert_eq!(guard.config.hard_limit_mb, 1024);
379        assert_eq!(guard.config.soft_limit_mb, 819); // 80% of 1024
380    }
381
382    #[test]
383    fn test_memory_guard_disabled() {
384        let guard = MemoryGuard::default_guard();
385        // Should always succeed when disabled
386        assert!(guard.check().is_ok());
387        assert!(guard.check_now().is_ok());
388    }
389
390    #[test]
391    fn test_memory_estimation() {
392        let est = estimate_memory_mb(1000, 4);
393        assert!(est > 0);
394        assert!(est < 100); // Should be reasonable for 1000 entries
395    }
396
397    #[test]
398    fn test_sufficient_memory_check() {
399        // Should pass with high limit
400        assert!(check_sufficient_memory(1000, 4, 1024).is_ok());
401
402        // Should fail with low limit
403        let result = check_sufficient_memory(1_000_000, 10, 100);
404        assert!(result.is_err());
405    }
406
407    #[test]
408    fn test_stats_tracking() {
409        let guard = MemoryGuard::with_limit(10000); // High limit to avoid errors
410
411        // Perform some checks
412        for _ in 0..1000 {
413            let _ = guard.check();
414        }
415
416        let stats = guard.stats();
417        assert!(stats.checks_performed > 0);
418    }
419
420    #[test]
421    fn test_is_available() {
422        // This will vary by platform
423        #[cfg(target_os = "linux")]
424        assert!(MemoryGuard::is_available());
425    }
426}