sochdb_storage/
adaptive_memtable.rs1use std::sync::atomic::{AtomicU64, Ordering};
73
74pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
76
77pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
79
80pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
82
83pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
86
87pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
89
90pub const PRESSURE_THRESHOLD: f64 = 0.7;
92
93#[derive(Debug, Clone)]
95pub struct AdaptiveMemtableConfig {
96 pub base_size: usize,
98 pub min_size: usize,
100 pub max_size: usize,
102 pub target_buffer_seconds: f64,
104 pub ema_alpha: f64,
106 pub enable_memory_pressure: bool,
108}
109
110impl Default for AdaptiveMemtableConfig {
111 fn default() -> Self {
112 Self {
113 base_size: DEFAULT_BASE_SIZE,
114 min_size: MIN_MEMTABLE_SIZE,
115 max_size: MAX_MEMTABLE_SIZE,
116 target_buffer_seconds: TARGET_BUFFER_SECONDS,
117 ema_alpha: WRITE_RATE_EMA_ALPHA,
118 enable_memory_pressure: true,
119 }
120 }
121}
122
123pub struct AdaptiveMemtableSizer {
129 config: AdaptiveMemtableConfig,
131 current_size: AtomicU64,
133 write_rate_ema: AtomicU64,
135 last_update_us: AtomicU64,
137 bytes_since_update: AtomicU64,
139 memory_pressure_scaled: AtomicU64,
141}
142
143impl AdaptiveMemtableSizer {
144 pub fn new() -> Self {
146 Self::with_config(AdaptiveMemtableConfig::default())
147 }
148
149 pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
151 let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
152
153 Self {
154 current_size: AtomicU64::new(config.base_size as u64),
155 write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
156 last_update_us: AtomicU64::new(now_us()),
157 bytes_since_update: AtomicU64::new(0),
158 memory_pressure_scaled: AtomicU64::new(0),
159 config,
160 }
161 }
162
163 #[inline]
165 pub fn record_write(&self, bytes: usize) {
166 self.bytes_since_update
167 .fetch_add(bytes as u64, Ordering::Relaxed);
168 }
169
170 #[inline]
172 pub fn current_size(&self) -> usize {
173 self.current_size.load(Ordering::Relaxed) as usize
174 }
175
176 #[inline]
178 pub fn write_rate(&self) -> f64 {
179 self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
180 }
181
182 #[inline]
184 pub fn memory_pressure(&self) -> f64 {
185 self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
186 }
187
188 pub fn update(&self) -> usize {
192 let now = now_us();
193 let last = self.last_update_us.swap(now, Ordering::Relaxed);
194 let delta_us = now.saturating_sub(last);
195
196 if delta_us == 0 {
197 return self.current_size();
198 }
199
200 let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
202 let delta_secs = delta_us as f64 / 1_000_000.0;
203 let instant_rate = bytes as f64 / delta_secs;
204
205 let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
207 let new_rate =
208 old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
209 self.write_rate_ema
210 .store((new_rate * 1000.0) as u64, Ordering::Relaxed);
211
212 let pressure = if self.config.enable_memory_pressure {
214 get_memory_pressure()
215 } else {
216 0.0
217 };
218 self.memory_pressure_scaled
219 .store((pressure * 1000.0) as u64, Ordering::Relaxed);
220
221 let target_size = new_rate * self.config.target_buffer_seconds;
224
225 let pressure_factor = if pressure > PRESSURE_THRESHOLD {
228 1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
230 } else {
231 1.0
232 };
233
234 let adjusted_size = target_size * pressure_factor;
235
236 let final_size = adjusted_size
238 .max(self.config.min_size as f64)
239 .min(self.config.max_size as f64) as usize;
240
241 self.current_size
242 .store(final_size as u64, Ordering::Relaxed);
243 final_size
244 }
245
246 #[inline]
248 pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
249 current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
250 }
251
252 pub fn stats(&self) -> AdaptiveMemtableStats {
254 AdaptiveMemtableStats {
255 current_size: self.current_size(),
256 write_rate_bytes_per_sec: self.write_rate(),
257 memory_pressure: self.memory_pressure(),
258 config: self.config.clone(),
259 }
260 }
261}
262
263impl Default for AdaptiveMemtableSizer {
264 fn default() -> Self {
265 Self::new()
266 }
267}
268
269#[derive(Debug, Clone)]
271pub struct AdaptiveMemtableStats {
272 pub current_size: usize,
274 pub write_rate_bytes_per_sec: f64,
276 pub memory_pressure: f64,
278 pub config: AdaptiveMemtableConfig,
280}
281
282#[cfg(target_os = "linux")]
288fn get_memory_pressure() -> f64 {
289 use std::fs::File;
291 use std::io::{BufRead, BufReader};
292
293 let file = match File::open("/proc/meminfo") {
294 Ok(f) => f,
295 Err(_) => return 0.0,
296 };
297
298 let reader = BufReader::new(file);
299 let mut mem_total: u64 = 0;
300 let mut mem_available: u64 = 0;
301
302 for line in reader.lines().take(10).flatten() {
303 if line.starts_with("MemTotal:") {
304 mem_total = parse_meminfo_value(&line);
305 } else if line.starts_with("MemAvailable:") {
306 mem_available = parse_meminfo_value(&line);
307 }
308 }
309
310 if mem_total == 0 {
311 return 0.0;
312 }
313
314 1.0 - (mem_available as f64 / mem_total as f64)
316}
317
318#[cfg(target_os = "linux")]
319fn parse_meminfo_value(line: &str) -> u64 {
320 line.split_whitespace()
322 .nth(1)
323 .and_then(|s| s.parse::<u64>().ok())
324 .unwrap_or(0)
325}
326
327#[cfg(target_os = "macos")]
328fn get_memory_pressure() -> f64 {
329 use std::process::Command;
332
333 let output = match Command::new("vm_stat").output() {
334 Ok(o) => o,
335 Err(_) => return 0.0,
336 };
337
338 let stdout = match String::from_utf8(output.stdout) {
339 Ok(s) => s,
340 Err(_) => return 0.0,
341 };
342
343 let mut free_pages: u64 = 0;
344 let mut active_pages: u64 = 0;
345 let mut inactive_pages: u64 = 0;
346 let mut speculative_pages: u64 = 0;
347 let mut wired_pages: u64 = 0;
348
349 for line in stdout.lines() {
350 if line.contains("Pages free:") {
351 free_pages = extract_vm_stat_value(line);
352 } else if line.contains("Pages active:") {
353 active_pages = extract_vm_stat_value(line);
354 } else if line.contains("Pages inactive:") {
355 inactive_pages = extract_vm_stat_value(line);
356 } else if line.contains("Pages speculative:") {
357 speculative_pages = extract_vm_stat_value(line);
358 } else if line.contains("Pages wired down:") {
359 wired_pages = extract_vm_stat_value(line);
360 }
361 }
362
363 let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
366 let available = free_pages + inactive_pages;
367
368 if total == 0 {
369 return 0.0;
370 }
371
372 1.0 - (available as f64 / total as f64)
373}
374
375#[cfg(target_os = "macos")]
376fn extract_vm_stat_value(line: &str) -> u64 {
377 line.split(':')
379 .nth(1)
380 .map(|s| s.trim().trim_end_matches('.'))
381 .and_then(|s| s.parse::<u64>().ok())
382 .unwrap_or(0)
383}
384
385#[cfg(not(any(target_os = "linux", target_os = "macos")))]
386fn get_memory_pressure() -> f64 {
387 0.0
389}
390
391#[inline]
397fn now_us() -> u64 {
398 std::time::SystemTime::now()
399 .duration_since(std::time::UNIX_EPOCH)
400 .unwrap_or_default()
401 .as_micros() as u64
402}
403
404#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_default_config() {
414 let config = AdaptiveMemtableConfig::default();
415 assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
416 assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
417 assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
418 }
419
420 #[test]
421 fn test_initial_size() {
422 let sizer = AdaptiveMemtableSizer::new();
423 assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
424 }
425
426 #[test]
427 fn test_record_write() {
428 let sizer = AdaptiveMemtableSizer::new();
429
430 sizer.record_write(1000);
431 sizer.record_write(2000);
432
433 assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
435 }
436
437 #[test]
438 fn test_should_flush() {
439 let sizer = AdaptiveMemtableSizer::new();
440
441 assert!(!sizer.should_flush(1_000_000)); assert!(sizer.should_flush(5_000_000)); }
445
446 #[test]
447 fn test_write_rate_update() {
448 let sizer = AdaptiveMemtableSizer::new();
449
450 sizer.record_write(1_000_000);
452 std::thread::sleep(std::time::Duration::from_millis(100));
453
454 let new_size = sizer.update();
455
456 assert!(new_size >= MIN_MEMTABLE_SIZE);
458 assert!(new_size <= MAX_MEMTABLE_SIZE);
459 }
460
461 #[test]
462 fn test_custom_config() {
463 let config = AdaptiveMemtableConfig {
464 base_size: 8 * 1024 * 1024,
465 min_size: 2 * 1024 * 1024,
466 max_size: 32 * 1024 * 1024,
467 target_buffer_seconds: 2.0,
468 ema_alpha: 0.2,
469 enable_memory_pressure: false,
470 };
471
472 let sizer = AdaptiveMemtableSizer::with_config(config);
473 assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
474 }
475
476 #[test]
477 fn test_memory_pressure() {
478 let pressure = get_memory_pressure();
480 assert!(pressure >= 0.0);
481 assert!(pressure <= 1.0);
482 }
483
484 #[test]
485 fn test_stats() {
486 let sizer = AdaptiveMemtableSizer::new();
487 let stats = sizer.stats();
488
489 assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
490 assert!(stats.write_rate_bytes_per_sec > 0.0);
491 assert!(stats.memory_pressure >= 0.0);
492 }
493}