1use crate::memory_pool::{
19 human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26 num::NonZeroUsize,
27 sync::atomic::{AtomicUsize, Ordering},
28};
29
30#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33 used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38 self.used.fetch_add(additional, Ordering::Relaxed);
39 }
40
41 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42 self.used.fetch_sub(shrink, Ordering::Relaxed);
43 }
44
45 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46 self.grow(reservation, additional);
47 Ok(())
48 }
49
50 fn reserved(&self) -> usize {
51 self.used.load(Ordering::Relaxed)
52 }
53
54 fn memory_limit(&self) -> MemoryLimit {
55 MemoryLimit::Infinite
56 }
57}
58
59#[derive(Debug)]
65pub struct GreedyMemoryPool {
66 pool_size: usize,
67 used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71 pub fn new(pool_size: usize) -> Self {
73 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74 Self {
75 pool_size,
76 used: AtomicUsize::new(0),
77 }
78 }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83 self.used.fetch_add(additional, Ordering::Relaxed);
84 }
85
86 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87 self.used.fetch_sub(shrink, Ordering::Relaxed);
88 }
89
90 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91 self.used
92 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93 let new_used = used + additional;
94 (new_used <= self.pool_size).then_some(new_used)
95 })
96 .map_err(|used| {
97 insufficient_capacity_err(
98 reservation,
99 additional,
100 self.pool_size.saturating_sub(used),
101 )
102 })?;
103 Ok(())
104 }
105
106 fn reserved(&self) -> usize {
107 self.used.load(Ordering::Relaxed)
108 }
109
110 fn memory_limit(&self) -> MemoryLimit {
111 MemoryLimit::Finite(self.pool_size)
112 }
113}
114
115#[derive(Debug)]
138pub struct FairSpillPool {
139 pool_size: usize,
141
142 state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147 num_spill: usize,
149
150 spillable: usize,
152
153 unspillable: usize,
155}
156
157impl FairSpillPool {
158 pub fn new(pool_size: usize) -> Self {
160 debug!("Created new FairSpillPool(pool_size={pool_size})");
161 Self {
162 pool_size,
163 state: Mutex::new(FairSpillPoolState {
164 num_spill: 0,
165 spillable: 0,
166 unspillable: 0,
167 }),
168 }
169 }
170}
171
172impl MemoryPool for FairSpillPool {
173 fn register(&self, consumer: &MemoryConsumer) {
174 if consumer.can_spill {
175 self.state.lock().num_spill += 1;
176 }
177 }
178
179 fn unregister(&self, consumer: &MemoryConsumer) {
180 if consumer.can_spill {
181 let mut state = self.state.lock();
182 state.num_spill = state.num_spill.checked_sub(1).unwrap();
183 }
184 }
185
186 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187 let mut state = self.state.lock();
188 match reservation.registration.consumer.can_spill {
189 true => state.spillable += additional,
190 false => state.unspillable += additional,
191 }
192 }
193
194 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195 let mut state = self.state.lock();
196 match reservation.registration.consumer.can_spill {
197 true => state.spillable -= shrink,
198 false => state.unspillable -= shrink,
199 }
200 }
201
202 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203 let mut state = self.state.lock();
204
205 match reservation.registration.consumer.can_spill {
206 true => {
207 let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210 let available = spill_available
212 .checked_div(state.num_spill)
213 .unwrap_or(spill_available);
214
215 if reservation.size + additional > available {
216 return Err(insufficient_capacity_err(
217 reservation,
218 additional,
219 available,
220 ));
221 }
222 state.spillable += additional;
223 }
224 false => {
225 let available = self
226 .pool_size
227 .saturating_sub(state.unspillable + state.spillable);
228
229 if available < additional {
230 return Err(insufficient_capacity_err(
231 reservation,
232 additional,
233 available,
234 ));
235 }
236 state.unspillable += additional;
237 }
238 }
239 Ok(())
240 }
241
242 fn reserved(&self) -> usize {
243 let state = self.state.lock();
244 state.spillable + state.unspillable
245 }
246
247 fn memory_limit(&self) -> MemoryLimit {
248 MemoryLimit::Finite(self.pool_size)
249 }
250}
251
252#[inline(always)]
258fn insufficient_capacity_err(
259 reservation: &MemoryReservation,
260 additional: usize,
261 available: usize,
262) -> DataFusionError {
263 resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
264 human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269 name: String,
270 can_spill: bool,
271 reserved: AtomicUsize,
272 peak: AtomicUsize,
273}
274
275impl TrackedConsumer {
276 fn reserved(&self) -> usize {
278 self.reserved.load(Ordering::Relaxed)
279 }
280
281 fn peak(&self) -> usize {
283 self.peak.load(Ordering::Relaxed)
284 }
285
286 fn grow(&self, additional: usize) {
289 self.reserved.fetch_add(additional, Ordering::Relaxed);
290 self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
291 }
292
293 fn shrink(&self, shrink: usize) {
296 self.reserved.fetch_sub(shrink, Ordering::Relaxed);
297 }
298}
299
300#[derive(Debug)]
325pub struct TrackConsumersPool<I> {
326 inner: I,
328 top: NonZeroUsize,
330 tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
332}
333
334impl<I: MemoryPool> TrackConsumersPool<I> {
335 pub fn new(inner: I, top: NonZeroUsize) -> Self {
370 Self {
371 inner,
372 top,
373 tracked_consumers: Default::default(),
374 }
375 }
376
377 pub fn report_top(&self, top: usize) -> String {
379 let mut consumers = self
380 .tracked_consumers
381 .lock()
382 .iter()
383 .map(|(consumer_id, tracked_consumer)| {
384 (
385 (
386 *consumer_id,
387 tracked_consumer.name.to_owned(),
388 tracked_consumer.can_spill,
389 tracked_consumer.peak(),
390 ),
391 tracked_consumer.reserved(),
392 )
393 })
394 .collect::<Vec<_>>();
395 consumers.sort_by(|a, b| b.1.cmp(&a.1)); consumers[0..std::cmp::min(top, consumers.len())]
398 .iter()
399 .map(|((id, name, can_spill, peak), size)| {
400 format!(
401 " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
402 human_readable_size(*size),
403 human_readable_size(*peak),
404 )
405 })
406 .collect::<Vec<_>>()
407 .join(",\n")
408 + "."
409 }
410}
411
412impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
413 fn register(&self, consumer: &MemoryConsumer) {
414 self.inner.register(consumer);
415
416 let mut guard = self.tracked_consumers.lock();
417 let existing = guard.insert(
418 consumer.id(),
419 TrackedConsumer {
420 name: consumer.name().to_string(),
421 can_spill: consumer.can_spill(),
422 reserved: Default::default(),
423 peak: Default::default(),
424 },
425 );
426
427 debug_assert!(
428 existing.is_none(),
429 "Registered was called twice on the same consumer"
430 );
431 }
432
433 fn unregister(&self, consumer: &MemoryConsumer) {
434 self.inner.unregister(consumer);
435 self.tracked_consumers.lock().remove(&consumer.id());
436 }
437
438 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
439 self.inner.grow(reservation, additional);
440 self.tracked_consumers
441 .lock()
442 .entry(reservation.consumer().id())
443 .and_modify(|tracked_consumer| {
444 tracked_consumer.grow(additional);
445 });
446 }
447
448 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
449 self.inner.shrink(reservation, shrink);
450 self.tracked_consumers
451 .lock()
452 .entry(reservation.consumer().id())
453 .and_modify(|tracked_consumer| {
454 tracked_consumer.shrink(shrink);
455 });
456 }
457
458 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
459 self.inner
460 .try_grow(reservation, additional)
461 .map_err(|e| match e {
462 DataFusionError::ResourcesExhausted(e) => {
463 DataFusionError::ResourcesExhausted(
465 provide_top_memory_consumers_to_error_msg(
466 e,
467 self.report_top(self.top.into()),
468 ),
469 )
470 }
471 _ => e,
472 })?;
473
474 self.tracked_consumers
475 .lock()
476 .entry(reservation.consumer().id())
477 .and_modify(|tracked_consumer| {
478 tracked_consumer.grow(additional);
479 });
480 Ok(())
481 }
482
483 fn reserved(&self) -> usize {
484 self.inner.reserved()
485 }
486
487 fn memory_limit(&self) -> MemoryLimit {
488 self.inner.memory_limit()
489 }
490}
491
492fn provide_top_memory_consumers_to_error_msg(
493 error_msg: String,
494 top_consumers: String,
495) -> String {
496 format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502 use insta::{allow_duplicates, assert_snapshot, Settings};
503 use std::sync::Arc;
504
505 fn make_settings() -> Settings {
506 let mut settings = Settings::clone_current();
507 settings.add_filter(
508 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
509 "$1#[ID](can spill: $2)",
510 );
511 settings
512 }
513
514 #[test]
515 fn test_fair() {
516 let pool = Arc::new(FairSpillPool::new(100)) as _;
517
518 let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
519 r1.grow(2000);
521 assert_eq!(pool.reserved(), 2000);
522
523 let mut r2 = MemoryConsumer::new("r2")
524 .with_can_spill(true)
525 .register(&pool);
526 r2.grow(2000);
528
529 assert_eq!(pool.reserved(), 4000);
530
531 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
532 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
533
534 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
535 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
536
537 r1.shrink(1990);
538 r2.shrink(2000);
539
540 assert_eq!(pool.reserved(), 10);
541
542 r1.try_grow(10).unwrap();
543 assert_eq!(pool.reserved(), 20);
544
545 r2.try_grow(80).unwrap();
547 assert_eq!(pool.reserved(), 100);
548
549 r2.shrink(70);
550
551 assert_eq!(r1.size(), 20);
552 assert_eq!(r2.size(), 10);
553 assert_eq!(pool.reserved(), 30);
554
555 let mut r3 = MemoryConsumer::new("r3")
556 .with_can_spill(true)
557 .register(&pool);
558
559 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
560 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
561
562 r2.free();
564 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
565 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
566
567 drop(r2);
569 assert_eq!(pool.reserved(), 20);
570 r3.try_grow(80).unwrap();
571
572 assert_eq!(pool.reserved(), 100);
573 r1.free();
574 assert_eq!(pool.reserved(), 80);
575
576 let mut r4 = MemoryConsumer::new("s4").register(&pool);
577 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
578 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
579 }
580
581 #[test]
582 fn test_tracked_consumers_pool() {
583 let setting = make_settings();
584 let _bound = setting.bind_to_scope();
585 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
586 GreedyMemoryPool::new(100),
587 NonZeroUsize::new(3).unwrap(),
588 ));
589
590 let mut r1 = MemoryConsumer::new("r1").register(&pool);
594 r1.grow(50);
595 r1.grow(20);
596 r1.shrink(20);
597
598 let mut r2 = MemoryConsumer::new("r2").register(&pool);
600 r2.try_grow(15)
601 .expect("should succeed in memory allotment for r2");
602
603 let mut r3 = MemoryConsumer::new("r3").register(&pool);
605 r3.try_resize(25)
606 .expect("should succeed in memory allotment for r3");
607 r3.try_resize(20)
608 .expect("should succeed in memory allotment for r3");
609
610 let mut r4 = MemoryConsumer::new("r4").register(&pool);
613 r4.grow(10);
614
615 let mut r5 = MemoryConsumer::new("r5").register(&pool);
618 let res = r5.try_grow(150);
619 assert!(res.is_err());
620 let error = res.unwrap_err().strip_backtrace();
621 assert_snapshot!(error, @r"
622 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
623 r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
624 r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
625 r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
626 Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
627 ");
628 }
629
630 #[test]
631 fn test_tracked_consumers_pool_register() {
632 let setting = make_settings();
633 let _bound = setting.bind_to_scope();
634 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
635 GreedyMemoryPool::new(100),
636 NonZeroUsize::new(3).unwrap(),
637 ));
638
639 let same_name = "foo";
640
641 let mut r0 = MemoryConsumer::new(same_name).register(&pool);
643 let res = r0.try_grow(150);
644 assert!(res.is_err());
645 let error = res.unwrap_err().strip_backtrace();
646 assert_snapshot!(error, @r"
647 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
648 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
649 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
650 ");
651
652 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
657 let mut r1 = new_consumer_same_name.register(&pool);
658 let res = r1.try_grow(150);
661 assert!(res.is_err());
662 let error = res.unwrap_err().strip_backtrace();
663 assert_snapshot!(error, @r"
664 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
665 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
666 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
667 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
668 ");
669
670 r1.grow(20);
672
673 let res = r1.try_grow(150);
674 assert!(res.is_err());
675 let error = res.unwrap_err().strip_backtrace();
676 assert_snapshot!(error, @r"
677 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
678 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
679 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
680 Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
681 ");
682
683 let consumer_with_same_name_but_different_hash =
686 MemoryConsumer::new(same_name).with_can_spill(true);
687 let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
688 let res = r2.try_grow(150);
689 assert!(res.is_err());
690 let error = res.unwrap_err().strip_backtrace();
691 assert_snapshot!(error, @r"
692 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
693 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
694 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
695 foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
696 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
697 ");
698 }
699
700 #[test]
701 fn test_tracked_consumers_pool_deregister() {
702 fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
703 let setting = make_settings();
705 let _bound = setting.bind_to_scope();
706 let mut r0 = MemoryConsumer::new("r0").register(&pool);
707 r0.grow(10);
708 let r1_consumer = MemoryConsumer::new("r1");
709 let mut r1 = r1_consumer.register(&pool);
710 r1.grow(20);
711
712 let res = r0.try_grow(150);
713 assert!(res.is_err());
714 let error = res.unwrap_err().strip_backtrace();
715 allow_duplicates!(assert_snapshot!(error, @r"
716 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
717 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
718 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
719 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
720 "));
721
722 drop(r1);
725 let res = r0.try_grow(150);
726 assert!(res.is_err());
727 let error = res.unwrap_err().strip_backtrace();
728 allow_duplicates!(assert_snapshot!(error, @r"
729 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
730 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
731 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
732 "));
733
734 let res = r0.try_grow(150);
737 assert!(res.is_err());
738 let error = res.unwrap_err().strip_backtrace();
739 allow_duplicates!(assert_snapshot!(error, @r"
740 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
741 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
742 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
743 "));
744
745 let res = r0.try_grow(150);
748 assert!(res.is_err());
749 let error = res.unwrap_err().strip_backtrace();
750 allow_duplicates!(assert_snapshot!(error, @r"
751 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
752 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
753 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
754 "));
755 }
756
757 let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
758 FairSpillPool::new(100),
759 NonZeroUsize::new(3).unwrap(),
760 ));
761 test_per_pool_type(tracked_spill_pool);
762
763 let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
764 GreedyMemoryPool::new(100),
765 NonZeroUsize::new(3).unwrap(),
766 ));
767 test_per_pool_type(tracked_greedy_pool);
768 }
769
770 #[test]
771 fn test_tracked_consumers_pool_use_beyond_errors() {
772 let setting = make_settings();
773 let _bound = setting.bind_to_scope();
774 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
775 Arc::new(TrackConsumersPool::new(
776 GreedyMemoryPool::new(100),
777 NonZeroUsize::new(3).unwrap(),
778 ));
779 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
780 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
781 .unwrap();
782 let mut r1 = MemoryConsumer::new("r1").register(&pool);
784 r1.grow(20);
785 let mut r2 = MemoryConsumer::new("r2").register(&pool);
787 r2.grow(15);
788 let mut r3 = MemoryConsumer::new("r3").register(&pool);
790 r3.grow(45);
791
792 let downcasted = upcasted
793 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
794 .unwrap();
795
796 let res = downcasted.report_top(2);
798 assert_snapshot!(res, @r"
799 r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
800 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
801 ");
802 }
803}