1use parking_lot::Condvar;
29use parking_lot::Mutex;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32
33#[derive(Debug, Clone)]
35pub struct MemoryBudget {
36 pub total_budget: u64,
38 pub memtable_budget: u64,
40 pub immutable_memtables_budget: u64,
42 pub block_cache_budget: u64,
44 pub soft_limit: f64,
46 pub hard_limit: f64,
48}
49
50impl Default for MemoryBudget {
51 fn default() -> Self {
52 Self {
53 total_budget: 512 * 1024 * 1024, memtable_budget: 32 * 1024 * 1024, immutable_memtables_budget: 128 * 1024 * 1024, block_cache_budget: 256 * 1024 * 1024, soft_limit: 0.80, hard_limit: 0.95, }
60 }
61}
62
63impl MemoryBudget {
64 pub fn from_system_memory_percent(percent: f64) -> Self {
68 let available_bytes = Self::get_available_memory();
69 let total_budget = (available_bytes as f64 * percent) as u64;
70
71 Self {
72 total_budget,
73 memtable_budget: total_budget / 16,
74 immutable_memtables_budget: total_budget / 4,
75 block_cache_budget: total_budget / 2,
76 soft_limit: 0.80,
77 hard_limit: 0.95,
78 }
79 }
80
81 fn get_available_memory() -> u64 {
89 #[cfg(target_os = "linux")]
90 {
91 Self::linux_available_memory().unwrap_or(1024 * 1024 * 1024)
92 }
93
94 #[cfg(target_os = "macos")]
95 {
96 Self::macos_available_memory().unwrap_or(1024 * 1024 * 1024)
97 }
98
99 #[cfg(target_os = "windows")]
100 {
101 Self::windows_available_memory().unwrap_or(1024 * 1024 * 1024)
102 }
103
104 #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
105 {
106 1024 * 1024 * 1024
108 }
109 }
110
111 #[cfg(target_os = "linux")]
112 fn linux_available_memory() -> Option<u64> {
113 use std::fs::read_to_string;
114
115 let meminfo = read_to_string("/proc/meminfo").ok()?;
116
117 let mut mem_available = None;
120 let mut mem_free = None;
121
122 for line in meminfo.lines() {
123 if line.starts_with("MemAvailable:") {
124 let parts: Vec<&str> = line.split_whitespace().collect();
125 if parts.len() >= 2 {
126 mem_available = parts[1].parse::<u64>().ok();
127 }
128 } else if line.starts_with("MemFree:") {
129 let parts: Vec<&str> = line.split_whitespace().collect();
130 if parts.len() >= 2 {
131 mem_free = parts[1].parse::<u64>().ok();
132 }
133 }
134 }
135
136 mem_available.or(mem_free).map(|kb| kb * 1024)
138 }
139
140 #[cfg(target_os = "macos")]
141 fn macos_available_memory() -> Option<u64> {
142 use std::process::Command;
143
144 let output = Command::new("sysctl")
146 .args(["-n", "hw.memsize"])
147 .output()
148 .ok()?;
149
150 let mem_bytes: u64 = String::from_utf8_lossy(&output.stdout)
151 .trim()
152 .parse()
153 .ok()?;
154
155 Some((mem_bytes as f64 * 0.9) as u64)
158 }
159
160 #[cfg(target_os = "windows")]
161 fn windows_available_memory() -> Option<u64> {
162 None
166 }
167}
168
169pub struct MemoryTracker {
171 current_usage: Arc<AtomicU64>,
173 budget: MemoryBudget,
175 under_pressure: Arc<AtomicBool>,
177}
178
179impl MemoryTracker {
180 pub fn new(budget: MemoryBudget) -> Self {
182 Self {
183 current_usage: Arc::new(AtomicU64::new(0)),
184 budget,
185 under_pressure: Arc::new(AtomicBool::new(false)),
186 }
187 }
188
189 pub fn allocate(&self, bytes: u64) {
191 let new_usage = self.current_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
192 self.check_pressure(new_usage);
193 }
194
195 pub fn deallocate(&self, bytes: u64) {
197 let prev_usage = self.current_usage.fetch_sub(bytes, Ordering::Relaxed);
198 let new_usage = prev_usage.saturating_sub(bytes);
199 self.check_pressure(new_usage);
200 }
201
202 fn check_pressure(&self, current: u64) {
204 let pressure = current as f64 >= (self.budget.total_budget as f64 * self.budget.soft_limit);
205 self.under_pressure.store(pressure, Ordering::Relaxed);
206 }
207
208 pub fn should_block_writes(&self) -> bool {
210 let current = self.current_usage.load(Ordering::Relaxed);
211 current as f64 >= (self.budget.total_budget as f64 * self.budget.hard_limit)
212 }
213
214 pub fn should_trigger_flush(&self) -> bool {
216 self.under_pressure.load(Ordering::Relaxed)
217 }
218
219 pub fn current_usage(&self) -> u64 {
221 self.current_usage.load(Ordering::Relaxed)
222 }
223
224 pub fn usage_percent(&self) -> f64 {
226 let current = self.current_usage.load(Ordering::Relaxed);
227 (current as f64 / self.budget.total_budget as f64) * 100.0
228 }
229
230 pub fn reset(&self) {
232 self.current_usage.store(0, Ordering::Relaxed);
233 self.under_pressure.store(false, Ordering::Relaxed);
234 }
235}
236
237pub struct WriteBufferManager {
257 total_buffer_memory: AtomicU64,
259 buffer_limit: u64,
261 soft_limit_ratio: f64,
263 hard_limit_ratio: f64,
265 writers_blocked: AtomicBool,
267 write_cv: Condvar,
269 write_mutex: Mutex<()>,
271 stats: WriteBufferStats,
273}
274
275#[derive(Debug, Default)]
277pub struct WriteBufferStats {
278 pub blocks_count: AtomicU64,
280 pub blocked_time_us: AtomicU64,
282 pub soft_limit_flushes: AtomicU64,
284}
285
286impl WriteBufferManager {
287 pub fn new(buffer_limit: u64) -> Self {
292 Self {
293 total_buffer_memory: AtomicU64::new(0),
294 buffer_limit,
295 soft_limit_ratio: 0.8,
296 hard_limit_ratio: 0.95,
297 writers_blocked: AtomicBool::new(false),
298 write_cv: Condvar::new(),
299 write_mutex: Mutex::new(()),
300 stats: WriteBufferStats::default(),
301 }
302 }
303
304 pub fn with_limits(buffer_limit: u64, soft_limit_ratio: f64, hard_limit_ratio: f64) -> Self {
306 Self {
307 total_buffer_memory: AtomicU64::new(0),
308 buffer_limit,
309 soft_limit_ratio,
310 hard_limit_ratio,
311 writers_blocked: AtomicBool::new(false),
312 write_cv: Condvar::new(),
313 write_mutex: Mutex::new(()),
314 stats: WriteBufferStats::default(),
315 }
316 }
317
318 pub fn reserve_memory(&self, bytes: u64) -> bool {
323 let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
324 let hard_limit = (self.buffer_limit as f64 * self.hard_limit_ratio) as u64;
325
326 loop {
327 let current = self.total_buffer_memory.load(Ordering::Acquire);
328 let new_total = current + bytes;
329
330 if new_total > hard_limit {
331 self.writers_blocked.store(true, Ordering::Release);
333 self.stats.blocks_count.fetch_add(1, Ordering::Relaxed);
334
335 let start = std::time::Instant::now();
336 {
337 let mut guard = self.write_mutex.lock();
338 self.write_cv
340 .wait_for(&mut guard, std::time::Duration::from_millis(100));
341 }
342 self.stats
343 .blocked_time_us
344 .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
345
346 continue;
348 }
349
350 if self
352 .total_buffer_memory
353 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Acquire)
354 .is_ok()
355 {
356 let should_flush = new_total > soft_limit;
358 if should_flush {
359 self.stats
360 .soft_limit_flushes
361 .fetch_add(1, Ordering::Relaxed);
362 }
363 return should_flush;
364 }
365 }
367 }
368
369 pub fn release_memory(&self, bytes: u64) {
373 self.total_buffer_memory.fetch_sub(bytes, Ordering::AcqRel);
374
375 if self.writers_blocked.swap(false, Ordering::AcqRel) {
377 self.write_cv.notify_all();
378 }
379 }
380
381 pub fn memory_usage(&self) -> u64 {
383 self.total_buffer_memory.load(Ordering::Acquire)
384 }
385
386 pub fn usage_percent(&self) -> f64 {
388 let current = self.total_buffer_memory.load(Ordering::Acquire);
389 (current as f64 / self.buffer_limit as f64) * 100.0
390 }
391
392 pub fn is_under_pressure(&self) -> bool {
394 let current = self.total_buffer_memory.load(Ordering::Acquire);
395 let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
396 current > soft_limit
397 }
398
399 pub fn stats(&self) -> &WriteBufferStats {
401 &self.stats
402 }
403}
404
405#[allow(dead_code)]
434pub struct SpilloverManager {
435 write_buffer: Arc<WriteBufferManager>,
437 spillover_capacity: u64,
439 spillover_used: AtomicU64,
441 spillover_limit_ratio: f64,
443 spillover_file_count: AtomicU64,
445 spillover_active: AtomicBool,
447 spillover_tx: crossbeam_channel::Sender<SpilloverRequest>,
449 stats: SpilloverStats,
451}
452
453#[derive(Debug)]
455pub struct SpilloverRequest {
456 pub data: Vec<(Vec<u8>, Vec<u8>)>,
458 pub min_timestamp: u64,
460 pub max_timestamp: u64,
462 pub size_bytes: u64,
464}
465
466#[derive(Debug, Default)]
468pub struct SpilloverStats {
469 pub spillover_count: AtomicU64,
471 pub bytes_spilled: AtomicU64,
473 pub bytes_recovered: AtomicU64,
475 pub avg_latency_us: AtomicU64,
477 pub blocks_avoided: AtomicU64,
479}
480
481impl SpilloverManager {
482 pub fn new(
484 write_buffer: Arc<WriteBufferManager>,
485 spillover_capacity: u64,
486 ) -> (Self, crossbeam_channel::Receiver<SpilloverRequest>) {
487 let (tx, rx) = crossbeam_channel::bounded(16);
488
489 let manager = Self {
490 write_buffer,
491 spillover_capacity,
492 spillover_used: AtomicU64::new(0),
493 spillover_limit_ratio: 0.9,
494 spillover_file_count: AtomicU64::new(0),
495 spillover_active: AtomicBool::new(false),
496 spillover_tx: tx,
497 stats: SpilloverStats::default(),
498 };
499
500 (manager, rx)
501 }
502
503 pub fn should_spillover(&self) -> bool {
505 let usage = self.write_buffer.memory_usage();
506 let spillover_limit =
507 (self.write_buffer.buffer_limit as f64 * self.spillover_limit_ratio) as u64;
508 usage > spillover_limit && !self.is_spillover_full()
509 }
510
511 pub fn is_spillover_full(&self) -> bool {
513 self.spillover_used.load(Ordering::Relaxed) >= self.spillover_capacity
514 }
515
516 pub fn reserve_memory(
523 &self,
524 bytes: u64,
525 data: Vec<(Vec<u8>, Vec<u8>)>,
526 ) -> Result<bool, SpilloverRequest> {
527 if !self.write_buffer.is_under_pressure() {
529 let should_flush = self.write_buffer.reserve_memory(bytes);
530 return Ok(should_flush);
531 }
532
533 if self.should_spillover() && !data.is_empty() {
535 let request = SpilloverRequest {
536 data,
537 min_timestamp: 0,
538 max_timestamp: u64::MAX,
539 size_bytes: bytes,
540 };
541
542 if self.spillover_tx.try_send(request.clone()).is_ok() {
544 self.spillover_used.fetch_add(bytes, Ordering::Relaxed);
545 self.stats.spillover_count.fetch_add(1, Ordering::Relaxed);
546 self.stats.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
547 self.stats.blocks_avoided.fetch_add(1, Ordering::Relaxed);
548 self.spillover_active.store(true, Ordering::Release);
549
550 return Ok(true);
552 } else {
553 return Err(request);
555 }
556 }
557
558 let should_flush = self.write_buffer.reserve_memory(bytes);
560 Ok(should_flush)
561 }
562
563 pub fn release_spillover(&self, bytes: u64) {
565 self.spillover_used.fetch_sub(bytes, Ordering::Relaxed);
566 self.stats
567 .bytes_recovered
568 .fetch_add(bytes, Ordering::Relaxed);
569
570 if self.spillover_used.load(Ordering::Relaxed) == 0 {
571 self.spillover_active.store(false, Ordering::Release);
572 }
573 }
574
575 pub fn is_spillover_active(&self) -> bool {
577 self.spillover_active.load(Ordering::Acquire)
578 }
579
580 pub fn spillover_usage(&self) -> u64 {
582 self.spillover_used.load(Ordering::Relaxed)
583 }
584
585 pub fn spillover_capacity(&self) -> u64 {
587 self.spillover_capacity
588 }
589
590 pub fn stats(&self) -> &SpilloverStats {
592 &self.stats
593 }
594}
595
596impl Clone for SpilloverRequest {
597 fn clone(&self) -> Self {
598 Self {
599 data: self.data.clone(),
600 min_timestamp: self.min_timestamp,
601 max_timestamp: self.max_timestamp,
602 size_bytes: self.size_bytes,
603 }
604 }
605}
606
607#[cfg(test)]
608mod tests {
609 use super::*;
610
611 #[test]
612 fn test_memory_budget_default() {
613 let budget = MemoryBudget::default();
614 assert_eq!(budget.total_budget, 512 * 1024 * 1024);
615 assert_eq!(budget.soft_limit, 0.80);
616 assert_eq!(budget.hard_limit, 0.95);
617 }
618
619 #[test]
620 fn test_memory_tracker_pressure() {
621 let budget = MemoryBudget {
622 total_budget: 1000,
623 memtable_budget: 100,
624 immutable_memtables_budget: 300,
625 block_cache_budget: 500,
626 soft_limit: 0.80,
627 hard_limit: 0.95,
628 };
629
630 let tracker = MemoryTracker::new(budget);
631
632 tracker.allocate(700);
634 assert!(!tracker.should_trigger_flush());
635 assert!(!tracker.should_block_writes());
636
637 tracker.allocate(100);
639 assert_eq!(tracker.current_usage(), 800);
640 assert!(tracker.should_trigger_flush());
641 assert!(!tracker.should_block_writes());
642
643 tracker.allocate(200);
645 assert_eq!(tracker.current_usage(), 1000);
646 assert!(tracker.should_trigger_flush());
647 assert!(tracker.should_block_writes());
648
649 tracker.deallocate(300);
651 assert_eq!(tracker.current_usage(), 700);
652 assert!(!tracker.should_trigger_flush());
653 assert!(!tracker.should_block_writes());
654 }
655
656 #[test]
657 fn test_memory_tracker_usage_percent() {
658 let budget = MemoryBudget {
659 total_budget: 1000,
660 memtable_budget: 100,
661 immutable_memtables_budget: 300,
662 block_cache_budget: 500,
663 soft_limit: 0.80,
664 hard_limit: 0.95,
665 };
666
667 let tracker = MemoryTracker::new(budget);
668
669 tracker.allocate(500);
670 assert_eq!(tracker.usage_percent(), 50.0);
671
672 tracker.allocate(250);
673 assert_eq!(tracker.usage_percent(), 75.0);
674 }
675
676 #[test]
677 fn test_from_system_memory_percent() {
678 let budget = MemoryBudget::from_system_memory_percent(0.25);
679
680 assert!(budget.total_budget > 0);
682 assert!(budget.memtable_budget > 0);
683 assert!(budget.memtable_budget < budget.total_budget);
684 assert_eq!(budget.soft_limit, 0.80);
685 assert_eq!(budget.hard_limit, 0.95);
686 }
687
688 #[test]
689 fn test_system_memory_detection() {
690 let budget = MemoryBudget::from_system_memory_percent(1.0);
693
694 #[cfg(any(target_os = "linux", target_os = "macos"))]
697 {
698 assert!(
700 budget.total_budget > 2 * 1024 * 1024 * 1024,
701 "Expected >2GB detected, got {} bytes. Memory detection may have failed.",
702 budget.total_budget
703 );
704 }
705
706 assert!(budget.total_budget >= 1024 * 1024 * 1024);
708 }
709
710 #[test]
711 fn test_write_buffer_manager_reserve_release() {
712 let wbm = WriteBufferManager::new(1000);
713
714 let should_flush = wbm.reserve_memory(400);
716 assert!(!should_flush); assert_eq!(wbm.memory_usage(), 400);
718
719 let should_flush = wbm.reserve_memory(500);
721 assert!(should_flush); assert_eq!(wbm.memory_usage(), 900);
723
724 wbm.release_memory(600);
726 assert_eq!(wbm.memory_usage(), 300);
727 assert!(!wbm.is_under_pressure());
728 }
729
730 #[test]
731 fn test_write_buffer_manager_pressure() {
732 let wbm = WriteBufferManager::with_limits(1000, 0.5, 0.9);
733
734 wbm.reserve_memory(400);
736 assert!(!wbm.is_under_pressure());
737 assert_eq!(wbm.usage_percent(), 40.0);
738
739 wbm.reserve_memory(200);
741 assert!(wbm.is_under_pressure());
742 assert_eq!(wbm.usage_percent(), 60.0);
743 }
744
745 #[test]
748 fn test_spillover_manager_creation() {
749 let wbm = Arc::new(WriteBufferManager::new(1000));
750 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
751
752 assert_eq!(spillover.spillover_capacity(), 500);
753 assert_eq!(spillover.spillover_usage(), 0);
754 assert!(!spillover.is_spillover_active());
755 }
756
757 #[test]
758 fn test_spillover_manager_reserve_below_limit() {
759 let wbm = Arc::new(WriteBufferManager::new(1000));
760 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
761
762 let result = spillover.reserve_memory(100, vec![]);
764 assert!(result.is_ok());
765 assert!(!result.unwrap()); }
767
768 #[test]
769 fn test_spillover_manager_stats() {
770 let wbm = Arc::new(WriteBufferManager::new(1000));
771 let (spillover, _rx) = SpilloverManager::new(wbm.clone(), 500);
772
773 wbm.reserve_memory(850); let data = vec![(b"key".to_vec(), b"value".to_vec())];
778
779 let result = spillover.reserve_memory(100, data);
781 assert!(result.is_ok());
782
783 let stats = spillover.stats();
784 assert!(stats.spillover_count.load(Ordering::Relaxed) <= 1);
786 }
787
788 #[test]
789 fn test_spillover_release() {
790 let wbm = Arc::new(WriteBufferManager::new(1000));
791 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
792
793 spillover.spillover_used.store(200, Ordering::Relaxed);
795 spillover.spillover_active.store(true, Ordering::Release);
796
797 assert!(spillover.is_spillover_active());
798 assert_eq!(spillover.spillover_usage(), 200);
799
800 spillover.release_spillover(200);
802
803 assert!(!spillover.is_spillover_active());
804 assert_eq!(spillover.spillover_usage(), 0);
805 }
806}