Skip to main content

sochdb_storage/
adaptive_memtable.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// SochDB - LLM-Optimized Embedded Database
3// Copyright (C) 2026 Sushanth Reddy Vanagala (https://github.com/sushanthpy)
4//
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU Affero General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9//
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU Affero General Public License for more details.
14//
15// You should have received a copy of the GNU Affero General Public License
16// along with this program. If not, see <https://www.gnu.org/licenses/>.
17
18//! Adaptive Memtable Sizing with Memory Pressure Feedback
19//!
20//! This module implements Task 10 from mm.md: dynamic memtable sizing that
21//! responds to system memory pressure and write rate.
22//!
23//! ## Problem: Fixed Memtable Size is Suboptimal
24//!
25//! The current memtable uses a fixed 4MB flush threshold (`memtable_flush_size`).
26//! This one-size-fits-all approach is suboptimal because:
27//!
28//! - **Too small:** Frequent flushes cause I/O overhead and write amplification
29//! - **Too large:** Memory pressure, long recovery times, increased GC pause
30//!
31//! The optimal size depends on write rate, available memory, and durability
32//! requirements—all of which vary at runtime.
33//!
34//! ## Solution: Adaptive Sizing with Feedback Control
35//!
36//! ```text
37//! ┌─────────────────────────────────────────────────────────────────┐
38//! │                  Adaptive Memtable Sizer                         │
39//! ├─────────────────────────────────────────────────────────────────┤
40//! │                                                                  │
41//! │  ┌──────────────┐    ┌──────────────┐    ┌──────────────┐       │
42//! │  │ Write Rate   │    │ Memory       │    │ Target Size  │       │
43//! │  │ Estimator    │───▶│ Pressure     │───▶│ Calculator   │       │
44//! │  │ (EMA)        │    │ Monitor      │    │              │       │
45//! │  └──────────────┘    └──────────────┘    └──────────────┘       │
46//! │                                                 │                │
47//! │                                                 ▼                │
48//! │                      ┌──────────────────────────────────────────┐│
49//! │                      │ target = write_rate × 1.0s               ││
50//! │                      │ adjusted = target × (1 - memory_pressure²)│
51//! │                      │ final = clamp(adjusted, base/4, base×4)  ││
52//! │                      └──────────────────────────────────────────┘│
53//! └─────────────────────────────────────────────────────────────────┘
54//!
55//! Goals:
56//! - Optimal memory utilization across varying workloads
57//! - Reduced flush frequency during low-memory conditions
58//! - Faster recovery (smaller memtable = less WAL replay)
59//! ```
60//!
61//! ## Feedback Controller
62//!
63//! ```text
64//! target = write_rate_bytes_per_sec × 1.0  // 1 second buffer
65//! adjusted = target × (1 - memory_pressure²)
66//! final = clamp(adjusted, base/4, base×4)  // 1MB to 16MB
67//! ```
68//!
69//! Memory pressure signal: Read from /proc/meminfo (Linux) or
70//! mach_host_statistics (macOS). Pressure = 1 - (available / total).
71
72use std::sync::atomic::{AtomicU64, Ordering};
73
74/// Default base memtable size: 4MB
75pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
76
77/// Minimum memtable size: 1MB
78pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
79
80/// Maximum memtable size: 16MB
81pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
82
83/// Target buffer duration in seconds
84/// Memtable should hold approximately this much time of write throughput
85pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
86
87/// EMA alpha for write rate estimation (higher = more responsive)
88pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
89
90/// Memory pressure threshold above which we start reducing memtable size
91pub const PRESSURE_THRESHOLD: f64 = 0.7;
92
93/// Configuration for adaptive memtable sizing
94#[derive(Debug, Clone)]
95pub struct AdaptiveMemtableConfig {
96    /// Base size in bytes (default: 4MB)
97    pub base_size: usize,
98    /// Minimum allowed size (default: 1MB)
99    pub min_size: usize,
100    /// Maximum allowed size (default: 16MB)
101    pub max_size: usize,
102    /// Target buffer duration in seconds (default: 1.0)
103    pub target_buffer_seconds: f64,
104    /// EMA alpha for write rate (default: 0.1)
105    pub ema_alpha: f64,
106    /// Whether to enable memory pressure feedback
107    pub enable_memory_pressure: bool,
108}
109
110impl Default for AdaptiveMemtableConfig {
111    fn default() -> Self {
112        Self {
113            base_size: DEFAULT_BASE_SIZE,
114            min_size: MIN_MEMTABLE_SIZE,
115            max_size: MAX_MEMTABLE_SIZE,
116            target_buffer_seconds: TARGET_BUFFER_SECONDS,
117            ema_alpha: WRITE_RATE_EMA_ALPHA,
118            enable_memory_pressure: true,
119        }
120    }
121}
122
123/// Adaptive memtable sizer with memory pressure feedback
124///
125/// Dynamically adjusts memtable flush threshold based on:
126/// 1. Write rate (to maintain target buffer duration)
127/// 2. Memory pressure (to avoid OOM)
128pub struct AdaptiveMemtableSizer {
129    /// Configuration
130    config: AdaptiveMemtableConfig,
131    /// Current adaptive size (bytes)
132    current_size: AtomicU64,
133    /// Estimated write rate (bytes per second × 1000 for precision)
134    write_rate_ema: AtomicU64,
135    /// Last update timestamp (microseconds since epoch)
136    last_update_us: AtomicU64,
137    /// Total bytes written since last update
138    bytes_since_update: AtomicU64,
139    /// Last memory pressure reading (0-1000 scaled)
140    memory_pressure_scaled: AtomicU64,
141}
142
143impl AdaptiveMemtableSizer {
144    /// Create a new adaptive sizer with default configuration
145    pub fn new() -> Self {
146        Self::with_config(AdaptiveMemtableConfig::default())
147    }
148
149    /// Create with custom configuration
150    pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
151        let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
152
153        Self {
154            current_size: AtomicU64::new(config.base_size as u64),
155            write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
156            last_update_us: AtomicU64::new(now_us()),
157            bytes_since_update: AtomicU64::new(0),
158            memory_pressure_scaled: AtomicU64::new(0),
159            config,
160        }
161    }
162
163    /// Record bytes written (call on every write)
164    #[inline]
165    pub fn record_write(&self, bytes: usize) {
166        self.bytes_since_update
167            .fetch_add(bytes as u64, Ordering::Relaxed);
168    }
169
170    /// Get current recommended memtable size
171    #[inline]
172    pub fn current_size(&self) -> usize {
173        self.current_size.load(Ordering::Relaxed) as usize
174    }
175
176    /// Get estimated write rate in bytes per second
177    #[inline]
178    pub fn write_rate(&self) -> f64 {
179        self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
180    }
181
182    /// Get current memory pressure (0.0 - 1.0)
183    #[inline]
184    pub fn memory_pressure(&self) -> f64 {
185        self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
186    }
187
188    /// Update the adaptive size (call periodically, e.g., every second)
189    ///
190    /// Returns the new recommended memtable size
191    pub fn update(&self) -> usize {
192        let now = now_us();
193        let last = self.last_update_us.swap(now, Ordering::Relaxed);
194        let delta_us = now.saturating_sub(last);
195
196        if delta_us == 0 {
197            return self.current_size();
198        }
199
200        // Calculate instantaneous write rate
201        let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
202        let delta_secs = delta_us as f64 / 1_000_000.0;
203        let instant_rate = bytes as f64 / delta_secs;
204
205        // Update EMA of write rate
206        let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
207        let new_rate =
208            old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
209        self.write_rate_ema
210            .store((new_rate * 1000.0) as u64, Ordering::Relaxed);
211
212        // Update memory pressure if enabled
213        let pressure = if self.config.enable_memory_pressure {
214            get_memory_pressure()
215        } else {
216            0.0
217        };
218        self.memory_pressure_scaled
219            .store((pressure * 1000.0) as u64, Ordering::Relaxed);
220
221        // Calculate target size based on write rate
222        // Target = write_rate × buffer_duration
223        let target_size = new_rate * self.config.target_buffer_seconds;
224
225        // Adjust for memory pressure (quadratic dampening)
226        // When pressure is high, we reduce memtable size more aggressively
227        let pressure_factor = if pressure > PRESSURE_THRESHOLD {
228            // Above threshold, apply quadratic reduction
229            1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
230        } else {
231            1.0
232        };
233
234        let adjusted_size = target_size * pressure_factor;
235
236        // Clamp to configured bounds
237        let final_size = adjusted_size
238            .max(self.config.min_size as f64)
239            .min(self.config.max_size as f64) as usize;
240
241        self.current_size
242            .store(final_size as u64, Ordering::Relaxed);
243        final_size
244    }
245
246    /// Check if the memtable should be flushed based on current size
247    #[inline]
248    pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
249        current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
250    }
251
252    /// Get statistics for monitoring
253    pub fn stats(&self) -> AdaptiveMemtableStats {
254        AdaptiveMemtableStats {
255            current_size: self.current_size(),
256            write_rate_bytes_per_sec: self.write_rate(),
257            memory_pressure: self.memory_pressure(),
258            config: self.config.clone(),
259        }
260    }
261}
262
263impl Default for AdaptiveMemtableSizer {
264    fn default() -> Self {
265        Self::new()
266    }
267}
268
269/// Statistics for adaptive memtable sizing
270#[derive(Debug, Clone)]
271pub struct AdaptiveMemtableStats {
272    /// Current recommended memtable size
273    pub current_size: usize,
274    /// Estimated write rate in bytes per second
275    pub write_rate_bytes_per_sec: f64,
276    /// Current memory pressure (0.0 - 1.0)
277    pub memory_pressure: f64,
278    /// Current configuration
279    pub config: AdaptiveMemtableConfig,
280}
281
282// ============================================================================
283// Platform-specific memory pressure detection
284// ============================================================================
285
286/// Get current memory pressure (0.0 = no pressure, 1.0 = critical)
287#[cfg(target_os = "linux")]
288fn get_memory_pressure() -> f64 {
289    // Read from /proc/meminfo
290    use std::fs::File;
291    use std::io::{BufRead, BufReader};
292
293    let file = match File::open("/proc/meminfo") {
294        Ok(f) => f,
295        Err(_) => return 0.0,
296    };
297
298    let reader = BufReader::new(file);
299    let mut mem_total: u64 = 0;
300    let mut mem_available: u64 = 0;
301
302    for line in reader.lines().take(10).flatten() {
303        if line.starts_with("MemTotal:") {
304            mem_total = parse_meminfo_value(&line);
305        } else if line.starts_with("MemAvailable:") {
306            mem_available = parse_meminfo_value(&line);
307        }
308    }
309
310    if mem_total == 0 {
311        return 0.0;
312    }
313
314    // Pressure = 1 - (available / total)
315    1.0 - (mem_available as f64 / mem_total as f64)
316}
317
318#[cfg(target_os = "linux")]
319fn parse_meminfo_value(line: &str) -> u64 {
320    // Format: "MemTotal:       16384000 kB"
321    line.split_whitespace()
322        .nth(1)
323        .and_then(|s| s.parse::<u64>().ok())
324        .unwrap_or(0)
325}
326
327#[cfg(target_os = "macos")]
328fn get_memory_pressure() -> f64 {
329    // On macOS, we can use mach APIs for memory info
330    // For simplicity, we use sysctl to get vm.page_free_count and related values
331    use std::process::Command;
332
333    let output = match Command::new("vm_stat").output() {
334        Ok(o) => o,
335        Err(_) => return 0.0,
336    };
337
338    let stdout = match String::from_utf8(output.stdout) {
339        Ok(s) => s,
340        Err(_) => return 0.0,
341    };
342
343    let mut free_pages: u64 = 0;
344    let mut active_pages: u64 = 0;
345    let mut inactive_pages: u64 = 0;
346    let mut speculative_pages: u64 = 0;
347    let mut wired_pages: u64 = 0;
348
349    for line in stdout.lines() {
350        if line.contains("Pages free:") {
351            free_pages = extract_vm_stat_value(line);
352        } else if line.contains("Pages active:") {
353            active_pages = extract_vm_stat_value(line);
354        } else if line.contains("Pages inactive:") {
355            inactive_pages = extract_vm_stat_value(line);
356        } else if line.contains("Pages speculative:") {
357            speculative_pages = extract_vm_stat_value(line);
358        } else if line.contains("Pages wired down:") {
359            wired_pages = extract_vm_stat_value(line);
360        }
361    }
362
363    // Total = active + inactive + speculative + free + wired
364    // Available = free + inactive (approximately)
365    let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
366    let available = free_pages + inactive_pages;
367
368    if total == 0 {
369        return 0.0;
370    }
371
372    1.0 - (available as f64 / total as f64)
373}
374
375#[cfg(target_os = "macos")]
376fn extract_vm_stat_value(line: &str) -> u64 {
377    // Format: "Pages free:                                3142."
378    line.split(':')
379        .nth(1)
380        .map(|s| s.trim().trim_end_matches('.'))
381        .and_then(|s| s.parse::<u64>().ok())
382        .unwrap_or(0)
383}
384
385#[cfg(not(any(target_os = "linux", target_os = "macos")))]
386fn get_memory_pressure() -> f64 {
387    // Default: assume no pressure on unsupported platforms
388    0.0
389}
390
391// ============================================================================
392// Utility functions
393// ============================================================================
394
395/// Get current time in microseconds since epoch
396#[inline]
397fn now_us() -> u64 {
398    std::time::SystemTime::now()
399        .duration_since(std::time::UNIX_EPOCH)
400        .unwrap_or_default()
401        .as_micros() as u64
402}
403
404// ============================================================================
405// Tests
406// ============================================================================
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_default_config() {
414        let config = AdaptiveMemtableConfig::default();
415        assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
416        assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
417        assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
418    }
419
420    #[test]
421    fn test_initial_size() {
422        let sizer = AdaptiveMemtableSizer::new();
423        assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
424    }
425
426    #[test]
427    fn test_record_write() {
428        let sizer = AdaptiveMemtableSizer::new();
429
430        sizer.record_write(1000);
431        sizer.record_write(2000);
432
433        // Bytes accumulate until update
434        assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
435    }
436
437    #[test]
438    fn test_should_flush() {
439        let sizer = AdaptiveMemtableSizer::new();
440
441        // Initially at base size (4MB)
442        assert!(!sizer.should_flush(1_000_000)); // 1MB < 4MB
443        assert!(sizer.should_flush(5_000_000)); // 5MB >= 4MB
444    }
445
446    #[test]
447    fn test_write_rate_update() {
448        let sizer = AdaptiveMemtableSizer::new();
449
450        // Simulate writing 1MB over ~1 second
451        sizer.record_write(1_000_000);
452        std::thread::sleep(std::time::Duration::from_millis(100));
453
454        let new_size = sizer.update();
455
456        // Size should adjust based on write rate
457        assert!(new_size >= MIN_MEMTABLE_SIZE);
458        assert!(new_size <= MAX_MEMTABLE_SIZE);
459    }
460
461    #[test]
462    fn test_custom_config() {
463        let config = AdaptiveMemtableConfig {
464            base_size: 8 * 1024 * 1024,
465            min_size: 2 * 1024 * 1024,
466            max_size: 32 * 1024 * 1024,
467            target_buffer_seconds: 2.0,
468            ema_alpha: 0.2,
469            enable_memory_pressure: false,
470        };
471
472        let sizer = AdaptiveMemtableSizer::with_config(config);
473        assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
474    }
475
476    #[test]
477    fn test_memory_pressure() {
478        // Just ensure it doesn't crash
479        let pressure = get_memory_pressure();
480        assert!(pressure >= 0.0);
481        assert!(pressure <= 1.0);
482    }
483
484    #[test]
485    fn test_stats() {
486        let sizer = AdaptiveMemtableSizer::new();
487        let stats = sizer.stats();
488
489        assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
490        assert!(stats.write_rate_bytes_per_sec > 0.0);
491        assert!(stats.memory_pressure >= 0.0);
492    }
493}