sochdb_storage/
adaptive_memtable.rs

1// Copyright 2025 Sushanth (https://github.com/sushanthpy)
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Adaptive Memtable Sizing with Memory Pressure Feedback
16//!
17//! This module implements Task 10 from mm.md: dynamic memtable sizing that
18//! responds to system memory pressure and write rate.
19//!
20//! ## Problem: Fixed Memtable Size is Suboptimal
21//!
22//! The current memtable uses a fixed 4MB flush threshold (`memtable_flush_size`).
23//! This one-size-fits-all approach is suboptimal because:
24//!
25//! - **Too small:** Frequent flushes cause I/O overhead and write amplification
26//! - **Too large:** Memory pressure, long recovery times, increased GC pause
27//!
28//! The optimal size depends on write rate, available memory, and durability
29//! requirements—all of which vary at runtime.
30//!
31//! ## Solution: Adaptive Sizing with Feedback Control
32//!
33//! ```text
34//! ┌─────────────────────────────────────────────────────────────────┐
35//! │                  Adaptive Memtable Sizer                         │
36//! ├─────────────────────────────────────────────────────────────────┤
37//! │                                                                  │
38//! │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
39//! │  │ Write Rate   │    │ Memory       │    │ Target Size  │       │
40//! │  │ Estimator    │───▶│ Pressure     │───▶│ Calculator   │       │
41//! │  │ (EMA)        │    │ Monitor      │    │              │       │
42//! │  └──────────────┘    └──────────────┘    └──────────────┘       │
43//! │                                                 │                │
44//! │                                                 ▼                │
45//! │                      ┌──────────────────────────────────────────┐│
46//! │                      │ target = write_rate × 1.0s               ││
47//! │                      │ adjusted = target × (1 - memory_pressure²)│
48//! │                      │ final = clamp(adjusted, base/4, base×4)  ││
49//! │                      └──────────────────────────────────────────┘│
50//! └─────────────────────────────────────────────────────────────────┘
51//!
52//! Goals:
53//! - Optimal memory utilization across varying workloads
54//! - Reduced flush frequency during low-memory conditions
55//! - Faster recovery (smaller memtable = less WAL replay)
56//! ```
57//!
58//! ## Feedback Controller
59//!
60//! ```text
61//! target = write_rate_bytes_per_sec × 1.0  // 1 second buffer
62//! adjusted = target × (1 - memory_pressure²)
63//! final = clamp(adjusted, base/4, base×4)  // 1MB to 16MB
64//! ```
65//!
66//! Memory pressure signal: Read from /proc/meminfo (Linux) or
67//! mach_host_statistics (macOS). Pressure = 1 - (available / total).
68
69use std::sync::atomic::{AtomicU64, Ordering};
70
71/// Default base memtable size: 4MB
72pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
73
74/// Minimum memtable size: 1MB
75pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
76
77/// Maximum memtable size: 16MB
78pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
79
80/// Target buffer duration in seconds
81/// Memtable should hold approximately this much time of write throughput
82pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
83
84/// EMA alpha for write rate estimation (higher = more responsive)
85pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
86
87/// Memory pressure threshold above which we start reducing memtable size
88pub const PRESSURE_THRESHOLD: f64 = 0.7;
89
90/// Configuration for adaptive memtable sizing
91#[derive(Debug, Clone)]
92pub struct AdaptiveMemtableConfig {
93    /// Base size in bytes (default: 4MB)
94    pub base_size: usize,
95    /// Minimum allowed size (default: 1MB)
96    pub min_size: usize,
97    /// Maximum allowed size (default: 16MB)
98    pub max_size: usize,
99    /// Target buffer duration in seconds (default: 1.0)
100    pub target_buffer_seconds: f64,
101    /// EMA alpha for write rate (default: 0.1)
102    pub ema_alpha: f64,
103    /// Whether to enable memory pressure feedback
104    pub enable_memory_pressure: bool,
105}
106
107impl Default for AdaptiveMemtableConfig {
108    fn default() -> Self {
109        Self {
110            base_size: DEFAULT_BASE_SIZE,
111            min_size: MIN_MEMTABLE_SIZE,
112            max_size: MAX_MEMTABLE_SIZE,
113            target_buffer_seconds: TARGET_BUFFER_SECONDS,
114            ema_alpha: WRITE_RATE_EMA_ALPHA,
115            enable_memory_pressure: true,
116        }
117    }
118}
119
120/// Adaptive memtable sizer with memory pressure feedback
121///
122/// Dynamically adjusts memtable flush threshold based on:
123/// 1. Write rate (to maintain target buffer duration)
124/// 2. Memory pressure (to avoid OOM)
125pub struct AdaptiveMemtableSizer {
126    /// Configuration
127    config: AdaptiveMemtableConfig,
128    /// Current adaptive size (bytes)
129    current_size: AtomicU64,
130    /// Estimated write rate (bytes per second × 1000 for precision)
131    write_rate_ema: AtomicU64,
132    /// Last update timestamp (microseconds since epoch)
133    last_update_us: AtomicU64,
134    /// Total bytes written since last update
135    bytes_since_update: AtomicU64,
136    /// Last memory pressure reading (0-1000 scaled)
137    memory_pressure_scaled: AtomicU64,
138}
139
140impl AdaptiveMemtableSizer {
141    /// Create a new adaptive sizer with default configuration
142    pub fn new() -> Self {
143        Self::with_config(AdaptiveMemtableConfig::default())
144    }
145
146    /// Create with custom configuration
147    pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
148        let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
149        
150        Self {
151            current_size: AtomicU64::new(config.base_size as u64),
152            write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
153            last_update_us: AtomicU64::new(now_us()),
154            bytes_since_update: AtomicU64::new(0),
155            memory_pressure_scaled: AtomicU64::new(0),
156            config,
157        }
158    }
159
160    /// Record bytes written (call on every write)
161    #[inline]
162    pub fn record_write(&self, bytes: usize) {
163        self.bytes_since_update.fetch_add(bytes as u64, Ordering::Relaxed);
164    }
165
166    /// Get current recommended memtable size
167    #[inline]
168    pub fn current_size(&self) -> usize {
169        self.current_size.load(Ordering::Relaxed) as usize
170    }
171
172    /// Get estimated write rate in bytes per second
173    #[inline]
174    pub fn write_rate(&self) -> f64 {
175        self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
176    }
177
178    /// Get current memory pressure (0.0 - 1.0)
179    #[inline]
180    pub fn memory_pressure(&self) -> f64 {
181        self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
182    }
183
184    /// Update the adaptive size (call periodically, e.g., every second)
185    ///
186    /// Returns the new recommended memtable size
187    pub fn update(&self) -> usize {
188        let now = now_us();
189        let last = self.last_update_us.swap(now, Ordering::Relaxed);
190        let delta_us = now.saturating_sub(last);
191
192        if delta_us == 0 {
193            return self.current_size();
194        }
195
196        // Calculate instantaneous write rate
197        let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
198        let delta_secs = delta_us as f64 / 1_000_000.0;
199        let instant_rate = bytes as f64 / delta_secs;
200
201        // Update EMA of write rate
202        let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
203        let new_rate = old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
204        self.write_rate_ema.store((new_rate * 1000.0) as u64, Ordering::Relaxed);
205
206        // Update memory pressure if enabled
207        let pressure = if self.config.enable_memory_pressure {
208            get_memory_pressure()
209        } else {
210            0.0
211        };
212        self.memory_pressure_scaled.store((pressure * 1000.0) as u64, Ordering::Relaxed);
213
214        // Calculate target size based on write rate
215        // Target = write_rate × buffer_duration
216        let target_size = new_rate * self.config.target_buffer_seconds;
217
218        // Adjust for memory pressure (quadratic dampening)
219        // When pressure is high, we reduce memtable size more aggressively
220        let pressure_factor = if pressure > PRESSURE_THRESHOLD {
221            // Above threshold, apply quadratic reduction
222            1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
223        } else {
224            1.0
225        };
226
227        let adjusted_size = target_size * pressure_factor;
228
229        // Clamp to configured bounds
230        let final_size = adjusted_size
231            .max(self.config.min_size as f64)
232            .min(self.config.max_size as f64) as usize;
233
234        self.current_size.store(final_size as u64, Ordering::Relaxed);
235        final_size
236    }
237
238    /// Check if the memtable should be flushed based on current size
239    #[inline]
240    pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
241        current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
242    }
243
244    /// Get statistics for monitoring
245    pub fn stats(&self) -> AdaptiveMemtableStats {
246        AdaptiveMemtableStats {
247            current_size: self.current_size(),
248            write_rate_bytes_per_sec: self.write_rate(),
249            memory_pressure: self.memory_pressure(),
250            config: self.config.clone(),
251        }
252    }
253}
254
255impl Default for AdaptiveMemtableSizer {
256    fn default() -> Self {
257        Self::new()
258    }
259}
260
261/// Statistics for adaptive memtable sizing
262#[derive(Debug, Clone)]
263pub struct AdaptiveMemtableStats {
264    /// Current recommended memtable size
265    pub current_size: usize,
266    /// Estimated write rate in bytes per second
267    pub write_rate_bytes_per_sec: f64,
268    /// Current memory pressure (0.0 - 1.0)
269    pub memory_pressure: f64,
270    /// Current configuration
271    pub config: AdaptiveMemtableConfig,
272}
273
274// ============================================================================
275// Platform-specific memory pressure detection
276// ============================================================================
277
278/// Get current memory pressure (0.0 = no pressure, 1.0 = critical)
279#[cfg(target_os = "linux")]
280fn get_memory_pressure() -> f64 {
281    // Read from /proc/meminfo
282    use std::fs::File;
283    use std::io::{BufRead, BufReader};
284
285    let file = match File::open("/proc/meminfo") {
286        Ok(f) => f,
287        Err(_) => return 0.0,
288    };
289
290    let reader = BufReader::new(file);
291    let mut mem_total: u64 = 0;
292    let mut mem_available: u64 = 0;
293
294    for line in reader.lines().take(10).flatten() {
295        if line.starts_with("MemTotal:") {
296            mem_total = parse_meminfo_value(&line);
297        } else if line.starts_with("MemAvailable:") {
298            mem_available = parse_meminfo_value(&line);
299        }
300    }
301
302    if mem_total == 0 {
303        return 0.0;
304    }
305
306    // Pressure = 1 - (available / total)
307    1.0 - (mem_available as f64 / mem_total as f64)
308}
309
310#[cfg(target_os = "linux")]
311fn parse_meminfo_value(line: &str) -> u64 {
312    // Format: "MemTotal:       16384000 kB"
313    line.split_whitespace()
314        .nth(1)
315        .and_then(|s| s.parse::<u64>().ok())
316        .unwrap_or(0)
317}
318
319#[cfg(target_os = "macos")]
320fn get_memory_pressure() -> f64 {
321    // On macOS, we can use mach APIs for memory info
322    // For simplicity, we use sysctl to get vm.page_free_count and related values
323    use std::process::Command;
324
325    let output = match Command::new("vm_stat").output() {
326        Ok(o) => o,
327        Err(_) => return 0.0,
328    };
329
330    let stdout = match String::from_utf8(output.stdout) {
331        Ok(s) => s,
332        Err(_) => return 0.0,
333    };
334
335    let mut free_pages: u64 = 0;
336    let mut active_pages: u64 = 0;
337    let mut inactive_pages: u64 = 0;
338    let mut speculative_pages: u64 = 0;
339    let mut wired_pages: u64 = 0;
340
341    for line in stdout.lines() {
342        if line.contains("Pages free:") {
343            free_pages = extract_vm_stat_value(line);
344        } else if line.contains("Pages active:") {
345            active_pages = extract_vm_stat_value(line);
346        } else if line.contains("Pages inactive:") {
347            inactive_pages = extract_vm_stat_value(line);
348        } else if line.contains("Pages speculative:") {
349            speculative_pages = extract_vm_stat_value(line);
350        } else if line.contains("Pages wired down:") {
351            wired_pages = extract_vm_stat_value(line);
352        }
353    }
354
355    // Total = active + inactive + speculative + free + wired
356    // Available = free + inactive (approximately)
357    let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
358    let available = free_pages + inactive_pages;
359
360    if total == 0 {
361        return 0.0;
362    }
363
364    1.0 - (available as f64 / total as f64)
365}
366
367#[cfg(target_os = "macos")]
368fn extract_vm_stat_value(line: &str) -> u64 {
369    // Format: "Pages free:                                3142."
370    line.split(':')
371        .nth(1)
372        .map(|s| s.trim().trim_end_matches('.'))
373        .and_then(|s| s.parse::<u64>().ok())
374        .unwrap_or(0)
375}
376
377#[cfg(not(any(target_os = "linux", target_os = "macos")))]
378fn get_memory_pressure() -> f64 {
379    // Default: assume no pressure on unsupported platforms
380    0.0
381}
382
383// ============================================================================
384// Utility functions
385// ============================================================================
386
387/// Get current time in microseconds since epoch
388#[inline]
389fn now_us() -> u64 {
390    std::time::SystemTime::now()
391        .duration_since(std::time::UNIX_EPOCH)
392        .unwrap_or_default()
393        .as_micros() as u64
394}
395
396// ============================================================================
397// Tests
398// ============================================================================
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn test_default_config() {
406        let config = AdaptiveMemtableConfig::default();
407        assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
408        assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
409        assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
410    }
411
412    #[test]
413    fn test_initial_size() {
414        let sizer = AdaptiveMemtableSizer::new();
415        assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
416    }
417
418    #[test]
419    fn test_record_write() {
420        let sizer = AdaptiveMemtableSizer::new();
421        
422        sizer.record_write(1000);
423        sizer.record_write(2000);
424        
425        // Bytes accumulate until update
426        assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
427    }
428
429    #[test]
430    fn test_should_flush() {
431        let sizer = AdaptiveMemtableSizer::new();
432        
433        // Initially at base size (4MB)
434        assert!(!sizer.should_flush(1_000_000)); // 1MB < 4MB
435        assert!(sizer.should_flush(5_000_000));  // 5MB >= 4MB
436    }
437
438    #[test]
439    fn test_write_rate_update() {
440        let sizer = AdaptiveMemtableSizer::new();
441        
442        // Simulate writing 1MB over ~1 second
443        sizer.record_write(1_000_000);
444        std::thread::sleep(std::time::Duration::from_millis(100));
445        
446        let new_size = sizer.update();
447        
448        // Size should adjust based on write rate
449        assert!(new_size >= MIN_MEMTABLE_SIZE);
450        assert!(new_size <= MAX_MEMTABLE_SIZE);
451    }
452
453    #[test]
454    fn test_custom_config() {
455        let config = AdaptiveMemtableConfig {
456            base_size: 8 * 1024 * 1024,
457            min_size: 2 * 1024 * 1024,
458            max_size: 32 * 1024 * 1024,
459            target_buffer_seconds: 2.0,
460            ema_alpha: 0.2,
461            enable_memory_pressure: false,
462        };
463
464        let sizer = AdaptiveMemtableSizer::with_config(config);
465        assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
466    }
467
468    #[test]
469    fn test_memory_pressure() {
470        // Just ensure it doesn't crash
471        let pressure = get_memory_pressure();
472        assert!(pressure >= 0.0);
473        assert!(pressure <= 1.0);
474    }
475
476    #[test]
477    fn test_stats() {
478        let sizer = AdaptiveMemtableSizer::new();
479        let stats = sizer.stats();
480        
481        assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
482        assert!(stats.write_rate_bytes_per_sec > 0.0);
483        assert!(stats.memory_pressure >= 0.0);
484    }
485}