1use parking_lot::Condvar;
32use parking_lot::Mutex;
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35
36#[derive(Debug, Clone)]
38pub struct MemoryBudget {
39 pub total_budget: u64,
41 pub memtable_budget: u64,
43 pub immutable_memtables_budget: u64,
45 pub block_cache_budget: u64,
47 pub soft_limit: f64,
49 pub hard_limit: f64,
51}
52
53impl Default for MemoryBudget {
54 fn default() -> Self {
55 Self {
56 total_budget: 512 * 1024 * 1024, memtable_budget: 32 * 1024 * 1024, immutable_memtables_budget: 128 * 1024 * 1024, block_cache_budget: 256 * 1024 * 1024, soft_limit: 0.80, hard_limit: 0.95, }
63 }
64}
65
66impl MemoryBudget {
67 pub fn from_system_memory_percent(percent: f64) -> Self {
71 let available_bytes = Self::get_available_memory();
72 let total_budget = (available_bytes as f64 * percent) as u64;
73
74 Self {
75 total_budget,
76 memtable_budget: total_budget / 16,
77 immutable_memtables_budget: total_budget / 4,
78 block_cache_budget: total_budget / 2,
79 soft_limit: 0.80,
80 hard_limit: 0.95,
81 }
82 }
83
84 fn get_available_memory() -> u64 {
92 #[cfg(target_os = "linux")]
93 {
94 Self::linux_available_memory().unwrap_or(1024 * 1024 * 1024)
95 }
96
97 #[cfg(target_os = "macos")]
98 {
99 Self::macos_available_memory().unwrap_or(1024 * 1024 * 1024)
100 }
101
102 #[cfg(target_os = "windows")]
103 {
104 Self::windows_available_memory().unwrap_or(1024 * 1024 * 1024)
105 }
106
107 #[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
108 {
109 1024 * 1024 * 1024
111 }
112 }
113
114 #[cfg(target_os = "linux")]
115 fn linux_available_memory() -> Option<u64> {
116 use std::fs::read_to_string;
117
118 let meminfo = read_to_string("/proc/meminfo").ok()?;
119
120 let mut mem_available = None;
123 let mut mem_free = None;
124
125 for line in meminfo.lines() {
126 if line.starts_with("MemAvailable:") {
127 let parts: Vec<&str> = line.split_whitespace().collect();
128 if parts.len() >= 2 {
129 mem_available = parts[1].parse::<u64>().ok();
130 }
131 } else if line.starts_with("MemFree:") {
132 let parts: Vec<&str> = line.split_whitespace().collect();
133 if parts.len() >= 2 {
134 mem_free = parts[1].parse::<u64>().ok();
135 }
136 }
137 }
138
139 mem_available.or(mem_free).map(|kb| kb * 1024)
141 }
142
143 #[cfg(target_os = "macos")]
144 fn macos_available_memory() -> Option<u64> {
145 use std::process::Command;
146
147 let output = Command::new("sysctl")
149 .args(["-n", "hw.memsize"])
150 .output()
151 .ok()?;
152
153 let mem_bytes: u64 = String::from_utf8_lossy(&output.stdout)
154 .trim()
155 .parse()
156 .ok()?;
157
158 Some((mem_bytes as f64 * 0.9) as u64)
161 }
162
163 #[cfg(target_os = "windows")]
164 fn windows_available_memory() -> Option<u64> {
165 None
169 }
170}
171
172pub struct MemoryTracker {
174 current_usage: Arc<AtomicU64>,
176 budget: MemoryBudget,
178 under_pressure: Arc<AtomicBool>,
180}
181
182impl MemoryTracker {
183 pub fn new(budget: MemoryBudget) -> Self {
185 Self {
186 current_usage: Arc::new(AtomicU64::new(0)),
187 budget,
188 under_pressure: Arc::new(AtomicBool::new(false)),
189 }
190 }
191
192 pub fn allocate(&self, bytes: u64) {
194 let new_usage = self.current_usage.fetch_add(bytes, Ordering::Relaxed) + bytes;
195 self.check_pressure(new_usage);
196 }
197
198 pub fn deallocate(&self, bytes: u64) {
200 let prev_usage = self.current_usage.fetch_sub(bytes, Ordering::Relaxed);
201 let new_usage = prev_usage.saturating_sub(bytes);
202 self.check_pressure(new_usage);
203 }
204
205 fn check_pressure(&self, current: u64) {
207 let pressure = current as f64 >= (self.budget.total_budget as f64 * self.budget.soft_limit);
208 self.under_pressure.store(pressure, Ordering::Relaxed);
209 }
210
211 pub fn should_block_writes(&self) -> bool {
213 let current = self.current_usage.load(Ordering::Relaxed);
214 current as f64 >= (self.budget.total_budget as f64 * self.budget.hard_limit)
215 }
216
217 pub fn should_trigger_flush(&self) -> bool {
219 self.under_pressure.load(Ordering::Relaxed)
220 }
221
222 pub fn current_usage(&self) -> u64 {
224 self.current_usage.load(Ordering::Relaxed)
225 }
226
227 pub fn usage_percent(&self) -> f64 {
229 let current = self.current_usage.load(Ordering::Relaxed);
230 (current as f64 / self.budget.total_budget as f64) * 100.0
231 }
232
233 pub fn reset(&self) {
235 self.current_usage.store(0, Ordering::Relaxed);
236 self.under_pressure.store(false, Ordering::Relaxed);
237 }
238}
239
240pub struct WriteBufferManager {
260 total_buffer_memory: AtomicU64,
262 buffer_limit: u64,
264 soft_limit_ratio: f64,
266 hard_limit_ratio: f64,
268 writers_blocked: AtomicBool,
270 write_cv: Condvar,
272 write_mutex: Mutex<()>,
274 stats: WriteBufferStats,
276}
277
278#[derive(Debug, Default)]
280pub struct WriteBufferStats {
281 pub blocks_count: AtomicU64,
283 pub blocked_time_us: AtomicU64,
285 pub soft_limit_flushes: AtomicU64,
287}
288
289impl WriteBufferManager {
290 pub fn new(buffer_limit: u64) -> Self {
295 Self {
296 total_buffer_memory: AtomicU64::new(0),
297 buffer_limit,
298 soft_limit_ratio: 0.8,
299 hard_limit_ratio: 0.95,
300 writers_blocked: AtomicBool::new(false),
301 write_cv: Condvar::new(),
302 write_mutex: Mutex::new(()),
303 stats: WriteBufferStats::default(),
304 }
305 }
306
307 pub fn with_limits(buffer_limit: u64, soft_limit_ratio: f64, hard_limit_ratio: f64) -> Self {
309 Self {
310 total_buffer_memory: AtomicU64::new(0),
311 buffer_limit,
312 soft_limit_ratio,
313 hard_limit_ratio,
314 writers_blocked: AtomicBool::new(false),
315 write_cv: Condvar::new(),
316 write_mutex: Mutex::new(()),
317 stats: WriteBufferStats::default(),
318 }
319 }
320
321 pub fn reserve_memory(&self, bytes: u64) -> bool {
326 let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
327 let hard_limit = (self.buffer_limit as f64 * self.hard_limit_ratio) as u64;
328
329 loop {
330 let current = self.total_buffer_memory.load(Ordering::Acquire);
331 let new_total = current + bytes;
332
333 if new_total > hard_limit {
334 self.writers_blocked.store(true, Ordering::Release);
336 self.stats.blocks_count.fetch_add(1, Ordering::Relaxed);
337
338 let start = std::time::Instant::now();
339 {
340 let mut guard = self.write_mutex.lock();
341 self.write_cv
343 .wait_for(&mut guard, std::time::Duration::from_millis(100));
344 }
345 self.stats
346 .blocked_time_us
347 .fetch_add(start.elapsed().as_micros() as u64, Ordering::Relaxed);
348
349 continue;
351 }
352
353 if self
355 .total_buffer_memory
356 .compare_exchange_weak(current, new_total, Ordering::AcqRel, Ordering::Acquire)
357 .is_ok()
358 {
359 let should_flush = new_total > soft_limit;
361 if should_flush {
362 self.stats
363 .soft_limit_flushes
364 .fetch_add(1, Ordering::Relaxed);
365 }
366 return should_flush;
367 }
368 }
370 }
371
372 pub fn release_memory(&self, bytes: u64) {
376 self.total_buffer_memory.fetch_sub(bytes, Ordering::AcqRel);
377
378 if self.writers_blocked.swap(false, Ordering::AcqRel) {
380 self.write_cv.notify_all();
381 }
382 }
383
384 pub fn memory_usage(&self) -> u64 {
386 self.total_buffer_memory.load(Ordering::Acquire)
387 }
388
389 pub fn usage_percent(&self) -> f64 {
391 let current = self.total_buffer_memory.load(Ordering::Acquire);
392 (current as f64 / self.buffer_limit as f64) * 100.0
393 }
394
395 pub fn is_under_pressure(&self) -> bool {
397 let current = self.total_buffer_memory.load(Ordering::Acquire);
398 let soft_limit = (self.buffer_limit as f64 * self.soft_limit_ratio) as u64;
399 current > soft_limit
400 }
401
402 pub fn stats(&self) -> &WriteBufferStats {
404 &self.stats
405 }
406}
407
408#[allow(dead_code)]
437pub struct SpilloverManager {
438 write_buffer: Arc<WriteBufferManager>,
440 spillover_capacity: u64,
442 spillover_used: AtomicU64,
444 spillover_limit_ratio: f64,
446 spillover_file_count: AtomicU64,
448 spillover_active: AtomicBool,
450 spillover_tx: crossbeam_channel::Sender<SpilloverRequest>,
452 stats: SpilloverStats,
454}
455
456#[derive(Debug)]
458pub struct SpilloverRequest {
459 pub data: Vec<(Vec<u8>, Vec<u8>)>,
461 pub min_timestamp: u64,
463 pub max_timestamp: u64,
465 pub size_bytes: u64,
467}
468
469#[derive(Debug, Default)]
471pub struct SpilloverStats {
472 pub spillover_count: AtomicU64,
474 pub bytes_spilled: AtomicU64,
476 pub bytes_recovered: AtomicU64,
478 pub avg_latency_us: AtomicU64,
480 pub blocks_avoided: AtomicU64,
482}
483
484impl SpilloverManager {
485 pub fn new(
487 write_buffer: Arc<WriteBufferManager>,
488 spillover_capacity: u64,
489 ) -> (Self, crossbeam_channel::Receiver<SpilloverRequest>) {
490 let (tx, rx) = crossbeam_channel::bounded(16);
491
492 let manager = Self {
493 write_buffer,
494 spillover_capacity,
495 spillover_used: AtomicU64::new(0),
496 spillover_limit_ratio: 0.9,
497 spillover_file_count: AtomicU64::new(0),
498 spillover_active: AtomicBool::new(false),
499 spillover_tx: tx,
500 stats: SpilloverStats::default(),
501 };
502
503 (manager, rx)
504 }
505
506 pub fn should_spillover(&self) -> bool {
508 let usage = self.write_buffer.memory_usage();
509 let spillover_limit =
510 (self.write_buffer.buffer_limit as f64 * self.spillover_limit_ratio) as u64;
511 usage > spillover_limit && !self.is_spillover_full()
512 }
513
514 pub fn is_spillover_full(&self) -> bool {
516 self.spillover_used.load(Ordering::Relaxed) >= self.spillover_capacity
517 }
518
519 pub fn reserve_memory(
526 &self,
527 bytes: u64,
528 data: Vec<(Vec<u8>, Vec<u8>)>,
529 ) -> Result<bool, SpilloverRequest> {
530 if !self.write_buffer.is_under_pressure() {
532 let should_flush = self.write_buffer.reserve_memory(bytes);
533 return Ok(should_flush);
534 }
535
536 if self.should_spillover() && !data.is_empty() {
538 let request = SpilloverRequest {
539 data,
540 min_timestamp: 0,
541 max_timestamp: u64::MAX,
542 size_bytes: bytes,
543 };
544
545 if self.spillover_tx.try_send(request.clone()).is_ok() {
547 self.spillover_used.fetch_add(bytes, Ordering::Relaxed);
548 self.stats.spillover_count.fetch_add(1, Ordering::Relaxed);
549 self.stats.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
550 self.stats.blocks_avoided.fetch_add(1, Ordering::Relaxed);
551 self.spillover_active.store(true, Ordering::Release);
552
553 return Ok(true);
555 } else {
556 return Err(request);
558 }
559 }
560
561 let should_flush = self.write_buffer.reserve_memory(bytes);
563 Ok(should_flush)
564 }
565
566 pub fn release_spillover(&self, bytes: u64) {
568 self.spillover_used.fetch_sub(bytes, Ordering::Relaxed);
569 self.stats
570 .bytes_recovered
571 .fetch_add(bytes, Ordering::Relaxed);
572
573 if self.spillover_used.load(Ordering::Relaxed) == 0 {
574 self.spillover_active.store(false, Ordering::Release);
575 }
576 }
577
578 pub fn is_spillover_active(&self) -> bool {
580 self.spillover_active.load(Ordering::Acquire)
581 }
582
583 pub fn spillover_usage(&self) -> u64 {
585 self.spillover_used.load(Ordering::Relaxed)
586 }
587
588 pub fn spillover_capacity(&self) -> u64 {
590 self.spillover_capacity
591 }
592
593 pub fn stats(&self) -> &SpilloverStats {
595 &self.stats
596 }
597}
598
599impl Clone for SpilloverRequest {
600 fn clone(&self) -> Self {
601 Self {
602 data: self.data.clone(),
603 min_timestamp: self.min_timestamp,
604 max_timestamp: self.max_timestamp,
605 size_bytes: self.size_bytes,
606 }
607 }
608}
609
610#[cfg(test)]
611mod tests {
612 use super::*;
613
614 #[test]
615 fn test_memory_budget_default() {
616 let budget = MemoryBudget::default();
617 assert_eq!(budget.total_budget, 512 * 1024 * 1024);
618 assert_eq!(budget.soft_limit, 0.80);
619 assert_eq!(budget.hard_limit, 0.95);
620 }
621
622 #[test]
623 fn test_memory_tracker_pressure() {
624 let budget = MemoryBudget {
625 total_budget: 1000,
626 memtable_budget: 100,
627 immutable_memtables_budget: 300,
628 block_cache_budget: 500,
629 soft_limit: 0.80,
630 hard_limit: 0.95,
631 };
632
633 let tracker = MemoryTracker::new(budget);
634
635 tracker.allocate(700);
637 assert!(!tracker.should_trigger_flush());
638 assert!(!tracker.should_block_writes());
639
640 tracker.allocate(100);
642 assert_eq!(tracker.current_usage(), 800);
643 assert!(tracker.should_trigger_flush());
644 assert!(!tracker.should_block_writes());
645
646 tracker.allocate(200);
648 assert_eq!(tracker.current_usage(), 1000);
649 assert!(tracker.should_trigger_flush());
650 assert!(tracker.should_block_writes());
651
652 tracker.deallocate(300);
654 assert_eq!(tracker.current_usage(), 700);
655 assert!(!tracker.should_trigger_flush());
656 assert!(!tracker.should_block_writes());
657 }
658
659 #[test]
660 fn test_memory_tracker_usage_percent() {
661 let budget = MemoryBudget {
662 total_budget: 1000,
663 memtable_budget: 100,
664 immutable_memtables_budget: 300,
665 block_cache_budget: 500,
666 soft_limit: 0.80,
667 hard_limit: 0.95,
668 };
669
670 let tracker = MemoryTracker::new(budget);
671
672 tracker.allocate(500);
673 assert_eq!(tracker.usage_percent(), 50.0);
674
675 tracker.allocate(250);
676 assert_eq!(tracker.usage_percent(), 75.0);
677 }
678
679 #[test]
680 fn test_from_system_memory_percent() {
681 let budget = MemoryBudget::from_system_memory_percent(0.25);
682
683 assert!(budget.total_budget > 0);
685 assert!(budget.memtable_budget > 0);
686 assert!(budget.memtable_budget < budget.total_budget);
687 assert_eq!(budget.soft_limit, 0.80);
688 assert_eq!(budget.hard_limit, 0.95);
689 }
690
691 #[test]
692 fn test_system_memory_detection() {
693 let budget = MemoryBudget::from_system_memory_percent(1.0);
696
697 #[cfg(any(target_os = "linux", target_os = "macos"))]
700 {
701 assert!(
703 budget.total_budget > 2 * 1024 * 1024 * 1024,
704 "Expected >2GB detected, got {} bytes. Memory detection may have failed.",
705 budget.total_budget
706 );
707 }
708
709 assert!(budget.total_budget >= 1024 * 1024 * 1024);
711 }
712
713 #[test]
714 fn test_write_buffer_manager_reserve_release() {
715 let wbm = WriteBufferManager::new(1000);
716
717 let should_flush = wbm.reserve_memory(400);
719 assert!(!should_flush); assert_eq!(wbm.memory_usage(), 400);
721
722 let should_flush = wbm.reserve_memory(500);
724 assert!(should_flush); assert_eq!(wbm.memory_usage(), 900);
726
727 wbm.release_memory(600);
729 assert_eq!(wbm.memory_usage(), 300);
730 assert!(!wbm.is_under_pressure());
731 }
732
733 #[test]
734 fn test_write_buffer_manager_pressure() {
735 let wbm = WriteBufferManager::with_limits(1000, 0.5, 0.9);
736
737 wbm.reserve_memory(400);
739 assert!(!wbm.is_under_pressure());
740 assert_eq!(wbm.usage_percent(), 40.0);
741
742 wbm.reserve_memory(200);
744 assert!(wbm.is_under_pressure());
745 assert_eq!(wbm.usage_percent(), 60.0);
746 }
747
748 #[test]
751 fn test_spillover_manager_creation() {
752 let wbm = Arc::new(WriteBufferManager::new(1000));
753 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
754
755 assert_eq!(spillover.spillover_capacity(), 500);
756 assert_eq!(spillover.spillover_usage(), 0);
757 assert!(!spillover.is_spillover_active());
758 }
759
760 #[test]
761 fn test_spillover_manager_reserve_below_limit() {
762 let wbm = Arc::new(WriteBufferManager::new(1000));
763 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
764
765 let result = spillover.reserve_memory(100, vec![]);
767 assert!(result.is_ok());
768 assert!(!result.unwrap()); }
770
771 #[test]
772 fn test_spillover_manager_stats() {
773 let wbm = Arc::new(WriteBufferManager::new(1000));
774 let (spillover, _rx) = SpilloverManager::new(wbm.clone(), 500);
775
776 wbm.reserve_memory(850); let data = vec![(b"key".to_vec(), b"value".to_vec())];
781
782 let result = spillover.reserve_memory(100, data);
784 assert!(result.is_ok());
785
786 let stats = spillover.stats();
787 assert!(stats.spillover_count.load(Ordering::Relaxed) <= 1);
789 }
790
791 #[test]
792 fn test_spillover_release() {
793 let wbm = Arc::new(WriteBufferManager::new(1000));
794 let (spillover, _rx) = SpilloverManager::new(wbm, 500);
795
796 spillover.spillover_used.store(200, Ordering::Relaxed);
798 spillover.spillover_active.store(true, Ordering::Release);
799
800 assert!(spillover.is_spillover_active());
801 assert_eq!(spillover.spillover_usage(), 200);
802
803 spillover.release_spillover(200);
805
806 assert!(!spillover.is_spillover_active());
807 assert_eq!(spillover.spillover_usage(), 0);
808 }
809}