1use crate::memory_pool::{
19 MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
23use log::debug;
24use parking_lot::Mutex;
25use std::fmt::{Display, Formatter};
26use std::{
27 num::NonZeroUsize,
28 sync::atomic::{AtomicUsize, Ordering},
29};
30
31#[derive(Debug, Default)]
33pub struct UnboundedMemoryPool {
34 used: AtomicUsize,
35}
36
37impl MemoryPool for UnboundedMemoryPool {
38 fn name(&self) -> &str {
39 "unbounded"
40 }
41
42 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
43 self.used.fetch_add(additional, Ordering::Relaxed);
44 }
45
46 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
47 self.used.fetch_sub(shrink, Ordering::Relaxed);
48 }
49
50 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
51 self.grow(reservation, additional);
52 Ok(())
53 }
54
55 fn reserved(&self) -> usize {
56 self.used.load(Ordering::Relaxed)
57 }
58
59 fn memory_limit(&self) -> MemoryLimit {
60 MemoryLimit::Infinite
61 }
62}
63
64impl Display for UnboundedMemoryPool {
65 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66 let used = self.used.load(Ordering::Relaxed);
67 write!(f, "{}(used: {})", &self.name(), human_readable_size(used))
68 }
69}
70
71#[derive(Debug)]
77pub struct GreedyMemoryPool {
78 pool_size: usize,
79 used: AtomicUsize,
80}
81
82impl GreedyMemoryPool {
83 pub fn new(pool_size: usize) -> Self {
85 debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
86 Self {
87 pool_size,
88 used: AtomicUsize::new(0),
89 }
90 }
91}
92
93impl MemoryPool for GreedyMemoryPool {
94 fn name(&self) -> &str {
95 "greedy"
96 }
97
98 fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
99 self.used.fetch_add(additional, Ordering::Relaxed);
100 }
101
102 fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
103 self.used.fetch_sub(shrink, Ordering::Relaxed);
104 }
105
106 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
107 self.used
108 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
109 let new_used = used + additional;
110 (new_used <= self.pool_size).then_some(new_used)
111 })
112 .map_err(|used| {
113 insufficient_capacity_err(
114 reservation,
115 additional,
116 self.pool_size.saturating_sub(used),
117 self,
118 )
119 })?;
120 Ok(())
121 }
122
123 fn reserved(&self) -> usize {
124 self.used.load(Ordering::Relaxed)
125 }
126
127 fn memory_limit(&self) -> MemoryLimit {
128 MemoryLimit::Finite(self.pool_size)
129 }
130}
131
132impl Display for GreedyMemoryPool {
133 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134 let used = self.used.load(Ordering::Relaxed);
135 write!(
136 f,
137 "{}(used: {}, pool_size: {})",
138 &self.name(),
139 human_readable_size(used),
140 human_readable_size(self.pool_size)
141 )
142 }
143}
144
145#[derive(Debug)]
168pub struct FairSpillPool {
169 pool_size: usize,
171
172 state: Mutex<FairSpillPoolState>,
173}
174
175#[derive(Debug)]
176struct FairSpillPoolState {
177 num_spill: usize,
179
180 spillable: usize,
182
183 unspillable: usize,
185}
186
187impl FairSpillPool {
188 pub fn new(pool_size: usize) -> Self {
190 debug!("Created new FairSpillPool(pool_size={pool_size})");
191 Self {
192 pool_size,
193 state: Mutex::new(FairSpillPoolState {
194 num_spill: 0,
195 spillable: 0,
196 unspillable: 0,
197 }),
198 }
199 }
200}
201
202impl MemoryPool for FairSpillPool {
203 fn name(&self) -> &str {
204 "fair"
205 }
206
207 fn register(&self, consumer: &MemoryConsumer) {
208 if consumer.can_spill {
209 self.state.lock().num_spill += 1;
210 }
211 }
212
213 fn unregister(&self, consumer: &MemoryConsumer) {
214 if consumer.can_spill {
215 let mut state = self.state.lock();
216 state.num_spill = state.num_spill.checked_sub(1).unwrap();
217 }
218 }
219
220 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
221 let mut state = self.state.lock();
222 match reservation.registration.consumer.can_spill {
223 true => state.spillable += additional,
224 false => state.unspillable += additional,
225 }
226 }
227
228 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
229 let mut state = self.state.lock();
230 match reservation.registration.consumer.can_spill {
231 true => state.spillable -= shrink,
232 false => state.unspillable -= shrink,
233 }
234 }
235
236 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
237 let mut state = self.state.lock();
238
239 match reservation.registration.consumer.can_spill {
240 true => {
241 let spill_available = self.pool_size.saturating_sub(state.unspillable);
243
244 let available = spill_available
246 .checked_div(state.num_spill)
247 .unwrap_or(spill_available);
248
249 if reservation.size() + additional > available {
250 return Err(insufficient_capacity_err(
251 reservation,
252 additional,
253 available,
254 self,
255 ));
256 }
257 state.spillable += additional;
258 }
259 false => {
260 let available = self
261 .pool_size
262 .saturating_sub(state.unspillable + state.spillable);
263
264 if available < additional {
265 return Err(insufficient_capacity_err(
266 reservation,
267 additional,
268 available,
269 self,
270 ));
271 }
272 state.unspillable += additional;
273 }
274 }
275 Ok(())
276 }
277
278 fn reserved(&self) -> usize {
279 let state = self.state.lock();
280 state.spillable + state.unspillable
281 }
282
283 fn memory_limit(&self) -> MemoryLimit {
284 MemoryLimit::Finite(self.pool_size)
285 }
286}
287
288impl Display for FairSpillPool {
289 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
290 write!(
291 f,
292 "{}(pool_size: {})",
293 &self.name(),
294 human_readable_size(self.pool_size),
295 )
296 }
297}
298
299#[inline(always)]
305fn insufficient_capacity_err(
306 reservation: &MemoryReservation,
307 additional: usize,
308 available: usize,
309 pool: &impl MemoryPool,
310) -> DataFusionError {
311 resources_datafusion_err!(
312 "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total memory pool: {}",
313 human_readable_size(additional),
314 reservation.registration.consumer.name,
315 human_readable_size(reservation.size()),
316 human_readable_size(available),
317 pool
318 )
319}
320
321#[derive(Debug)]
322struct TrackedConsumer {
323 name: String,
324 can_spill: bool,
325 reserved: AtomicUsize,
326 peak: AtomicUsize,
327}
328
329impl TrackedConsumer {
330 fn reserved(&self) -> usize {
332 self.reserved.load(Ordering::Relaxed)
333 }
334
335 fn peak(&self) -> usize {
337 self.peak.load(Ordering::Relaxed)
338 }
339
340 fn grow(&self, additional: usize) {
343 self.reserved.fetch_add(additional, Ordering::Relaxed);
344 self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
345 }
346
347 fn shrink(&self, shrink: usize) {
350 self.reserved.fetch_sub(shrink, Ordering::Relaxed);
351 }
352}
353
354#[derive(Debug, Clone)]
358pub struct MemoryConsumerMetrics {
359 pub name: String,
361 pub can_spill: bool,
363 pub reserved: usize,
365 pub peak: usize,
367}
368
369impl From<&TrackedConsumer> for MemoryConsumerMetrics {
370 fn from(tracked: &TrackedConsumer) -> Self {
371 Self {
372 name: tracked.name.clone(),
373 can_spill: tracked.can_spill,
374 reserved: tracked.reserved(),
375 peak: tracked.peak(),
376 }
377 }
378}
379
380#[derive(Debug)]
405pub struct TrackConsumersPool<I> {
406 inner: I,
408 top: NonZeroUsize,
410 tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
412}
413
414impl<I: MemoryPool> Display for TrackConsumersPool<I> {
415 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
416 write!(
417 f,
418 "{}(inner_pool: {}, num_of_top_consumers: {})",
419 &self.name(),
420 &self.inner,
421 &self.top,
422 )
423 }
424}
425
426impl<I: MemoryPool> TrackConsumersPool<I> {
427 pub fn new(inner: I, top: NonZeroUsize) -> Self {
464 Self {
465 inner,
466 top,
467 tracked_consumers: Default::default(),
468 }
469 }
470
471 pub fn inner(&self) -> &I {
473 &self.inner
474 }
475
476 pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
478 self.tracked_consumers
479 .lock()
480 .values()
481 .map(Into::into)
482 .collect()
483 }
484
485 pub fn report_top(&self, top: usize) -> String {
487 let mut consumers = self
488 .tracked_consumers
489 .lock()
490 .iter()
491 .map(|(consumer_id, tracked_consumer)| {
492 (
493 (
494 *consumer_id,
495 tracked_consumer.name.to_owned(),
496 tracked_consumer.can_spill,
497 tracked_consumer.peak(),
498 ),
499 tracked_consumer.reserved(),
500 )
501 })
502 .collect::<Vec<_>>();
503 consumers.sort_by_key(|consumer| std::cmp::Reverse(consumer.1));
504
505 consumers[0..std::cmp::min(top, consumers.len())]
506 .iter()
507 .map(|((id, name, can_spill, peak), size)| {
508 format!(
509 " {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
510 human_readable_size(*size),
511 human_readable_size(*peak),
512 )
513 })
514 .collect::<Vec<_>>()
515 .join(",\n")
516 + "."
517 }
518}
519
520impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
521 fn name(&self) -> &str {
522 "track_consumers"
523 }
524
525 fn register(&self, consumer: &MemoryConsumer) {
526 self.inner.register(consumer);
527
528 let mut guard = self.tracked_consumers.lock();
529 let existing = guard.insert(
530 consumer.id(),
531 TrackedConsumer {
532 name: consumer.name().to_string(),
533 can_spill: consumer.can_spill(),
534 reserved: Default::default(),
535 peak: Default::default(),
536 },
537 );
538
539 debug_assert!(
540 existing.is_none(),
541 "Registered was called twice on the same consumer"
542 );
543 }
544
545 fn unregister(&self, consumer: &MemoryConsumer) {
546 self.inner.unregister(consumer);
547 self.tracked_consumers.lock().remove(&consumer.id());
548 }
549
550 fn grow(&self, reservation: &MemoryReservation, additional: usize) {
551 self.inner.grow(reservation, additional);
552 self.tracked_consumers
553 .lock()
554 .entry(reservation.consumer().id())
555 .and_modify(|tracked_consumer| {
556 tracked_consumer.grow(additional);
557 });
558 }
559
560 fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
561 self.inner.shrink(reservation, shrink);
562 self.tracked_consumers
563 .lock()
564 .entry(reservation.consumer().id())
565 .and_modify(|tracked_consumer| {
566 tracked_consumer.shrink(shrink);
567 });
568 }
569
570 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
571 self.inner
572 .try_grow(reservation, additional)
573 .map_err(|e| match e {
574 DataFusionError::ResourcesExhausted(e) => {
575 DataFusionError::ResourcesExhausted(
577 provide_top_memory_consumers_to_error_msg(
578 &reservation.consumer().name,
579 &e,
580 &self.report_top(self.top.into()),
581 ),
582 )
583 }
584 _ => e,
585 })?;
586
587 self.tracked_consumers
588 .lock()
589 .entry(reservation.consumer().id())
590 .and_modify(|tracked_consumer| {
591 tracked_consumer.grow(additional);
592 });
593 Ok(())
594 }
595
596 fn reserved(&self) -> usize {
597 self.inner.reserved()
598 }
599
600 fn memory_limit(&self) -> MemoryLimit {
601 self.inner.memory_limit()
602 }
603}
604
605fn provide_top_memory_consumers_to_error_msg(
606 consumer_name: &str,
607 error_msg: &str,
608 top_consumers: &str,
609) -> String {
610 format!(
611 "Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}"
612 )
613}
614
615#[cfg(test)]
616mod tests {
617 use super::*;
618 use insta::{Settings, allow_duplicates, assert_snapshot, with_settings};
619 use std::sync::Arc;
620
621 fn make_settings() -> Settings {
622 let mut settings = Settings::clone_current();
623 settings.add_filter(
624 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
625 "$1#[ID](can spill: $2)",
626 );
627 settings
628 }
629
630 #[test]
631 fn test_fair() {
632 let pool = Arc::new(FairSpillPool::new(100)) as _;
633
634 let r1 = MemoryConsumer::new("unspillable").register(&pool);
635 r1.grow(2000);
637 assert_eq!(pool.reserved(), 2000);
638
639 let r2 = MemoryConsumer::new("r2")
640 .with_can_spill(true)
641 .register(&pool);
642 r2.grow(2000);
644
645 assert_eq!(pool.reserved(), 4000);
646
647 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
648 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
649
650 let err = r2.try_grow(1).unwrap_err().strip_backtrace();
651 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
652
653 r1.shrink(1990);
654 r2.shrink(2000);
655
656 assert_eq!(pool.reserved(), 10);
657
658 r1.try_grow(10).unwrap();
659 assert_eq!(pool.reserved(), 20);
660
661 r2.try_grow(80).unwrap();
663 assert_eq!(pool.reserved(), 100);
664
665 r2.shrink(70);
666
667 assert_eq!(r1.size(), 20);
668 assert_eq!(r2.size(), 10);
669 assert_eq!(pool.reserved(), 30);
670
671 let r3 = MemoryConsumer::new("r3")
672 .with_can_spill(true)
673 .register(&pool);
674
675 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
676 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
677
678 r2.free();
680 let err = r3.try_grow(70).unwrap_err().strip_backtrace();
681 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
682
683 drop(r2);
685 assert_eq!(pool.reserved(), 20);
686 r3.try_grow(80).unwrap();
687
688 assert_eq!(pool.reserved(), 100);
689 r1.free();
690 assert_eq!(pool.reserved(), 80);
691
692 let r4 = MemoryConsumer::new("s4").register(&pool);
693 let err = r4.try_grow(30).unwrap_err().strip_backtrace();
694 assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
695 }
696
697 #[test]
698 fn test_tracked_consumers_pool() {
699 let setting = make_settings();
700 let _bound = setting.bind_to_scope();
701 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
702 GreedyMemoryPool::new(100),
703 NonZeroUsize::new(3).unwrap(),
704 ));
705
706 let r1 = MemoryConsumer::new("r1").register(&pool);
710 r1.grow(50);
711 r1.grow(20);
712 r1.shrink(20);
713
714 let r2 = MemoryConsumer::new("r2").register(&pool);
716 r2.try_grow(15)
717 .expect("should succeed in memory allotment for r2");
718
719 let r3 = MemoryConsumer::new("r3").register(&pool);
721 r3.try_resize(25)
722 .expect("should succeed in memory allotment for r3");
723 r3.try_resize(20)
724 .expect("should succeed in memory allotment for r3");
725
726 let r4 = MemoryConsumer::new("r4").register(&pool);
729 r4.grow(10);
730
731 let r5 = MemoryConsumer::new("r5").register(&pool);
734 let res = r5.try_grow(150);
735 assert!(res.is_err());
736 let error = res.unwrap_err().strip_backtrace();
737 assert_snapshot!(error, @r"
738 Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
739 r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
740 r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
741 r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
742 Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total memory pool: greedy(used: 95.0 B, pool_size: 100.0 B)
743 ");
744 }
745
746 #[test]
747 fn test_tracked_consumers_pool_register() {
748 let setting = make_settings();
749 let _bound = setting.bind_to_scope();
750 let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
751 GreedyMemoryPool::new(100),
752 NonZeroUsize::new(3).unwrap(),
753 ));
754
755 let same_name = "foo";
756
757 let r0 = MemoryConsumer::new(same_name).register(&pool);
759 let res = r0.try_grow(150);
760 assert!(res.is_err());
761 let error = res.unwrap_err().strip_backtrace();
762 assert_snapshot!(error, @r"
763 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
764 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
765 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total memory pool: greedy(used: 0.0 B, pool_size: 100.0 B)
766 ");
767
768 r0.grow(10); let new_consumer_same_name = MemoryConsumer::new(same_name);
773 let r1 = new_consumer_same_name.register(&pool);
774 let res = r1.try_grow(150);
777 assert!(res.is_err());
778 let error = res.unwrap_err().strip_backtrace();
779 assert_snapshot!(error, @r"
780 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
781 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
782 foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
783 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: greedy(used: 10.0 B, pool_size: 100.0 B)
784 ");
785
786 r1.grow(20);
788
789 let res = r1.try_grow(150);
790 assert!(res.is_err());
791 let error = res.unwrap_err().strip_backtrace();
792 assert_snapshot!(error, @r"
793 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
794 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
795 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
796 Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
797 ");
798
799 let consumer_with_same_name_but_different_hash =
802 MemoryConsumer::new(same_name).with_can_spill(true);
803 let r2 = consumer_with_same_name_but_different_hash.register(&pool);
804 let res = r2.try_grow(150);
805 assert!(res.is_err());
806 let error = res.unwrap_err().strip_backtrace();
807 assert_snapshot!(error, @r"
808 Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
809 foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
810 foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
811 foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
812 Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
813 ");
814 }
815
816 #[test]
817 fn test_tracked_consumers_pool_deregister() {
818 fn test_per_pool_type<P: MemoryPool + 'static>(pool: Arc<TrackConsumersPool<P>>) {
819 with_settings!({
822 snapshot_suffix => pool.inner().name().to_string(),
823 filters => vec![
824 (
825 r"([^\s]+)\#\d+\(can spill: (true|false)\)",
826 "$1#[ID](can spill: $2)",
827 ),
828 (
829 r"for the total memory pool: [^\n]+",
830 "for the total memory pool: [INNER_POOL]",
831 ),
832 ],
833 }, {
834 let memory_pool: Arc<dyn MemoryPool> = Arc::<TrackConsumersPool<P>>::clone(&pool);
835 let r0 = MemoryConsumer::new("r0").register(&memory_pool);
836 r0.grow(10);
837 let r1 = MemoryConsumer::new("r1").register(&memory_pool);
838 r1.grow(20);
839
840 let error = r0.try_grow(150).unwrap_err().strip_backtrace();
842 assert_snapshot!(error, @r"
843 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
844 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
845 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
846 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL]
847 ");
848
849 drop(r1);
851 let error = r0.try_grow(150).unwrap_err().strip_backtrace();
852 assert_snapshot!(error, @r"
853 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
854 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
855 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
856 ");
857
858 let error = r0.try_grow(150).unwrap_err().strip_backtrace();
861 assert_snapshot!(error, @r"
862 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
863 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
864 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
865 ");
866
867 let error = r0.try_grow(150).unwrap_err().strip_backtrace();
870 assert_snapshot!(error, @r"
871 Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
872 r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
873 Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
874 ");
875 }
876 );
877 }
878
879 allow_duplicates! {
880 let tracked_spill_pool = Arc::new(TrackConsumersPool::new(
881 FairSpillPool::new(100),
882 NonZeroUsize::new(3).unwrap(),
883 ));
884 test_per_pool_type(tracked_spill_pool);
885
886 let tracked_greedy_pool = Arc::new(TrackConsumersPool::new(
887 GreedyMemoryPool::new(100),
888 NonZeroUsize::new(3).unwrap(),
889 ));
890 test_per_pool_type(tracked_greedy_pool);
891 }
892 }
893
894 #[test]
895 fn test_track_consumers_pool_metrics() {
896 let track_consumers_pool = Arc::new(TrackConsumersPool::new(
897 GreedyMemoryPool::new(1000),
898 NonZeroUsize::new(3).unwrap(),
899 ));
900 let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;
901
902 assert!(track_consumers_pool.metrics().is_empty());
904
905 let r1 = MemoryConsumer::new("spilling")
907 .with_can_spill(true)
908 .register(&memory_pool);
909 let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
910
911 r1.grow(100);
913 r1.grow(50);
914 r1.shrink(50); r2.grow(200); let mut metrics = track_consumers_pool.metrics();
919 metrics.sort_by_key(|m| m.name.clone());
920
921 assert_eq!(metrics.len(), 2);
922
923 let m_non = &metrics[0];
924 assert_eq!(m_non.name, "non-spilling");
925 assert!(!m_non.can_spill);
926 assert_eq!(m_non.reserved, 200);
927 assert_eq!(m_non.peak, 200);
928
929 let m_spill = &metrics[1];
930 assert_eq!(m_spill.name, "spilling");
931 assert!(m_spill.can_spill);
932 assert_eq!(m_spill.reserved, 100);
933 assert_eq!(m_spill.peak, 150);
934
935 drop(r2);
937 let metrics = track_consumers_pool.metrics();
938 assert_eq!(metrics.len(), 1);
939 assert_eq!(metrics[0].name, "spilling");
940 }
941
942 #[test]
943 fn test_tracked_consumers_pool_use_beyond_errors() {
944 let setting = make_settings();
945 let _bound = setting.bind_to_scope();
946 let upcasted: Arc<dyn std::any::Any + Send + Sync> =
947 Arc::new(TrackConsumersPool::new(
948 GreedyMemoryPool::new(100),
949 NonZeroUsize::new(3).unwrap(),
950 ));
951 let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
952 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
953 .unwrap();
954 let r1 = MemoryConsumer::new("r1").register(&pool);
956 r1.grow(20);
957 let r2 = MemoryConsumer::new("r2").register(&pool);
959 r2.grow(15);
960 let r3 = MemoryConsumer::new("r3").register(&pool);
962 r3.grow(45);
963
964 let downcasted = upcasted
965 .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
966 .unwrap();
967
968 let res = downcasted.report_top(2);
970 assert_snapshot!(res, @r"
971 r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
972 r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
973 ");
974 }
975
976 #[test]
977 fn test_memory_pool_display_fmt() {
978 let top = NonZeroUsize::new(5).unwrap();
979
980 let unbounded = UnboundedMemoryPool::default();
982 assert_eq!(
983 unbounded.to_string(),
984 "unbounded(used: 0.0 B)",
985 "UnboundedMemoryPool Display"
986 );
987
988 let unbounded_arc: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
990 let r = MemoryConsumer::new("u").register(&unbounded_arc);
991 r.grow(2048);
992 assert_eq!(
993 unbounded_arc.as_ref().to_string(),
994 "unbounded(used: 2.0 KB)",
995 "UnboundedMemoryPool Display with reservations"
996 );
997
998 let greedy = GreedyMemoryPool::new(100);
1000 assert_eq!(
1001 greedy.to_string(),
1002 "greedy(used: 0.0 B, pool_size: 100.0 B)",
1003 "GreedyMemoryPool Display"
1004 );
1005
1006 let greedy_arc: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
1008 let r = MemoryConsumer::new("g").register(&greedy_arc);
1009 r.grow(50);
1010 assert_eq!(
1011 greedy_arc.as_ref().to_string(),
1012 "greedy(used: 50.0 B, pool_size: 100.0 B)",
1013 "GreedyMemoryPool Display with reservations"
1014 );
1015
1016 let fair = FairSpillPool::new(4096);
1018 assert_eq!(
1019 fair.to_string(),
1020 "fair(pool_size: 4.0 KB)",
1021 "FairSpillPool Display"
1022 );
1023
1024 let tracked_greedy = TrackConsumersPool::new(GreedyMemoryPool::new(128), top);
1026 assert_eq!(
1027 tracked_greedy.to_string(),
1028 "track_consumers(inner_pool: greedy(used: 0.0 B, pool_size: 128.0 B), num_of_top_consumers: 5)",
1029 "TrackConsumersPool<GreedyMemoryPool> Display"
1030 );
1031
1032 let tracked_fair = TrackConsumersPool::new(FairSpillPool::new(256), top);
1034 assert_eq!(
1035 tracked_fair.to_string(),
1036 "track_consumers(inner_pool: fair(pool_size: 256.0 B), num_of_top_consumers: 5)",
1037 "TrackConsumersPool<FairSpillPool> Display"
1038 );
1039
1040 let tracked_unbounded =
1042 TrackConsumersPool::new(UnboundedMemoryPool::default(), top);
1043 assert_eq!(
1044 tracked_unbounded.to_string(),
1045 "track_consumers(inner_pool: unbounded(used: 0.0 B), num_of_top_consumers: 5)",
1046 "TrackConsumersPool<UnboundedMemoryPool> Display"
1047 );
1048 }
1049}