Skip to main content

sochdb_storage/
memory.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Memory pressure handling and resource limits
19//!
20//! Prevents OOM crashes by implementing memory budgets and backpressure.
21//! Addresses Task 7 from task.md: Missing Memory Pressure Handling.
22//!
23//! ## WriteBufferManager (jj.md Task 2)
24//!
25//! Coordinates memory usage across active and immutable memtables to prevent OOM:
26//! - Tracks total buffer memory (active + immutable memtables)
27//! - Enforces soft/hard limits with backpressure
28//! - Triggers flush when memory pressure is detected
29//! - Blocks writes when hard limit is exceeded
30
31use parking_lot::Condvar;
32use parking_lot::Mutex;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35
36/// Memory budget configuration for the storage engine
37#[derive(Debug, Clone)]
38pub struct MemoryBudget {
39    /// Maximum memory for all components (bytes)
40    pub total_budget: u64,
41    /// Maximum memory for active memtable
42    pub memtable_budget: u64,
43    /// Maximum memory for immutable memtables
44    pub immutable_memtables_budget: u64,
45    /// Maximum memory for block cache
46    pub block_cache_budget: u64,
47    /// Percentage of budget at which to trigger early flush (0.0-1.0)
48    pub soft_limit: f64,
49    /// Percentage of budget at which to block writes (0.0-1.0)
50    pub hard_limit: f64,
51}
52
53impl Default for MemoryBudget {
54    fn default() -> Self {
55        Self {
56            total_budget: 512 * 1024 * 1024,               // 512 MB default
57            memtable_budget: 32 * 1024 * 1024,             // 32 MB per memtable
58            immutable_memtables_budget: 128 * 1024 * 1024, // 128 MB for immutable
59            block_cache_budget: 256 * 1024 * 1024,         // 256 MB for cache
60            soft_limit: 0.80,                              // 80% - trigger early flush
61            hard_limit: 0.95,                              // 95% - block writes
62        }
63    }
64}
65
66impl MemoryBudget {
67    /// Create budget from available system memory percentage
68    ///
69    /// Example: `from_system_memory_percent(0.25)` uses 25% of available RAM
70    pub fn from_system_memory_percent(percent: f64) -> Self {
71        let available_bytes = Self::get_available_memory();
72        let total_budget = (available_bytes as f64 * percent) as u64;
73
74        Self {
75            total_budget,
76            memtable_budget: total_budget / 16,
77            immutable_memtables_budget: total_budget / 4,
78            block_cache_budget: total_budget / 2,
79            soft_limit: 0.80,
80            hard_limit: 0.95,
81        }
82    }
83
84    /// Get available system memory in bytes
85    ///
86    /// Platform-specific implementations:
87    /// - Linux: Reads /proc/meminfo
88    /// - macOS/BSD: Uses sysctl hw.memsize
89    /// - Windows: Uses GlobalMemoryStatusEx
90    /// - Fallback: Returns conservative 1GB estimate
91    fn get_available_memory() -> u64 {
92        #[cfg(target_os = "linux")]
93        {
94            Self::linux_available_memory().unwrap_or(1024 * 1024 * 1024)
95        }
96
97        #[cfg(target_os = "macos")]
98        {
99            Self::macos_available_memory().unwrap_or(1024 * 1024 * 1024)
100        }
101
102        #[cfg(target_os = "windows")]
103        {
104            Self::windows_available_memory().unwrap_or(1024 * 1024 * 1024)
105        }
106
107        #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
108        {
109            // Conservative fallback: 1GB
110            1024 * 1024 * 1024
111        }
112    }
113
114    #[cfg(target_os = "linux")]
115    fn linux_available_memory() -> Option<u64> {
116        use std::fs::read_to_string;
117
118        let meminfo = read_to_string("/proc/meminfo").ok()?;
119
120        // Prefer MemAvailable (includes reclaimable memory)
121        // Fall back to MemFree if not available
122        let mut mem_available = None;
123        let mut mem_free = None;
124
125        for line in meminfo.lines() {
126            if line.starts_with("MemAvailable:") {
127                let parts: Vec<&str> = line.split_whitespace().collect();
128                if parts.len() >= 2 {
129                    mem_available = parts[1].parse::<u64>().ok();
130                }
131            } else if line.starts_with("MemFree:") {
132                let parts: Vec<&str> = line.split_whitespace().collect();
133                if parts.len() >= 2 {
134                    mem_free = parts[1].parse::<u64>().ok();
135                }
136            }
137        }
138
139        // Convert KB to bytes
140        mem_available.or(mem_free).map(|kb| kb * 1024)
141    }
142
143    #[cfg(target_os = "macos")]
144    fn macos_available_memory() -> Option<u64> {
145        use std::process::Command;
146
147        // Get total physical memory via sysctl
148        let output = Command::new("sysctl")
149            .args(["-n", "hw.memsize"])
150            .output()
151            .ok()?;
152
153        let mem_bytes: u64 = String::from_utf8_lossy(&output.stdout)
154            .trim()
155            .parse()
156            .ok()?;
157
158        // hw.memsize returns total RAM - use 90% as "available" approximation
159        // This is more conservative than caching would be but safer
160        Some((mem_bytes as f64 * 0.9) as u64)
161    }
162
163    #[cfg(target_os = "windows")]
164    fn windows_available_memory() -> Option<u64> {
165        // For Windows, we could use GlobalMemoryStatusEx via winapi
166        // For now, return None to use fallback (1GB)
167        // TODO: Add winapi dependency and implement proper detection
168        None
169    }
170}
171
172/// Memory usage tracker with pressure detection
173pub struct MemoryTracker {
174    /// Current memory usage estimate
175    current_usage: Arc<AtomicU64>,
176    /// Memory budget configuration
177    budget: MemoryBudget,
178    /// Whether system is under memory pressure
179    under_pressure: Arc<AtomicBool>,
180}
181
182impl MemoryTracker {
183    /// Create new memory tracker with given budget
184    pub fn new(budget: MemoryBudget) -> Self {
185        Self {
186            current_usage: Arc::new(AtomicU64::new(0)),
187            budget,
188            under_pressure: Arc::new(AtomicBool::new(false)),
189        }
190    }
191
192    /// Record memory allocation
193    pub fn allocate(&self, bytes: u64) {
194        let new_usage = self.current_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
195        self.check_pressure(new_usage);
196    }
197
198    /// Record memory deallocation
199    pub fn deallocate(&self, bytes: u64) {
200        let prev_usage = self.current_usage.fetch_sub(bytes, Ordering::Relaxed);
201        let new_usage = prev_usage.saturating_sub(bytes);
202        self.check_pressure(new_usage);
203    }
204
205    /// Check if under memory pressure and update flag
206    fn check_pressure(&self, current: u64) {
207        let pressure = current as f64 >= (self.budget.total_budget as f64 * self.budget.soft_limit);
208        self.under_pressure.store(pressure, Ordering::Relaxed);
209    }
210
211    /// Check if writes should be blocked (hard limit exceeded)
212    pub fn should_block_writes(&self) -> bool {
213        let current = self.current_usage.load(Ordering::Relaxed);
214        current as f64 >= (self.budget.total_budget as f64 * self.budget.hard_limit)
215    }
216
217    /// Check if early flush should be triggered (soft limit exceeded)
218    pub fn should_trigger_flush(&self) -> bool {
219        self.under_pressure.load(Ordering::Relaxed)
220    }
221
222    /// Get current memory usage
223    pub fn current_usage(&self) -> u64 {
224        self.current_usage.load(Ordering::Relaxed)
225    }
226
227    /// Get memory usage as percentage of budget
228    pub fn usage_percent(&self) -> f64 {
229        let current = self.current_usage.load(Ordering::Relaxed);
230        (current as f64 / self.budget.total_budget as f64) * 100.0
231    }
232
233    /// Reset memory usage counter
234    pub fn reset(&self) {
235        self.current_usage.store(0, Ordering::Relaxed);
236        self.under_pressure.store(false, Ordering::Relaxed);
237    }
238}
239
240/// Write Buffer Manager for coordinating memory across memtables
241///
242/// Implements jj.md Task 2: Write Buffer Manager with Global Memory Coordination
243///
244/// ## Algorithm
245/// ```text
246/// total_buffer_memory = active_memtable.size + Σ(immutable_memtables[i].size)
247///
248/// on_write(bytes):
249///     if total_buffer_memory + bytes > hard_limit:
250///         block_until(total_buffer_memory < soft_limit)
251///     if total_buffer_memory > soft_limit:
252///         trigger_flush(largest_immutable_memtable)
253///     total_buffer_memory += bytes
254///
255/// on_flush_complete(memtable):
256///     total_buffer_memory -= memtable.size
257///     signal_blocked_writers()
258/// ```
259pub struct WriteBufferManager {
260    /// Total memory used by active + immutable memtables
261    total_buffer_memory: AtomicU64,
262    /// Memory budget for all write buffers
263    buffer_limit: u64,
264    /// Soft limit percentage (0.0-1.0) - trigger flush
265    soft_limit_ratio: f64,
266    /// Hard limit percentage (0.0-1.0) - block writes
267    hard_limit_ratio: f64,
268    /// Whether writers are currently blocked
269    writers_blocked: AtomicBool,
270    /// Condition variable for blocked writers
271    write_cv: Condvar,
272    /// Mutex for condition variable
273    write_mutex: Mutex<()>,
274    /// Statistics
275    stats: WriteBufferStats,
276}
277
278/// Statistics for write buffer monitoring
279#[derive(Debug, Default)]
280pub struct WriteBufferStats {
281    /// Number of times writes were blocked
282    pub blocks_count: AtomicU64,
283    /// Total microseconds spent blocked
284    pub blocked_time_us: AtomicU64,
285    /// Number of flushes triggered by soft limit
286    pub soft_limit_flushes: AtomicU64,
287}
288
289impl WriteBufferManager {
290    /// Create a new write buffer manager
291    ///
292    /// # Arguments
293    /// * `buffer_limit` - Maximum total memory for write buffers (bytes)
294    pub fn new(buffer_limit: u64) -> Self {
295        Self {
296            total_buffer_memory: AtomicU64::new(0),
297            buffer_limit,
298            soft_limit_ratio: 0.8,
299            hard_limit_ratio: 0.95,
300            writers_blocked: AtomicBool::new(false),
301            write_cv: Condvar::new(),
302            write_mutex: Mutex::new(()),
303            stats: WriteBufferStats::default(),
304        }
305    }
306
307    /// Create with custom soft/hard limits
308    pub fn with_limits(buffer_limit: u64, soft_limit_ratio: f64, hard_limit_ratio: f64) -> Self {
309        Self {
310            total_buffer_memory: AtomicU64::new(0),
311            buffer_limit,
312            soft_limit_ratio,
313            hard_limit_ratio,
314            writers_blocked: AtomicBool::new(false),
315            write_cv: Condvar::new(),
316            write_mutex: Mutex::new(()),
317            stats: WriteBufferStats::default(),
318        }
319    }
320
321    /// Reserve memory for a write operation
322    ///
323    /// May block if hard limit is exceeded, waiting for flushes to complete.
324    /// Returns true if flush should be triggered (soft limit exceeded).
325    pub fn reserve_memory(&self, bytes: u64) -> bool {
326        let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
327        let hard_limit = (self.buffer_limit as f64 * self.hard_limit_ratio) as u64;
328
329        loop {
330            let current = self.total_buffer_memory.load(Ordering::Acquire);
331            let new_total = current + bytes;
332
333            if new_total > hard_limit {
334                // Block until memory is freed
335                self.writers_blocked.store(true, Ordering::Release);
336                self.stats.blocks_count.fetch_add(1, Ordering::Relaxed);
337
338                let start = std::time::Instant::now();
339                {
340                    let mut guard = self.write_mutex.lock();
341                    // Wait for flush to complete
342                    self.write_cv
343                        .wait_for(&mut guard, std::time::Duration::from_millis(100));
344                }
345                self.stats
346                    .blocked_time_us
347                    .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
348
349                // Retry after waiting
350                continue;
351            }
352
353            // Try to reserve the memory
354            if self
355                .total_buffer_memory
356                .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Acquire)
357                .is_ok()
358            {
359                // Check if we should trigger flush (soft limit)
360                let should_flush = new_total > soft_limit;
361                if should_flush {
362                    self.stats
363                        .soft_limit_flushes
364                        .fetch_add(1, Ordering::Relaxed);
365                }
366                return should_flush;
367            }
368            // CAS failed, retry
369        }
370    }
371
372    /// Release memory after flush completes
373    ///
374    /// Signals any blocked writers to retry.
375    pub fn release_memory(&self, bytes: u64) {
376        self.total_buffer_memory.fetch_sub(bytes, Ordering::AcqRel);
377
378        // Signal blocked writers
379        if self.writers_blocked.swap(false, Ordering::AcqRel) {
380            self.write_cv.notify_all();
381        }
382    }
383
384    /// Get current buffer memory usage
385    pub fn memory_usage(&self) -> u64 {
386        self.total_buffer_memory.load(Ordering::Acquire)
387    }
388
389    /// Get usage as percentage of limit
390    pub fn usage_percent(&self) -> f64 {
391        let current = self.total_buffer_memory.load(Ordering::Acquire);
392        (current as f64 / self.buffer_limit as f64) * 100.0
393    }
394
395    /// Check if under soft limit pressure
396    pub fn is_under_pressure(&self) -> bool {
397        let current = self.total_buffer_memory.load(Ordering::Acquire);
398        let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
399        current > soft_limit
400    }
401
402    /// Get statistics
403    pub fn stats(&self) -> &WriteBufferStats {
404        &self.stats
405    }
406}
407
408// =============================================================================
409// Task 10 Enhancement: Async Write Buffer Spillover
410// =============================================================================
411
412/// Async spillover manager for write buffers
413///
414/// ## Problem Addressed
415/// Current WriteBufferManager blocks writes when hard limit is exceeded.
416/// This causes latency spikes for foreground operations.
417///
418/// ## Solution
419/// Add async spillover to secondary storage (disk) before blocking:
420/// 1. When soft limit exceeded → trigger async flush to SSTable
421/// 2. When 90% limit exceeded → trigger spillover to temp file
422/// 3. Only block at 100% when spillover buffer is also full
423///
424/// ## Architecture
425/// ```text
426/// Memtable (hot data)
427///     │
428///     ├── Soft limit (80%) → Async SSTable flush
429///     │
430///     ├── Spillover limit (90%) → Async temp file write
431///     │
432///     └── Hard limit (100%) → Block (last resort)
433///
434/// Spillover files are replayed into new SSTables during quiet periods.
435/// ```
436#[allow(dead_code)]
437pub struct SpilloverManager {
438    /// Write buffer manager reference
439    write_buffer: Arc<WriteBufferManager>,
440    /// Spillover buffer capacity
441    spillover_capacity: u64,
442    /// Current spillover usage
443    spillover_used: AtomicU64,
444    /// Spillover limit ratio (e.g., 0.9 for 90%)
445    spillover_limit_ratio: f64,
446    /// Number of active spillover files
447    spillover_file_count: AtomicU64,
448    /// Whether spillover is currently active
449    spillover_active: AtomicBool,
450    /// Channel for spillover requests
451    spillover_tx: crossbeam_channel::Sender<SpilloverRequest>,
452    /// Statistics
453    stats: SpilloverStats,
454}
455
456/// Request to spill data to secondary storage
457#[derive(Debug)]
458pub struct SpilloverRequest {
459    /// Key-value data to spill
460    pub data: Vec<(Vec<u8>, Vec<u8>)>,
461    /// Timestamp of oldest entry
462    pub min_timestamp: u64,
463    /// Timestamp of newest entry  
464    pub max_timestamp: u64,
465    /// Size in bytes
466    pub size_bytes: u64,
467}
468
469/// Spillover statistics
470#[derive(Debug, Default)]
471pub struct SpilloverStats {
472    /// Number of spillover operations
473    pub spillover_count: AtomicU64,
474    /// Total bytes spilled to disk
475    pub bytes_spilled: AtomicU64,
476    /// Total bytes recovered from spillover
477    pub bytes_recovered: AtomicU64,
478    /// Average spillover latency (microseconds)
479    pub avg_latency_us: AtomicU64,
480    /// Number of times blocking was avoided by spillover
481    pub blocks_avoided: AtomicU64,
482}
483
484impl SpilloverManager {
485    /// Create a new spillover manager
486    pub fn new(
487        write_buffer: Arc<WriteBufferManager>,
488        spillover_capacity: u64,
489    ) -> (Self, crossbeam_channel::Receiver<SpilloverRequest>) {
490        let (tx, rx) = crossbeam_channel::bounded(16);
491
492        let manager = Self {
493            write_buffer,
494            spillover_capacity,
495            spillover_used: AtomicU64::new(0),
496            spillover_limit_ratio: 0.9,
497            spillover_file_count: AtomicU64::new(0),
498            spillover_active: AtomicBool::new(false),
499            spillover_tx: tx,
500            stats: SpilloverStats::default(),
501        };
502
503        (manager, rx)
504    }
505
506    /// Check if spillover should be triggered
507    pub fn should_spillover(&self) -> bool {
508        let usage = self.write_buffer.memory_usage();
509        let spillover_limit =
510            (self.write_buffer.buffer_limit as f64 * self.spillover_limit_ratio) as u64;
511        usage > spillover_limit && !self.is_spillover_full()
512    }
513
514    /// Check if spillover buffer is full
515    pub fn is_spillover_full(&self) -> bool {
516        self.spillover_used.load(Ordering::Relaxed) >= self.spillover_capacity
517    }
518
519    /// Reserve memory with spillover fallback
520    ///
521    /// Returns:
522    /// - `Ok(false)` - Memory reserved, no action needed
523    /// - `Ok(true)` - Memory reserved, flush should be triggered
524    /// - `Err(SpilloverRequest)` - Caller should spill this data before proceeding
525    pub fn reserve_memory(
526        &self,
527        bytes: u64,
528        data: Vec<(Vec<u8>, Vec<u8>)>,
529    ) -> Result<bool, SpilloverRequest> {
530        // First try normal reservation
531        if !self.write_buffer.is_under_pressure() {
532            let should_flush = self.write_buffer.reserve_memory(bytes);
533            return Ok(should_flush);
534        }
535
536        // Check if we should spillover
537        if self.should_spillover() && !data.is_empty() {
538            let request = SpilloverRequest {
539                data,
540                min_timestamp: 0,
541                max_timestamp: u64::MAX,
542                size_bytes: bytes,
543            };
544
545            // Try to send spillover request
546            if self.spillover_tx.try_send(request.clone()).is_ok() {
547                self.spillover_used.fetch_add(bytes, Ordering::Relaxed);
548                self.stats.spillover_count.fetch_add(1, Ordering::Relaxed);
549                self.stats.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
550                self.stats.blocks_avoided.fetch_add(1, Ordering::Relaxed);
551                self.spillover_active.store(true, Ordering::Release);
552
553                // Don't block - data will be spilled
554                return Ok(true);
555            } else {
556                // Spillover queue full, return request to caller
557                return Err(request);
558            }
559        }
560
561        // Fall back to blocking reservation
562        let should_flush = self.write_buffer.reserve_memory(bytes);
563        Ok(should_flush)
564    }
565
566    /// Release spillover capacity after recovery
567    pub fn release_spillover(&self, bytes: u64) {
568        self.spillover_used.fetch_sub(bytes, Ordering::Relaxed);
569        self.stats
570            .bytes_recovered
571            .fetch_add(bytes, Ordering::Relaxed);
572
573        if self.spillover_used.load(Ordering::Relaxed) == 0 {
574            self.spillover_active.store(false, Ordering::Release);
575        }
576    }
577
578    /// Check if spillover is active
579    pub fn is_spillover_active(&self) -> bool {
580        self.spillover_active.load(Ordering::Acquire)
581    }
582
583    /// Get spillover usage
584    pub fn spillover_usage(&self) -> u64 {
585        self.spillover_used.load(Ordering::Relaxed)
586    }
587
588    /// Get spillover capacity
589    pub fn spillover_capacity(&self) -> u64 {
590        self.spillover_capacity
591    }
592
593    /// Get statistics
594    pub fn stats(&self) -> &SpilloverStats {
595        &self.stats
596    }
597}
598
599impl Clone for SpilloverRequest {
600    fn clone(&self) -> Self {
601        Self {
602            data: self.data.clone(),
603            min_timestamp: self.min_timestamp,
604            max_timestamp: self.max_timestamp,
605            size_bytes: self.size_bytes,
606        }
607    }
608}
609
610#[cfg(test)]
611mod tests {
612    use super::*;
613
614    #[test]
615    fn test_memory_budget_default() {
616        let budget = MemoryBudget::default();
617        assert_eq!(budget.total_budget, 512 * 1024 * 1024);
618        assert_eq!(budget.soft_limit, 0.80);
619        assert_eq!(budget.hard_limit, 0.95);
620    }
621
622    #[test]
623    fn test_memory_tracker_pressure() {
624        let budget = MemoryBudget {
625            total_budget: 1000,
626            memtable_budget: 100,
627            immutable_memtables_budget: 300,
628            block_cache_budget: 500,
629            soft_limit: 0.80,
630            hard_limit: 0.95,
631        };
632
633        let tracker = MemoryTracker::new(budget);
634
635        // Below soft limit - no pressure
636        tracker.allocate(700);
637        assert!(!tracker.should_trigger_flush());
638        assert!(!tracker.should_block_writes());
639
640        // Above soft limit - trigger flush
641        tracker.allocate(100);
642        assert_eq!(tracker.current_usage(), 800);
643        assert!(tracker.should_trigger_flush());
644        assert!(!tracker.should_block_writes());
645
646        // Above hard limit - block writes
647        tracker.allocate(200);
648        assert_eq!(tracker.current_usage(), 1000);
649        assert!(tracker.should_trigger_flush());
650        assert!(tracker.should_block_writes());
651
652        // Deallocate - pressure relieved
653        tracker.deallocate(300);
654        assert_eq!(tracker.current_usage(), 700);
655        assert!(!tracker.should_trigger_flush());
656        assert!(!tracker.should_block_writes());
657    }
658
659    #[test]
660    fn test_memory_tracker_usage_percent() {
661        let budget = MemoryBudget {
662            total_budget: 1000,
663            memtable_budget: 100,
664            immutable_memtables_budget: 300,
665            block_cache_budget: 500,
666            soft_limit: 0.80,
667            hard_limit: 0.95,
668        };
669
670        let tracker = MemoryTracker::new(budget);
671
672        tracker.allocate(500);
673        assert_eq!(tracker.usage_percent(), 50.0);
674
675        tracker.allocate(250);
676        assert_eq!(tracker.usage_percent(), 75.0);
677    }
678
679    #[test]
680    fn test_from_system_memory_percent() {
681        let budget = MemoryBudget::from_system_memory_percent(0.25);
682
683        // Should have reasonable values
684        assert!(budget.total_budget > 0);
685        assert!(budget.memtable_budget > 0);
686        assert!(budget.memtable_budget < budget.total_budget);
687        assert_eq!(budget.soft_limit, 0.80);
688        assert_eq!(budget.hard_limit, 0.95);
689    }
690
691    #[test]
692    fn test_system_memory_detection() {
693        // Verify we can detect system memory (not fallback to 1GB)
694        // This tests the platform-specific detection code
695        let budget = MemoryBudget::from_system_memory_percent(1.0);
696
697        // On any modern system with >4GB RAM, we should detect more than 1GB
698        // If we're hitting the 1GB fallback, this test will fail
699        #[cfg(any(target_os = "linux", target_os = "macos"))]
700        {
701            // Should detect at least 2GB on any modern dev machine
702            assert!(
703                budget.total_budget > 2 * 1024 * 1024 * 1024,
704                "Expected >2GB detected, got {} bytes. Memory detection may have failed.",
705                budget.total_budget
706            );
707        }
708
709        // On all platforms, should at least get the 1GB fallback
710        assert!(budget.total_budget >= 1024 * 1024 * 1024);
711    }
712
713    #[test]
714    fn test_write_buffer_manager_reserve_release() {
715        let wbm = WriteBufferManager::new(1000);
716
717        // Reserve some memory
718        let should_flush = wbm.reserve_memory(400);
719        assert!(!should_flush); // Below 80% soft limit
720        assert_eq!(wbm.memory_usage(), 400);
721
722        // Reserve more - should trigger soft limit
723        let should_flush = wbm.reserve_memory(500);
724        assert!(should_flush); // Above 80% soft limit (900/1000)
725        assert_eq!(wbm.memory_usage(), 900);
726
727        // Release memory
728        wbm.release_memory(600);
729        assert_eq!(wbm.memory_usage(), 300);
730        assert!(!wbm.is_under_pressure());
731    }
732
733    #[test]
734    fn test_write_buffer_manager_pressure() {
735        let wbm = WriteBufferManager::with_limits(1000, 0.5, 0.9);
736
737        // Below soft limit
738        wbm.reserve_memory(400);
739        assert!(!wbm.is_under_pressure());
740        assert_eq!(wbm.usage_percent(), 40.0);
741
742        // Above soft limit
743        wbm.reserve_memory(200);
744        assert!(wbm.is_under_pressure());
745        assert_eq!(wbm.usage_percent(), 60.0);
746    }
747
748    // Spillover Manager Tests
749
750    #[test]
751    fn test_spillover_manager_creation() {
752        let wbm = Arc::new(WriteBufferManager::new(1000));
753        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
754
755        assert_eq!(spillover.spillover_capacity(), 500);
756        assert_eq!(spillover.spillover_usage(), 0);
757        assert!(!spillover.is_spillover_active());
758    }
759
760    #[test]
761    fn test_spillover_manager_reserve_below_limit() {
762        let wbm = Arc::new(WriteBufferManager::new(1000));
763        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
764
765        // Below any limits - should succeed without spillover
766        let result = spillover.reserve_memory(100, vec![]);
767        assert!(result.is_ok());
768        assert!(!result.unwrap()); // No flush needed
769    }
770
771    #[test]
772    fn test_spillover_manager_stats() {
773        let wbm = Arc::new(WriteBufferManager::new(1000));
774        let (spillover, _rx) = SpilloverManager::new(wbm.clone(), 500);
775
776        // Fill up to trigger spillover consideration
777        wbm.reserve_memory(850); // Above 80% soft limit
778
779        // Create test data
780        let data = vec![(b"key".to_vec(), b"value".to_vec())];
781
782        // This should trigger spillover logic
783        let result = spillover.reserve_memory(100, data);
784        assert!(result.is_ok());
785
786        let stats = spillover.stats();
787        // Stats are available even if spillover wasn't needed
788        assert!(stats.spillover_count.load(Ordering::Relaxed) <= 1);
789    }
790
791    #[test]
792    fn test_spillover_release() {
793        let wbm = Arc::new(WriteBufferManager::new(1000));
794        let (spillover, _rx) = SpilloverManager::new(wbm, 500);
795
796        // Simulate spillover used
797        spillover.spillover_used.store(200, Ordering::Relaxed);
798        spillover.spillover_active.store(true, Ordering::Release);
799
800        assert!(spillover.is_spillover_active());
801        assert_eq!(spillover.spillover_usage(), 200);
802
803        // Release spillover
804        spillover.release_spillover(200);
805
806        assert!(!spillover.is_spillover_active());
807        assert_eq!(spillover.spillover_usage(), 0);
808    }
809}