datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
19use datafusion_common::HashMap;
20use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
21use log::debug;
22use parking_lot::Mutex;
23use std::{
24    num::NonZeroUsize,
25    sync::atomic::{AtomicU64, AtomicUsize, Ordering},
26};
27
28/// A [`MemoryPool`] that enforces no limit
29#[derive(Debug, Default)]
30pub struct UnboundedMemoryPool {
31    used: AtomicUsize,
32}
33
34impl MemoryPool for UnboundedMemoryPool {
35    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
36        self.used.fetch_add(additional, Ordering::Relaxed);
37    }
38
39    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
40        self.used.fetch_sub(shrink, Ordering::Relaxed);
41    }
42
43    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
44        self.grow(reservation, additional);
45        Ok(())
46    }
47
48    fn reserved(&self) -> usize {
49        self.used.load(Ordering::Relaxed)
50    }
51}
52
53/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
54///
55/// This pool works well for queries that do not need to spill or have
56/// a single spillable operator. See [`FairSpillPool`] if there are
57/// multiple spillable operators that all will spill.
58#[derive(Debug)]
59pub struct GreedyMemoryPool {
60    pool_size: usize,
61    used: AtomicUsize,
62}
63
64impl GreedyMemoryPool {
65    /// Create a new pool that can allocate up to `pool_size` bytes
66    pub fn new(pool_size: usize) -> Self {
67        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
68        Self {
69            pool_size,
70            used: AtomicUsize::new(0),
71        }
72    }
73}
74
75impl MemoryPool for GreedyMemoryPool {
76    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
77        self.used.fetch_add(additional, Ordering::Relaxed);
78    }
79
80    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
81        self.used.fetch_sub(shrink, Ordering::Relaxed);
82    }
83
84    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
85        self.used
86            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
87                let new_used = used + additional;
88                (new_used <= self.pool_size).then_some(new_used)
89            })
90            .map_err(|used| {
91                insufficient_capacity_err(
92                    reservation,
93                    additional,
94                    self.pool_size.saturating_sub(used),
95                )
96            })?;
97        Ok(())
98    }
99
100    fn reserved(&self) -> usize {
101        self.used.load(Ordering::Relaxed)
102    }
103}
104
105/// A [`MemoryPool`] that prevents spillable reservations from using more than
106/// an even fraction of the available memory sans any unspillable reservations
107/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
108///
109/// This pool works best when you know beforehand the query has
110/// multiple spillable operators that will likely all need to
111/// spill. Sometimes it will cause spills even when there was
112/// sufficient memory (reserved for other operators) to avoid doing
113/// so.
114///
115/// ```text
116///    ┌───────────────────────z──────────────────────z───────────────┐
117///    │                       z                      z               │
118///    │                       z                      z               │
119///    │       Spillable       z       Unspillable    z     Free      │
120///    │        Memory         z        Memory        z    Memory     │
121///    │                       z                      z               │
122///    │                       z                      z               │
123///    └───────────────────────z──────────────────────z───────────────┘
124/// ```
125///
126/// Unspillable memory is allocated in a first-come, first-serve fashion
127#[derive(Debug)]
128pub struct FairSpillPool {
129    /// The total memory limit
130    pool_size: usize,
131
132    state: Mutex<FairSpillPoolState>,
133}
134
135#[derive(Debug)]
136struct FairSpillPoolState {
137    /// The number of consumers that can spill
138    num_spill: usize,
139
140    /// The total amount of memory reserved that can be spilled
141    spillable: usize,
142
143    /// The total amount of memory reserved by consumers that cannot spill
144    unspillable: usize,
145}
146
147impl FairSpillPool {
148    /// Allocate up to `limit` bytes
149    pub fn new(pool_size: usize) -> Self {
150        debug!("Created new FairSpillPool(pool_size={pool_size})");
151        Self {
152            pool_size,
153            state: Mutex::new(FairSpillPoolState {
154                num_spill: 0,
155                spillable: 0,
156                unspillable: 0,
157            }),
158        }
159    }
160}
161
162impl MemoryPool for FairSpillPool {
163    fn register(&self, consumer: &MemoryConsumer) {
164        if consumer.can_spill {
165            self.state.lock().num_spill += 1;
166        }
167    }
168
169    fn unregister(&self, consumer: &MemoryConsumer) {
170        if consumer.can_spill {
171            let mut state = self.state.lock();
172            state.num_spill = state.num_spill.checked_sub(1).unwrap();
173        }
174    }
175
176    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
177        let mut state = self.state.lock();
178        match reservation.registration.consumer.can_spill {
179            true => state.spillable += additional,
180            false => state.unspillable += additional,
181        }
182    }
183
184    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
185        let mut state = self.state.lock();
186        match reservation.registration.consumer.can_spill {
187            true => state.spillable -= shrink,
188            false => state.unspillable -= shrink,
189        }
190    }
191
192    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
193        let mut state = self.state.lock();
194
195        match reservation.registration.consumer.can_spill {
196            true => {
197                // The total amount of memory available to spilling consumers
198                let spill_available = self.pool_size.saturating_sub(state.unspillable);
199
200                // No spiller may use more than their fraction of the memory available
201                let available = spill_available
202                    .checked_div(state.num_spill)
203                    .unwrap_or(spill_available);
204
205                if reservation.size + additional > available {
206                    return Err(insufficient_capacity_err(
207                        reservation,
208                        additional,
209                        available,
210                    ));
211                }
212                state.spillable += additional;
213            }
214            false => {
215                let available = self
216                    .pool_size
217                    .saturating_sub(state.unspillable + state.spillable);
218
219                if available < additional {
220                    return Err(insufficient_capacity_err(
221                        reservation,
222                        additional,
223                        available,
224                    ));
225                }
226                state.unspillable += additional;
227            }
228        }
229        Ok(())
230    }
231
232    fn reserved(&self) -> usize {
233        let state = self.state.lock();
234        state.spillable + state.unspillable
235    }
236}
237
238/// Constructs a resources error based upon the individual [`MemoryReservation`].
239///
240/// The error references the `bytes already allocated` for the reservation,
241/// and not the total within the collective [`MemoryPool`],
242/// nor the total across multiple reservations with the same [`MemoryConsumer`].
243#[inline(always)]
244fn insufficient_capacity_err(
245    reservation: &MemoryReservation,
246    additional: usize,
247    available: usize,
248) -> DataFusionError {
249    resources_datafusion_err!("Failed to allocate additional {} bytes for {} with {} bytes already allocated for this reservation - {} bytes remain available for the total pool", additional, reservation.registration.consumer.name, reservation.size, available)
250}
251
252/// A [`MemoryPool`] that tracks the consumers that have
253/// reserved memory within the inner memory pool.
254///
255/// By tracking memory reservations more carefully this pool
256/// can provide better error messages on the largest memory users
257///
258/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
259/// The same consumer can have multiple reservations.
260#[derive(Debug)]
261pub struct TrackConsumersPool<I> {
262    inner: I,
263    top: NonZeroUsize,
264    tracked_consumers: Mutex<HashMap<MemoryConsumer, AtomicU64>>,
265}
266
267impl<I: MemoryPool> TrackConsumersPool<I> {
268    /// Creates a new [`TrackConsumersPool`].
269    ///
270    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
271    /// in the reported [`DataFusionError::ResourcesExhausted`].
272    pub fn new(inner: I, top: NonZeroUsize) -> Self {
273        Self {
274            inner,
275            top,
276            tracked_consumers: Default::default(),
277        }
278    }
279
280    /// Determine if there are multiple [`MemoryConsumer`]s registered
281    /// which have the same name.
282    ///
283    /// This is very tied to the implementation of the memory consumer.
284    fn has_multiple_consumers(&self, name: &String) -> bool {
285        let consumer = MemoryConsumer::new(name);
286        let consumer_with_spill = consumer.clone().with_can_spill(true);
287        let guard = self.tracked_consumers.lock();
288        guard.contains_key(&consumer) && guard.contains_key(&consumer_with_spill)
289    }
290
291    /// The top consumers in a report string.
292    pub fn report_top(&self, top: usize) -> String {
293        let mut consumers = self
294            .tracked_consumers
295            .lock()
296            .iter()
297            .map(|(consumer, reserved)| {
298                (
299                    (consumer.name().to_owned(), consumer.can_spill()),
300                    reserved.load(Ordering::Acquire),
301                )
302            })
303            .collect::<Vec<_>>();
304        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
305
306        consumers[0..std::cmp::min(top, consumers.len())]
307            .iter()
308            .map(|((name, can_spill), size)| {
309                if self.has_multiple_consumers(name) {
310                    format!("{name}(can_spill={}) consumed {:?} bytes", can_spill, size)
311                } else {
312                    format!("{name} consumed {:?} bytes", size)
313                }
314            })
315            .collect::<Vec<_>>()
316            .join(", ")
317    }
318}
319
320impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
321    fn register(&self, consumer: &MemoryConsumer) {
322        self.inner.register(consumer);
323
324        let mut guard = self.tracked_consumers.lock();
325        if let Some(already_reserved) = guard.insert(consumer.clone(), Default::default())
326        {
327            guard.entry_ref(consumer).and_modify(|bytes| {
328                bytes.fetch_add(
329                    already_reserved.load(Ordering::Acquire),
330                    Ordering::AcqRel,
331                );
332            });
333        }
334    }
335
336    fn unregister(&self, consumer: &MemoryConsumer) {
337        self.inner.unregister(consumer);
338        self.tracked_consumers.lock().remove(consumer);
339    }
340
341    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
342        self.inner.grow(reservation, additional);
343        self.tracked_consumers
344            .lock()
345            .entry_ref(reservation.consumer())
346            .and_modify(|bytes| {
347                bytes.fetch_add(additional as u64, Ordering::AcqRel);
348            });
349    }
350
351    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
352        self.inner.shrink(reservation, shrink);
353        self.tracked_consumers
354            .lock()
355            .entry_ref(reservation.consumer())
356            .and_modify(|bytes| {
357                bytes.fetch_sub(shrink as u64, Ordering::AcqRel);
358            });
359    }
360
361    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
362        self.inner
363            .try_grow(reservation, additional)
364            .map_err(|e| match e {
365                DataFusionError::ResourcesExhausted(e) => {
366                    // wrap OOM message in top consumers
367                    DataFusionError::ResourcesExhausted(
368                        provide_top_memory_consumers_to_error_msg(
369                            e,
370                            self.report_top(self.top.into()),
371                        ),
372                    )
373                }
374                _ => e,
375            })?;
376
377        self.tracked_consumers
378            .lock()
379            .entry_ref(reservation.consumer())
380            .and_modify(|bytes| {
381                bytes.fetch_add(additional as u64, Ordering::AcqRel);
382            });
383        Ok(())
384    }
385
386    fn reserved(&self) -> usize {
387        self.inner.reserved()
388    }
389}
390
391fn provide_top_memory_consumers_to_error_msg(
392    error_msg: String,
393    top_consumers: String,
394) -> String {
395    format!("Additional allocation failed with top memory consumers (across reservations) as: {}. Error: {}", top_consumers, error_msg)
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use std::sync::Arc;
402
403    #[test]
404    fn test_fair() {
405        let pool = Arc::new(FairSpillPool::new(100)) as _;
406
407        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
408        // Can grow beyond capacity of pool
409        r1.grow(2000);
410        assert_eq!(pool.reserved(), 2000);
411
412        let mut r2 = MemoryConsumer::new("r2")
413            .with_can_spill(true)
414            .register(&pool);
415        // Can grow beyond capacity of pool
416        r2.grow(2000);
417
418        assert_eq!(pool.reserved(), 4000);
419
420        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
421        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated for this reservation - 0 bytes remain available for the total pool");
422
423        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
424        assert_eq!(err, "Resources exhausted: Failed to allocate additional 1 bytes for r2 with 2000 bytes already allocated for this reservation - 0 bytes remain available for the total pool");
425
426        r1.shrink(1990);
427        r2.shrink(2000);
428
429        assert_eq!(pool.reserved(), 10);
430
431        r1.try_grow(10).unwrap();
432        assert_eq!(pool.reserved(), 20);
433
434        // Can grow r2 to 80 as only spilling consumer
435        r2.try_grow(80).unwrap();
436        assert_eq!(pool.reserved(), 100);
437
438        r2.shrink(70);
439
440        assert_eq!(r1.size(), 20);
441        assert_eq!(r2.size(), 10);
442        assert_eq!(pool.reserved(), 30);
443
444        let mut r3 = MemoryConsumer::new("r3")
445            .with_can_spill(true)
446            .register(&pool);
447
448        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
449        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated for this reservation - 40 bytes remain available for the total pool");
450
451        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
452        r2.free();
453        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
454        assert_eq!(err, "Resources exhausted: Failed to allocate additional 70 bytes for r3 with 0 bytes already allocated for this reservation - 40 bytes remain available for the total pool");
455
456        // But dropping r2 does
457        drop(r2);
458        assert_eq!(pool.reserved(), 20);
459        r3.try_grow(80).unwrap();
460
461        assert_eq!(pool.reserved(), 100);
462        r1.free();
463        assert_eq!(pool.reserved(), 80);
464
465        let mut r4 = MemoryConsumer::new("s4").register(&pool);
466        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
467        assert_eq!(err, "Resources exhausted: Failed to allocate additional 30 bytes for s4 with 0 bytes already allocated for this reservation - 20 bytes remain available for the total pool");
468    }
469
470    #[test]
471    fn test_tracked_consumers_pool() {
472        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
473            GreedyMemoryPool::new(100),
474            NonZeroUsize::new(3).unwrap(),
475        ));
476
477        // Test: use all the different interfaces to change reservation size
478
479        // set r1=50, using grow and shrink
480        let mut r1 = MemoryConsumer::new("r1").register(&pool);
481        r1.grow(70);
482        r1.shrink(20);
483
484        // set r2=15 using try_grow
485        let mut r2 = MemoryConsumer::new("r2").register(&pool);
486        r2.try_grow(15)
487            .expect("should succeed in memory allotment for r2");
488
489        // set r3=20 using try_resize
490        let mut r3 = MemoryConsumer::new("r3").register(&pool);
491        r3.try_resize(25)
492            .expect("should succeed in memory allotment for r3");
493        r3.try_resize(20)
494            .expect("should succeed in memory allotment for r3");
495
496        // set r4=10
497        // this should not be reported in top 3
498        let mut r4 = MemoryConsumer::new("r4").register(&pool);
499        r4.grow(10);
500
501        // Test: reports if new reservation causes error
502        // using the previously set sizes for other consumers
503        let mut r5 = MemoryConsumer::new("r5").register(&pool);
504        let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 50 bytes, r3 consumed 20 bytes, r2 consumed 15 bytes. Error: Failed to allocate additional 150 bytes for r5 with 0 bytes already allocated for this reservation - 5 bytes remain available for the total pool";
505        let res = r5.try_grow(150);
506        assert!(
507            matches!(
508                &res,
509                Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
510            ),
511            "should provide list of top memory consumers, instead found {:?}",
512            res
513        );
514    }
515
516    #[test]
517    fn test_tracked_consumers_pool_register() {
518        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
519            GreedyMemoryPool::new(100),
520            NonZeroUsize::new(3).unwrap(),
521        ));
522
523        let same_name = "foo";
524
525        // Test: see error message when no consumers recorded yet
526        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
527        let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 100 bytes remain available for the total pool";
528        let res = r0.try_grow(150);
529        assert!(
530            matches!(
531                &res,
532                Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
533            ),
534            "should provide proper error when no reservations have been made yet, instead found {:?}", res
535        );
536
537        // API: multiple registrations using the same hashed consumer,
538        // will be recognized as the same in the TrackConsumersPool.
539
540        // Test: will be the same per Top Consumers reported.
541        r0.grow(10); // make r0=10, pool available=90
542        let new_consumer_same_name = MemoryConsumer::new(same_name);
543        let mut r1 = new_consumer_same_name.register(&pool);
544        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
545        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
546        let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 10 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
547        let res = r1.try_grow(150);
548        assert!(
549            matches!(
550                &res,
551                Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
552            ),
553            "should provide proper error with same hashed consumer (a single foo=10 bytes, available=90), instead found {:?}", res
554        );
555
556        // Test: will accumulate size changes per consumer, not per reservation
557        r1.grow(20);
558        let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo consumed 30 bytes. Error: Failed to allocate additional 150 bytes for foo with 20 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
559        let res = r1.try_grow(150);
560        assert!(
561            matches!(
562                &res,
563                Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
564            ),
565            "should provide proper error with same hashed consumer (a single foo=30 bytes, available=70), instead found {:?}", res
566        );
567
568        // Test: different hashed consumer, (even with the same name),
569        // will be recognized as different in the TrackConsumersPool
570        let consumer_with_same_name_but_different_hash =
571            MemoryConsumer::new(same_name).with_can_spill(true);
572        let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
573        let expected = "Additional allocation failed with top memory consumers (across reservations) as: foo(can_spill=false) consumed 30 bytes, foo(can_spill=true) consumed 0 bytes. Error: Failed to allocate additional 150 bytes for foo with 0 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
574        let res = r2.try_grow(150);
575        assert!(
576            matches!(
577                &res,
578                Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
579            ),
580            "should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70), instead found {:?}", res
581        );
582    }
583
584    #[test]
585    fn test_tracked_consumers_pool_deregister() {
586        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
587            // Baseline: see the 2 memory consumers
588            let mut r0 = MemoryConsumer::new("r0").register(&pool);
589            r0.grow(10);
590            let r1_consumer = MemoryConsumer::new("r1");
591            let mut r1 = r1_consumer.clone().register(&pool);
592            r1.grow(20);
593            let expected = "Additional allocation failed with top memory consumers (across reservations) as: r1 consumed 20 bytes, r0 consumed 10 bytes. Error: Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
594            let res = r0.try_grow(150);
595            assert!(
596                matches!(
597                    &res,
598                    Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected)
599                ),
600                "should provide proper error with both consumers, instead found {:?}",
601                res
602            );
603
604            // Test: unregister one
605            // only the remaining one should be listed
606            pool.unregister(&r1_consumer);
607            let expected_consumers = "Additional allocation failed with top memory consumers (across reservations) as: r0 consumed 10 bytes";
608            let res = r0.try_grow(150);
609            assert!(
610                matches!(
611                    &res,
612                    Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_consumers)
613                ),
614                "should provide proper error with only 1 consumer left registered, instead found {:?}", res
615            );
616
617            // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
618            // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
619            let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 70 bytes remain available for the total pool";
620            let res = r0.try_grow(150);
621            assert!(
622                matches!(
623                    &res,
624                    Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_70_available)
625                ),
626                "should find that the inner pool will still count all bytes for the deregistered consumer until the reservation is dropped, instead found {:?}", res
627            );
628
629            // Test: the registration needs to free itself (or be dropped),
630            // for the proper error message
631            r1.free();
632            let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated for this reservation - 90 bytes remain available for the total pool";
633            let res = r0.try_grow(150);
634            assert!(
635                matches!(
636                    &res,
637                    Err(DataFusionError::ResourcesExhausted(ref e)) if e.to_string().contains(expected_90_available)
638                ),
639                "should correctly account the total bytes after reservation is free, instead found {:?}", res
640            );
641        }
642
643        let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
644            FairSpillPool::new(100),
645            NonZeroUsize::new(3).unwrap(),
646        ));
647        test_per_pool_type(tracked_spill_pool);
648
649        let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
650            GreedyMemoryPool::new(100),
651            NonZeroUsize::new(3).unwrap(),
652        ));
653        test_per_pool_type(tracked_greedy_pool);
654    }
655
656    #[test]
657    fn test_tracked_consumers_pool_use_beyond_errors() {
658        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
659            Arc::new(TrackConsumersPool::new(
660                GreedyMemoryPool::new(100),
661                NonZeroUsize::new(3).unwrap(),
662            ));
663        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
664            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
665            .unwrap();
666        // set r1=20
667        let mut r1 = MemoryConsumer::new("r1").register(&pool);
668        r1.grow(20);
669        // set r2=15
670        let mut r2 = MemoryConsumer::new("r2").register(&pool);
671        r2.grow(15);
672        // set r3=45
673        let mut r3 = MemoryConsumer::new("r3").register(&pool);
674        r3.grow(45);
675
676        let downcasted = upcasted
677            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
678            .unwrap();
679
680        // Test: can get runtime metrics, even without an error thrown
681        let expected = "r3 consumed 45 bytes, r1 consumed 20 bytes";
682        let res = downcasted.report_top(2);
683        assert_eq!(
684            res, expected,
685            "should provide list of top memory consumers, instead found {:?}",
686            res
687        );
688    }
689}