Skip to main content

sochdb_storage/
adaptive_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Adaptive Memtable Sizing with Memory Pressure Feedback
19//!
20//! This module implements Task 10 from mm.md: dynamic memtable sizing that
21//! responds to system memory pressure and write rate.
22//!
23//! ## Problem: Fixed Memtable Size is Suboptimal
24//!
25//! The current memtable uses a fixed 4MB flush threshold (`memtable_flush_size`).
26//! This one-size-fits-all approach is suboptimal because:
27//!
28//! - **Too small:** Frequent flushes cause I/O overhead and write amplification
29//! - **Too large:** Memory pressure, long recovery times, increased GC pause
30//!
31//! The optimal size depends on write rate, available memory, and durability
32//! requirements—all of which vary at runtime.
33//!
34//! ## Solution: Adaptive Sizing with Feedback Control
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │                  Adaptive Memtable Sizer                         │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │                                                                  │
41//! │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
42//! │  │ Write Rate   │    │ Memory       │    │ Target Size  │       │
43//! │  │ Estimator    │───▶│ Pressure     │───▶│ Calculator   │       │
44//! │  │ (EMA)        │    │ Monitor      │    │              │       │
45//! │  └──────────────┘    └──────────────┘    └──────────────┘       │
46//! │                                                 │                │
47//! │                                                 ▼                │
48//! │                      ┌──────────────────────────────────────────┐│
49//! │                      │ target = write_rate × 1.0s               ││
50//! │                      │ adjusted = target × (1 - memory_pressure²)│
51//! │                      │ final = clamp(adjusted, base/4, base×4)  ││
52//! │                      └──────────────────────────────────────────┘│
53//! └─────────────────────────────────────────────────────────────────┘
54//!
55//! Goals:
56//! - Optimal memory utilization across varying workloads
57//! - Reduced flush frequency during low-memory conditions
58//! - Faster recovery (smaller memtable = less WAL replay)
59//! ```
60//!
61//! ## Feedback Controller
62//!
63//! ```text
64//! target = write_rate_bytes_per_sec × 1.0  // 1 second buffer
65//! adjusted = target × (1 - memory_pressure²)
66//! final = clamp(adjusted, base/4, base×4)  // 1MB to 16MB
67//! ```
68//!
69//! Memory pressure signal: Read from /proc/meminfo (Linux) or
70//! mach_host_statistics (macOS). Pressure = 1 - (available / total).
71
72use std::sync::atomic::{AtomicU64, Ordering};
73
74/// Default base memtable size: 4MB
75pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
76
77/// Minimum memtable size: 1MB
78pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
79
80/// Maximum memtable size: 16MB
81pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
82
83/// Target buffer duration in seconds
84/// Memtable should hold approximately this much time of write throughput
85pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
86
87/// EMA alpha for write rate estimation (higher = more responsive)
88pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
89
90/// Memory pressure threshold above which we start reducing memtable size
91pub const PRESSURE_THRESHOLD: f64 = 0.7;
92
93/// Configuration for adaptive memtable sizing
94#[derive(Debug, Clone)]
95pub struct AdaptiveMemtableConfig {
96    /// Base size in bytes (default: 4MB)
97    pub base_size: usize,
98    /// Minimum allowed size (default: 1MB)
99    pub min_size: usize,
100    /// Maximum allowed size (default: 16MB)
101    pub max_size: usize,
102    /// Target buffer duration in seconds (default: 1.0)
103    pub target_buffer_seconds: f64,
104    /// EMA alpha for write rate (default: 0.1)
105    pub ema_alpha: f64,
106    /// Whether to enable memory pressure feedback
107    pub enable_memory_pressure: bool,
108}
109
110impl Default for AdaptiveMemtableConfig {
111    fn default() -> Self {
112        Self {
113            base_size: DEFAULT_BASE_SIZE,
114            min_size: MIN_MEMTABLE_SIZE,
115            max_size: MAX_MEMTABLE_SIZE,
116            target_buffer_seconds: TARGET_BUFFER_SECONDS,
117            ema_alpha: WRITE_RATE_EMA_ALPHA,
118            enable_memory_pressure: true,
119        }
120    }
121}
122
123/// Adaptive memtable sizer with memory pressure feedback
124///
125/// Dynamically adjusts memtable flush threshold based on:
126/// 1. Write rate (to maintain target buffer duration)
127/// 2. Memory pressure (to avoid OOM)
128pub struct AdaptiveMemtableSizer {
129    /// Configuration
130    config: AdaptiveMemtableConfig,
131    /// Current adaptive size (bytes)
132    current_size: AtomicU64,
133    /// Estimated write rate (bytes per second × 1000 for precision)
134    write_rate_ema: AtomicU64,
135    /// Last update timestamp (microseconds since epoch)
136    last_update_us: AtomicU64,
137    /// Total bytes written since last update
138    bytes_since_update: AtomicU64,
139    /// Last memory pressure reading (0-1000 scaled)
140    memory_pressure_scaled: AtomicU64,
141}
142
143impl AdaptiveMemtableSizer {
144    /// Create a new adaptive sizer with default configuration
145    pub fn new() -> Self {
146        Self::with_config(AdaptiveMemtableConfig::default())
147    }
148
149    /// Create with custom configuration
150    pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
151        let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
152        
153        Self {
154            current_size: AtomicU64::new(config.base_size as u64),
155            write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
156            last_update_us: AtomicU64::new(now_us()),
157            bytes_since_update: AtomicU64::new(0),
158            memory_pressure_scaled: AtomicU64::new(0),
159            config,
160        }
161    }
162
163    /// Record bytes written (call on every write)
164    #[inline]
165    pub fn record_write(&self, bytes: usize) {
166        self.bytes_since_update.fetch_add(bytes as u64, Ordering::Relaxed);
167    }
168
169    /// Get current recommended memtable size
170    #[inline]
171    pub fn current_size(&self) -> usize {
172        self.current_size.load(Ordering::Relaxed) as usize
173    }
174
175    /// Get estimated write rate in bytes per second
176    #[inline]
177    pub fn write_rate(&self) -> f64 {
178        self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
179    }
180
181    /// Get current memory pressure (0.0 - 1.0)
182    #[inline]
183    pub fn memory_pressure(&self) -> f64 {
184        self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
185    }
186
187    /// Update the adaptive size (call periodically, e.g., every second)
188    ///
189    /// Returns the new recommended memtable size
190    pub fn update(&self) -> usize {
191        let now = now_us();
192        let last = self.last_update_us.swap(now, Ordering::Relaxed);
193        let delta_us = now.saturating_sub(last);
194
195        if delta_us == 0 {
196            return self.current_size();
197        }
198
199        // Calculate instantaneous write rate
200        let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
201        let delta_secs = delta_us as f64 / 1_000_000.0;
202        let instant_rate = bytes as f64 / delta_secs;
203
204        // Update EMA of write rate
205        let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
206        let new_rate = old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
207        self.write_rate_ema.store((new_rate * 1000.0) as u64, Ordering::Relaxed);
208
209        // Update memory pressure if enabled
210        let pressure = if self.config.enable_memory_pressure {
211            get_memory_pressure()
212        } else {
213            0.0
214        };
215        self.memory_pressure_scaled.store((pressure * 1000.0) as u64, Ordering::Relaxed);
216
217        // Calculate target size based on write rate
218        // Target = write_rate × buffer_duration
219        let target_size = new_rate * self.config.target_buffer_seconds;
220
221        // Adjust for memory pressure (quadratic dampening)
222        // When pressure is high, we reduce memtable size more aggressively
223        let pressure_factor = if pressure > PRESSURE_THRESHOLD {
224            // Above threshold, apply quadratic reduction
225            1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
226        } else {
227            1.0
228        };
229
230        let adjusted_size = target_size * pressure_factor;
231
232        // Clamp to configured bounds
233        let final_size = adjusted_size
234            .max(self.config.min_size as f64)
235            .min(self.config.max_size as f64) as usize;
236
237        self.current_size.store(final_size as u64, Ordering::Relaxed);
238        final_size
239    }
240
241    /// Check if the memtable should be flushed based on current size
242    #[inline]
243    pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
244        current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
245    }
246
247    /// Get statistics for monitoring
248    pub fn stats(&self) -> AdaptiveMemtableStats {
249        AdaptiveMemtableStats {
250            current_size: self.current_size(),
251            write_rate_bytes_per_sec: self.write_rate(),
252            memory_pressure: self.memory_pressure(),
253            config: self.config.clone(),
254        }
255    }
256}
257
258impl Default for AdaptiveMemtableSizer {
259    fn default() -> Self {
260        Self::new()
261    }
262}
263
264/// Statistics for adaptive memtable sizing
265#[derive(Debug, Clone)]
266pub struct AdaptiveMemtableStats {
267    /// Current recommended memtable size
268    pub current_size: usize,
269    /// Estimated write rate in bytes per second
270    pub write_rate_bytes_per_sec: f64,
271    /// Current memory pressure (0.0 - 1.0)
272    pub memory_pressure: f64,
273    /// Current configuration
274    pub config: AdaptiveMemtableConfig,
275}
276
277// ============================================================================
278// Platform-specific memory pressure detection
279// ============================================================================
280
281/// Get current memory pressure (0.0 = no pressure, 1.0 = critical)
282#[cfg(target_os = "linux")]
283fn get_memory_pressure() -> f64 {
284    // Read from /proc/meminfo
285    use std::fs::File;
286    use std::io::{BufRead, BufReader};
287
288    let file = match File::open("/proc/meminfo") {
289        Ok(f) => f,
290        Err(_) => return 0.0,
291    };
292
293    let reader = BufReader::new(file);
294    let mut mem_total: u64 = 0;
295    let mut mem_available: u64 = 0;
296
297    for line in reader.lines().take(10).flatten() {
298        if line.starts_with("MemTotal:") {
299            mem_total = parse_meminfo_value(&line);
300        } else if line.starts_with("MemAvailable:") {
301            mem_available = parse_meminfo_value(&line);
302        }
303    }
304
305    if mem_total == 0 {
306        return 0.0;
307    }
308
309    // Pressure = 1 - (available / total)
310    1.0 - (mem_available as f64 / mem_total as f64)
311}
312
313#[cfg(target_os = "linux")]
314fn parse_meminfo_value(line: &str) -> u64 {
315    // Format: "MemTotal:       16384000 kB"
316    line.split_whitespace()
317        .nth(1)
318        .and_then(|s| s.parse::<u64>().ok())
319        .unwrap_or(0)
320}
321
322#[cfg(target_os = "macos")]
323fn get_memory_pressure() -> f64 {
324    // On macOS, we can use mach APIs for memory info
325    // For simplicity, we use sysctl to get vm.page_free_count and related values
326    use std::process::Command;
327
328    let output = match Command::new("vm_stat").output() {
329        Ok(o) => o,
330        Err(_) => return 0.0,
331    };
332
333    let stdout = match String::from_utf8(output.stdout) {
334        Ok(s) => s,
335        Err(_) => return 0.0,
336    };
337
338    let mut free_pages: u64 = 0;
339    let mut active_pages: u64 = 0;
340    let mut inactive_pages: u64 = 0;
341    let mut speculative_pages: u64 = 0;
342    let mut wired_pages: u64 = 0;
343
344    for line in stdout.lines() {
345        if line.contains("Pages free:") {
346            free_pages = extract_vm_stat_value(line);
347        } else if line.contains("Pages active:") {
348            active_pages = extract_vm_stat_value(line);
349        } else if line.contains("Pages inactive:") {
350            inactive_pages = extract_vm_stat_value(line);
351        } else if line.contains("Pages speculative:") {
352            speculative_pages = extract_vm_stat_value(line);
353        } else if line.contains("Pages wired down:") {
354            wired_pages = extract_vm_stat_value(line);
355        }
356    }
357
358    // Total = active + inactive + speculative + free + wired
359    // Available = free + inactive (approximately)
360    let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
361    let available = free_pages + inactive_pages;
362
363    if total == 0 {
364        return 0.0;
365    }
366
367    1.0 - (available as f64 / total as f64)
368}
369
370#[cfg(target_os = "macos")]
371fn extract_vm_stat_value(line: &str) -> u64 {
372    // Format: "Pages free:                                3142."
373    line.split(':')
374        .nth(1)
375        .map(|s| s.trim().trim_end_matches('.'))
376        .and_then(|s| s.parse::<u64>().ok())
377        .unwrap_or(0)
378}
379
380#[cfg(not(any(target_os = "linux", target_os = "macos")))]
381fn get_memory_pressure() -> f64 {
382    // Default: assume no pressure on unsupported platforms
383    0.0
384}
385
386// ============================================================================
387// Utility functions
388// ============================================================================
389
390/// Get current time in microseconds since epoch
391#[inline]
392fn now_us() -> u64 {
393    std::time::SystemTime::now()
394        .duration_since(std::time::UNIX_EPOCH)
395        .unwrap_or_default()
396        .as_micros() as u64
397}
398
399// ============================================================================
400// Tests
401// ============================================================================
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_default_config() {
409        let config = AdaptiveMemtableConfig::default();
410        assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
411        assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
412        assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
413    }
414
415    #[test]
416    fn test_initial_size() {
417        let sizer = AdaptiveMemtableSizer::new();
418        assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
419    }
420
421    #[test]
422    fn test_record_write() {
423        let sizer = AdaptiveMemtableSizer::new();
424        
425        sizer.record_write(1000);
426        sizer.record_write(2000);
427        
428        // Bytes accumulate until update
429        assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
430    }
431
432    #[test]
433    fn test_should_flush() {
434        let sizer = AdaptiveMemtableSizer::new();
435        
436        // Initially at base size (4MB)
437        assert!(!sizer.should_flush(1_000_000)); // 1MB < 4MB
438        assert!(sizer.should_flush(5_000_000));  // 5MB >= 4MB
439    }
440
441    #[test]
442    fn test_write_rate_update() {
443        let sizer = AdaptiveMemtableSizer::new();
444        
445        // Simulate writing 1MB over ~1 second
446        sizer.record_write(1_000_000);
447        std::thread::sleep(std::time::Duration::from_millis(100));
448        
449        let new_size = sizer.update();
450        
451        // Size should adjust based on write rate
452        assert!(new_size >= MIN_MEMTABLE_SIZE);
453        assert!(new_size <= MAX_MEMTABLE_SIZE);
454    }
455
456    #[test]
457    fn test_custom_config() {
458        let config = AdaptiveMemtableConfig {
459            base_size: 8 * 1024 * 1024,
460            min_size: 2 * 1024 * 1024,
461            max_size: 32 * 1024 * 1024,
462            target_buffer_seconds: 2.0,
463            ema_alpha: 0.2,
464            enable_memory_pressure: false,
465        };
466
467        let sizer = AdaptiveMemtableSizer::with_config(config);
468        assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
469    }
470
471    #[test]
472    fn test_memory_pressure() {
473        // Just ensure it doesn't crash
474        let pressure = get_memory_pressure();
475        assert!(pressure >= 0.0);
476        assert!(pressure <= 1.0);
477    }
478
479    #[test]
480    fn test_stats() {
481        let sizer = AdaptiveMemtableSizer::new();
482        let stats = sizer.stats();
483        
484        assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
485        assert!(stats.write_rate_bytes_per_sec > 0.0);
486        assert!(stats.memory_pressure >= 0.0);
487    }
488}