1use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
19use datafusion_common::HashMap;
20use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
21use log::debug;
22use parking_lot::Mutex;
23use std::{
24 num::NonZeroUsize,
25 sync::atomic::{AtomicU64, AtomicUsize, Ordering},
26};
27
28#[derive(Debug, Default)]
30pub struct UnboundedMemoryPool {
31 used: AtomicUsize,
32}
33
34impl MemoryPool for UnboundedMemoryPool {
35 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
36 self.used.fetch_add(additional, Ordering::Relaxed);
37 }
38
39 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
40 self.used.fetch_sub(shrink, Ordering::Relaxed);
41 }
42
43 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
44 self.grow(reservation, additional);
45 Ok(())
46 }
47
48 fn reserved(&self) -> usize {
49 self.used.load(Ordering::Relaxed)
50 }
51}
52
53#[derive(Debug)]
59pub struct GreedyMemoryPool {
60 pool_size: usize,
61 used: AtomicUsize,
62}
63
64impl GreedyMemoryPool {
65 pub fn new(pool_size: usize) -> Self {
67 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
68 Self {
69 pool_size,
70 used: AtomicUsize::new(0),
71 }
72 }
73}
74
75impl MemoryPool for GreedyMemoryPool {
76 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
77 self.used.fetch_add(additional, Ordering::Relaxed);
78 }
79
80 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
81 self.used.fetch_sub(shrink, Ordering::Relaxed);
82 }
83
84 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
85 self.used
86 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
87 let new_used = used + additional;
88 (new_used <= self.pool_size).then_some(new_used)
89 })
90 .map_err(|used| {
91 insufficient_capacity_err(
92 reservation,
93 additional,
94 self.pool_size.saturating_sub(used),
95 )
96 })?;
97 Ok(())
98 }
99
100 fn reserved(&self) -> usize {
101 self.used.load(Ordering::Relaxed)
102 }
103}
104
105#[derive(Debug)]
128pub struct FairSpillPool {
129 pool_size: usize,
131
132 state: Mutex<FairSpillPoolState>,
133}
134
135#[derive(Debug)]
136struct FairSpillPoolState {
137 num_spill: usize,
139
140 spillable: usize,
142
143 unspillable: usize,
145}
146
147impl FairSpillPool {
148 pub fn new(pool_size: usize) -> Self {
150 debug!("Created new FairSpillPool(pool_size={pool_size})");
151 Self {
152 pool_size,
153 state: Mutex::new(FairSpillPoolState {
154 num_spill: 0,
155 spillable: 0,
156 unspillable: 0,
157 }),
158 }
159 }
160}
161
162impl MemoryPool for FairSpillPool {
163 fn register(&self, consumer: &MemoryConsumer) {
164 if consumer.can_spill {
165 self.state.lock().num_spill += 1;
166 }
167 }
168
169 fn unregister(&self, consumer: &MemoryConsumer) {
170 if consumer.can_spill {
171 let mut state = self.state.lock();
172 state.num_spill = state.num_spill.checked_sub(1).unwrap();
173 }
174 }
175
176 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
177 let mut state = self.state.lock();
178 match reservation.registration.consumer.can_spill {
179 true => state.spillable += additional,
180 false => state.unspillable += additional,
181 }
182 }
183
184 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
185 let mut state = self.state.lock();
186 match reservation.registration.consumer.can_spill {
187 true => state.spillable -= shrink,
188 false => state.unspillable -= shrink,
189 }
190 }
191
192 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
193 let mut state = self.state.lock();
194
195 match reservation.registration.consumer.can_spill {
196 true => {
197 let spill_available = self.pool_size.saturating_sub(state.unspillable);
199
200 let available = spill_available
202 .checked_div(state.num_spill)
203 .unwrap_or(spill_available);
204
205 if reservation.size + additional > available {
206 return Err(insufficient_capacity_err(
207 reservation,
208 additional,
209 available,
210 ));
211 }
212 state.spillable += additional;
213 }
214 false => {
215 let available = self
216 .pool_size
217 .saturating_sub(state.unspillable + state.spillable);
218
219 if available < additional {
220 return Err(insufficient_capacity_err(
221 reservation,
222 additional,
223 available,
224 ));
225 }
226 state.unspillable += additional;
227 }
228 }
229 Ok(())
230 }
231
232 fn reserved(&self) -> usize {
233 let state = self.state.lock();
234 state.spillable + state.unspillable
235 }
236}
237
238#[inline(always)]
244fn insufficient_capacity_err(
245 reservation: &MemoryReservation,
246 additional: usize,
247 available: usize,
248) -> DataFusionError {
249 resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated for this reservation - {} bytes remain available for the total pool", additional, reservation.registration.consumer.name, reservation.size, available)
250}
251
252#[derive(Debug)]
261pub struct TrackConsumersPool<I> {
262 inner: I,
263 top: NonZeroUsize,
264 tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
265}
266
267impl<I: MemoryPool> TrackConsumersPool<I> {
268 pub fn new(inner: I, top: NonZeroUsize) -> Self {
273 Self {
274 inner,
275 top,
276 tracked_consumers: Default::default(),
277 }
278 }
279
280 fn has_multiple_consumers(&self, name: &String) -> bool {
285 let consumer = MemoryConsumer::new(name);
286 let consumer_with_spill = consumer.clone().with_can_spill(true);
287 let guard = self.tracked_consumers.lock();
288 guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill)
289 }
290
291 pub fn report_top(&self, top: usize) -> String {
293 let mut consumers = self
294 .tracked_consumers
295 .lock()
296 .iter()
297 .map(|(consumer, reserved)| {
298 (
299 (consumer.name().to_owned(), consumer.can_spill()),
300 reserved.load(Ordering::Acquire),
301 )
302 })
303 .collect::<Vec<_>>();
304 consumers.sort_by(|a, b| b.1.cmp(&a.1)); consumers[0..std::cmp::min(top, consumers.len())]
307 .iter()
308 .map(|((name, can_spill), size)| {
309 if self.has_multiple_consumers(name) {
310 format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size)
311 } else {
312 format!("{name} consumed {:?} bytes", size)
313 }
314 })
315 .collect::<Vec<_>>()
316 .join(", ")
317 }
318}
319
320impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
321 fn register(&self, consumer: &MemoryConsumer) {
322 self.inner.register(consumer);
323
324 let mut guard = self.tracked_consumers.lock();
325 if let Some(already_reserved) = guard.insert(consumer.clone(), Default::default())
326 {
327 guard.entry_ref(consumer).and_modify(|bytes| {
328 bytes.fetch_add(
329 already_reserved.load(Ordering::Acquire),
330 Ordering::AcqRel,
331 );
332 });
333 }
334 }
335
336 fn unregister(&self, consumer: &MemoryConsumer) {
337 self.inner.unregister(consumer);
338 self.tracked_consumers.lock().remove(consumer);
339 }
340
341 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
342 self.inner.grow(reservation, additional);
343 self.tracked_consumers
344 .lock()
345 .entry_ref(reservation.consumer())
346 .and_modify(|bytes| {
347 bytes.fetch_add(additional as u64, Ordering::AcqRel);
348 });
349 }
350
351 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
352 self.inner.shrink(reservation, shrink);
353 self.tracked_consumers
354 .lock()
355 .entry_ref(reservation.consumer())
356 .and_modify(|bytes| {
357 bytes.fetch_sub(shrink as u64, Ordering::AcqRel);
358 });
359 }
360
361 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
362 self.inner
363 .try_grow(reservation, additional)
364 .map_err(|e| match e {
365 DataFusionError::ResourcesExhausted(e) => {
366 DataFusionError::ResourcesExhausted(
368 provide_top_memory_consumers_to_error_msg(
369 e,
370 self.report_top(self.top.into()),
371 ),
372 )
373 }
374 _ => e,
375 })?;
376
377 self.tracked_consumers
378 .lock()
379 .entry_ref(reservation.consumer())
380 .and_modify(|bytes| {
381 bytes.fetch_add(additional as u64, Ordering::AcqRel);
382 });
383 Ok(())
384 }
385
386 fn reserved(&self) -> usize {
387 self.inner.reserved()
388 }
389}
390
391fn provide_top_memory_consumers_to_error_msg(
392 error_msg: String,
393 top_consumers: String,
394) -> String {
395 format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg)
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use std::sync::Arc;
402
403 #[test]
404 fn test_fair() {
405 let pool = Arc::new(FairSpillPool::new(100)) as _;
406
407 let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
408 r1.grow(2000);
410 assert_eq!(pool.reserved(), 2000);
411
412 let mut r2 = MemoryConsumer::new("r2")
413 .with_can_spill(true)
414 .register(&pool);
415 r2.grow(2000);
417
418 assert_eq!(pool.reserved(), 4000);
419
420 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
421 assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated for this reservation - 0 bytes remain available for the total pool");
422
423 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
424 assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated for this reservation - 0 bytes remain available for the total pool");
425
426 r1.shrink(1990);
427 r2.shrink(2000);
428
429 assert_eq!(pool.reserved(), 10);
430
431 r1.try_grow(10).unwrap();
432 assert_eq!(pool.reserved(), 20);
433
434 r2.try_grow(80).unwrap();
436 assert_eq!(pool.reserved(), 100);
437
438 r2.shrink(70);
439
440 assert_eq!(r1.size(), 20);
441 assert_eq!(r2.size(), 10);
442 assert_eq!(pool.reserved(), 30);
443
444 let mut r3 = MemoryConsumer::new("r3")
445 .with_can_spill(true)
446 .register(&pool);
447
448 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
449 assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated for this reservation - 40 bytes remain available for the total pool");
450
451 r2.free();
453 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
454 assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated for this reservation - 40 bytes remain available for the total pool");
455
456 drop(r2);
458 assert_eq!(pool.reserved(), 20);
459 r3.try_grow(80).unwrap();
460
461 assert_eq!(pool.reserved(), 100);
462 r1.free();
463 assert_eq!(pool.reserved(), 80);
464
465 let mut r4 = MemoryConsumer::new("s4").register(&pool);
466 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
467 assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated for this reservation - 20 bytes remain available for the total pool");
468 }
469
470 #[test]
471 fn test_tracked_consumers_pool() {
472 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
473 GreedyMemoryPool::new(100),
474 NonZeroUsize::new(3).unwrap(),
475 ));
476
477 let mut r1 = MemoryConsumer::new("r1").register(&pool);
481 r1.grow(70);
482 r1.shrink(20);
483
484 let mut r2 = MemoryConsumer::new("r2").register(&pool);
486 r2.try_grow(15)
487 .expect("should succeed in memory allotment for r2");
488
489 let mut r3 = MemoryConsumer::new("r3").register(&pool);
491 r3.try_resize(25)
492 .expect("should succeed in memory allotment for r3");
493 r3.try_resize(20)
494 .expect("should succeed in memory allotment for r3");
495
496 let mut r4 = MemoryConsumer::new("r4").register(&pool);
499 r4.grow(10);
500
501 let mut r5 = MemoryConsumer::new("r5").register(&pool);
504 let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
505 let res = r5.try_grow(150);
506 assert!(
507 matches!(
508 &res,
509 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
510 ),
511 "should provide list of top memory consumers, instead found {:?}",
512 res
513 );
514 }
515
516 #[test]
517 fn test_tracked_consumers_pool_register() {
518 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
519 GreedyMemoryPool::new(100),
520 NonZeroUsize::new(3).unwrap(),
521 ));
522
523 let same_name = "foo";
524
525 let mut r0 = MemoryConsumer::new(same_name).register(&pool);
527 let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
528 let res = r0.try_grow(150);
529 assert!(
530 matches!(
531 &res,
532 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
533 ),
534 "should provide proper error when no reservations have been made yet, instead found {:?}", res
535 );
536
537 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
543 let mut r1 = new_consumer_same_name.register(&pool);
544 let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
547 let res = r1.try_grow(150);
548 assert!(
549 matches!(
550 &res,
551 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
552 ),
553 "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90), instead found {:?}", res
554 );
555
556 r1.grow(20);
558 let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
559 let res = r1.try_grow(150);
560 assert!(
561 matches!(
562 &res,
563 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
564 ),
565 "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70), instead found {:?}", res
566 );
567
568 let consumer_with_same_name_but_different_hash =
571 MemoryConsumer::new(same_name).with_can_spill(true);
572 let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
573 let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
574 let res = r2.try_grow(150);
575 assert!(
576 matches!(
577 &res,
578 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
579 ),
580 "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70), instead found {:?}", res
581 );
582 }
583
584 #[test]
585 fn test_tracked_consumers_pool_deregister() {
586 fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
587 let mut r0 = MemoryConsumer::new("r0").register(&pool);
589 r0.grow(10);
590 let r1_consumer = MemoryConsumer::new("r1");
591 let mut r1 = r1_consumer.clone().register(&pool);
592 r1.grow(20);
593 let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
594 let res = r0.try_grow(150);
595 assert!(
596 matches!(
597 &res,
598 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
599 ),
600 "should provide proper error with both consumers, instead found {:?}",
601 res
602 );
603
604 pool.unregister(&r1_consumer);
607 let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes";
608 let res = r0.try_grow(150);
609 assert!(
610 matches!(
611 &res,
612 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_consumers)
613 ),
614 "should provide proper error with only 1 consumer left registered, instead found {:?}", res
615 );
616
617 let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
620 let res = r0.try_grow(150);
621 assert!(
622 matches!(
623 &res,
624 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_70_available)
625 ),
626 "should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {:?}", res
627 );
628
629 r1.free();
632 let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
633 let res = r0.try_grow(150);
634 assert!(
635 matches!(
636 &res,
637 Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available)
638 ),
639 "should correctly account the total bytes after reservation is free, instead found {:?}", res
640 );
641 }
642
643 let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
644 FairSpillPool::new(100),
645 NonZeroUsize::new(3).unwrap(),
646 ));
647 test_per_pool_type(tracked_spill_pool);
648
649 let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
650 GreedyMemoryPool::new(100),
651 NonZeroUsize::new(3).unwrap(),
652 ));
653 test_per_pool_type(tracked_greedy_pool);
654 }
655
656 #[test]
657 fn test_tracked_consumers_pool_use_beyond_errors() {
658 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
659 Arc::new(TrackConsumersPool::new(
660 GreedyMemoryPool::new(100),
661 NonZeroUsize::new(3).unwrap(),
662 ));
663 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
664 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
665 .unwrap();
666 let mut r1 = MemoryConsumer::new("r1").register(&pool);
668 r1.grow(20);
669 let mut r2 = MemoryConsumer::new("r2").register(&pool);
671 r2.grow(15);
672 let mut r3 = MemoryConsumer::new("r3").register(&pool);
674 r3.grow(45);
675
676 let downcasted = upcasted
677 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
678 .unwrap();
679
680 let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes";
682 let res = downcasted.report_top(2);
683 assert_eq!(
684 res, expected,
685 "should provide list of top memory consumers, instead found {:?}",
686 res
687 );
688 }
689}