1use std::{
33 env,
34 path::PathBuf,
35 sync::{
36 atomic::{AtomicUsize, Ordering},
37 Arc,
38 },
39};
40
41pub const DEFAULT_MEMORY_BUDGET: usize = 1024 * 1024 * 1024; pub const DEFAULT_SPILL_THRESHOLD: f64 = 0.8;
48
49pub const DEFAULT_TARGET_PARTITION_BYTES: usize = 64 * 1024 * 1024; pub const MIN_OPERATOR_MEMORY: usize = 4 * 1024 * 1024; #[derive(Debug, Clone)]
59pub struct MemoryConfig {
60 pub budget_bytes: usize,
62
63 pub temp_directory: PathBuf,
65
66 pub spill_threshold: f64,
68
69 pub target_partition_bytes: usize,
71}
72
73impl Default for MemoryConfig {
74 fn default() -> Self {
75 let budget_bytes = env::var("VIBESQL_MEMORY_LIMIT")
77 .ok()
78 .and_then(|s| parse_memory_size(&s))
79 .unwrap_or(DEFAULT_MEMORY_BUDGET);
80
81 let temp_directory = env::var("VIBESQL_TEMP_DIR")
82 .ok()
83 .map(PathBuf::from)
84 .unwrap_or_else(|| env::temp_dir().join("vibesql"));
85
86 let spill_threshold = env::var("VIBESQL_SPILL_THRESHOLD")
87 .ok()
88 .and_then(|s| s.parse::<f64>().ok())
89 .filter(|&t| (0.0..=1.0).contains(&t))
90 .unwrap_or(DEFAULT_SPILL_THRESHOLD);
91
92 let target_partition_bytes = env::var("VIBESQL_PARTITION_SIZE")
93 .ok()
94 .and_then(|s| parse_memory_size(&s))
95 .unwrap_or(DEFAULT_TARGET_PARTITION_BYTES);
96
97 Self { budget_bytes, temp_directory, spill_threshold, target_partition_bytes }
98 }
99}
100
101impl MemoryConfig {
102 pub fn with_budget(budget_bytes: usize) -> Self {
104 Self { budget_bytes, ..Default::default() }
105 }
106
107 pub fn with_temp_dir(mut self, path: PathBuf) -> Self {
109 self.temp_directory = path;
110 self
111 }
112
113 pub fn with_spill_threshold(mut self, threshold: f64) -> Self {
115 self.spill_threshold = threshold.clamp(0.0, 1.0);
116 self
117 }
118}
119
120#[derive(Debug, Clone)]
125pub struct MemoryStats {
126 pub budget_bytes: usize,
128 pub reserved_bytes: usize,
130 pub peak_bytes: usize,
132 pub bytes_spilled: usize,
134 pub spill_count: usize,
136 pub active_reservations: usize,
138 pub spill_threshold: f64,
140}
141
142impl MemoryStats {
143 pub fn utilization(&self) -> f64 {
145 if self.budget_bytes == 0 {
146 0.0
147 } else {
148 self.reserved_bytes as f64 / self.budget_bytes as f64
149 }
150 }
151
152 pub fn is_under_pressure(&self) -> bool {
154 self.utilization() >= self.spill_threshold
155 }
156
157 pub fn available_bytes(&self) -> usize {
159 self.budget_bytes.saturating_sub(self.reserved_bytes)
160 }
161}
162
163impl std::fmt::Display for MemoryStats {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 write!(
166 f,
167 "Memory: {}/{} ({:.1}%), peak: {}, spilled: {} ({} ops)",
168 format_bytes(self.reserved_bytes),
169 format_bytes(self.budget_bytes),
170 self.utilization() * 100.0,
171 format_bytes(self.peak_bytes),
172 format_bytes(self.bytes_spilled),
173 self.spill_count,
174 )
175 }
176}
177
178fn format_bytes(bytes: usize) -> String {
180 if bytes >= 1024 * 1024 * 1024 {
181 format!("{:.2}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
182 } else if bytes >= 1024 * 1024 {
183 format!("{:.2}MB", bytes as f64 / (1024.0 * 1024.0))
184 } else if bytes >= 1024 {
185 format!("{:.2}KB", bytes as f64 / 1024.0)
186 } else {
187 format!("{}B", bytes)
188 }
189}
190
191fn parse_memory_size(s: &str) -> Option<usize> {
193 let s = s.trim().to_uppercase();
194
195 if let Ok(n) = s.parse::<usize>() {
197 return Some(n);
198 }
199
200 let (num_str, multiplier) = if let Some(num) = s.strip_suffix("GB") {
202 (num, 1024 * 1024 * 1024)
203 } else if let Some(num) = s.strip_suffix("G") {
204 (num, 1024 * 1024 * 1024)
205 } else if let Some(num) = s.strip_suffix("MB") {
206 (num, 1024 * 1024)
207 } else if let Some(num) = s.strip_suffix("M") {
208 (num, 1024 * 1024)
209 } else if let Some(num) = s.strip_suffix("KB") {
210 (num, 1024)
211 } else if let Some(num) = s.strip_suffix("K") {
212 (num, 1024)
213 } else {
214 return None;
215 };
216
217 num_str.trim().parse::<usize>().ok().map(|n| n * multiplier)
218}
219
220pub struct MemoryController {
241 config: MemoryConfig,
243
244 reserved: AtomicUsize,
246
247 active_reservations: AtomicUsize,
249
250 bytes_spilled: AtomicUsize,
252
253 spill_count: AtomicUsize,
255
256 peak_memory: AtomicUsize,
258}
259
260impl MemoryController {
261 pub fn new(config: MemoryConfig) -> Self {
263 Self {
264 config,
265 reserved: AtomicUsize::new(0),
266 active_reservations: AtomicUsize::new(0),
267 bytes_spilled: AtomicUsize::new(0),
268 spill_count: AtomicUsize::new(0),
269 peak_memory: AtomicUsize::new(0),
270 }
271 }
272
273 pub fn with_defaults() -> Self {
275 Self::new(MemoryConfig::default())
276 }
277
278 pub fn with_budget(budget_bytes: usize) -> Self {
280 Self::new(MemoryConfig::with_budget(budget_bytes))
281 }
282
283 pub fn budget(&self) -> usize {
285 self.config.budget_bytes
286 }
287
288 pub fn reserved(&self) -> usize {
290 self.reserved.load(Ordering::Relaxed)
291 }
292
293 pub fn available(&self) -> usize {
295 let budget = self.config.budget_bytes;
296 let reserved = self.reserved.load(Ordering::Relaxed);
297 budget.saturating_sub(reserved)
298 }
299
300 pub fn spill_threshold_bytes(&self) -> usize {
302 (self.config.budget_bytes as f64 * self.config.spill_threshold) as usize
303 }
304
305 pub fn should_spill(&self) -> bool {
307 self.reserved() >= self.spill_threshold_bytes()
308 }
309
310 pub fn temp_directory(&self) -> &PathBuf {
312 &self.config.temp_directory
313 }
314
315 pub fn target_partition_bytes(&self) -> usize {
317 self.config.target_partition_bytes
318 }
319
320 pub fn create_reservation(self: &Arc<Self>) -> MemoryReservation {
325 self.active_reservations.fetch_add(1, Ordering::Relaxed);
326 MemoryReservation { controller: Arc::clone(self), reserved: 0 }
327 }
328
329 pub fn record_spill(&self, bytes: usize) {
331 self.bytes_spilled.fetch_add(bytes, Ordering::Relaxed);
332 self.spill_count.fetch_add(1, Ordering::Relaxed);
333 }
334
335 pub fn bytes_spilled(&self) -> usize {
337 self.bytes_spilled.load(Ordering::Relaxed)
338 }
339
340 pub fn spill_count(&self) -> usize {
342 self.spill_count.load(Ordering::Relaxed)
343 }
344
345 pub fn peak_memory(&self) -> usize {
347 self.peak_memory.load(Ordering::Relaxed)
348 }
349
350 pub fn active_reservations(&self) -> usize {
352 self.active_reservations.load(Ordering::Relaxed)
353 }
354
355 pub fn stats(&self) -> MemoryStats {
357 MemoryStats {
358 budget_bytes: self.config.budget_bytes,
359 reserved_bytes: self.reserved.load(Ordering::Relaxed),
360 peak_bytes: self.peak_memory.load(Ordering::Relaxed),
361 bytes_spilled: self.bytes_spilled.load(Ordering::Relaxed),
362 spill_count: self.spill_count.load(Ordering::Relaxed),
363 active_reservations: self.active_reservations.load(Ordering::Relaxed),
364 spill_threshold: self.config.spill_threshold,
365 }
366 }
367
368 fn try_reserve(&self, bytes: usize) -> bool {
372 let budget = self.config.budget_bytes;
373
374 loop {
375 let current = self.reserved.load(Ordering::Relaxed);
376 let new_reserved = current.saturating_add(bytes);
377
378 if new_reserved > budget {
379 return false;
380 }
381
382 match self.reserved.compare_exchange_weak(
384 current,
385 new_reserved,
386 Ordering::SeqCst,
387 Ordering::Relaxed,
388 ) {
389 Ok(_) => {
390 self.update_peak_memory(new_reserved);
392 return true;
393 }
394 Err(_) => continue, }
396 }
397 }
398
399 fn update_peak_memory(&self, current: usize) {
401 let mut peak = self.peak_memory.load(Ordering::Relaxed);
402 while current > peak {
403 match self.peak_memory.compare_exchange_weak(
404 peak,
405 current,
406 Ordering::Relaxed,
407 Ordering::Relaxed,
408 ) {
409 Ok(_) => break,
410 Err(actual) => peak = actual,
411 }
412 }
413 }
414
415 fn release(&self, bytes: usize) {
419 self.reserved.fetch_sub(bytes, Ordering::Relaxed);
420 }
421
422 fn release_reservation(&self) {
424 self.active_reservations.fetch_sub(1, Ordering::Relaxed);
425 }
426}
427
428pub struct MemoryReservation {
453 controller: Arc<MemoryController>,
455
456 reserved: usize,
458}
459
460impl MemoryReservation {
461 pub fn reserved(&self) -> usize {
463 self.reserved
464 }
465
466 pub fn try_grow(&mut self, additional: usize) -> bool {
472 if self.controller.try_reserve(additional) {
473 self.reserved = self.reserved.saturating_add(additional);
474 true
475 } else {
476 false
477 }
478 }
479
480 pub fn shrink(&mut self, bytes: usize) {
485 let to_release = bytes.min(self.reserved);
486 self.controller.release(to_release);
487 self.reserved = self.reserved.saturating_sub(to_release);
488 }
489
490 pub fn release_all(&mut self) {
494 if self.reserved > 0 {
495 self.controller.release(self.reserved);
496 self.reserved = 0;
497 }
498 }
499
500 pub fn should_spill(&self) -> bool {
502 self.controller.should_spill()
503 }
504
505 pub fn would_exceed_budget(&self, additional: usize) -> bool {
510 let current_total = self.controller.reserved();
511 current_total.saturating_add(additional) > self.controller.budget()
512 }
513
514 pub fn budget(&self) -> usize {
516 self.controller.budget()
517 }
518
519 pub fn temp_directory(&self) -> &PathBuf {
521 self.controller.temp_directory()
522 }
523
524 pub fn target_partition_bytes(&self) -> usize {
526 self.controller.target_partition_bytes()
527 }
528
529 pub fn record_spill(&self, bytes: usize) {
531 self.controller.record_spill(bytes);
532 }
533}
534
535impl Drop for MemoryReservation {
536 fn drop(&mut self) {
537 if self.reserved > 0 {
539 self.controller.release(self.reserved);
540 }
541 self.controller.release_reservation();
542 }
543}
544
545unsafe impl Send for MemoryController {}
547unsafe impl Sync for MemoryController {}
548
549#[cfg(test)]
550mod tests {
551 use std::sync::Arc;
552
553 use super::*;
554
555 #[test]
556 fn test_memory_size_parsing() {
557 assert_eq!(parse_memory_size("1024"), Some(1024));
558 assert_eq!(parse_memory_size("1KB"), Some(1024));
559 assert_eq!(parse_memory_size("1K"), Some(1024));
560 assert_eq!(parse_memory_size("1MB"), Some(1024 * 1024));
561 assert_eq!(parse_memory_size("1M"), Some(1024 * 1024));
562 assert_eq!(parse_memory_size("1GB"), Some(1024 * 1024 * 1024));
563 assert_eq!(parse_memory_size("1G"), Some(1024 * 1024 * 1024));
564 assert_eq!(parse_memory_size("4GB"), Some(4 * 1024 * 1024 * 1024));
565 assert_eq!(parse_memory_size("512mb"), Some(512 * 1024 * 1024));
566 assert_eq!(parse_memory_size(" 100 "), Some(100));
567 assert_eq!(parse_memory_size("invalid"), None);
568 }
569
570 #[test]
571 fn test_controller_creation() {
572 let controller = Arc::new(MemoryController::with_budget(1024 * 1024));
573 assert_eq!(controller.budget(), 1024 * 1024);
574 assert_eq!(controller.reserved(), 0);
575 assert_eq!(controller.available(), 1024 * 1024);
576 }
577
578 #[test]
579 fn test_reservation_try_grow() {
580 let controller = Arc::new(MemoryController::with_budget(1024));
581 let mut reservation = controller.create_reservation();
582
583 assert!(reservation.try_grow(512));
584 assert_eq!(reservation.reserved(), 512);
585 assert_eq!(controller.reserved(), 512);
586
587 assert!(reservation.try_grow(256));
588 assert_eq!(reservation.reserved(), 768);
589 assert_eq!(controller.reserved(), 768);
590
591 assert!(!reservation.try_grow(512));
593 assert_eq!(reservation.reserved(), 768);
594 assert_eq!(controller.reserved(), 768);
595
596 assert!(reservation.try_grow(256));
598 assert_eq!(reservation.reserved(), 1024);
599 }
600
601 #[test]
602 fn test_reservation_shrink() {
603 let controller = Arc::new(MemoryController::with_budget(1024));
604 let mut reservation = controller.create_reservation();
605
606 reservation.try_grow(1024);
607 assert_eq!(reservation.reserved(), 1024);
608
609 reservation.shrink(512);
610 assert_eq!(reservation.reserved(), 512);
611 assert_eq!(controller.reserved(), 512);
612
613 reservation.shrink(1024);
615 assert_eq!(reservation.reserved(), 0);
616 assert_eq!(controller.reserved(), 0);
617 }
618
619 #[test]
620 fn test_reservation_drop_releases_memory() {
621 let controller = Arc::new(MemoryController::with_budget(1024));
622
623 {
624 let mut reservation = controller.create_reservation();
625 reservation.try_grow(512);
626 assert_eq!(controller.reserved(), 512);
627 }
628
629 assert_eq!(controller.reserved(), 0);
631 }
632
633 #[test]
634 fn test_multiple_reservations() {
635 let controller = Arc::new(MemoryController::with_budget(1024));
636
637 let mut res1 = controller.create_reservation();
638 let mut res2 = controller.create_reservation();
639
640 assert!(res1.try_grow(300));
641 assert!(res2.try_grow(300));
642 assert_eq!(controller.reserved(), 600);
643
644 assert!(res1.try_grow(200));
645 assert!(res2.try_grow(200));
646 assert_eq!(controller.reserved(), 1000);
647
648 assert!(!res1.try_grow(100));
650 assert!(!res2.try_grow(100));
651
652 drop(res1);
654 assert_eq!(controller.reserved(), 500);
655
656 assert!(res2.try_grow(400));
658 assert_eq!(controller.reserved(), 900);
659 }
660
661 #[test]
662 fn test_should_spill() {
663 let config = MemoryConfig {
664 budget_bytes: 1000,
665 spill_threshold: 0.8, temp_directory: std::env::temp_dir(),
667 target_partition_bytes: DEFAULT_TARGET_PARTITION_BYTES,
668 };
669 let controller = Arc::new(MemoryController::new(config));
670
671 let mut reservation = controller.create_reservation();
672
673 reservation.try_grow(700);
674 assert!(!reservation.should_spill());
675
676 reservation.try_grow(100);
677 assert!(reservation.should_spill());
678 }
679
680 #[test]
681 fn test_spill_tracking() {
682 let controller = Arc::new(MemoryController::with_budget(1024));
683 assert_eq!(controller.bytes_spilled(), 0);
684
685 controller.record_spill(100);
686 assert_eq!(controller.bytes_spilled(), 100);
687
688 controller.record_spill(200);
689 assert_eq!(controller.bytes_spilled(), 300);
690 }
691
692 #[test]
693 fn test_active_reservation_count() {
694 let controller = Arc::new(MemoryController::with_budget(1024));
695 assert_eq!(controller.active_reservations(), 0);
696
697 let _res1 = controller.create_reservation();
698 assert_eq!(controller.active_reservations(), 1);
699
700 let _res2 = controller.create_reservation();
701 assert_eq!(controller.active_reservations(), 2);
702
703 drop(_res1);
704 assert_eq!(controller.active_reservations(), 1);
705
706 drop(_res2);
707 assert_eq!(controller.active_reservations(), 0);
708 }
709
710 #[test]
711 fn test_concurrent_reservations() {
712 use std::thread;
713
714 let controller = Arc::new(MemoryController::with_budget(10_000));
715 let mut handles = vec![];
716
717 for _ in 0..10 {
719 let controller = Arc::clone(&controller);
720 handles.push(thread::spawn(move || {
721 let mut reservation = controller.create_reservation();
722 reservation.try_grow(500);
723 thread::sleep(std::time::Duration::from_millis(10));
725 reservation.reserved()
726 }));
727 }
728
729 let reserved: usize = handles.into_iter().map(|h| h.join().unwrap()).sum();
731
732 assert_eq!(reserved, 5000);
734
735 assert_eq!(controller.reserved(), 0);
737 }
738
739 #[test]
740 fn test_memory_stats() {
741 let config = MemoryConfig {
742 budget_bytes: 1000,
743 spill_threshold: 0.8,
744 temp_directory: std::env::temp_dir(),
745 target_partition_bytes: DEFAULT_TARGET_PARTITION_BYTES,
746 };
747 let controller = Arc::new(MemoryController::new(config));
748
749 let stats = controller.stats();
751 assert_eq!(stats.budget_bytes, 1000);
752 assert_eq!(stats.reserved_bytes, 0);
753 assert_eq!(stats.peak_bytes, 0);
754 assert_eq!(stats.bytes_spilled, 0);
755 assert_eq!(stats.spill_count, 0);
756 assert_eq!(stats.active_reservations, 0);
757 assert_eq!(stats.spill_threshold, 0.8);
758
759 let mut res = controller.create_reservation();
761 res.try_grow(500);
762
763 let stats = controller.stats();
764 assert_eq!(stats.reserved_bytes, 500);
765 assert_eq!(stats.peak_bytes, 500);
766 assert_eq!(stats.active_reservations, 1);
767
768 assert!((stats.utilization() - 0.5).abs() < 0.001);
770 assert_eq!(stats.available_bytes(), 500);
771 assert!(!stats.is_under_pressure()); res.try_grow(400);
775 let stats = controller.stats();
776 assert!(stats.is_under_pressure()); }
778
779 #[test]
780 fn test_peak_memory_tracking() {
781 let controller = Arc::new(MemoryController::with_budget(1000));
782
783 {
785 let mut res = controller.create_reservation();
786 res.try_grow(800);
787 assert_eq!(controller.peak_memory(), 800);
788 }
789
790 assert_eq!(controller.reserved(), 0);
792 assert_eq!(controller.peak_memory(), 800);
793
794 {
796 let mut res = controller.create_reservation();
797 res.try_grow(900);
798 assert_eq!(controller.peak_memory(), 900);
799 }
800
801 {
803 let mut res = controller.create_reservation();
804 res.try_grow(100);
805 assert_eq!(controller.peak_memory(), 900);
806 }
807 }
808
809 #[test]
810 fn test_memory_stats_display() {
811 let stats = MemoryStats {
812 budget_bytes: 1024 * 1024 * 1024,
813 reserved_bytes: 512 * 1024 * 1024,
814 peak_bytes: 950 * 1024 * 1024,
815 bytes_spilled: 2 * 1024 * 1024 * 1024,
816 spill_count: 3,
817 active_reservations: 2,
818 spill_threshold: 0.8,
819 };
820
821 let display = format!("{}", stats);
822 assert!(display.contains("512.00MB"));
823 assert!(display.contains("1.00GB"));
824 assert!(display.contains("50.0%"));
825 assert!(display.contains("950.00MB"));
826 assert!(display.contains("2.00GB"));
827 assert!(display.contains("3 ops"));
828 }
829
830 #[test]
831 fn test_spill_count_tracking() {
832 let controller = Arc::new(MemoryController::with_budget(1024));
833
834 assert_eq!(controller.spill_count(), 0);
835 assert_eq!(controller.bytes_spilled(), 0);
836
837 controller.record_spill(100);
838 assert_eq!(controller.spill_count(), 1);
839 assert_eq!(controller.bytes_spilled(), 100);
840
841 controller.record_spill(200);
842 assert_eq!(controller.spill_count(), 2);
843 assert_eq!(controller.bytes_spilled(), 300);
844
845 controller.record_spill(50);
846 assert_eq!(controller.spill_count(), 3);
847 assert_eq!(controller.bytes_spilled(), 350);
848 }
849
850 #[test]
851 fn test_format_bytes_helper() {
852 let make_stats = |bytes| MemoryStats {
854 budget_bytes: bytes,
855 reserved_bytes: bytes,
856 peak_bytes: bytes,
857 bytes_spilled: 0,
858 spill_count: 0,
859 active_reservations: 0,
860 spill_threshold: 0.8,
861 };
862
863 let s = format!("{}", make_stats(500));
864 assert!(s.contains("500B"));
865
866 let s = format!("{}", make_stats(2048));
867 assert!(s.contains("2.00KB"));
868
869 let s = format!("{}", make_stats(5 * 1024 * 1024));
870 assert!(s.contains("5.00MB"));
871
872 let s = format!("{}", make_stats(3 * 1024 * 1024 * 1024));
873 assert!(s.contains("3.00GB"));
874 }
875}