vibesql_executor/memory/
controller.rs

1//! Memory controller for bounded query execution
2//!
3//! This module provides memory budget management for operators that need to
4//! track and limit memory usage, supporting disk spilling when memory is exhausted.
5//!
6//! # Architecture
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────┐
10//! │                    MemoryController                          │
11//! │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
12//! │  │ Budget Pool │  │  Tracking   │  │ Spill Mgr   │         │
13//! │  │ (configurable)│ │ (per-operator)│ │ (temp files)│        │
14//! │  └─────────────┘  └─────────────┘  └─────────────┘         │
15//! └─────────────────────────────────────────────────────────────┘
16//!            │                │                │
17//!            ▼                ▼                ▼
18//! ┌──────────────┐  ┌──────────────┐  ┌──────────────┐
19//! │ External     │  │ External     │  │ External     │
20//! │ Sort         │  │ Aggregate    │  │ Hash Join    │
21//! └──────────────┘  └──────────────┘  └──────────────┘
22//! ```
23//!
24//! # Design Decisions
25//!
26//! 1. **Thread-safe**: Uses atomic operations for concurrent access
27//! 2. **Non-blocking reservations**: `try_reserve` never blocks, operators decide whether to spill
28//!    based on return value
29//! 3. **Configurable via environment**: Memory limits can be overridden
30//! 4. **Platform-aware**: Different defaults for native vs WASM
31
32use std::{
33    env,
34    path::PathBuf,
35    sync::{
36        atomic::{AtomicUsize, Ordering},
37        Arc,
38    },
39};
40
41/// Default memory budget: 1GB
42/// Conservative default that works on most systems
43pub const DEFAULT_MEMORY_BUDGET: usize = 1024 * 1024 * 1024; // 1 GB
44
45/// Default spill threshold: 80%
46/// Start spilling when 80% of budget is used
47pub const DEFAULT_SPILL_THRESHOLD: f64 = 0.8;
48
49/// Default target partition size for external operators: 64MB
50/// Tuned for good I/O efficiency while limiting memory per partition
51pub const DEFAULT_TARGET_PARTITION_BYTES: usize = 64 * 1024 * 1024; // 64 MB
52
53/// Minimum memory for an operator: 4MB
54/// Below this, operators may not function correctly
55pub const MIN_OPERATOR_MEMORY: usize = 4 * 1024 * 1024; // 4 MB
56
57/// Configuration for memory-bounded execution
58#[derive(Debug, Clone)]
59pub struct MemoryConfig {
60    /// Total memory budget for query execution
61    pub budget_bytes: usize,
62
63    /// Directory for spill files (defaults to system temp)
64    pub temp_directory: PathBuf,
65
66    /// Threshold at which to trigger spilling (0.0 - 1.0)
67    pub spill_threshold: f64,
68
69    /// Target size for each partition in external operators
70    pub target_partition_bytes: usize,
71}
72
73impl Default for MemoryConfig {
74    fn default() -> Self {
75        // Check environment variables for overrides
76        let budget_bytes = env::var("VIBESQL_MEMORY_LIMIT")
77            .ok()
78            .and_then(|s| parse_memory_size(&s))
79            .unwrap_or(DEFAULT_MEMORY_BUDGET);
80
81        let temp_directory = env::var("VIBESQL_TEMP_DIR")
82            .ok()
83            .map(PathBuf::from)
84            .unwrap_or_else(|| env::temp_dir().join("vibesql"));
85
86        let spill_threshold = env::var("VIBESQL_SPILL_THRESHOLD")
87            .ok()
88            .and_then(|s| s.parse::<f64>().ok())
89            .filter(|&t| (0.0..=1.0).contains(&t))
90            .unwrap_or(DEFAULT_SPILL_THRESHOLD);
91
92        let target_partition_bytes = env::var("VIBESQL_PARTITION_SIZE")
93            .ok()
94            .and_then(|s| parse_memory_size(&s))
95            .unwrap_or(DEFAULT_TARGET_PARTITION_BYTES);
96
97        Self { budget_bytes, temp_directory, spill_threshold, target_partition_bytes }
98    }
99}
100
101impl MemoryConfig {
102    /// Create a new configuration with the specified budget
103    pub fn with_budget(budget_bytes: usize) -> Self {
104        Self { budget_bytes, ..Default::default() }
105    }
106
107    /// Create a configuration with a specific temp directory
108    pub fn with_temp_dir(mut self, path: PathBuf) -> Self {
109        self.temp_directory = path;
110        self
111    }
112
113    /// Set the spill threshold (0.0 - 1.0)
114    pub fn with_spill_threshold(mut self, threshold: f64) -> Self {
115        self.spill_threshold = threshold.clamp(0.0, 1.0);
116        self
117    }
118}
119
120/// Statistics snapshot from the memory controller
121///
122/// A point-in-time view of memory usage and spill statistics.
123/// Useful for monitoring, debugging, and query profiling.
124#[derive(Debug, Clone)]
125pub struct MemoryStats {
126    /// Total memory budget
127    pub budget_bytes: usize,
128    /// Currently reserved memory
129    pub reserved_bytes: usize,
130    /// Peak memory usage (high water mark)
131    pub peak_bytes: usize,
132    /// Total bytes written to disk during spills
133    pub bytes_spilled: usize,
134    /// Number of spill operations performed
135    pub spill_count: usize,
136    /// Number of currently active reservations
137    pub active_reservations: usize,
138    /// Spill threshold (0.0 - 1.0)
139    pub spill_threshold: f64,
140}
141
142impl MemoryStats {
143    /// Get memory utilization as a percentage (0.0 - 1.0)
144    pub fn utilization(&self) -> f64 {
145        if self.budget_bytes == 0 {
146            0.0
147        } else {
148            self.reserved_bytes as f64 / self.budget_bytes as f64
149        }
150    }
151
152    /// Check if memory is under pressure (above spill threshold)
153    pub fn is_under_pressure(&self) -> bool {
154        self.utilization() >= self.spill_threshold
155    }
156
157    /// Get available memory
158    pub fn available_bytes(&self) -> usize {
159        self.budget_bytes.saturating_sub(self.reserved_bytes)
160    }
161}
162
163impl std::fmt::Display for MemoryStats {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        write!(
166            f,
167            "Memory: {}/{} ({:.1}%), peak: {}, spilled: {} ({} ops)",
168            format_bytes(self.reserved_bytes),
169            format_bytes(self.budget_bytes),
170            self.utilization() * 100.0,
171            format_bytes(self.peak_bytes),
172            format_bytes(self.bytes_spilled),
173            self.spill_count,
174        )
175    }
176}
177
178/// Format bytes as human-readable string
179fn format_bytes(bytes: usize) -> String {
180    if bytes >= 1024 * 1024 * 1024 {
181        format!("{:.2}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
182    } else if bytes >= 1024 * 1024 {
183        format!("{:.2}MB", bytes as f64 / (1024.0 * 1024.0))
184    } else if bytes >= 1024 {
185        format!("{:.2}KB", bytes as f64 / 1024.0)
186    } else {
187        format!("{}B", bytes)
188    }
189}
190
191/// Parse memory size strings like "4GB", "512MB", "1024K", "1073741824"
192fn parse_memory_size(s: &str) -> Option<usize> {
193    let s = s.trim().to_uppercase();
194
195    // Try parsing as pure number first
196    if let Ok(n) = s.parse::<usize>() {
197        return Some(n);
198    }
199
200    // Parse with suffix
201    let (num_str, multiplier) = if let Some(num) = s.strip_suffix("GB") {
202        (num, 1024 * 1024 * 1024)
203    } else if let Some(num) = s.strip_suffix("G") {
204        (num, 1024 * 1024 * 1024)
205    } else if let Some(num) = s.strip_suffix("MB") {
206        (num, 1024 * 1024)
207    } else if let Some(num) = s.strip_suffix("M") {
208        (num, 1024 * 1024)
209    } else if let Some(num) = s.strip_suffix("KB") {
210        (num, 1024)
211    } else if let Some(num) = s.strip_suffix("K") {
212        (num, 1024)
213    } else {
214        return None;
215    };
216
217    num_str.trim().parse::<usize>().ok().map(|n| n * multiplier)
218}
219
220/// Global memory controller for query execution
221///
222/// Manages a shared memory budget across all operators in a query.
223/// Thread-safe and supports concurrent reservations.
224///
225/// # Example
226///
227/// ```
228/// use std::sync::Arc;
229/// use vibesql_executor::memory::{MemoryController, MemoryConfig};
230///
231/// let controller = Arc::new(MemoryController::new(MemoryConfig::with_budget(1024 * 1024 * 1024)));
232///
233/// // Create a reservation for a sort operator
234/// let mut reservation = controller.create_reservation();
235///
236/// // Try to reserve memory for sorted runs
237/// assert!(reservation.try_grow(1024 * 1024)); // Should succeed with 1GB budget
238/// assert_eq!(reservation.reserved(), 1024 * 1024);
239/// ```
240pub struct MemoryController {
241    /// Configuration
242    config: MemoryConfig,
243
244    /// Total reserved memory across all operators (bytes)
245    reserved: AtomicUsize,
246
247    /// Number of active reservations (for debugging/metrics)
248    active_reservations: AtomicUsize,
249
250    /// Total bytes spilled to disk (for metrics)
251    bytes_spilled: AtomicUsize,
252
253    /// Number of spill operations (for metrics)
254    spill_count: AtomicUsize,
255
256    /// Peak memory usage (high water mark)
257    peak_memory: AtomicUsize,
258}
259
260impl MemoryController {
261    /// Create a new memory controller with the given configuration
262    pub fn new(config: MemoryConfig) -> Self {
263        Self {
264            config,
265            reserved: AtomicUsize::new(0),
266            active_reservations: AtomicUsize::new(0),
267            bytes_spilled: AtomicUsize::new(0),
268            spill_count: AtomicUsize::new(0),
269            peak_memory: AtomicUsize::new(0),
270        }
271    }
272
273    /// Create a memory controller with default configuration
274    pub fn with_defaults() -> Self {
275        Self::new(MemoryConfig::default())
276    }
277
278    /// Create a memory controller with a specific budget
279    pub fn with_budget(budget_bytes: usize) -> Self {
280        Self::new(MemoryConfig::with_budget(budget_bytes))
281    }
282
283    /// Get the total memory budget
284    pub fn budget(&self) -> usize {
285        self.config.budget_bytes
286    }
287
288    /// Get the currently reserved memory
289    pub fn reserved(&self) -> usize {
290        self.reserved.load(Ordering::Relaxed)
291    }
292
293    /// Get available memory (budget - reserved)
294    pub fn available(&self) -> usize {
295        let budget = self.config.budget_bytes;
296        let reserved = self.reserved.load(Ordering::Relaxed);
297        budget.saturating_sub(reserved)
298    }
299
300    /// Get the spill threshold in bytes
301    pub fn spill_threshold_bytes(&self) -> usize {
302        (self.config.budget_bytes as f64 * self.config.spill_threshold) as usize
303    }
304
305    /// Check if memory pressure is high (above spill threshold)
306    pub fn should_spill(&self) -> bool {
307        self.reserved() >= self.spill_threshold_bytes()
308    }
309
310    /// Get the temporary directory for spill files
311    pub fn temp_directory(&self) -> &PathBuf {
312        &self.config.temp_directory
313    }
314
315    /// Get the target partition size for external operators
316    pub fn target_partition_bytes(&self) -> usize {
317        self.config.target_partition_bytes
318    }
319
320    /// Create a new memory reservation
321    ///
322    /// Returns a reservation handle that tracks memory for a single operator.
323    /// When the reservation is dropped, its memory is released.
324    pub fn create_reservation(self: &Arc<Self>) -> MemoryReservation {
325        self.active_reservations.fetch_add(1, Ordering::Relaxed);
326        MemoryReservation { controller: Arc::clone(self), reserved: 0 }
327    }
328
329    /// Record that bytes were spilled to disk (for metrics)
330    pub fn record_spill(&self, bytes: usize) {
331        self.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
332        self.spill_count.fetch_add(1, Ordering::Relaxed);
333    }
334
335    /// Get total bytes spilled to disk
336    pub fn bytes_spilled(&self) -> usize {
337        self.bytes_spilled.load(Ordering::Relaxed)
338    }
339
340    /// Get number of spill operations
341    pub fn spill_count(&self) -> usize {
342        self.spill_count.load(Ordering::Relaxed)
343    }
344
345    /// Get peak memory usage
346    pub fn peak_memory(&self) -> usize {
347        self.peak_memory.load(Ordering::Relaxed)
348    }
349
350    /// Get number of active reservations
351    pub fn active_reservations(&self) -> usize {
352        self.active_reservations.load(Ordering::Relaxed)
353    }
354
355    /// Get comprehensive statistics snapshot
356    pub fn stats(&self) -> MemoryStats {
357        MemoryStats {
358            budget_bytes: self.config.budget_bytes,
359            reserved_bytes: self.reserved.load(Ordering::Relaxed),
360            peak_bytes: self.peak_memory.load(Ordering::Relaxed),
361            bytes_spilled: self.bytes_spilled.load(Ordering::Relaxed),
362            spill_count: self.spill_count.load(Ordering::Relaxed),
363            active_reservations: self.active_reservations.load(Ordering::Relaxed),
364            spill_threshold: self.config.spill_threshold,
365        }
366    }
367
368    /// Try to reserve memory, returning true if successful
369    ///
370    /// This is the internal method called by MemoryReservation
371    fn try_reserve(&self, bytes: usize) -> bool {
372        let budget = self.config.budget_bytes;
373
374        loop {
375            let current = self.reserved.load(Ordering::Relaxed);
376            let new_reserved = current.saturating_add(bytes);
377
378            if new_reserved > budget {
379                return false;
380            }
381
382            // Try to atomically update
383            match self.reserved.compare_exchange_weak(
384                current,
385                new_reserved,
386                Ordering::SeqCst,
387                Ordering::Relaxed,
388            ) {
389                Ok(_) => {
390                    // Update peak memory if this is a new high
391                    self.update_peak_memory(new_reserved);
392                    return true;
393                }
394                Err(_) => continue, // Retry on contention
395            }
396        }
397    }
398
399    /// Update peak memory tracking
400    fn update_peak_memory(&self, current: usize) {
401        let mut peak = self.peak_memory.load(Ordering::Relaxed);
402        while current > peak {
403            match self.peak_memory.compare_exchange_weak(
404                peak,
405                current,
406                Ordering::Relaxed,
407                Ordering::Relaxed,
408            ) {
409                Ok(_) => break,
410                Err(actual) => peak = actual,
411            }
412        }
413    }
414
415    /// Release reserved memory
416    ///
417    /// This is the internal method called by MemoryReservation
418    fn release(&self, bytes: usize) {
419        self.reserved.fetch_sub(bytes, Ordering::Relaxed);
420    }
421
422    /// Decrement active reservation count
423    fn release_reservation(&self) {
424        self.active_reservations.fetch_sub(1, Ordering::Relaxed);
425    }
426}
427
428/// A memory reservation for a single operator
429///
430/// Tracks memory usage for one operator (sort, aggregate, join, etc.)
431/// and automatically releases memory when dropped.
432///
433/// # Example
434///
435/// ```text
436/// let controller = Arc::new(MemoryController::with_budget(1024 * 1024 * 1024));
437/// let mut reservation = controller.create_reservation();
438///
439/// // Accumulate memory as we process data
440/// for batch in batches {
441///     let batch_size = batch.size_in_bytes();
442///
443///     if !reservation.try_grow(batch_size) {
444///         // Memory exhausted - spill current data to disk
445///         spill_to_disk(&accumulated_data);
446///         reservation.shrink(accumulated_data.size_in_bytes());
447///     }
448///
449///     // Process batch...
450/// }
451/// ```
452pub struct MemoryReservation {
453    /// Reference to the parent controller
454    controller: Arc<MemoryController>,
455
456    /// Memory reserved by this operator (bytes)
457    reserved: usize,
458}
459
460impl MemoryReservation {
461    /// Get the amount of memory reserved by this operator
462    pub fn reserved(&self) -> usize {
463        self.reserved
464    }
465
466    /// Try to grow the reservation by the specified amount
467    ///
468    /// Returns true if the memory was reserved, false if there's
469    /// not enough budget available. The caller should spill to disk
470    /// if this returns false.
471    pub fn try_grow(&mut self, additional: usize) -> bool {
472        if self.controller.try_reserve(additional) {
473            self.reserved = self.reserved.saturating_add(additional);
474            true
475        } else {
476            false
477        }
478    }
479
480    /// Shrink the reservation by the specified amount
481    ///
482    /// Call this after spilling data to disk to free up budget
483    /// for new data.
484    pub fn shrink(&mut self, bytes: usize) {
485        let to_release = bytes.min(self.reserved);
486        self.controller.release(to_release);
487        self.reserved = self.reserved.saturating_sub(to_release);
488    }
489
490    /// Release all reserved memory
491    ///
492    /// Equivalent to `shrink(self.reserved())`
493    pub fn release_all(&mut self) {
494        if self.reserved > 0 {
495            self.controller.release(self.reserved);
496            self.reserved = 0;
497        }
498    }
499
500    /// Check if memory pressure is high and spilling is recommended
501    pub fn should_spill(&self) -> bool {
502        self.controller.should_spill()
503    }
504
505    /// Check if trying to grow by the given amount would exceed budget
506    ///
507    /// This is a non-mutating check useful for deciding whether to
508    /// accumulate more data or trigger a spill first.
509    pub fn would_exceed_budget(&self, additional: usize) -> bool {
510        let current_total = self.controller.reserved();
511        current_total.saturating_add(additional) > self.controller.budget()
512    }
513
514    /// Get the controller's budget
515    pub fn budget(&self) -> usize {
516        self.controller.budget()
517    }
518
519    /// Get the temporary directory for spill files
520    pub fn temp_directory(&self) -> &PathBuf {
521        self.controller.temp_directory()
522    }
523
524    /// Get the target partition size
525    pub fn target_partition_bytes(&self) -> usize {
526        self.controller.target_partition_bytes()
527    }
528
529    /// Record that data was spilled to disk
530    pub fn record_spill(&self, bytes: usize) {
531        self.controller.record_spill(bytes);
532    }
533}
534
535impl Drop for MemoryReservation {
536    fn drop(&mut self) {
537        // Release all reserved memory when the reservation is dropped
538        if self.reserved > 0 {
539            self.controller.release(self.reserved);
540        }
541        self.controller.release_reservation();
542    }
543}
544
545// MemoryController is thread-safe
546unsafe impl Send for MemoryController {}
547unsafe impl Sync for MemoryController {}
548
549#[cfg(test)]
550mod tests {
551    use std::sync::Arc;
552
553    use super::*;
554
555    #[test]
556    fn test_memory_size_parsing() {
557        assert_eq!(parse_memory_size("1024"), Some(1024));
558        assert_eq!(parse_memory_size("1KB"), Some(1024));
559        assert_eq!(parse_memory_size("1K"), Some(1024));
560        assert_eq!(parse_memory_size("1MB"), Some(1024 * 1024));
561        assert_eq!(parse_memory_size("1M"), Some(1024 * 1024));
562        assert_eq!(parse_memory_size("1GB"), Some(1024 * 1024 * 1024));
563        assert_eq!(parse_memory_size("1G"), Some(1024 * 1024 * 1024));
564        assert_eq!(parse_memory_size("4GB"), Some(4 * 1024 * 1024 * 1024));
565        assert_eq!(parse_memory_size("512mb"), Some(512 * 1024 * 1024));
566        assert_eq!(parse_memory_size(" 100 "), Some(100));
567        assert_eq!(parse_memory_size("invalid"), None);
568    }
569
570    #[test]
571    fn test_controller_creation() {
572        let controller = Arc::new(MemoryController::with_budget(1024 * 1024));
573        assert_eq!(controller.budget(), 1024 * 1024);
574        assert_eq!(controller.reserved(), 0);
575        assert_eq!(controller.available(), 1024 * 1024);
576    }
577
578    #[test]
579    fn test_reservation_try_grow() {
580        let controller = Arc::new(MemoryController::with_budget(1024));
581        let mut reservation = controller.create_reservation();
582
583        assert!(reservation.try_grow(512));
584        assert_eq!(reservation.reserved(), 512);
585        assert_eq!(controller.reserved(), 512);
586
587        assert!(reservation.try_grow(256));
588        assert_eq!(reservation.reserved(), 768);
589        assert_eq!(controller.reserved(), 768);
590
591        // Should fail - would exceed budget
592        assert!(!reservation.try_grow(512));
593        assert_eq!(reservation.reserved(), 768);
594        assert_eq!(controller.reserved(), 768);
595
596        // Can still grow within budget
597        assert!(reservation.try_grow(256));
598        assert_eq!(reservation.reserved(), 1024);
599    }
600
601    #[test]
602    fn test_reservation_shrink() {
603        let controller = Arc::new(MemoryController::with_budget(1024));
604        let mut reservation = controller.create_reservation();
605
606        reservation.try_grow(1024);
607        assert_eq!(reservation.reserved(), 1024);
608
609        reservation.shrink(512);
610        assert_eq!(reservation.reserved(), 512);
611        assert_eq!(controller.reserved(), 512);
612
613        // Shrink more than reserved clamps to reserved
614        reservation.shrink(1024);
615        assert_eq!(reservation.reserved(), 0);
616        assert_eq!(controller.reserved(), 0);
617    }
618
619    #[test]
620    fn test_reservation_drop_releases_memory() {
621        let controller = Arc::new(MemoryController::with_budget(1024));
622
623        {
624            let mut reservation = controller.create_reservation();
625            reservation.try_grow(512);
626            assert_eq!(controller.reserved(), 512);
627        }
628
629        // Memory should be released when reservation is dropped
630        assert_eq!(controller.reserved(), 0);
631    }
632
633    #[test]
634    fn test_multiple_reservations() {
635        let controller = Arc::new(MemoryController::with_budget(1024));
636
637        let mut res1 = controller.create_reservation();
638        let mut res2 = controller.create_reservation();
639
640        assert!(res1.try_grow(300));
641        assert!(res2.try_grow(300));
642        assert_eq!(controller.reserved(), 600);
643
644        assert!(res1.try_grow(200));
645        assert!(res2.try_grow(200));
646        assert_eq!(controller.reserved(), 1000);
647
648        // Both reservations together exceed budget
649        assert!(!res1.try_grow(100));
650        assert!(!res2.try_grow(100));
651
652        // Drop one reservation
653        drop(res1);
654        assert_eq!(controller.reserved(), 500);
655
656        // Now res2 can grow
657        assert!(res2.try_grow(400));
658        assert_eq!(controller.reserved(), 900);
659    }
660
661    #[test]
662    fn test_should_spill() {
663        let config = MemoryConfig {
664            budget_bytes: 1000,
665            spill_threshold: 0.8, // 800 bytes
666            temp_directory: std::env::temp_dir(),
667            target_partition_bytes: DEFAULT_TARGET_PARTITION_BYTES,
668        };
669        let controller = Arc::new(MemoryController::new(config));
670
671        let mut reservation = controller.create_reservation();
672
673        reservation.try_grow(700);
674        assert!(!reservation.should_spill());
675
676        reservation.try_grow(100);
677        assert!(reservation.should_spill());
678    }
679
680    #[test]
681    fn test_spill_tracking() {
682        let controller = Arc::new(MemoryController::with_budget(1024));
683        assert_eq!(controller.bytes_spilled(), 0);
684
685        controller.record_spill(100);
686        assert_eq!(controller.bytes_spilled(), 100);
687
688        controller.record_spill(200);
689        assert_eq!(controller.bytes_spilled(), 300);
690    }
691
692    #[test]
693    fn test_active_reservation_count() {
694        let controller = Arc::new(MemoryController::with_budget(1024));
695        assert_eq!(controller.active_reservations(), 0);
696
697        let _res1 = controller.create_reservation();
698        assert_eq!(controller.active_reservations(), 1);
699
700        let _res2 = controller.create_reservation();
701        assert_eq!(controller.active_reservations(), 2);
702
703        drop(_res1);
704        assert_eq!(controller.active_reservations(), 1);
705
706        drop(_res2);
707        assert_eq!(controller.active_reservations(), 0);
708    }
709
710    #[test]
711    fn test_concurrent_reservations() {
712        use std::thread;
713
714        let controller = Arc::new(MemoryController::with_budget(10_000));
715        let mut handles = vec![];
716
717        // Spawn 10 threads, each trying to reserve 500 bytes
718        for _ in 0..10 {
719            let controller = Arc::clone(&controller);
720            handles.push(thread::spawn(move || {
721                let mut reservation = controller.create_reservation();
722                reservation.try_grow(500);
723                // Hold reservation for a bit
724                thread::sleep(std::time::Duration::from_millis(10));
725                reservation.reserved()
726            }));
727        }
728
729        // Wait for all threads and collect results
730        let reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
731
732        // All threads should have reserved 500 bytes
733        assert_eq!(reserved, 5000);
734
735        // All memory should be released after threads complete
736        assert_eq!(controller.reserved(), 0);
737    }
738
739    #[test]
740    fn test_memory_stats() {
741        let config = MemoryConfig {
742            budget_bytes: 1000,
743            spill_threshold: 0.8,
744            temp_directory: std::env::temp_dir(),
745            target_partition_bytes: DEFAULT_TARGET_PARTITION_BYTES,
746        };
747        let controller = Arc::new(MemoryController::new(config));
748
749        // Initial stats
750        let stats = controller.stats();
751        assert_eq!(stats.budget_bytes, 1000);
752        assert_eq!(stats.reserved_bytes, 0);
753        assert_eq!(stats.peak_bytes, 0);
754        assert_eq!(stats.bytes_spilled, 0);
755        assert_eq!(stats.spill_count, 0);
756        assert_eq!(stats.active_reservations, 0);
757        assert_eq!(stats.spill_threshold, 0.8);
758
759        // After reservations
760        let mut res = controller.create_reservation();
761        res.try_grow(500);
762
763        let stats = controller.stats();
764        assert_eq!(stats.reserved_bytes, 500);
765        assert_eq!(stats.peak_bytes, 500);
766        assert_eq!(stats.active_reservations, 1);
767
768        // Test utilization
769        assert!((stats.utilization() - 0.5).abs() < 0.001);
770        assert_eq!(stats.available_bytes(), 500);
771        assert!(!stats.is_under_pressure()); // 50% < 80%
772
773        // Go above spill threshold
774        res.try_grow(400);
775        let stats = controller.stats();
776        assert!(stats.is_under_pressure()); // 90% >= 80%
777    }
778
779    #[test]
780    fn test_peak_memory_tracking() {
781        let controller = Arc::new(MemoryController::with_budget(1000));
782
783        // Reserve and release
784        {
785            let mut res = controller.create_reservation();
786            res.try_grow(800);
787            assert_eq!(controller.peak_memory(), 800);
788        }
789
790        // Memory released but peak preserved
791        assert_eq!(controller.reserved(), 0);
792        assert_eq!(controller.peak_memory(), 800);
793
794        // New peak
795        {
796            let mut res = controller.create_reservation();
797            res.try_grow(900);
798            assert_eq!(controller.peak_memory(), 900);
799        }
800
801        // Lower usage doesn't affect peak
802        {
803            let mut res = controller.create_reservation();
804            res.try_grow(100);
805            assert_eq!(controller.peak_memory(), 900);
806        }
807    }
808
809    #[test]
810    fn test_memory_stats_display() {
811        let stats = MemoryStats {
812            budget_bytes: 1024 * 1024 * 1024,
813            reserved_bytes: 512 * 1024 * 1024,
814            peak_bytes: 950 * 1024 * 1024,
815            bytes_spilled: 2 * 1024 * 1024 * 1024,
816            spill_count: 3,
817            active_reservations: 2,
818            spill_threshold: 0.8,
819        };
820
821        let display = format!("{}", stats);
822        assert!(display.contains("512.00MB"));
823        assert!(display.contains("1.00GB"));
824        assert!(display.contains("50.0%"));
825        assert!(display.contains("950.00MB"));
826        assert!(display.contains("2.00GB"));
827        assert!(display.contains("3 ops"));
828    }
829
830    #[test]
831    fn test_spill_count_tracking() {
832        let controller = Arc::new(MemoryController::with_budget(1024));
833
834        assert_eq!(controller.spill_count(), 0);
835        assert_eq!(controller.bytes_spilled(), 0);
836
837        controller.record_spill(100);
838        assert_eq!(controller.spill_count(), 1);
839        assert_eq!(controller.bytes_spilled(), 100);
840
841        controller.record_spill(200);
842        assert_eq!(controller.spill_count(), 2);
843        assert_eq!(controller.bytes_spilled(), 300);
844
845        controller.record_spill(50);
846        assert_eq!(controller.spill_count(), 3);
847        assert_eq!(controller.bytes_spilled(), 350);
848    }
849
850    #[test]
851    fn test_format_bytes_helper() {
852        // Test the format_bytes function through MemoryStats Display
853        let make_stats = |bytes| MemoryStats {
854            budget_bytes: bytes,
855            reserved_bytes: bytes,
856            peak_bytes: bytes,
857            bytes_spilled: 0,
858            spill_count: 0,
859            active_reservations: 0,
860            spill_threshold: 0.8,
861        };
862
863        let s = format!("{}", make_stats(500));
864        assert!(s.contains("500B"));
865
866        let s = format!("{}", make_stats(2048));
867        assert!(s.contains("2.00KB"));
868
869        let s = format!("{}", make_stats(5 * 1024 * 1024));
870        assert!(s.contains("5.00MB"));
871
872        let s = format!("{}", make_stats(3 * 1024 * 1024 * 1024));
873        assert!(s.contains("3.00GB"));
874    }
875}