sochdb_storage/
adaptive_memtable.rs1use std::sync::atomic::{AtomicU64, Ordering};
70
71pub const DEFAULT_BASE_SIZE: usize = 4 * 1024 * 1024;
73
74pub const MIN_MEMTABLE_SIZE: usize = 1 * 1024 * 1024;
76
77pub const MAX_MEMTABLE_SIZE: usize = 16 * 1024 * 1024;
79
80pub const TARGET_BUFFER_SECONDS: f64 = 1.0;
83
84pub const WRITE_RATE_EMA_ALPHA: f64 = 0.1;
86
87pub const PRESSURE_THRESHOLD: f64 = 0.7;
89
90#[derive(Debug, Clone)]
92pub struct AdaptiveMemtableConfig {
93 pub base_size: usize,
95 pub min_size: usize,
97 pub max_size: usize,
99 pub target_buffer_seconds: f64,
101 pub ema_alpha: f64,
103 pub enable_memory_pressure: bool,
105}
106
107impl Default for AdaptiveMemtableConfig {
108 fn default() -> Self {
109 Self {
110 base_size: DEFAULT_BASE_SIZE,
111 min_size: MIN_MEMTABLE_SIZE,
112 max_size: MAX_MEMTABLE_SIZE,
113 target_buffer_seconds: TARGET_BUFFER_SECONDS,
114 ema_alpha: WRITE_RATE_EMA_ALPHA,
115 enable_memory_pressure: true,
116 }
117 }
118}
119
120pub struct AdaptiveMemtableSizer {
126 config: AdaptiveMemtableConfig,
128 current_size: AtomicU64,
130 write_rate_ema: AtomicU64,
132 last_update_us: AtomicU64,
134 bytes_since_update: AtomicU64,
136 memory_pressure_scaled: AtomicU64,
138}
139
140impl AdaptiveMemtableSizer {
141 pub fn new() -> Self {
143 Self::with_config(AdaptiveMemtableConfig::default())
144 }
145
146 pub fn with_config(config: AdaptiveMemtableConfig) -> Self {
148 let initial_rate = config.base_size as f64 / config.target_buffer_seconds;
149
150 Self {
151 current_size: AtomicU64::new(config.base_size as u64),
152 write_rate_ema: AtomicU64::new((initial_rate * 1000.0) as u64),
153 last_update_us: AtomicU64::new(now_us()),
154 bytes_since_update: AtomicU64::new(0),
155 memory_pressure_scaled: AtomicU64::new(0),
156 config,
157 }
158 }
159
160 #[inline]
162 pub fn record_write(&self, bytes: usize) {
163 self.bytes_since_update.fetch_add(bytes as u64, Ordering::Relaxed);
164 }
165
166 #[inline]
168 pub fn current_size(&self) -> usize {
169 self.current_size.load(Ordering::Relaxed) as usize
170 }
171
172 #[inline]
174 pub fn write_rate(&self) -> f64 {
175 self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0
176 }
177
178 #[inline]
180 pub fn memory_pressure(&self) -> f64 {
181 self.memory_pressure_scaled.load(Ordering::Relaxed) as f64 / 1000.0
182 }
183
184 pub fn update(&self) -> usize {
188 let now = now_us();
189 let last = self.last_update_us.swap(now, Ordering::Relaxed);
190 let delta_us = now.saturating_sub(last);
191
192 if delta_us == 0 {
193 return self.current_size();
194 }
195
196 let bytes = self.bytes_since_update.swap(0, Ordering::Relaxed);
198 let delta_secs = delta_us as f64 / 1_000_000.0;
199 let instant_rate = bytes as f64 / delta_secs;
200
201 let old_rate = self.write_rate_ema.load(Ordering::Relaxed) as f64 / 1000.0;
203 let new_rate = old_rate * (1.0 - self.config.ema_alpha) + instant_rate * self.config.ema_alpha;
204 self.write_rate_ema.store((new_rate * 1000.0) as u64, Ordering::Relaxed);
205
206 let pressure = if self.config.enable_memory_pressure {
208 get_memory_pressure()
209 } else {
210 0.0
211 };
212 self.memory_pressure_scaled.store((pressure * 1000.0) as u64, Ordering::Relaxed);
213
214 let target_size = new_rate * self.config.target_buffer_seconds;
217
218 let pressure_factor = if pressure > PRESSURE_THRESHOLD {
221 1.0 - (pressure - PRESSURE_THRESHOLD).powi(2)
223 } else {
224 1.0
225 };
226
227 let adjusted_size = target_size * pressure_factor;
228
229 let final_size = adjusted_size
231 .max(self.config.min_size as f64)
232 .min(self.config.max_size as f64) as usize;
233
234 self.current_size.store(final_size as u64, Ordering::Relaxed);
235 final_size
236 }
237
238 #[inline]
240 pub fn should_flush(&self, current_memtable_bytes: u64) -> bool {
241 current_memtable_bytes >= self.current_size.load(Ordering::Relaxed)
242 }
243
244 pub fn stats(&self) -> AdaptiveMemtableStats {
246 AdaptiveMemtableStats {
247 current_size: self.current_size(),
248 write_rate_bytes_per_sec: self.write_rate(),
249 memory_pressure: self.memory_pressure(),
250 config: self.config.clone(),
251 }
252 }
253}
254
255impl Default for AdaptiveMemtableSizer {
256 fn default() -> Self {
257 Self::new()
258 }
259}
260
261#[derive(Debug, Clone)]
263pub struct AdaptiveMemtableStats {
264 pub current_size: usize,
266 pub write_rate_bytes_per_sec: f64,
268 pub memory_pressure: f64,
270 pub config: AdaptiveMemtableConfig,
272}
273
274#[cfg(target_os = "linux")]
280fn get_memory_pressure() -> f64 {
281 use std::fs::File;
283 use std::io::{BufRead, BufReader};
284
285 let file = match File::open("/proc/meminfo") {
286 Ok(f) => f,
287 Err(_) => return 0.0,
288 };
289
290 let reader = BufReader::new(file);
291 let mut mem_total: u64 = 0;
292 let mut mem_available: u64 = 0;
293
294 for line in reader.lines().take(10).flatten() {
295 if line.starts_with("MemTotal:") {
296 mem_total = parse_meminfo_value(&line);
297 } else if line.starts_with("MemAvailable:") {
298 mem_available = parse_meminfo_value(&line);
299 }
300 }
301
302 if mem_total == 0 {
303 return 0.0;
304 }
305
306 1.0 - (mem_available as f64 / mem_total as f64)
308}
309
310#[cfg(target_os = "linux")]
311fn parse_meminfo_value(line: &str) -> u64 {
312 line.split_whitespace()
314 .nth(1)
315 .and_then(|s| s.parse::<u64>().ok())
316 .unwrap_or(0)
317}
318
319#[cfg(target_os = "macos")]
320fn get_memory_pressure() -> f64 {
321 use std::process::Command;
324
325 let output = match Command::new("vm_stat").output() {
326 Ok(o) => o,
327 Err(_) => return 0.0,
328 };
329
330 let stdout = match String::from_utf8(output.stdout) {
331 Ok(s) => s,
332 Err(_) => return 0.0,
333 };
334
335 let mut free_pages: u64 = 0;
336 let mut active_pages: u64 = 0;
337 let mut inactive_pages: u64 = 0;
338 let mut speculative_pages: u64 = 0;
339 let mut wired_pages: u64 = 0;
340
341 for line in stdout.lines() {
342 if line.contains("Pages free:") {
343 free_pages = extract_vm_stat_value(line);
344 } else if line.contains("Pages active:") {
345 active_pages = extract_vm_stat_value(line);
346 } else if line.contains("Pages inactive:") {
347 inactive_pages = extract_vm_stat_value(line);
348 } else if line.contains("Pages speculative:") {
349 speculative_pages = extract_vm_stat_value(line);
350 } else if line.contains("Pages wired down:") {
351 wired_pages = extract_vm_stat_value(line);
352 }
353 }
354
355 let total = active_pages + inactive_pages + speculative_pages + free_pages + wired_pages;
358 let available = free_pages + inactive_pages;
359
360 if total == 0 {
361 return 0.0;
362 }
363
364 1.0 - (available as f64 / total as f64)
365}
366
367#[cfg(target_os = "macos")]
368fn extract_vm_stat_value(line: &str) -> u64 {
369 line.split(':')
371 .nth(1)
372 .map(|s| s.trim().trim_end_matches('.'))
373 .and_then(|s| s.parse::<u64>().ok())
374 .unwrap_or(0)
375}
376
377#[cfg(not(any(target_os = "linux", target_os = "macos")))]
378fn get_memory_pressure() -> f64 {
379 0.0
381}
382
383#[inline]
389fn now_us() -> u64 {
390 std::time::SystemTime::now()
391 .duration_since(std::time::UNIX_EPOCH)
392 .unwrap_or_default()
393 .as_micros() as u64
394}
395
396#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_default_config() {
406 let config = AdaptiveMemtableConfig::default();
407 assert_eq!(config.base_size, DEFAULT_BASE_SIZE);
408 assert_eq!(config.min_size, MIN_MEMTABLE_SIZE);
409 assert_eq!(config.max_size, MAX_MEMTABLE_SIZE);
410 }
411
412 #[test]
413 fn test_initial_size() {
414 let sizer = AdaptiveMemtableSizer::new();
415 assert_eq!(sizer.current_size(), DEFAULT_BASE_SIZE);
416 }
417
418 #[test]
419 fn test_record_write() {
420 let sizer = AdaptiveMemtableSizer::new();
421
422 sizer.record_write(1000);
423 sizer.record_write(2000);
424
425 assert_eq!(sizer.bytes_since_update.load(Ordering::Relaxed), 3000);
427 }
428
429 #[test]
430 fn test_should_flush() {
431 let sizer = AdaptiveMemtableSizer::new();
432
433 assert!(!sizer.should_flush(1_000_000)); assert!(sizer.should_flush(5_000_000)); }
437
438 #[test]
439 fn test_write_rate_update() {
440 let sizer = AdaptiveMemtableSizer::new();
441
442 sizer.record_write(1_000_000);
444 std::thread::sleep(std::time::Duration::from_millis(100));
445
446 let new_size = sizer.update();
447
448 assert!(new_size >= MIN_MEMTABLE_SIZE);
450 assert!(new_size <= MAX_MEMTABLE_SIZE);
451 }
452
453 #[test]
454 fn test_custom_config() {
455 let config = AdaptiveMemtableConfig {
456 base_size: 8 * 1024 * 1024,
457 min_size: 2 * 1024 * 1024,
458 max_size: 32 * 1024 * 1024,
459 target_buffer_seconds: 2.0,
460 ema_alpha: 0.2,
461 enable_memory_pressure: false,
462 };
463
464 let sizer = AdaptiveMemtableSizer::with_config(config);
465 assert_eq!(sizer.current_size(), 8 * 1024 * 1024);
466 }
467
468 #[test]
469 fn test_memory_pressure() {
470 let pressure = get_memory_pressure();
472 assert!(pressure >= 0.0);
473 assert!(pressure <= 1.0);
474 }
475
476 #[test]
477 fn test_stats() {
478 let sizer = AdaptiveMemtableSizer::new();
479 let stats = sizer.stats();
480
481 assert_eq!(stats.current_size, DEFAULT_BASE_SIZE);
482 assert!(stats.write_rate_bytes_per_sec > 0.0);
483 assert!(stats.memory_pressure >= 0.0);
484 }
485}