sochdb_storage/
memory.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Memory pressure handling and resource limits
16//!
17//! Prevents OOM crashes by implementing memory budgets and backpressure.
18//! Addresses Task 7 from task.md: Missing Memory Pressure Handling.
19//!
20//! ## WriteBufferManager (jj.md Task 2)
21//!
22//! Coordinates memory usage across active and immutable memtables to prevent OOM:
23//! - Tracks total buffer memory (active + immutable memtables)
24//! - Enforces soft/hard limits with backpressure
25//! - Triggers flush when memory pressure is detected
26//! - Blocks writes when hard limit is exceeded
27
28use parking_lot::Condvar;
29use parking_lot::Mutex;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32
33/// Memory budget configuration for the storage engine
34#[derive(Debug, Clone)]
35pub struct MemoryBudget {
36    /// Maximum memory for all components (bytes)
37    pub total_budget: u64,
38    /// Maximum memory for active memtable
39    pub memtable_budget: u64,
40    /// Maximum memory for immutable memtables
41    pub immutable_memtables_budget: u64,
42    /// Maximum memory for block cache
43    pub block_cache_budget: u64,
44    /// Percentage of budget at which to trigger early flush (0.0-1.0)
45    pub soft_limit: f64,
46    /// Percentage of budget at which to block writes (0.0-1.0)
47    pub hard_limit: f64,
48}
49
50impl Default for MemoryBudget {
51    fn default() -> Self {
52        Self {
53            total_budget: 512 * 1024 * 1024,               // 512 MB default
54            memtable_budget: 32 * 1024 * 1024,             // 32 MB per memtable
55            immutable_memtables_budget: 128 * 1024 * 1024, // 128 MB for immutable
56            block_cache_budget: 256 * 1024 * 1024,         // 256 MB for cache
57            soft_limit: 0.80,                              // 80% - trigger early flush
58            hard_limit: 0.95,                              // 95% - block writes
59        }
60    }
61}
62
63impl MemoryBudget {
64    /// Create budget from available system memory percentage
65    ///
66    /// Example: `from_system_memory_percent(0.25)` uses 25% of available RAM
67    pub fn from_system_memory_percent(percent: f64) -> Self {
68        let available_bytes = Self::get_available_memory();
69        let total_budget = (available_bytes as f64 * percent) as u64;
70
71        Self {
72            total_budget,
73            memtable_budget: total_budget / 16,
74            immutable_memtables_budget: total_budget / 4,
75            block_cache_budget: total_budget / 2,
76            soft_limit: 0.80,
77            hard_limit: 0.95,
78        }
79    }
80
81    /// Get available system memory in bytes
82    ///
83    /// Platform-specific implementations:
84    /// - Linux: Reads /proc/meminfo
85    /// - macOS/BSD: Uses sysctl hw.memsize
86    /// - Windows: Uses GlobalMemoryStatusEx
87    /// - Fallback: Returns conservative 1GB estimate
88    fn get_available_memory() -> u64 {
89        #[cfg(target_os = "linux")]
90        {
91            Self::linux_available_memory().unwrap_or(1024 * 1024 * 1024)
92        }
93
94        #[cfg(target_os = "macos")]
95        {
96            Self::macos_available_memory().unwrap_or(1024 * 1024 * 1024)
97        }
98
99        #[cfg(target_os = "windows")]
100        {
101            Self::windows_available_memory().unwrap_or(1024 * 1024 * 1024)
102        }
103
104        #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
105        {
106            // Conservative fallback: 1GB
107            1024 * 1024 * 1024
108        }
109    }
110
111    #[cfg(target_os = "linux")]
112    fn linux_available_memory() -> Option<u64> {
113        use std::fs::read_to_string;
114
115        let meminfo = read_to_string("/proc/meminfo").ok()?;
116
117        // Prefer MemAvailable (includes reclaimable memory)
118        // Fall back to MemFree if not available
119        let mut mem_available = None;
120        let mut mem_free = None;
121
122        for line in meminfo.lines() {
123            if line.starts_with("MemAvailable:") {
124                let parts: Vec<&str> = line.split_whitespace().collect();
125                if parts.len() >= 2 {
126                    mem_available = parts[1].parse::<u64>().ok();
127                }
128            } else if line.starts_with("MemFree:") {
129                let parts: Vec<&str> = line.split_whitespace().collect();
130                if parts.len() >= 2 {
131                    mem_free = parts[1].parse::<u64>().ok();
132                }
133            }
134        }
135
136        // Convert KB to bytes
137        mem_available.or(mem_free).map(|kb| kb * 1024)
138    }
139
140    #[cfg(target_os = "macos")]
141    fn macos_available_memory() -> Option<u64> {
142        use std::process::Command;
143
144        // Get total physical memory via sysctl
145        let output = Command::new("sysctl")
146            .args(["-n", "hw.memsize"])
147            .output()
148            .ok()?;
149
150        let mem_bytes: u64 = String::from_utf8_lossy(&output.stdout)
151            .trim()
152            .parse()
153            .ok()?;
154
155        // hw.memsize returns total RAM - use 90% as "available" approximation
156        // This is more conservative than caching would be but safer
157        Some((mem_bytes as f64 * 0.9) as u64)
158    }
159
160    #[cfg(target_os = "windows")]
161    fn windows_available_memory() -> Option<u64> {
162        // For Windows, we could use GlobalMemoryStatusEx via winapi
163        // For now, return None to use fallback (1GB)
164        // TODO: Add winapi dependency and implement proper detection
165        None
166    }
167}
168
169/// Memory usage tracker with pressure detection
170pub struct MemoryTracker {
171    /// Current memory usage estimate
172    current_usage: Arc<AtomicU64>,
173    /// Memory budget configuration
174    budget: MemoryBudget,
175    /// Whether system is under memory pressure
176    under_pressure: Arc<AtomicBool>,
177}
178
179impl MemoryTracker {
180    /// Create new memory tracker with given budget
181    pub fn new(budget: MemoryBudget) -> Self {
182        Self {
183            current_usage: Arc::new(AtomicU64::new(0)),
184            budget,
185            under_pressure: Arc::new(AtomicBool::new(false)),
186        }
187    }
188
189    /// Record memory allocation
190    pub fn allocate(&self, bytes: u64) {
191        let new_usage = self.current_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
192        self.check_pressure(new_usage);
193    }
194
195    /// Record memory deallocation
196    pub fn deallocate(&self, bytes: u64) {
197        let prev_usage = self.current_usage.fetch_sub(bytes, Ordering::Relaxed);
198        let new_usage = prev_usage.saturating_sub(bytes);
199        self.check_pressure(new_usage);
200    }
201
202    /// Check if under memory pressure and update flag
203    fn check_pressure(&self, current: u64) {
204        let pressure = current as f64 >= (self.budget.total_budget as f64 * self.budget.soft_limit);
205        self.under_pressure.store(pressure, Ordering::Relaxed);
206    }
207
208    /// Check if writes should be blocked (hard limit exceeded)
209    pub fn should_block_writes(&self) -> bool {
210        let current = self.current_usage.load(Ordering::Relaxed);
211        current as f64 >= (self.budget.total_budget as f64 * self.budget.hard_limit)
212    }
213
214    /// Check if early flush should be triggered (soft limit exceeded)
215    pub fn should_trigger_flush(&self) -> bool {
216        self.under_pressure.load(Ordering::Relaxed)
217    }
218
219    /// Get current memory usage
220    pub fn current_usage(&self) -> u64 {
221        self.current_usage.load(Ordering::Relaxed)
222    }
223
224    /// Get memory usage as percentage of budget
225    pub fn usage_percent(&self) -> f64 {
226        let current = self.current_usage.load(Ordering::Relaxed);
227        (current as f64 / self.budget.total_budget as f64) * 100.0
228    }
229
230    /// Reset memory usage counter
231    pub fn reset(&self) {
232        self.current_usage.store(0, Ordering::Relaxed);
233        self.under_pressure.store(false, Ordering::Relaxed);
234    }
235}
236
237/// Write Buffer Manager for coordinating memory across memtables
238///
239/// Implements jj.md Task 2: Write Buffer Manager with Global Memory Coordination
240///
241/// ## Algorithm
242/// ```text
243/// total_buffer_memory = active_memtable.size + Σ(immutable_memtables[i].size)
244///
245/// on_write(bytes):
246///     if total_buffer_memory + bytes > hard_limit:
247///         block_until(total_buffer_memory < soft_limit)
248///     if total_buffer_memory > soft_limit:
249///         trigger_flush(largest_immutable_memtable)
250///     total_buffer_memory += bytes
251///
252/// on_flush_complete(memtable):
253///     total_buffer_memory -= memtable.size
254///     signal_blocked_writers()
255/// ```
256pub struct WriteBufferManager {
257    /// Total memory used by active + immutable memtables
258    total_buffer_memory: AtomicU64,
259    /// Memory budget for all write buffers
260    buffer_limit: u64,
261    /// Soft limit percentage (0.0-1.0) - trigger flush
262    soft_limit_ratio: f64,
263    /// Hard limit percentage (0.0-1.0) - block writes
264    hard_limit_ratio: f64,
265    /// Whether writers are currently blocked
266    writers_blocked: AtomicBool,
267    /// Condition variable for blocked writers
268    write_cv: Condvar,
269    /// Mutex for condition variable
270    write_mutex: Mutex<()>,
271    /// Statistics
272    stats: WriteBufferStats,
273}
274
275/// Statistics for write buffer monitoring
276#[derive(Debug, Default)]
277pub struct WriteBufferStats {
278    /// Number of times writes were blocked
279    pub blocks_count: AtomicU64,
280    /// Total microseconds spent blocked
281    pub blocked_time_us: AtomicU64,
282    /// Number of flushes triggered by soft limit
283    pub soft_limit_flushes: AtomicU64,
284}
285
286impl WriteBufferManager {
287    /// Create a new write buffer manager
288    ///
289    /// # Arguments
290    /// * `buffer_limit` - Maximum total memory for write buffers (bytes)
291    pub fn new(buffer_limit: u64) -> Self {
292        Self {
293            total_buffer_memory: AtomicU64::new(0),
294            buffer_limit,
295            soft_limit_ratio: 0.8,
296            hard_limit_ratio: 0.95,
297            writers_blocked: AtomicBool::new(false),
298            write_cv: Condvar::new(),
299            write_mutex: Mutex::new(()),
300            stats: WriteBufferStats::default(),
301        }
302    }
303
304    /// Create with custom soft/hard limits
305    pub fn with_limits(buffer_limit: u64, soft_limit_ratio: f64, hard_limit_ratio: f64) -> Self {
306        Self {
307            total_buffer_memory: AtomicU64::new(0),
308            buffer_limit,
309            soft_limit_ratio,
310            hard_limit_ratio,
311            writers_blocked: AtomicBool::new(false),
312            write_cv: Condvar::new(),
313            write_mutex: Mutex::new(()),
314            stats: WriteBufferStats::default(),
315        }
316    }
317
318    /// Reserve memory for a write operation
319    ///
320    /// May block if hard limit is exceeded, waiting for flushes to complete.
321    /// Returns true if flush should be triggered (soft limit exceeded).
322    pub fn reserve_memory(&self, bytes: u64) -> bool {
323        let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
324        let hard_limit = (self.buffer_limit as f64 * self.hard_limit_ratio) as u64;
325
326        loop {
327            let current = self.total_buffer_memory.load(Ordering::Acquire);
328            let new_total = current + bytes;
329
330            if new_total > hard_limit {
331                // Block until memory is freed
332                self.writers_blocked.store(true, Ordering::Release);
333                self.stats.blocks_count.fetch_add(1, Ordering::Relaxed);
334
335                let start = std::time::Instant::now();
336                {
337                    let mut guard = self.write_mutex.lock();
338                    // Wait for flush to complete
339                    self.write_cv
340                        .wait_for(&mut guard, std::time::Duration::from_millis(100));
341                }
342                self.stats
343                    .blocked_time_us
344                    .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
345
346                // Retry after waiting
347                continue;
348            }
349
350            // Try to reserve the memory
351            if self
352                .total_buffer_memory
353                .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Acquire)
354                .is_ok()
355            {
356                // Check if we should trigger flush (soft limit)
357                let should_flush = new_total > soft_limit;
358                if should_flush {
359                    self.stats
360                        .soft_limit_flushes
361                        .fetch_add(1, Ordering::Relaxed);
362                }
363                return should_flush;
364            }
365            // CAS failed, retry
366        }
367    }
368
369    /// Release memory after flush completes
370    ///
371    /// Signals any blocked writers to retry.
372    pub fn release_memory(&self, bytes: u64) {
373        self.total_buffer_memory.fetch_sub(bytes, Ordering::AcqRel);
374
375        // Signal blocked writers
376        if self.writers_blocked.swap(false, Ordering::AcqRel) {
377            self.write_cv.notify_all();
378        }
379    }
380
381    /// Get current buffer memory usage
382    pub fn memory_usage(&self) -> u64 {
383        self.total_buffer_memory.load(Ordering::Acquire)
384    }
385
386    /// Get usage as percentage of limit
387    pub fn usage_percent(&self) -> f64 {
388        let current = self.total_buffer_memory.load(Ordering::Acquire);
389        (current as f64 / self.buffer_limit as f64) * 100.0
390    }
391
392    /// Check if under soft limit pressure
393    pub fn is_under_pressure(&self) -> bool {
394        let current = self.total_buffer_memory.load(Ordering::Acquire);
395        let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
396        current > soft_limit
397    }
398
399    /// Get statistics
400    pub fn stats(&self) -> &WriteBufferStats {
401        &self.stats
402    }
403}
404
405// =============================================================================
406// Task 10 Enhancement: Async Write Buffer Spillover
407// =============================================================================
408
409/// Async spillover manager for write buffers
410///
411/// ## Problem Addressed
412/// Current WriteBufferManager blocks writes when hard limit is exceeded.
413/// This causes latency spikes for foreground operations.
414///
415/// ## Solution
416/// Add async spillover to secondary storage (disk) before blocking:
417/// 1. When soft limit exceeded → trigger async flush to SSTable
418/// 2. When 90% limit exceeded → trigger spillover to temp file
419/// 3. Only block at 100% when spillover buffer is also full
420///
421/// ## Architecture
422/// ```text
423/// Memtable (hot data)
424///     │
425///     ├── Soft limit (80%) → Async SSTable flush
426///     │
427///     ├── Spillover limit (90%) → Async temp file write
428///     │
429///     └── Hard limit (100%) → Block (last resort)
430///
431/// Spillover files are replayed into new SSTables during quiet periods.
432/// ```
433#[allow(dead_code)]
434pub struct SpilloverManager {
435    /// Write buffer manager reference
436    write_buffer: Arc<WriteBufferManager>,
437    /// Spillover buffer capacity
438    spillover_capacity: u64,
439    /// Current spillover usage
440    spillover_used: AtomicU64,
441    /// Spillover limit ratio (e.g., 0.9 for 90%)
442    spillover_limit_ratio: f64,
443    /// Number of active spillover files
444    spillover_file_count: AtomicU64,
445    /// Whether spillover is currently active
446    spillover_active: AtomicBool,
447    /// Channel for spillover requests
448    spillover_tx: crossbeam_channel::Sender<SpilloverRequest>,
449    /// Statistics
450    stats: SpilloverStats,
451}
452
453/// Request to spill data to secondary storage
454#[derive(Debug)]
455pub struct SpilloverRequest {
456    /// Key-value data to spill
457    pub data: Vec<(Vec<u8>, Vec<u8>)>,
458    /// Timestamp of oldest entry
459    pub min_timestamp: u64,
460    /// Timestamp of newest entry  
461    pub max_timestamp: u64,
462    /// Size in bytes
463    pub size_bytes: u64,
464}
465
466/// Spillover statistics
467#[derive(Debug, Default)]
468pub struct SpilloverStats {
469    /// Number of spillover operations
470    pub spillover_count: AtomicU64,
471    /// Total bytes spilled to disk
472    pub bytes_spilled: AtomicU64,
473    /// Total bytes recovered from spillover
474    pub bytes_recovered: AtomicU64,
475    /// Average spillover latency (microseconds)
476    pub avg_latency_us: AtomicU64,
477    /// Number of times blocking was avoided by spillover
478    pub blocks_avoided: AtomicU64,
479}
480
481impl SpilloverManager {
482    /// Create a new spillover manager
483    pub fn new(
484        write_buffer: Arc<WriteBufferManager>,
485        spillover_capacity: u64,
486    ) -> (Self, crossbeam_channel::Receiver<SpilloverRequest>) {
487        let (tx, rx) = crossbeam_channel::bounded(16);
488
489        let manager = Self {
490            write_buffer,
491            spillover_capacity,
492            spillover_used: AtomicU64::new(0),
493            spillover_limit_ratio: 0.9,
494            spillover_file_count: AtomicU64::new(0),
495            spillover_active: AtomicBool::new(false),
496            spillover_tx: tx,
497            stats: SpilloverStats::default(),
498        };
499
500        (manager, rx)
501    }
502
503    /// Check if spillover should be triggered
504    pub fn should_spillover(&self) -> bool {
505        let usage = self.write_buffer.memory_usage();
506        let spillover_limit =
507            (self.write_buffer.buffer_limit as f64 * self.spillover_limit_ratio) as u64;
508        usage > spillover_limit && !self.is_spillover_full()
509    }
510
511    /// Check if spillover buffer is full
512    pub fn is_spillover_full(&self) -> bool {
513        self.spillover_used.load(Ordering::Relaxed) >= self.spillover_capacity
514    }
515
516    /// Reserve memory with spillover fallback
517    ///
518    /// Returns:
519    /// - `Ok(false)` - Memory reserved, no action needed
520    /// - `Ok(true)` - Memory reserved, flush should be triggered
521    /// - `Err(SpilloverRequest)` - Caller should spill this data before proceeding
522    pub fn reserve_memory(
523        &self,
524        bytes: u64,
525        data: Vec<(Vec<u8>, Vec<u8>)>,
526    ) -> Result<bool, SpilloverRequest> {
527        // First try normal reservation
528        if !self.write_buffer.is_under_pressure() {
529            let should_flush = self.write_buffer.reserve_memory(bytes);
530            return Ok(should_flush);
531        }
532
533        // Check if we should spillover
534        if self.should_spillover() && !data.is_empty() {
535            let request = SpilloverRequest {
536                data,
537                min_timestamp: 0,
538                max_timestamp: u64::MAX,
539                size_bytes: bytes,
540            };
541
542            // Try to send spillover request
543            if self.spillover_tx.try_send(request.clone()).is_ok() {
544                self.spillover_used.fetch_add(bytes, Ordering::Relaxed);
545                self.stats.spillover_count.fetch_add(1, Ordering::Relaxed);
546                self.stats.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
547                self.stats.blocks_avoided.fetch_add(1, Ordering::Relaxed);
548                self.spillover_active.store(true, Ordering::Release);
549
550                // Don't block - data will be spilled
551                return Ok(true);
552            } else {
553                // Spillover queue full, return request to caller
554                return Err(request);
555            }
556        }
557
558        // Fall back to blocking reservation
559        let should_flush = self.write_buffer.reserve_memory(bytes);
560        Ok(should_flush)
561    }
562
563    /// Release spillover capacity after recovery
564    pub fn release_spillover(&self, bytes: u64) {
565        self.spillover_used.fetch_sub(bytes, Ordering::Relaxed);
566        self.stats
567            .bytes_recovered
568            .fetch_add(bytes, Ordering::Relaxed);
569
570        if self.spillover_used.load(Ordering::Relaxed) == 0 {
571            self.spillover_active.store(false, Ordering::Release);
572        }
573    }
574
575    /// Check if spillover is active
576    pub fn is_spillover_active(&self) -> bool {
577        self.spillover_active.load(Ordering::Acquire)
578    }
579
580    /// Get spillover usage
581    pub fn spillover_usage(&self) -> u64 {
582        self.spillover_used.load(Ordering::Relaxed)
583    }
584
585    /// Get spillover capacity
586    pub fn spillover_capacity(&self) -> u64 {
587        self.spillover_capacity
588    }
589
590    /// Get statistics
591    pub fn stats(&self) -> &SpilloverStats {
592        &self.stats
593    }
594}
595
596impl Clone for SpilloverRequest {
597    fn clone(&self) -> Self {
598        Self {
599            data: self.data.clone(),
600            min_timestamp: self.min_timestamp,
601            max_timestamp: self.max_timestamp,
602            size_bytes: self.size_bytes,
603        }
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use super::*;
610
611    #[test]
612    fn test_memory_budget_default() {
613        let budget = MemoryBudget::default();
614        assert_eq!(budget.total_budget, 512 * 1024 * 1024);
615        assert_eq!(budget.soft_limit, 0.80);
616        assert_eq!(budget.hard_limit, 0.95);
617    }
618
619    #[test]
620    fn test_memory_tracker_pressure() {
621        let budget = MemoryBudget {
622            total_budget: 1000,
623            memtable_budget: 100,
624            immutable_memtables_budget: 300,
625            block_cache_budget: 500,
626            soft_limit: 0.80,
627            hard_limit: 0.95,
628        };
629
630        let tracker = MemoryTracker::new(budget);
631
632        // Below soft limit - no pressure
633        tracker.allocate(700);
634        assert!(!tracker.should_trigger_flush());
635        assert!(!tracker.should_block_writes());
636
637        // Above soft limit - trigger flush
638        tracker.allocate(100);
639        assert_eq!(tracker.current_usage(), 800);
640        assert!(tracker.should_trigger_flush());
641        assert!(!tracker.should_block_writes());
642
643        // Above hard limit - block writes
644        tracker.allocate(200);
645        assert_eq!(tracker.current_usage(), 1000);
646        assert!(tracker.should_trigger_flush());
647        assert!(tracker.should_block_writes());
648
649        // Deallocate - pressure relieved
650        tracker.deallocate(300);
651        assert_eq!(tracker.current_usage(), 700);
652        assert!(!tracker.should_trigger_flush());
653        assert!(!tracker.should_block_writes());
654    }
655
656    #[test]
657    fn test_memory_tracker_usage_percent() {
658        let budget = MemoryBudget {
659            total_budget: 1000,
660            memtable_budget: 100,
661            immutable_memtables_budget: 300,
662            block_cache_budget: 500,
663            soft_limit: 0.80,
664            hard_limit: 0.95,
665        };
666
667        let tracker = MemoryTracker::new(budget);
668
669        tracker.allocate(500);
670        assert_eq!(tracker.usage_percent(), 50.0);
671
672        tracker.allocate(250);
673        assert_eq!(tracker.usage_percent(), 75.0);
674    }
675
676    #[test]
677    fn test_from_system_memory_percent() {
678        let budget = MemoryBudget::from_system_memory_percent(0.25);
679
680        // Should have reasonable values
681        assert!(budget.total_budget > 0);
682        assert!(budget.memtable_budget > 0);
683        assert!(budget.memtable_budget < budget.total_budget);
684        assert_eq!(budget.soft_limit, 0.80);
685        assert_eq!(budget.hard_limit, 0.95);
686    }
687
688    #[test]
689    fn test_system_memory_detection() {
690        // Verify we can detect system memory (not fallback to 1GB)
691        // This tests the platform-specific detection code
692        let budget = MemoryBudget::from_system_memory_percent(1.0);
693
694        // On any modern system with >4GB RAM, we should detect more than 1GB
695        // If we're hitting the 1GB fallback, this test will fail
696        #[cfg(any(target_os = "linux", target_os = "macos"))]
697        {
698            // Should detect at least 2GB on any modern dev machine
699            assert!(
700                budget.total_budget > 2 * 1024 * 1024 * 1024,
701                "Expected >2GB detected, got {} bytes. Memory detection may have failed.",
702                budget.total_budget
703            );
704        }
705
706        // On all platforms, should at least get the 1GB fallback
707        assert!(budget.total_budget >= 1024 * 1024 * 1024);
708    }
709
710    #[test]
711    fn test_write_buffer_manager_reserve_release() {
712        let wbm = WriteBufferManager::new(1000);
713
714        // Reserve some memory
715        let should_flush = wbm.reserve_memory(400);
716        assert!(!should_flush); // Below 80% soft limit
717        assert_eq!(wbm.memory_usage(), 400);
718
719        // Reserve more - should trigger soft limit
720        let should_flush = wbm.reserve_memory(500);
721        assert!(should_flush); // Above 80% soft limit (900/1000)
722        assert_eq!(wbm.memory_usage(), 900);
723
724        // Release memory
725        wbm.release_memory(600);
726        assert_eq!(wbm.memory_usage(), 300);
727        assert!(!wbm.is_under_pressure());
728    }
729
730    #[test]
731    fn test_write_buffer_manager_pressure() {
732        let wbm = WriteBufferManager::with_limits(1000, 0.5, 0.9);
733
734        // Below soft limit
735        wbm.reserve_memory(400);
736        assert!(!wbm.is_under_pressure());
737        assert_eq!(wbm.usage_percent(), 40.0);
738
739        // Above soft limit
740        wbm.reserve_memory(200);
741        assert!(wbm.is_under_pressure());
742        assert_eq!(wbm.usage_percent(), 60.0);
743    }
744
745    // Spillover Manager Tests
746
747    #[test]
748    fn test_spillover_manager_creation() {
749        let wbm = Arc::new(WriteBufferManager::new(1000));
750        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
751
752        assert_eq!(spillover.spillover_capacity(), 500);
753        assert_eq!(spillover.spillover_usage(), 0);
754        assert!(!spillover.is_spillover_active());
755    }
756
757    #[test]
758    fn test_spillover_manager_reserve_below_limit() {
759        let wbm = Arc::new(WriteBufferManager::new(1000));
760        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
761
762        // Below any limits - should succeed without spillover
763        let result = spillover.reserve_memory(100, vec![]);
764        assert!(result.is_ok());
765        assert!(!result.unwrap()); // No flush needed
766    }
767
768    #[test]
769    fn test_spillover_manager_stats() {
770        let wbm = Arc::new(WriteBufferManager::new(1000));
771        let (spillover, _rx) = SpilloverManager::new(wbm.clone(), 500);
772
773        // Fill up to trigger spillover consideration
774        wbm.reserve_memory(850); // Above 80% soft limit
775
776        // Create test data
777        let data = vec![(b"key".to_vec(), b"value".to_vec())];
778
779        // This should trigger spillover logic
780        let result = spillover.reserve_memory(100, data);
781        assert!(result.is_ok());
782
783        let stats = spillover.stats();
784        // Stats are available even if spillover wasn't needed
785        assert!(stats.spillover_count.load(Ordering::Relaxed) <= 1);
786    }
787
788    #[test]
789    fn test_spillover_release() {
790        let wbm = Arc::new(WriteBufferManager::new(1000));
791        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
792
793        // Simulate spillover used
794        spillover.spillover_used.store(200, Ordering::Relaxed);
795        spillover.spillover_active.store(true, Ordering::Release);
796
797        assert!(spillover.is_spillover_active());
798        assert_eq!(spillover.spillover_usage(), 200);
799
800        // Release spillover
801        spillover.release_spillover(200);
802
803        assert!(!spillover.is_spillover_active());
804        assert_eq!(spillover.spillover_usage(), 0);
805    }
806}