sochdb_storage/
adaptive_memtable.rs1use std::sync::atomic::{AtomicU64, Ordering};
73
74pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
76
77pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
79
80pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
82
83pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
86
87pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
89
90pub const PRESSURE_THRESHOLD: f64 = 0.7;
92
93#[derive(Debug, Clone)]
95pub struct AdaptiveMemtableConfig {
96 pub base_size: usize,
98 pub min_size: usize,
100 pub max_size: usize,
102 pub target_buffer_seconds: f64,
104 pub ema_alpha: f64,
106 pub enable_memory_pressure: bool,
108}
109
110impl Default for AdaptiveMemtableConfig {
111 fn default() -> Self {
112 Self {
113 base_size: DEFAULT_BASE_SIZE,
114 min_size: MIN_MEMTABLE_SIZE,
115 max_size: MAX_MEMTABLE_SIZE,
116 target_buffer_seconds: TARGET_BUFFER_SECONDS,
117 ema_alpha: WRITE_RATE_EMA_ALPHA,
118 enable_memory_pressure: true,
119 }
120 }
121}
122
123pub struct AdaptiveMemtableSizer {
129 config: AdaptiveMemtableConfig,
131 current_size: AtomicU64,
133 write_rate_ema: AtomicU64,
135 last_update_us: AtomicU64,
137 bytes_since_update: AtomicU64,
139 memory_pressure_scaled: AtomicU64,
141}
142
143impl AdaptiveMemtableSizer {
144 pub fn new() -> Self {
146 Self::with_config(AdaptiveMemtableConfig::default())
147 }
148
149 pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
151 let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
152
153 Self {
154 current_size: AtomicU64::new(config.base_size as u64),
155 write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
156 last_update_us: AtomicU64::new(now_us()),
157 bytes_since_update: AtomicU64::new(0),
158 memory_pressure_scaled: AtomicU64::new(0),
159 config,
160 }
161 }
162
163 #[inline]
165 pub fn record_write(&self, bytes: usize) {
166 self.bytes_since_update.fetch_add(bytes as u64, Ordering::Relaxed);
167 }
168
169 #[inline]
171 pub fn current_size(&self) -> usize {
172 self.current_size.load(Ordering::Relaxed) as usize
173 }
174
175 #[inline]
177 pub fn write_rate(&self) -> f64 {
178 self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
179 }
180
181 #[inline]
183 pub fn memory_pressure(&self) -> f64 {
184 self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
185 }
186
187 pub fn update(&self) -> usize {
191 let now = now_us();
192 let last = self.last_update_us.swap(now, Ordering::Relaxed);
193 let delta_us = now.saturating_sub(last);
194
195 if delta_us == 0 {
196 return self.current_size();
197 }
198
199 let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
201 let delta_secs = delta_us as f64 / 1_000_000.0;
202 let instant_rate = bytes as f64 / delta_secs;
203
204 let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
206 let new_rate = old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
207 self.write_rate_ema.store((new_rate * 1000.0) as u64, Ordering::Relaxed);
208
209 let pressure = if self.config.enable_memory_pressure {
211 get_memory_pressure()
212 } else {
213 0.0
214 };
215 self.memory_pressure_scaled.store((pressure * 1000.0) as u64, Ordering::Relaxed);
216
217 let target_size = new_rate * self.config.target_buffer_seconds;
220
221 let pressure_factor = if pressure > PRESSURE_THRESHOLD {
224 1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
226 } else {
227 1.0
228 };
229
230 let adjusted_size = target_size * pressure_factor;
231
232 let final_size = adjusted_size
234 .max(self.config.min_size as f64)
235 .min(self.config.max_size as f64) as usize;
236
237 self.current_size.store(final_size as u64, Ordering::Relaxed);
238 final_size
239 }
240
241 #[inline]
243 pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
244 current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
245 }
246
247 pub fn stats(&self) -> AdaptiveMemtableStats {
249 AdaptiveMemtableStats {
250 current_size: self.current_size(),
251 write_rate_bytes_per_sec: self.write_rate(),
252 memory_pressure: self.memory_pressure(),
253 config: self.config.clone(),
254 }
255 }
256}
257
258impl Default for AdaptiveMemtableSizer {
259 fn default() -> Self {
260 Self::new()
261 }
262}
263
264#[derive(Debug, Clone)]
266pub struct AdaptiveMemtableStats {
267 pub current_size: usize,
269 pub write_rate_bytes_per_sec: f64,
271 pub memory_pressure: f64,
273 pub config: AdaptiveMemtableConfig,
275}
276
277#[cfg(target_os = "linux")]
283fn get_memory_pressure() -> f64 {
284 use std::fs::File;
286 use std::io::{BufRead, BufReader};
287
288 let file = match File::open("/proc/meminfo") {
289 Ok(f) => f,
290 Err(_) => return 0.0,
291 };
292
293 let reader = BufReader::new(file);
294 let mut mem_total: u64 = 0;
295 let mut mem_available: u64 = 0;
296
297 for line in reader.lines().take(10).flatten() {
298 if line.starts_with("MemTotal:") {
299 mem_total = parse_meminfo_value(&line);
300 } else if line.starts_with("MemAvailable:") {
301 mem_available = parse_meminfo_value(&line);
302 }
303 }
304
305 if mem_total == 0 {
306 return 0.0;
307 }
308
309 1.0 - (mem_available as f64 / mem_total as f64)
311}
312
313#[cfg(target_os = "linux")]
314fn parse_meminfo_value(line: &str) -> u64 {
315 line.split_whitespace()
317 .nth(1)
318 .and_then(|s| s.parse::<u64>().ok())
319 .unwrap_or(0)
320}
321
322#[cfg(target_os = "macos")]
323fn get_memory_pressure() -> f64 {
324 use std::process::Command;
327
328 let output = match Command::new("vm_stat").output() {
329 Ok(o) => o,
330 Err(_) => return 0.0,
331 };
332
333 let stdout = match String::from_utf8(output.stdout) {
334 Ok(s) => s,
335 Err(_) => return 0.0,
336 };
337
338 let mut free_pages: u64 = 0;
339 let mut active_pages: u64 = 0;
340 let mut inactive_pages: u64 = 0;
341 let mut speculative_pages: u64 = 0;
342 let mut wired_pages: u64 = 0;
343
344 for line in stdout.lines() {
345 if line.contains("Pages free:") {
346 free_pages = extract_vm_stat_value(line);
347 } else if line.contains("Pages active:") {
348 active_pages = extract_vm_stat_value(line);
349 } else if line.contains("Pages inactive:") {
350 inactive_pages = extract_vm_stat_value(line);
351 } else if line.contains("Pages speculative:") {
352 speculative_pages = extract_vm_stat_value(line);
353 } else if line.contains("Pages wired down:") {
354 wired_pages = extract_vm_stat_value(line);
355 }
356 }
357
358 let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
361 let available = free_pages + inactive_pages;
362
363 if total == 0 {
364 return 0.0;
365 }
366
367 1.0 - (available as f64 / total as f64)
368}
369
370#[cfg(target_os = "macos")]
371fn extract_vm_stat_value(line: &str) -> u64 {
372 line.split(':')
374 .nth(1)
375 .map(|s| s.trim().trim_end_matches('.'))
376 .and_then(|s| s.parse::<u64>().ok())
377 .unwrap_or(0)
378}
379
380#[cfg(not(any(target_os = "linux", target_os = "macos")))]
381fn get_memory_pressure() -> f64 {
382 0.0
384}
385
386#[inline]
392fn now_us() -> u64 {
393 std::time::SystemTime::now()
394 .duration_since(std::time::UNIX_EPOCH)
395 .unwrap_or_default()
396 .as_micros() as u64
397}
398
399#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn test_default_config() {
409 let config = AdaptiveMemtableConfig::default();
410 assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
411 assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
412 assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
413 }
414
415 #[test]
416 fn test_initial_size() {
417 let sizer = AdaptiveMemtableSizer::new();
418 assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
419 }
420
421 #[test]
422 fn test_record_write() {
423 let sizer = AdaptiveMemtableSizer::new();
424
425 sizer.record_write(1000);
426 sizer.record_write(2000);
427
428 assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
430 }
431
432 #[test]
433 fn test_should_flush() {
434 let sizer = AdaptiveMemtableSizer::new();
435
436 assert!(!sizer.should_flush(1_000_000)); assert!(sizer.should_flush(5_000_000)); }
440
441 #[test]
442 fn test_write_rate_update() {
443 let sizer = AdaptiveMemtableSizer::new();
444
445 sizer.record_write(1_000_000);
447 std::thread::sleep(std::time::Duration::from_millis(100));
448
449 let new_size = sizer.update();
450
451 assert!(new_size >= MIN_MEMTABLE_SIZE);
453 assert!(new_size <= MAX_MEMTABLE_SIZE);
454 }
455
456 #[test]
457 fn test_custom_config() {
458 let config = AdaptiveMemtableConfig {
459 base_size: 8 * 1024 * 1024,
460 min_size: 2 * 1024 * 1024,
461 max_size: 32 * 1024 * 1024,
462 target_buffer_seconds: 2.0,
463 ema_alpha: 0.2,
464 enable_memory_pressure: false,
465 };
466
467 let sizer = AdaptiveMemtableSizer::with_config(config);
468 assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
469 }
470
471 #[test]
472 fn test_memory_pressure() {
473 let pressure = get_memory_pressure();
475 assert!(pressure >= 0.0);
476 assert!(pressure <= 1.0);
477 }
478
479 #[test]
480 fn test_stats() {
481 let sizer = AdaptiveMemtableSizer::new();
482 let stats = sizer.stats();
483
484 assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
485 assert!(stats.write_rate_bytes_per_sec > 0.0);
486 assert!(stats.memory_pressure >= 0.0);
487 }
488}