1use crate::memory_pool::{
19 human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26 num::NonZeroUsize,
27 sync::atomic::{AtomicUsize, Ordering},
28};
29
30#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33 used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38 self.used.fetch_add(additional, Ordering::Relaxed);
39 }
40
41 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42 self.used.fetch_sub(shrink, Ordering::Relaxed);
43 }
44
45 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46 self.grow(reservation, additional);
47 Ok(())
48 }
49
50 fn reserved(&self) -> usize {
51 self.used.load(Ordering::Relaxed)
52 }
53
54 fn memory_limit(&self) -> MemoryLimit {
55 MemoryLimit::Infinite
56 }
57}
58
59#[derive(Debug)]
65pub struct GreedyMemoryPool {
66 pool_size: usize,
67 used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71 pub fn new(pool_size: usize) -> Self {
73 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74 Self {
75 pool_size,
76 used: AtomicUsize::new(0),
77 }
78 }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83 self.used.fetch_add(additional, Ordering::Relaxed);
84 }
85
86 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87 self.used.fetch_sub(shrink, Ordering::Relaxed);
88 }
89
90 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91 self.used
92 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93 let new_used = used + additional;
94 (new_used <= self.pool_size).then_some(new_used)
95 })
96 .map_err(|used| {
97 insufficient_capacity_err(
98 reservation,
99 additional,
100 self.pool_size.saturating_sub(used),
101 )
102 })?;
103 Ok(())
104 }
105
106 fn reserved(&self) -> usize {
107 self.used.load(Ordering::Relaxed)
108 }
109
110 fn memory_limit(&self) -> MemoryLimit {
111 MemoryLimit::Finite(self.pool_size)
112 }
113}
114
115#[derive(Debug)]
138pub struct FairSpillPool {
139 pool_size: usize,
141
142 state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147 num_spill: usize,
149
150 spillable: usize,
152
153 unspillable: usize,
155}
156
157impl FairSpillPool {
158 pub fn new(pool_size: usize) -> Self {
160 debug!("Created new FairSpillPool(pool_size={pool_size})");
161 Self {
162 pool_size,
163 state: Mutex::new(FairSpillPoolState {
164 num_spill: 0,
165 spillable: 0,
166 unspillable: 0,
167 }),
168 }
169 }
170}
171
172impl MemoryPool for FairSpillPool {
173 fn register(&self, consumer: &MemoryConsumer) {
174 if consumer.can_spill {
175 self.state.lock().num_spill += 1;
176 }
177 }
178
179 fn unregister(&self, consumer: &MemoryConsumer) {
180 if consumer.can_spill {
181 let mut state = self.state.lock();
182 state.num_spill = state.num_spill.checked_sub(1).unwrap();
183 }
184 }
185
186 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187 let mut state = self.state.lock();
188 match reservation.registration.consumer.can_spill {
189 true => state.spillable += additional,
190 false => state.unspillable += additional,
191 }
192 }
193
194 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195 let mut state = self.state.lock();
196 match reservation.registration.consumer.can_spill {
197 true => state.spillable -= shrink,
198 false => state.unspillable -= shrink,
199 }
200 }
201
202 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203 let mut state = self.state.lock();
204
205 match reservation.registration.consumer.can_spill {
206 true => {
207 let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210 let available = spill_available
212 .checked_div(state.num_spill)
213 .unwrap_or(spill_available);
214
215 if reservation.size + additional > available {
216 return Err(insufficient_capacity_err(
217 reservation,
218 additional,
219 available,
220 ));
221 }
222 state.spillable += additional;
223 }
224 false => {
225 let available = self
226 .pool_size
227 .saturating_sub(state.unspillable + state.spillable);
228
229 if available < additional {
230 return Err(insufficient_capacity_err(
231 reservation,
232 additional,
233 available,
234 ));
235 }
236 state.unspillable += additional;
237 }
238 }
239 Ok(())
240 }
241
242 fn reserved(&self) -> usize {
243 let state = self.state.lock();
244 state.spillable + state.unspillable
245 }
246
247 fn memory_limit(&self) -> MemoryLimit {
248 MemoryLimit::Finite(self.pool_size)
249 }
250}
251
252#[inline(always)]
258fn insufficient_capacity_err(
259 reservation: &MemoryReservation,
260 additional: usize,
261 available: usize,
262) -> DataFusionError {
263 resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
264 human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269 name: String,
270 can_spill: bool,
271 reserved: AtomicUsize,
272 peak: AtomicUsize,
273}
274
275impl TrackedConsumer {
276 fn reserved(&self) -> usize {
278 self.reserved.load(Ordering::Relaxed)
279 }
280
281 fn peak(&self) -> usize {
283 self.peak.load(Ordering::Relaxed)
284 }
285
286 fn grow(&self, additional: usize) {
289 self.reserved.fetch_add(additional, Ordering::Relaxed);
290 self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
291 }
292
293 fn shrink(&self, shrink: usize) {
296 self.reserved.fetch_sub(shrink, Ordering::Relaxed);
297 }
298}
299
300#[derive(Debug)]
325pub struct TrackConsumersPool<I> {
326 inner: I,
328 top: NonZeroUsize,
330 tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
332}
333
334impl<I: MemoryPool> TrackConsumersPool<I> {
335 pub fn new(inner: I, top: NonZeroUsize) -> Self {
372 Self {
373 inner,
374 top,
375 tracked_consumers: Default::default(),
376 }
377 }
378
379 pub fn report_top(&self, top: usize) -> String {
381 let mut consumers = self
382 .tracked_consumers
383 .lock()
384 .iter()
385 .map(|(consumer_id, tracked_consumer)| {
386 (
387 (
388 *consumer_id,
389 tracked_consumer.name.to_owned(),
390 tracked_consumer.can_spill,
391 tracked_consumer.peak(),
392 ),
393 tracked_consumer.reserved(),
394 )
395 })
396 .collect::<Vec<_>>();
397 consumers.sort_by(|a, b| b.1.cmp(&a.1)); consumers[0..std::cmp::min(top, consumers.len())]
400 .iter()
401 .map(|((id, name, can_spill, peak), size)| {
402 format!(
403 " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
404 human_readable_size(*size),
405 human_readable_size(*peak),
406 )
407 })
408 .collect::<Vec<_>>()
409 .join(",\n")
410 + "."
411 }
412}
413
414impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
415 fn register(&self, consumer: &MemoryConsumer) {
416 self.inner.register(consumer);
417
418 let mut guard = self.tracked_consumers.lock();
419 let existing = guard.insert(
420 consumer.id(),
421 TrackedConsumer {
422 name: consumer.name().to_string(),
423 can_spill: consumer.can_spill(),
424 reserved: Default::default(),
425 peak: Default::default(),
426 },
427 );
428
429 debug_assert!(
430 existing.is_none(),
431 "Registered was called twice on the same consumer"
432 );
433 }
434
435 fn unregister(&self, consumer: &MemoryConsumer) {
436 self.inner.unregister(consumer);
437 self.tracked_consumers.lock().remove(&consumer.id());
438 }
439
440 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
441 self.inner.grow(reservation, additional);
442 self.tracked_consumers
443 .lock()
444 .entry(reservation.consumer().id())
445 .and_modify(|tracked_consumer| {
446 tracked_consumer.grow(additional);
447 });
448 }
449
450 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
451 self.inner.shrink(reservation, shrink);
452 self.tracked_consumers
453 .lock()
454 .entry(reservation.consumer().id())
455 .and_modify(|tracked_consumer| {
456 tracked_consumer.shrink(shrink);
457 });
458 }
459
460 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
461 self.inner
462 .try_grow(reservation, additional)
463 .map_err(|e| match e {
464 DataFusionError::ResourcesExhausted(e) => {
465 DataFusionError::ResourcesExhausted(
467 provide_top_memory_consumers_to_error_msg(
468 &reservation.consumer().name,
469 e,
470 self.report_top(self.top.into()),
471 ),
472 )
473 }
474 _ => e,
475 })?;
476
477 self.tracked_consumers
478 .lock()
479 .entry(reservation.consumer().id())
480 .and_modify(|tracked_consumer| {
481 tracked_consumer.grow(additional);
482 });
483 Ok(())
484 }
485
486 fn reserved(&self) -> usize {
487 self.inner.reserved()
488 }
489
490 fn memory_limit(&self) -> MemoryLimit {
491 self.inner.memory_limit()
492 }
493}
494
495fn provide_top_memory_consumers_to_error_msg(
496 consumer_name: &str,
497 error_msg: String,
498 top_consumers: String,
499) -> String {
500 format!("Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
501}
502
503#[cfg(test)]
504mod tests {
505 use super::*;
506 use insta::{allow_duplicates, assert_snapshot, Settings};
507 use std::sync::Arc;
508
509 fn make_settings() -> Settings {
510 let mut settings = Settings::clone_current();
511 settings.add_filter(
512 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
513 "$1#[ID](can spill: $2)",
514 );
515 settings
516 }
517
518 #[test]
519 fn test_fair() {
520 let pool = Arc::new(FairSpillPool::new(100)) as _;
521
522 let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
523 r1.grow(2000);
525 assert_eq!(pool.reserved(), 2000);
526
527 let mut r2 = MemoryConsumer::new("r2")
528 .with_can_spill(true)
529 .register(&pool);
530 r2.grow(2000);
532
533 assert_eq!(pool.reserved(), 4000);
534
535 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
536 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
537
538 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
539 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
540
541 r1.shrink(1990);
542 r2.shrink(2000);
543
544 assert_eq!(pool.reserved(), 10);
545
546 r1.try_grow(10).unwrap();
547 assert_eq!(pool.reserved(), 20);
548
549 r2.try_grow(80).unwrap();
551 assert_eq!(pool.reserved(), 100);
552
553 r2.shrink(70);
554
555 assert_eq!(r1.size(), 20);
556 assert_eq!(r2.size(), 10);
557 assert_eq!(pool.reserved(), 30);
558
559 let mut r3 = MemoryConsumer::new("r3")
560 .with_can_spill(true)
561 .register(&pool);
562
563 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
564 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
565
566 r2.free();
568 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
569 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
570
571 drop(r2);
573 assert_eq!(pool.reserved(), 20);
574 r3.try_grow(80).unwrap();
575
576 assert_eq!(pool.reserved(), 100);
577 r1.free();
578 assert_eq!(pool.reserved(), 80);
579
580 let mut r4 = MemoryConsumer::new("s4").register(&pool);
581 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
582 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
583 }
584
585 #[test]
586 fn test_tracked_consumers_pool() {
587 let setting = make_settings();
588 let _bound = setting.bind_to_scope();
589 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
590 GreedyMemoryPool::new(100),
591 NonZeroUsize::new(3).unwrap(),
592 ));
593
594 let mut r1 = MemoryConsumer::new("r1").register(&pool);
598 r1.grow(50);
599 r1.grow(20);
600 r1.shrink(20);
601
602 let mut r2 = MemoryConsumer::new("r2").register(&pool);
604 r2.try_grow(15)
605 .expect("should succeed in memory allotment for r2");
606
607 let mut r3 = MemoryConsumer::new("r3").register(&pool);
609 r3.try_resize(25)
610 .expect("should succeed in memory allotment for r3");
611 r3.try_resize(20)
612 .expect("should succeed in memory allotment for r3");
613
614 let mut r4 = MemoryConsumer::new("r4").register(&pool);
617 r4.grow(10);
618
619 let mut r5 = MemoryConsumer::new("r5").register(&pool);
622 let res = r5.try_grow(150);
623 assert!(res.is_err());
624 let error = res.unwrap_err().strip_backtrace();
625 assert_snapshot!(error, @r"
626 Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
627 r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
628 r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
629 r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
630 Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
631 ");
632 }
633
634 #[test]
635 fn test_tracked_consumers_pool_register() {
636 let setting = make_settings();
637 let _bound = setting.bind_to_scope();
638 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
639 GreedyMemoryPool::new(100),
640 NonZeroUsize::new(3).unwrap(),
641 ));
642
643 let same_name = "foo";
644
645 let mut r0 = MemoryConsumer::new(same_name).register(&pool);
647 let res = r0.try_grow(150);
648 assert!(res.is_err());
649 let error = res.unwrap_err().strip_backtrace();
650 assert_snapshot!(error, @r"
651 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
652 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
653 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
654 ");
655
656 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
661 let mut r1 = new_consumer_same_name.register(&pool);
662 let res = r1.try_grow(150);
665 assert!(res.is_err());
666 let error = res.unwrap_err().strip_backtrace();
667 assert_snapshot!(error, @r"
668 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
669 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
670 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
671 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
672 ");
673
674 r1.grow(20);
676
677 let res = r1.try_grow(150);
678 assert!(res.is_err());
679 let error = res.unwrap_err().strip_backtrace();
680 assert_snapshot!(error, @r"
681 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
682 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
683 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
684 Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
685 ");
686
687 let consumer_with_same_name_but_different_hash =
690 MemoryConsumer::new(same_name).with_can_spill(true);
691 let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
692 let res = r2.try_grow(150);
693 assert!(res.is_err());
694 let error = res.unwrap_err().strip_backtrace();
695 assert_snapshot!(error, @r"
696 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
697 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
698 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
699 foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
700 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
701 ");
702 }
703
704 #[test]
705 fn test_tracked_consumers_pool_deregister() {
706 fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
707 let setting = make_settings();
709 let _bound = setting.bind_to_scope();
710 let mut r0 = MemoryConsumer::new("r0").register(&pool);
711 r0.grow(10);
712 let r1_consumer = MemoryConsumer::new("r1");
713 let mut r1 = r1_consumer.register(&pool);
714 r1.grow(20);
715
716 let res = r0.try_grow(150);
717 assert!(res.is_err());
718 let error = res.unwrap_err().strip_backtrace();
719 allow_duplicates!(assert_snapshot!(error, @r"
720 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
721 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
722 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
723 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
724 "));
725
726 drop(r1);
729 let res = r0.try_grow(150);
730 assert!(res.is_err());
731 let error = res.unwrap_err().strip_backtrace();
732 allow_duplicates!(assert_snapshot!(error, @r"
733 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
734 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
735 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
736 "));
737
738 let res = r0.try_grow(150);
741 assert!(res.is_err());
742 let error = res.unwrap_err().strip_backtrace();
743 allow_duplicates!(assert_snapshot!(error, @r"
744 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
745 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
746 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
747 "));
748
749 let res = r0.try_grow(150);
752 assert!(res.is_err());
753 let error = res.unwrap_err().strip_backtrace();
754 allow_duplicates!(assert_snapshot!(error, @r"
755 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
756 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
757 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
758 "));
759 }
760
761 let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
762 FairSpillPool::new(100),
763 NonZeroUsize::new(3).unwrap(),
764 ));
765 test_per_pool_type(tracked_spill_pool);
766
767 let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
768 GreedyMemoryPool::new(100),
769 NonZeroUsize::new(3).unwrap(),
770 ));
771 test_per_pool_type(tracked_greedy_pool);
772 }
773
774 #[test]
775 fn test_tracked_consumers_pool_use_beyond_errors() {
776 let setting = make_settings();
777 let _bound = setting.bind_to_scope();
778 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
779 Arc::new(TrackConsumersPool::new(
780 GreedyMemoryPool::new(100),
781 NonZeroUsize::new(3).unwrap(),
782 ));
783 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
784 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
785 .unwrap();
786 let mut r1 = MemoryConsumer::new("r1").register(&pool);
788 r1.grow(20);
789 let mut r2 = MemoryConsumer::new("r2").register(&pool);
791 r2.grow(15);
792 let mut r3 = MemoryConsumer::new("r3").register(&pool);
794 r3.grow(45);
795
796 let downcasted = upcasted
797 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
798 .unwrap();
799
800 let res = downcasted.report_top(2);
802 assert_snapshot!(res, @r"
803 r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
804 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
805 ");
806 }
807}