1use crate::memory_pool::{
19 MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26 num::NonZeroUsize,
27 sync::atomic::{AtomicUsize, Ordering},
28};
29
30#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33 used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38 self.used.fetch_add(additional, Ordering::Relaxed);
39 }
40
41 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42 self.used.fetch_sub(shrink, Ordering::Relaxed);
43 }
44
45 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46 self.grow(reservation, additional);
47 Ok(())
48 }
49
50 fn reserved(&self) -> usize {
51 self.used.load(Ordering::Relaxed)
52 }
53
54 fn memory_limit(&self) -> MemoryLimit {
55 MemoryLimit::Infinite
56 }
57}
58
59#[derive(Debug)]
65pub struct GreedyMemoryPool {
66 pool_size: usize,
67 used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71 pub fn new(pool_size: usize) -> Self {
73 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74 Self {
75 pool_size,
76 used: AtomicUsize::new(0),
77 }
78 }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83 self.used.fetch_add(additional, Ordering::Relaxed);
84 }
85
86 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87 self.used.fetch_sub(shrink, Ordering::Relaxed);
88 }
89
90 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91 self.used
92 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93 let new_used = used + additional;
94 (new_used <= self.pool_size).then_some(new_used)
95 })
96 .map_err(|used| {
97 insufficient_capacity_err(
98 reservation,
99 additional,
100 self.pool_size.saturating_sub(used),
101 )
102 })?;
103 Ok(())
104 }
105
106 fn reserved(&self) -> usize {
107 self.used.load(Ordering::Relaxed)
108 }
109
110 fn memory_limit(&self) -> MemoryLimit {
111 MemoryLimit::Finite(self.pool_size)
112 }
113}
114
115#[derive(Debug)]
138pub struct FairSpillPool {
139 pool_size: usize,
141
142 state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147 num_spill: usize,
149
150 spillable: usize,
152
153 unspillable: usize,
155}
156
157impl FairSpillPool {
158 pub fn new(pool_size: usize) -> Self {
160 debug!("Created new FairSpillPool(pool_size={pool_size})");
161 Self {
162 pool_size,
163 state: Mutex::new(FairSpillPoolState {
164 num_spill: 0,
165 spillable: 0,
166 unspillable: 0,
167 }),
168 }
169 }
170}
171
172impl MemoryPool for FairSpillPool {
173 fn register(&self, consumer: &MemoryConsumer) {
174 if consumer.can_spill {
175 self.state.lock().num_spill += 1;
176 }
177 }
178
179 fn unregister(&self, consumer: &MemoryConsumer) {
180 if consumer.can_spill {
181 let mut state = self.state.lock();
182 state.num_spill = state.num_spill.checked_sub(1).unwrap();
183 }
184 }
185
186 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187 let mut state = self.state.lock();
188 match reservation.registration.consumer.can_spill {
189 true => state.spillable += additional,
190 false => state.unspillable += additional,
191 }
192 }
193
194 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195 let mut state = self.state.lock();
196 match reservation.registration.consumer.can_spill {
197 true => state.spillable -= shrink,
198 false => state.unspillable -= shrink,
199 }
200 }
201
202 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203 let mut state = self.state.lock();
204
205 match reservation.registration.consumer.can_spill {
206 true => {
207 let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210 let available = spill_available
212 .checked_div(state.num_spill)
213 .unwrap_or(spill_available);
214
215 if reservation.size + additional > available {
216 return Err(insufficient_capacity_err(
217 reservation,
218 additional,
219 available,
220 ));
221 }
222 state.spillable += additional;
223 }
224 false => {
225 let available = self
226 .pool_size
227 .saturating_sub(state.unspillable + state.spillable);
228
229 if available < additional {
230 return Err(insufficient_capacity_err(
231 reservation,
232 additional,
233 available,
234 ));
235 }
236 state.unspillable += additional;
237 }
238 }
239 Ok(())
240 }
241
242 fn reserved(&self) -> usize {
243 let state = self.state.lock();
244 state.spillable + state.unspillable
245 }
246
247 fn memory_limit(&self) -> MemoryLimit {
248 MemoryLimit::Finite(self.pool_size)
249 }
250}
251
252#[inline(always)]
258fn insufficient_capacity_err(
259 reservation: &MemoryReservation,
260 additional: usize,
261 available: usize,
262) -> DataFusionError {
263 resources_datafusion_err!(
264 "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
265 human_readable_size(additional),
266 reservation.registration.consumer.name,
267 human_readable_size(reservation.size),
268 human_readable_size(available)
269 )
270}
271
272#[derive(Debug)]
273struct TrackedConsumer {
274 name: String,
275 can_spill: bool,
276 reserved: AtomicUsize,
277 peak: AtomicUsize,
278}
279
280impl TrackedConsumer {
281 fn reserved(&self) -> usize {
283 self.reserved.load(Ordering::Relaxed)
284 }
285
286 fn peak(&self) -> usize {
288 self.peak.load(Ordering::Relaxed)
289 }
290
291 fn grow(&self, additional: usize) {
294 self.reserved.fetch_add(additional, Ordering::Relaxed);
295 self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
296 }
297
298 fn shrink(&self, shrink: usize) {
301 self.reserved.fetch_sub(shrink, Ordering::Relaxed);
302 }
303}
304
305#[derive(Debug)]
330pub struct TrackConsumersPool<I> {
331 inner: I,
333 top: NonZeroUsize,
335 tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
337}
338
339impl<I: MemoryPool> TrackConsumersPool<I> {
340 pub fn new(inner: I, top: NonZeroUsize) -> Self {
377 Self {
378 inner,
379 top,
380 tracked_consumers: Default::default(),
381 }
382 }
383
384 pub fn report_top(&self, top: usize) -> String {
386 let mut consumers = self
387 .tracked_consumers
388 .lock()
389 .iter()
390 .map(|(consumer_id, tracked_consumer)| {
391 (
392 (
393 *consumer_id,
394 tracked_consumer.name.to_owned(),
395 tracked_consumer.can_spill,
396 tracked_consumer.peak(),
397 ),
398 tracked_consumer.reserved(),
399 )
400 })
401 .collect::<Vec<_>>();
402 consumers.sort_by(|a, b| b.1.cmp(&a.1)); consumers[0..std::cmp::min(top, consumers.len())]
405 .iter()
406 .map(|((id, name, can_spill, peak), size)| {
407 format!(
408 " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
409 human_readable_size(*size),
410 human_readable_size(*peak),
411 )
412 })
413 .collect::<Vec<_>>()
414 .join(",\n")
415 + "."
416 }
417}
418
419impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
420 fn register(&self, consumer: &MemoryConsumer) {
421 self.inner.register(consumer);
422
423 let mut guard = self.tracked_consumers.lock();
424 let existing = guard.insert(
425 consumer.id(),
426 TrackedConsumer {
427 name: consumer.name().to_string(),
428 can_spill: consumer.can_spill(),
429 reserved: Default::default(),
430 peak: Default::default(),
431 },
432 );
433
434 debug_assert!(
435 existing.is_none(),
436 "Registered was called twice on the same consumer"
437 );
438 }
439
440 fn unregister(&self, consumer: &MemoryConsumer) {
441 self.inner.unregister(consumer);
442 self.tracked_consumers.lock().remove(&consumer.id());
443 }
444
445 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
446 self.inner.grow(reservation, additional);
447 self.tracked_consumers
448 .lock()
449 .entry(reservation.consumer().id())
450 .and_modify(|tracked_consumer| {
451 tracked_consumer.grow(additional);
452 });
453 }
454
455 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
456 self.inner.shrink(reservation, shrink);
457 self.tracked_consumers
458 .lock()
459 .entry(reservation.consumer().id())
460 .and_modify(|tracked_consumer| {
461 tracked_consumer.shrink(shrink);
462 });
463 }
464
465 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
466 self.inner
467 .try_grow(reservation, additional)
468 .map_err(|e| match e {
469 DataFusionError::ResourcesExhausted(e) => {
470 DataFusionError::ResourcesExhausted(
472 provide_top_memory_consumers_to_error_msg(
473 &reservation.consumer().name,
474 &e,
475 &self.report_top(self.top.into()),
476 ),
477 )
478 }
479 _ => e,
480 })?;
481
482 self.tracked_consumers
483 .lock()
484 .entry(reservation.consumer().id())
485 .and_modify(|tracked_consumer| {
486 tracked_consumer.grow(additional);
487 });
488 Ok(())
489 }
490
491 fn reserved(&self) -> usize {
492 self.inner.reserved()
493 }
494
495 fn memory_limit(&self) -> MemoryLimit {
496 self.inner.memory_limit()
497 }
498}
499
500fn provide_top_memory_consumers_to_error_msg(
501 consumer_name: &str,
502 error_msg: &str,
503 top_consumers: &str,
504) -> String {
505 format!(
506 "Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}"
507 )
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use insta::{Settings, allow_duplicates, assert_snapshot};
514 use std::sync::Arc;
515
516 fn make_settings() -> Settings {
517 let mut settings = Settings::clone_current();
518 settings.add_filter(
519 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
520 "$1#[ID](can spill: $2)",
521 );
522 settings
523 }
524
525 #[test]
526 fn test_fair() {
527 let pool = Arc::new(FairSpillPool::new(100)) as _;
528
529 let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
530 r1.grow(2000);
532 assert_eq!(pool.reserved(), 2000);
533
534 let mut r2 = MemoryConsumer::new("r2")
535 .with_can_spill(true)
536 .register(&pool);
537 r2.grow(2000);
539
540 assert_eq!(pool.reserved(), 4000);
541
542 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
543 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
544
545 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
546 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
547
548 r1.shrink(1990);
549 r2.shrink(2000);
550
551 assert_eq!(pool.reserved(), 10);
552
553 r1.try_grow(10).unwrap();
554 assert_eq!(pool.reserved(), 20);
555
556 r2.try_grow(80).unwrap();
558 assert_eq!(pool.reserved(), 100);
559
560 r2.shrink(70);
561
562 assert_eq!(r1.size(), 20);
563 assert_eq!(r2.size(), 10);
564 assert_eq!(pool.reserved(), 30);
565
566 let mut r3 = MemoryConsumer::new("r3")
567 .with_can_spill(true)
568 .register(&pool);
569
570 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
571 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
572
573 r2.free();
575 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
576 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
577
578 drop(r2);
580 assert_eq!(pool.reserved(), 20);
581 r3.try_grow(80).unwrap();
582
583 assert_eq!(pool.reserved(), 100);
584 r1.free();
585 assert_eq!(pool.reserved(), 80);
586
587 let mut r4 = MemoryConsumer::new("s4").register(&pool);
588 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
589 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
590 }
591
592 #[test]
593 fn test_tracked_consumers_pool() {
594 let setting = make_settings();
595 let _bound = setting.bind_to_scope();
596 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
597 GreedyMemoryPool::new(100),
598 NonZeroUsize::new(3).unwrap(),
599 ));
600
601 let mut r1 = MemoryConsumer::new("r1").register(&pool);
605 r1.grow(50);
606 r1.grow(20);
607 r1.shrink(20);
608
609 let mut r2 = MemoryConsumer::new("r2").register(&pool);
611 r2.try_grow(15)
612 .expect("should succeed in memory allotment for r2");
613
614 let mut r3 = MemoryConsumer::new("r3").register(&pool);
616 r3.try_resize(25)
617 .expect("should succeed in memory allotment for r3");
618 r3.try_resize(20)
619 .expect("should succeed in memory allotment for r3");
620
621 let mut r4 = MemoryConsumer::new("r4").register(&pool);
624 r4.grow(10);
625
626 let mut r5 = MemoryConsumer::new("r5").register(&pool);
629 let res = r5.try_grow(150);
630 assert!(res.is_err());
631 let error = res.unwrap_err().strip_backtrace();
632 assert_snapshot!(error, @r"
633 Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
634 r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
635 r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
636 r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
637 Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
638 ");
639 }
640
641 #[test]
642 fn test_tracked_consumers_pool_register() {
643 let setting = make_settings();
644 let _bound = setting.bind_to_scope();
645 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
646 GreedyMemoryPool::new(100),
647 NonZeroUsize::new(3).unwrap(),
648 ));
649
650 let same_name = "foo";
651
652 let mut r0 = MemoryConsumer::new(same_name).register(&pool);
654 let res = r0.try_grow(150);
655 assert!(res.is_err());
656 let error = res.unwrap_err().strip_backtrace();
657 assert_snapshot!(error, @r"
658 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
659 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
660 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
661 ");
662
663 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
668 let mut r1 = new_consumer_same_name.register(&pool);
669 let res = r1.try_grow(150);
672 assert!(res.is_err());
673 let error = res.unwrap_err().strip_backtrace();
674 assert_snapshot!(error, @r"
675 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
676 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
677 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
678 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
679 ");
680
681 r1.grow(20);
683
684 let res = r1.try_grow(150);
685 assert!(res.is_err());
686 let error = res.unwrap_err().strip_backtrace();
687 assert_snapshot!(error, @r"
688 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
689 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
690 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
691 Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
692 ");
693
694 let consumer_with_same_name_but_different_hash =
697 MemoryConsumer::new(same_name).with_can_spill(true);
698 let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
699 let res = r2.try_grow(150);
700 assert!(res.is_err());
701 let error = res.unwrap_err().strip_backtrace();
702 assert_snapshot!(error, @r"
703 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
704 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
705 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
706 foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
707 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
708 ");
709 }
710
711 #[test]
712 fn test_tracked_consumers_pool_deregister() {
713 fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
714 let setting = make_settings();
716 let _bound = setting.bind_to_scope();
717 let mut r0 = MemoryConsumer::new("r0").register(&pool);
718 r0.grow(10);
719 let r1_consumer = MemoryConsumer::new("r1");
720 let mut r1 = r1_consumer.register(&pool);
721 r1.grow(20);
722
723 let res = r0.try_grow(150);
724 assert!(res.is_err());
725 let error = res.unwrap_err().strip_backtrace();
726 allow_duplicates!(assert_snapshot!(error, @r"
727 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
728 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
729 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
730 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
731 "));
732
733 drop(r1);
736 let res = r0.try_grow(150);
737 assert!(res.is_err());
738 let error = res.unwrap_err().strip_backtrace();
739 allow_duplicates!(assert_snapshot!(error, @r"
740 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
741 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
742 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
743 "));
744
745 let res = r0.try_grow(150);
748 assert!(res.is_err());
749 let error = res.unwrap_err().strip_backtrace();
750 allow_duplicates!(assert_snapshot!(error, @r"
751 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
752 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
753 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
754 "));
755
756 let res = r0.try_grow(150);
759 assert!(res.is_err());
760 let error = res.unwrap_err().strip_backtrace();
761 allow_duplicates!(assert_snapshot!(error, @r"
762 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
763 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
764 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
765 "));
766 }
767
768 let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
769 FairSpillPool::new(100),
770 NonZeroUsize::new(3).unwrap(),
771 ));
772 test_per_pool_type(tracked_spill_pool);
773
774 let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
775 GreedyMemoryPool::new(100),
776 NonZeroUsize::new(3).unwrap(),
777 ));
778 test_per_pool_type(tracked_greedy_pool);
779 }
780
781 #[test]
782 fn test_tracked_consumers_pool_use_beyond_errors() {
783 let setting = make_settings();
784 let _bound = setting.bind_to_scope();
785 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
786 Arc::new(TrackConsumersPool::new(
787 GreedyMemoryPool::new(100),
788 NonZeroUsize::new(3).unwrap(),
789 ));
790 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
791 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
792 .unwrap();
793 let mut r1 = MemoryConsumer::new("r1").register(&pool);
795 r1.grow(20);
796 let mut r2 = MemoryConsumer::new("r2").register(&pool);
798 r2.grow(15);
799 let mut r3 = MemoryConsumer::new("r3").register(&pool);
801 r3.grow(45);
802
803 let downcasted = upcasted
804 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
805 .unwrap();
806
807 let res = downcasted.report_top(2);
809 assert_snapshot!(res, @r"
810 r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
811 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
812 ");
813 }
814}