1use crate::memory_pool::{
19 human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26 num::NonZeroUsize,
27 sync::atomic::{AtomicUsize, Ordering},
28};
29
30#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33 used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38 self.used.fetch_add(additional, Ordering::Relaxed);
39 }
40
41 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42 self.used.fetch_sub(shrink, Ordering::Relaxed);
43 }
44
45 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46 self.grow(reservation, additional);
47 Ok(())
48 }
49
50 fn reserved(&self) -> usize {
51 self.used.load(Ordering::Relaxed)
52 }
53
54 fn memory_limit(&self) -> MemoryLimit {
55 MemoryLimit::Infinite
56 }
57}
58
59#[derive(Debug)]
65pub struct GreedyMemoryPool {
66 pool_size: usize,
67 used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71 pub fn new(pool_size: usize) -> Self {
73 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74 Self {
75 pool_size,
76 used: AtomicUsize::new(0),
77 }
78 }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83 self.used.fetch_add(additional, Ordering::Relaxed);
84 }
85
86 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87 self.used.fetch_sub(shrink, Ordering::Relaxed);
88 }
89
90 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91 self.used
92 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93 let new_used = used + additional;
94 (new_used <= self.pool_size).then_some(new_used)
95 })
96 .map_err(|used| {
97 insufficient_capacity_err(
98 reservation,
99 additional,
100 self.pool_size.saturating_sub(used),
101 )
102 })?;
103 Ok(())
104 }
105
106 fn reserved(&self) -> usize {
107 self.used.load(Ordering::Relaxed)
108 }
109
110 fn memory_limit(&self) -> MemoryLimit {
111 MemoryLimit::Finite(self.pool_size)
112 }
113}
114
115#[derive(Debug)]
138pub struct FairSpillPool {
139 pool_size: usize,
141
142 state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147 num_spill: usize,
149
150 spillable: usize,
152
153 unspillable: usize,
155}
156
157impl FairSpillPool {
158 pub fn new(pool_size: usize) -> Self {
160 debug!("Created new FairSpillPool(pool_size={pool_size})");
161 Self {
162 pool_size,
163 state: Mutex::new(FairSpillPoolState {
164 num_spill: 0,
165 spillable: 0,
166 unspillable: 0,
167 }),
168 }
169 }
170}
171
172impl MemoryPool for FairSpillPool {
173 fn register(&self, consumer: &MemoryConsumer) {
174 if consumer.can_spill {
175 self.state.lock().num_spill += 1;
176 }
177 }
178
179 fn unregister(&self, consumer: &MemoryConsumer) {
180 if consumer.can_spill {
181 let mut state = self.state.lock();
182 state.num_spill = state.num_spill.checked_sub(1).unwrap();
183 }
184 }
185
186 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187 let mut state = self.state.lock();
188 match reservation.registration.consumer.can_spill {
189 true => state.spillable += additional,
190 false => state.unspillable += additional,
191 }
192 }
193
194 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195 let mut state = self.state.lock();
196 match reservation.registration.consumer.can_spill {
197 true => state.spillable -= shrink,
198 false => state.unspillable -= shrink,
199 }
200 }
201
202 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203 let mut state = self.state.lock();
204
205 match reservation.registration.consumer.can_spill {
206 true => {
207 let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210 let available = spill_available
212 .checked_div(state.num_spill)
213 .unwrap_or(spill_available);
214
215 if reservation.size + additional > available {
216 return Err(insufficient_capacity_err(
217 reservation,
218 additional,
219 available,
220 ));
221 }
222 state.spillable += additional;
223 }
224 false => {
225 let available = self
226 .pool_size
227 .saturating_sub(state.unspillable + state.spillable);
228
229 if available < additional {
230 return Err(insufficient_capacity_err(
231 reservation,
232 additional,
233 available,
234 ));
235 }
236 state.unspillable += additional;
237 }
238 }
239 Ok(())
240 }
241
242 fn reserved(&self) -> usize {
243 let state = self.state.lock();
244 state.spillable + state.unspillable
245 }
246
247 fn memory_limit(&self) -> MemoryLimit {
248 MemoryLimit::Finite(self.pool_size)
249 }
250}
251
252#[inline(always)]
258fn insufficient_capacity_err(
259 reservation: &MemoryReservation,
260 additional: usize,
261 available: usize,
262) -> DataFusionError {
263 resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
264 human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269 name: String,
270 can_spill: bool,
271 reserved: AtomicUsize,
272}
273
274impl TrackedConsumer {
275 fn reserved(&self) -> usize {
277 self.reserved.load(Ordering::Relaxed)
278 }
279
280 fn grow(&self, additional: usize) {
283 self.reserved.fetch_add(additional, Ordering::Relaxed);
284 }
285
286 fn shrink(&self, shrink: usize) {
289 self.reserved.fetch_sub(shrink, Ordering::Relaxed);
290 }
291}
292
293#[derive(Debug)]
302pub struct TrackConsumersPool<I> {
303 inner: I,
305 top: NonZeroUsize,
307 tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
309}
310
311impl<I: MemoryPool> TrackConsumersPool<I> {
312 pub fn new(inner: I, top: NonZeroUsize) -> Self {
317 Self {
318 inner,
319 top,
320 tracked_consumers: Default::default(),
321 }
322 }
323
324 pub fn report_top(&self, top: usize) -> String {
326 let mut consumers = self
327 .tracked_consumers
328 .lock()
329 .iter()
330 .map(|(consumer_id, tracked_consumer)| {
331 (
332 (
333 *consumer_id,
334 tracked_consumer.name.to_owned(),
335 tracked_consumer.can_spill,
336 ),
337 tracked_consumer.reserved(),
338 )
339 })
340 .collect::<Vec<_>>();
341 consumers.sort_by(|a, b| b.1.cmp(&a.1)); consumers[0..std::cmp::min(top, consumers.len())]
344 .iter()
345 .map(|((id, name, can_spill), size)| {
346 format!(
347 " {name}#{id}(can spill: {can_spill}) consumed {}",
348 human_readable_size(*size)
349 )
350 })
351 .collect::<Vec<_>>()
352 .join(",\n")
353 + "."
354 }
355}
356
357impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
358 fn register(&self, consumer: &MemoryConsumer) {
359 self.inner.register(consumer);
360
361 let mut guard = self.tracked_consumers.lock();
362 let existing = guard.insert(
363 consumer.id(),
364 TrackedConsumer {
365 name: consumer.name().to_string(),
366 can_spill: consumer.can_spill(),
367 reserved: Default::default(),
368 },
369 );
370
371 debug_assert!(
372 existing.is_none(),
373 "Registered was called twice on the same consumer"
374 );
375 }
376
377 fn unregister(&self, consumer: &MemoryConsumer) {
378 self.inner.unregister(consumer);
379 self.tracked_consumers.lock().remove(&consumer.id());
380 }
381
382 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
383 self.inner.grow(reservation, additional);
384 self.tracked_consumers
385 .lock()
386 .entry(reservation.consumer().id())
387 .and_modify(|tracked_consumer| {
388 tracked_consumer.grow(additional);
389 });
390 }
391
392 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
393 self.inner.shrink(reservation, shrink);
394 self.tracked_consumers
395 .lock()
396 .entry(reservation.consumer().id())
397 .and_modify(|tracked_consumer| {
398 tracked_consumer.shrink(shrink);
399 });
400 }
401
402 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
403 self.inner
404 .try_grow(reservation, additional)
405 .map_err(|e| match e {
406 DataFusionError::ResourcesExhausted(e) => {
407 DataFusionError::ResourcesExhausted(
409 provide_top_memory_consumers_to_error_msg(
410 e,
411 self.report_top(self.top.into()),
412 ),
413 )
414 }
415 _ => e,
416 })?;
417
418 self.tracked_consumers
419 .lock()
420 .entry(reservation.consumer().id())
421 .and_modify(|tracked_consumer| {
422 tracked_consumer.grow(additional);
423 });
424 Ok(())
425 }
426
427 fn reserved(&self) -> usize {
428 self.inner.reserved()
429 }
430
431 fn memory_limit(&self) -> MemoryLimit {
432 self.inner.memory_limit()
433 }
434}
435
436fn provide_top_memory_consumers_to_error_msg(
437 error_msg: String,
438 top_consumers: String,
439) -> String {
440 format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
441}
442
443#[cfg(test)]
444mod tests {
445 use super::*;
446 use insta::{allow_duplicates, assert_snapshot, Settings};
447 use std::sync::Arc;
448
449 fn make_settings() -> Settings {
450 let mut settings = Settings::clone_current();
451 settings.add_filter(
452 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
453 "$1#[ID](can spill: $2)",
454 );
455 settings
456 }
457
458 #[test]
459 fn test_fair() {
460 let pool = Arc::new(FairSpillPool::new(100)) as _;
461
462 let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
463 r1.grow(2000);
465 assert_eq!(pool.reserved(), 2000);
466
467 let mut r2 = MemoryConsumer::new("r2")
468 .with_can_spill(true)
469 .register(&pool);
470 r2.grow(2000);
472
473 assert_eq!(pool.reserved(), 4000);
474
475 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
476 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
477
478 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
479 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
480
481 r1.shrink(1990);
482 r2.shrink(2000);
483
484 assert_eq!(pool.reserved(), 10);
485
486 r1.try_grow(10).unwrap();
487 assert_eq!(pool.reserved(), 20);
488
489 r2.try_grow(80).unwrap();
491 assert_eq!(pool.reserved(), 100);
492
493 r2.shrink(70);
494
495 assert_eq!(r1.size(), 20);
496 assert_eq!(r2.size(), 10);
497 assert_eq!(pool.reserved(), 30);
498
499 let mut r3 = MemoryConsumer::new("r3")
500 .with_can_spill(true)
501 .register(&pool);
502
503 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
504 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
505
506 r2.free();
508 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
509 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
510
511 drop(r2);
513 assert_eq!(pool.reserved(), 20);
514 r3.try_grow(80).unwrap();
515
516 assert_eq!(pool.reserved(), 100);
517 r1.free();
518 assert_eq!(pool.reserved(), 80);
519
520 let mut r4 = MemoryConsumer::new("s4").register(&pool);
521 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
522 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
523 }
524
525 #[test]
526 fn test_tracked_consumers_pool() {
527 let setting = make_settings();
528 let _bound = setting.bind_to_scope();
529 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
530 GreedyMemoryPool::new(100),
531 NonZeroUsize::new(3).unwrap(),
532 ));
533
534 let mut r1 = MemoryConsumer::new("r1").register(&pool);
538 r1.grow(70);
539 r1.shrink(20);
540
541 let mut r2 = MemoryConsumer::new("r2").register(&pool);
543 r2.try_grow(15)
544 .expect("should succeed in memory allotment for r2");
545
546 let mut r3 = MemoryConsumer::new("r3").register(&pool);
548 r3.try_resize(25)
549 .expect("should succeed in memory allotment for r3");
550 r3.try_resize(20)
551 .expect("should succeed in memory allotment for r3");
552
553 let mut r4 = MemoryConsumer::new("r4").register(&pool);
556 r4.grow(10);
557
558 let mut r5 = MemoryConsumer::new("r5").register(&pool);
561 let res = r5.try_grow(150);
562 assert!(res.is_err());
563 let error = res.unwrap_err().strip_backtrace();
564 assert_snapshot!(error, @r"
565 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
566 r1#[ID](can spill: false) consumed 50.0 B,
567 r3#[ID](can spill: false) consumed 20.0 B,
568 r2#[ID](can spill: false) consumed 15.0 B.
569 Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
570 ");
571 }
572
573 #[test]
574 fn test_tracked_consumers_pool_register() {
575 let setting = make_settings();
576 let _bound = setting.bind_to_scope();
577 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
578 GreedyMemoryPool::new(100),
579 NonZeroUsize::new(3).unwrap(),
580 ));
581
582 let same_name = "foo";
583
584 let mut r0 = MemoryConsumer::new(same_name).register(&pool);
586 let res = r0.try_grow(150);
587 assert!(res.is_err());
588 let error = res.unwrap_err().strip_backtrace();
589 assert_snapshot!(error, @r"
590 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
591 foo#[ID](can spill: false) consumed 0.0 B.
592 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
593 ");
594
595 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
600 let mut r1 = new_consumer_same_name.register(&pool);
601 let res = r1.try_grow(150);
604 assert!(res.is_err());
605 let error = res.unwrap_err().strip_backtrace();
606 assert_snapshot!(error, @r"
607 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
608 foo#[ID](can spill: false) consumed 10.0 B,
609 foo#[ID](can spill: false) consumed 0.0 B.
610 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
611 ");
612
613 r1.grow(20);
615
616 let res = r1.try_grow(150);
617 assert!(res.is_err());
618 let error = res.unwrap_err().strip_backtrace();
619 assert_snapshot!(error, @r"
620 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
621 foo#[ID](can spill: false) consumed 20.0 B,
622 foo#[ID](can spill: false) consumed 10.0 B.
623 Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
624 ");
625
626 let consumer_with_same_name_but_different_hash =
629 MemoryConsumer::new(same_name).with_can_spill(true);
630 let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
631 let res = r2.try_grow(150);
632 assert!(res.is_err());
633 let error = res.unwrap_err().strip_backtrace();
634 assert_snapshot!(error, @r"
635 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
636 foo#[ID](can spill: false) consumed 20.0 B,
637 foo#[ID](can spill: false) consumed 10.0 B,
638 foo#[ID](can spill: true) consumed 0.0 B.
639 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
640 ");
641 }
642
643 #[test]
644 fn test_tracked_consumers_pool_deregister() {
645 fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
646 let setting = make_settings();
648 let _bound = setting.bind_to_scope();
649 let mut r0 = MemoryConsumer::new("r0").register(&pool);
650 r0.grow(10);
651 let r1_consumer = MemoryConsumer::new("r1");
652 let mut r1 = r1_consumer.register(&pool);
653 r1.grow(20);
654
655 let res = r0.try_grow(150);
656 assert!(res.is_err());
657 let error = res.unwrap_err().strip_backtrace();
658 allow_duplicates!(assert_snapshot!(error, @r"
659 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
660 r1#[ID](can spill: false) consumed 20.0 B,
661 r0#[ID](can spill: false) consumed 10.0 B.
662 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
663 "));
664
665 drop(r1);
668 let res = r0.try_grow(150);
669 assert!(res.is_err());
670 let error = res.unwrap_err().strip_backtrace();
671 allow_duplicates!(assert_snapshot!(error, @r"
672 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
673 r0#[ID](can spill: false) consumed 10.0 B.
674 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
675 "));
676
677 let res = r0.try_grow(150);
680 assert!(res.is_err());
681 let error = res.unwrap_err().strip_backtrace();
682 allow_duplicates!(assert_snapshot!(error, @r"
683 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
684 r0#[ID](can spill: false) consumed 10.0 B.
685 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
686 "));
687
688 let res = r0.try_grow(150);
691 assert!(res.is_err());
692 let error = res.unwrap_err().strip_backtrace();
693 allow_duplicates!(assert_snapshot!(error, @r"
694 Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
695 r0#[ID](can spill: false) consumed 10.0 B.
696 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
697 "));
698 }
699
700 let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
701 FairSpillPool::new(100),
702 NonZeroUsize::new(3).unwrap(),
703 ));
704 test_per_pool_type(tracked_spill_pool);
705
706 let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
707 GreedyMemoryPool::new(100),
708 NonZeroUsize::new(3).unwrap(),
709 ));
710 test_per_pool_type(tracked_greedy_pool);
711 }
712
713 #[test]
714 fn test_tracked_consumers_pool_use_beyond_errors() {
715 let setting = make_settings();
716 let _bound = setting.bind_to_scope();
717 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
718 Arc::new(TrackConsumersPool::new(
719 GreedyMemoryPool::new(100),
720 NonZeroUsize::new(3).unwrap(),
721 ));
722 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
723 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
724 .unwrap();
725 let mut r1 = MemoryConsumer::new("r1").register(&pool);
727 r1.grow(20);
728 let mut r2 = MemoryConsumer::new("r2").register(&pool);
730 r2.grow(15);
731 let mut r3 = MemoryConsumer::new("r3").register(&pool);
733 r3.grow(45);
734
735 let downcasted = upcasted
736 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
737 .unwrap();
738
739 let res = downcasted.report_top(2);
741 assert_snapshot!(res, @r"
742 r3#[ID](can spill: false) consumed 45.0 B,
743 r1#[ID](can spill: false) consumed 20.0 B.
744 ");
745 }
746}