use std::sync::atomic::{AtomicU64, Ordering};
pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
pub const PRESSURE_THRESHOLD: f64 = 0.7;
#[derive(Debug, Clone)]
pub struct AdaptiveMemtableConfig {
pub base_size: usize,
pub min_size: usize,
pub max_size: usize,
pub target_buffer_seconds: f64,
pub ema_alpha: f64,
pub enable_memory_pressure: bool,
}
impl Default for AdaptiveMemtableConfig {
fn default() -> Self {
Self {
base_size: DEFAULT_BASE_SIZE,
min_size: MIN_MEMTABLE_SIZE,
max_size: MAX_MEMTABLE_SIZE,
target_buffer_seconds: TARGET_BUFFER_SECONDS,
ema_alpha: WRITE_RATE_EMA_ALPHA,
enable_memory_pressure: true,
}
}
}
pub struct AdaptiveMemtableSizer {
config: AdaptiveMemtableConfig,
current_size: AtomicU64,
write_rate_ema: AtomicU64,
last_update_us: AtomicU64,
bytes_since_update: AtomicU64,
memory_pressure_scaled: AtomicU64,
}
impl AdaptiveMemtableSizer {
pub fn new() -> Self {
Self::with_config(AdaptiveMemtableConfig::default())
}
pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
Self {
current_size: AtomicU64::new(config.base_size as u64),
write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
last_update_us: AtomicU64::new(now_us()),
bytes_since_update: AtomicU64::new(0),
memory_pressure_scaled: AtomicU64::new(0),
config,
}
}
#[inline]
pub fn record_write(&self, bytes: usize) {
self.bytes_since_update
.fetch_add(bytes as u64, Ordering::Relaxed);
}
#[inline]
pub fn current_size(&self) -> usize {
self.current_size.load(Ordering::Relaxed) as usize
}
#[inline]
pub fn write_rate(&self) -> f64 {
self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
}
#[inline]
pub fn memory_pressure(&self) -> f64 {
self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
}
pub fn update(&self) -> usize {
let now = now_us();
let last = self.last_update_us.swap(now, Ordering::Relaxed);
let delta_us = now.saturating_sub(last);
if delta_us == 0 {
return self.current_size();
}
let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
let delta_secs = delta_us as f64 / 1_000_000.0;
let instant_rate = bytes as f64 / delta_secs;
let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
let new_rate =
old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
self.write_rate_ema
.store((new_rate * 1000.0) as u64, Ordering::Relaxed);
let pressure = if self.config.enable_memory_pressure {
get_memory_pressure()
} else {
0.0
};
self.memory_pressure_scaled
.store((pressure * 1000.0) as u64, Ordering::Relaxed);
let target_size = new_rate * self.config.target_buffer_seconds;
let pressure_factor = if pressure > PRESSURE_THRESHOLD {
1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
} else {
1.0
};
let adjusted_size = target_size * pressure_factor;
let final_size = adjusted_size
.max(self.config.min_size as f64)
.min(self.config.max_size as f64) as usize;
self.current_size
.store(final_size as u64, Ordering::Relaxed);
final_size
}
#[inline]
pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
}
pub fn stats(&self) -> AdaptiveMemtableStats {
AdaptiveMemtableStats {
current_size: self.current_size(),
write_rate_bytes_per_sec: self.write_rate(),
memory_pressure: self.memory_pressure(),
config: self.config.clone(),
}
}
}
impl Default for AdaptiveMemtableSizer {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub struct AdaptiveMemtableStats {
pub current_size: usize,
pub write_rate_bytes_per_sec: f64,
pub memory_pressure: f64,
pub config: AdaptiveMemtableConfig,
}
#[cfg(target_os = "linux")]
fn get_memory_pressure() -> f64 {
use std::fs::File;
use std::io::{BufRead, BufReader};
let file = match File::open("/proc/meminfo") {
Ok(f) => f,
Err(_) => return 0.0,
};
let reader = BufReader::new(file);
let mut mem_total: u64 = 0;
let mut mem_available: u64 = 0;
for line in reader.lines().take(10).flatten() {
if line.starts_with("MemTotal:") {
mem_total = parse_meminfo_value(&line);
} else if line.starts_with("MemAvailable:") {
mem_available = parse_meminfo_value(&line);
}
}
if mem_total == 0 {
return 0.0;
}
1.0 - (mem_available as f64 / mem_total as f64)
}
#[cfg(target_os = "linux")]
fn parse_meminfo_value(line: &str) -> u64 {
line.split_whitespace()
.nth(1)
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0)
}
#[cfg(target_os = "macos")]
fn get_memory_pressure() -> f64 {
use std::process::Command;
let output = match Command::new("vm_stat").output() {
Ok(o) => o,
Err(_) => return 0.0,
};
let stdout = match String::from_utf8(output.stdout) {
Ok(s) => s,
Err(_) => return 0.0,
};
let mut free_pages: u64 = 0;
let mut active_pages: u64 = 0;
let mut inactive_pages: u64 = 0;
let mut speculative_pages: u64 = 0;
let mut wired_pages: u64 = 0;
for line in stdout.lines() {
if line.contains("Pages free:") {
free_pages = extract_vm_stat_value(line);
} else if line.contains("Pages active:") {
active_pages = extract_vm_stat_value(line);
} else if line.contains("Pages inactive:") {
inactive_pages = extract_vm_stat_value(line);
} else if line.contains("Pages speculative:") {
speculative_pages = extract_vm_stat_value(line);
} else if line.contains("Pages wired down:") {
wired_pages = extract_vm_stat_value(line);
}
}
let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
let available = free_pages + inactive_pages;
if total == 0 {
return 0.0;
}
1.0 - (available as f64 / total as f64)
}
#[cfg(target_os = "macos")]
fn extract_vm_stat_value(line: &str) -> u64 {
line.split(':')
.nth(1)
.map(|s| s.trim().trim_end_matches('.'))
.and_then(|s| s.parse::<u64>().ok())
.unwrap_or(0)
}
#[cfg(not(any(target_os = "linux", target_os = "macos")))]
fn get_memory_pressure() -> f64 {
0.0
}
#[inline]
fn now_us() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_micros() as u64
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_default_config() {
let config = AdaptiveMemtableConfig::default();
assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
}
#[test]
fn test_initial_size() {
let sizer = AdaptiveMemtableSizer::new();
assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
}
#[test]
fn test_record_write() {
let sizer = AdaptiveMemtableSizer::new();
sizer.record_write(1000);
sizer.record_write(2000);
assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
}
#[test]
fn test_should_flush() {
let sizer = AdaptiveMemtableSizer::new();
assert!(!sizer.should_flush(1_000_000)); assert!(sizer.should_flush(5_000_000)); }
#[test]
fn test_write_rate_update() {
let sizer = AdaptiveMemtableSizer::new();
sizer.record_write(1_000_000);
std::thread::sleep(std::time::Duration::from_millis(100));
let new_size = sizer.update();
assert!(new_size >= MIN_MEMTABLE_SIZE);
assert!(new_size <= MAX_MEMTABLE_SIZE);
}
#[test]
fn test_custom_config() {
let config = AdaptiveMemtableConfig {
base_size: 8 * 1024 * 1024,
min_size: 2 * 1024 * 1024,
max_size: 32 * 1024 * 1024,
target_buffer_seconds: 2.0,
ema_alpha: 0.2,
enable_memory_pressure: false,
};
let sizer = AdaptiveMemtableSizer::with_config(config);
assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
}
#[test]
fn test_memory_pressure() {
let pressure = get_memory_pressure();
assert!(pressure >= 0.0);
assert!(pressure <= 1.0);
}
#[test]
fn test_stats() {
let sizer = AdaptiveMemtableSizer::new();
let stats = sizer.stats();
assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
assert!(stats.write_rate_bytes_per_sec > 0.0);
assert!(stats.memory_pressure >= 0.0);
}
}