datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{
19    human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26    num::NonZeroUsize,
27    sync::atomic::{AtomicUsize, Ordering},
28};
29
30/// A [`MemoryPool`] that enforces no limit
31#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33    used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38        self.used.fetch_add(additional, Ordering::Relaxed);
39    }
40
41    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42        self.used.fetch_sub(shrink, Ordering::Relaxed);
43    }
44
45    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46        self.grow(reservation, additional);
47        Ok(())
48    }
49
50    fn reserved(&self) -> usize {
51        self.used.load(Ordering::Relaxed)
52    }
53
54    fn memory_limit(&self) -> MemoryLimit {
55        MemoryLimit::Infinite
56    }
57}
58
59/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
60///
61/// This pool works well for queries that do not need to spill or have
62/// a single spillable operator. See [`FairSpillPool`] if there are
63/// multiple spillable operators that all will spill.
64#[derive(Debug)]
65pub struct GreedyMemoryPool {
66    pool_size: usize,
67    used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71    /// Create a new pool that can allocate up to `pool_size` bytes
72    pub fn new(pool_size: usize) -> Self {
73        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74        Self {
75            pool_size,
76            used: AtomicUsize::new(0),
77        }
78    }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83        self.used.fetch_add(additional, Ordering::Relaxed);
84    }
85
86    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87        self.used.fetch_sub(shrink, Ordering::Relaxed);
88    }
89
90    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91        self.used
92            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93                let new_used = used + additional;
94                (new_used <= self.pool_size).then_some(new_used)
95            })
96            .map_err(|used| {
97                insufficient_capacity_err(
98                    reservation,
99                    additional,
100                    self.pool_size.saturating_sub(used),
101                )
102            })?;
103        Ok(())
104    }
105
106    fn reserved(&self) -> usize {
107        self.used.load(Ordering::Relaxed)
108    }
109
110    fn memory_limit(&self) -> MemoryLimit {
111        MemoryLimit::Finite(self.pool_size)
112    }
113}
114
115/// A [`MemoryPool`] that prevents spillable reservations from using more than
116/// an even fraction of the available memory sans any unspillable reservations
117/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
118///
119/// This pool works best when you know beforehand the query has
120/// multiple spillable operators that will likely all need to
121/// spill. Sometimes it will cause spills even when there was
122/// sufficient memory (reserved for other operators) to avoid doing
123/// so.
124///
125/// ```text
126///    ┌───────────────────────z──────────────────────z───────────────┐
127///    │                       z                      z               │
128///    │                       z                      z               │
129///    │       Spillable       z       Unspillable    z     Free      │
130///    │        Memory         z        Memory        z    Memory     │
131///    │                       z                      z               │
132///    │                       z                      z               │
133///    └───────────────────────z──────────────────────z───────────────┘
134/// ```
135///
136/// Unspillable memory is allocated in a first-come, first-serve fashion
137#[derive(Debug)]
138pub struct FairSpillPool {
139    /// The total memory limit
140    pool_size: usize,
141
142    state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147    /// The number of consumers that can spill
148    num_spill: usize,
149
150    /// The total amount of memory reserved that can be spilled
151    spillable: usize,
152
153    /// The total amount of memory reserved by consumers that cannot spill
154    unspillable: usize,
155}
156
157impl FairSpillPool {
158    /// Allocate up to `limit` bytes
159    pub fn new(pool_size: usize) -> Self {
160        debug!("Created new FairSpillPool(pool_size={pool_size})");
161        Self {
162            pool_size,
163            state: Mutex::new(FairSpillPoolState {
164                num_spill: 0,
165                spillable: 0,
166                unspillable: 0,
167            }),
168        }
169    }
170}
171
172impl MemoryPool for FairSpillPool {
173    fn register(&self, consumer: &MemoryConsumer) {
174        if consumer.can_spill {
175            self.state.lock().num_spill += 1;
176        }
177    }
178
179    fn unregister(&self, consumer: &MemoryConsumer) {
180        if consumer.can_spill {
181            let mut state = self.state.lock();
182            state.num_spill = state.num_spill.checked_sub(1).unwrap();
183        }
184    }
185
186    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187        let mut state = self.state.lock();
188        match reservation.registration.consumer.can_spill {
189            true => state.spillable += additional,
190            false => state.unspillable += additional,
191        }
192    }
193
194    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195        let mut state = self.state.lock();
196        match reservation.registration.consumer.can_spill {
197            true => state.spillable -= shrink,
198            false => state.unspillable -= shrink,
199        }
200    }
201
202    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203        let mut state = self.state.lock();
204
205        match reservation.registration.consumer.can_spill {
206            true => {
207                // The total amount of memory available to spilling consumers
208                let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210                // No spiller may use more than their fraction of the memory available
211                let available = spill_available
212                    .checked_div(state.num_spill)
213                    .unwrap_or(spill_available);
214
215                if reservation.size + additional > available {
216                    return Err(insufficient_capacity_err(
217                        reservation,
218                        additional,
219                        available,
220                    ));
221                }
222                state.spillable += additional;
223            }
224            false => {
225                let available = self
226                    .pool_size
227                    .saturating_sub(state.unspillable + state.spillable);
228
229                if available < additional {
230                    return Err(insufficient_capacity_err(
231                        reservation,
232                        additional,
233                        available,
234                    ));
235                }
236                state.unspillable += additional;
237            }
238        }
239        Ok(())
240    }
241
242    fn reserved(&self) -> usize {
243        let state = self.state.lock();
244        state.spillable + state.unspillable
245    }
246
247    fn memory_limit(&self) -> MemoryLimit {
248        MemoryLimit::Finite(self.pool_size)
249    }
250}
251
252/// Constructs a resources error based upon the individual [`MemoryReservation`].
253///
254/// The error references the `bytes already allocated` for the reservation,
255/// and not the total within the collective [`MemoryPool`],
256/// nor the total across multiple reservations with the same [`MemoryConsumer`].
257#[inline(always)]
258fn insufficient_capacity_err(
259    reservation: &MemoryReservation,
260    additional: usize,
261    available: usize,
262) -> DataFusionError {
263    resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", 
264    human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269    name: String,
270    can_spill: bool,
271    reserved: AtomicUsize,
272    peak: AtomicUsize,
273}
274
275impl TrackedConsumer {
276    /// Shorthand to return the currently reserved value
277    fn reserved(&self) -> usize {
278        self.reserved.load(Ordering::Relaxed)
279    }
280
281    /// Return the peak value
282    fn peak(&self) -> usize {
283        self.peak.load(Ordering::Relaxed)
284    }
285
286    /// Grows the tracked consumer's reserved size,
287    /// should be called after the pool has successfully performed the grow().
288    fn grow(&self, additional: usize) {
289        self.reserved.fetch_add(additional, Ordering::Relaxed);
290        self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
291    }
292
293    /// Reduce the tracked consumer's reserved size,
294    /// should be called after the pool has successfully performed the shrink().
295    fn shrink(&self, shrink: usize) {
296        self.reserved.fetch_sub(shrink, Ordering::Relaxed);
297    }
298}
299
300/// A [`MemoryPool`] that tracks the consumers that have
301/// reserved memory within the inner memory pool.
302///
303/// By tracking memory reservations more carefully this pool
304/// can provide better error messages on the largest memory users
305/// when memory allocation fails.
306///
307/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
308/// The same consumer can have multiple reservations.
309///
310/// # Automatic Usage via [`RuntimeEnvBuilder`]
311///
312/// The easiest way to use `TrackConsumersPool` is via
313/// [`RuntimeEnvBuilder::with_memory_limit()`].
314///
315/// [`RuntimeEnvBuilder`]: crate::runtime_env::RuntimeEnvBuilder
316/// [`RuntimeEnvBuilder::with_memory_limit()`]: crate::runtime_env::RuntimeEnvBuilder::with_memory_limit
317///
318/// # Usage Examples
319///
320/// For more examples of using `TrackConsumersPool`, see the [memory_pool_tracking.rs] example
321///
322/// [memory_pool_tracking.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_tracking.rs
323/// [memory_pool_execution_plan.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs
324#[derive(Debug)]
325pub struct TrackConsumersPool<I> {
326    /// The wrapped memory pool that actually handles reservation logic
327    inner: I,
328    /// The amount of consumers to report(ordered top to bottom by reservation size)
329    top: NonZeroUsize,
330    /// Maps consumer_id --> TrackedConsumer
331    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
332}
333
334impl<I: MemoryPool> TrackConsumersPool<I> {
335    /// Creates a new [`TrackConsumersPool`].
336    ///
337    /// # Arguments
338    /// * `inner` - The underlying memory pool that handles actual memory allocation
339    /// * `top` - The number of top memory consumers to include in error messages
340    ///
341    /// # Note
342    /// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
343    /// instead of creating this pool manually, as it automatically sets up tracking with
344    /// sensible defaults (top 5 consumers).
345    ///
346    /// # Example
347    ///
348    /// ```rust
349    /// use std::num::NonZeroUsize;
350    /// use datafusion_execution::memory_pool::{TrackConsumersPool, GreedyMemoryPool, FairSpillPool};
351    ///
352    /// // Create with a greedy pool backend, reporting top 3 consumers in error messages
353    /// let tracked_greedy = TrackConsumersPool::new(
354    ///     GreedyMemoryPool::new(1024 * 1024), // 1MB limit
355    ///     NonZeroUsize::new(3).unwrap(),
356    /// );
357    ///
358    /// // Create with a fair spill pool backend, reporting top 5 consumers in error messages
359    /// let tracked_fair = TrackConsumersPool::new(
360    ///     FairSpillPool::new(2 * 1024 * 1024), // 2MB limit
361    ///     NonZeroUsize::new(5).unwrap(),
362    /// );
363    /// ```
364    ///
365    /// # Impact on Error Messages
366    ///
367    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
368    /// in the reported [`DataFusionError::ResourcesExhausted`].
369    pub fn new(inner: I, top: NonZeroUsize) -> Self {
370        Self {
371            inner,
372            top,
373            tracked_consumers: Default::default(),
374        }
375    }
376
377    /// Returns a formatted string with the top memory consumers.
378    pub fn report_top(&self, top: usize) -> String {
379        let mut consumers = self
380            .tracked_consumers
381            .lock()
382            .iter()
383            .map(|(consumer_id, tracked_consumer)| {
384                (
385                    (
386                        *consumer_id,
387                        tracked_consumer.name.to_owned(),
388                        tracked_consumer.can_spill,
389                        tracked_consumer.peak(),
390                    ),
391                    tracked_consumer.reserved(),
392                )
393            })
394            .collect::<Vec<_>>();
395        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
396
397        consumers[0..std::cmp::min(top, consumers.len())]
398            .iter()
399            .map(|((id, name, can_spill, peak), size)| {
400                format!(
401                    "  {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
402                    human_readable_size(*size),
403                    human_readable_size(*peak),
404                )
405            })
406            .collect::<Vec<_>>()
407            .join(",\n")
408            + "."
409    }
410}
411
412impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
413    fn register(&self, consumer: &MemoryConsumer) {
414        self.inner.register(consumer);
415
416        let mut guard = self.tracked_consumers.lock();
417        let existing = guard.insert(
418            consumer.id(),
419            TrackedConsumer {
420                name: consumer.name().to_string(),
421                can_spill: consumer.can_spill(),
422                reserved: Default::default(),
423                peak: Default::default(),
424            },
425        );
426
427        debug_assert!(
428            existing.is_none(),
429            "Registered was called twice on the same consumer"
430        );
431    }
432
433    fn unregister(&self, consumer: &MemoryConsumer) {
434        self.inner.unregister(consumer);
435        self.tracked_consumers.lock().remove(&consumer.id());
436    }
437
438    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
439        self.inner.grow(reservation, additional);
440        self.tracked_consumers
441            .lock()
442            .entry(reservation.consumer().id())
443            .and_modify(|tracked_consumer| {
444                tracked_consumer.grow(additional);
445            });
446    }
447
448    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
449        self.inner.shrink(reservation, shrink);
450        self.tracked_consumers
451            .lock()
452            .entry(reservation.consumer().id())
453            .and_modify(|tracked_consumer| {
454                tracked_consumer.shrink(shrink);
455            });
456    }
457
458    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
459        self.inner
460            .try_grow(reservation, additional)
461            .map_err(|e| match e {
462                DataFusionError::ResourcesExhausted(e) => {
463                    // wrap OOM message in top consumers
464                    DataFusionError::ResourcesExhausted(
465                        provide_top_memory_consumers_to_error_msg(
466                            e,
467                            self.report_top(self.top.into()),
468                        ),
469                    )
470                }
471                _ => e,
472            })?;
473
474        self.tracked_consumers
475            .lock()
476            .entry(reservation.consumer().id())
477            .and_modify(|tracked_consumer| {
478                tracked_consumer.grow(additional);
479            });
480        Ok(())
481    }
482
483    fn reserved(&self) -> usize {
484        self.inner.reserved()
485    }
486
487    fn memory_limit(&self) -> MemoryLimit {
488        self.inner.memory_limit()
489    }
490}
491
492fn provide_top_memory_consumers_to_error_msg(
493    error_msg: String,
494    top_consumers: String,
495) -> String {
496    format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502    use insta::{allow_duplicates, assert_snapshot, Settings};
503    use std::sync::Arc;
504
505    fn make_settings() -> Settings {
506        let mut settings = Settings::clone_current();
507        settings.add_filter(
508            r"([^\s]+)\#\d+\(can spill: (true|false)\)",
509            "$1#[ID](can spill: $2)",
510        );
511        settings
512    }
513
514    #[test]
515    fn test_fair() {
516        let pool = Arc::new(FairSpillPool::new(100)) as _;
517
518        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
519        // Can grow beyond capacity of pool
520        r1.grow(2000);
521        assert_eq!(pool.reserved(), 2000);
522
523        let mut r2 = MemoryConsumer::new("r2")
524            .with_can_spill(true)
525            .register(&pool);
526        // Can grow beyond capacity of pool
527        r2.grow(2000);
528
529        assert_eq!(pool.reserved(), 4000);
530
531        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
532        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
533
534        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
535        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
536
537        r1.shrink(1990);
538        r2.shrink(2000);
539
540        assert_eq!(pool.reserved(), 10);
541
542        r1.try_grow(10).unwrap();
543        assert_eq!(pool.reserved(), 20);
544
545        // Can grow r2 to 80 as only spilling consumer
546        r2.try_grow(80).unwrap();
547        assert_eq!(pool.reserved(), 100);
548
549        r2.shrink(70);
550
551        assert_eq!(r1.size(), 20);
552        assert_eq!(r2.size(), 10);
553        assert_eq!(pool.reserved(), 30);
554
555        let mut r3 = MemoryConsumer::new("r3")
556            .with_can_spill(true)
557            .register(&pool);
558
559        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
560        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
561
562        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
563        r2.free();
564        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
565        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
566
567        // But dropping r2 does
568        drop(r2);
569        assert_eq!(pool.reserved(), 20);
570        r3.try_grow(80).unwrap();
571
572        assert_eq!(pool.reserved(), 100);
573        r1.free();
574        assert_eq!(pool.reserved(), 80);
575
576        let mut r4 = MemoryConsumer::new("s4").register(&pool);
577        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
578        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
579    }
580
581    #[test]
582    fn test_tracked_consumers_pool() {
583        let setting = make_settings();
584        let _bound = setting.bind_to_scope();
585        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
586            GreedyMemoryPool::new(100),
587            NonZeroUsize::new(3).unwrap(),
588        ));
589
590        // Test: use all the different interfaces to change reservation size
591
592        // set r1=50, using grow and shrink
593        let mut r1 = MemoryConsumer::new("r1").register(&pool);
594        r1.grow(50);
595        r1.grow(20);
596        r1.shrink(20);
597
598        // set r2=15 using try_grow
599        let mut r2 = MemoryConsumer::new("r2").register(&pool);
600        r2.try_grow(15)
601            .expect("should succeed in memory allotment for r2");
602
603        // set r3=20 using try_resize
604        let mut r3 = MemoryConsumer::new("r3").register(&pool);
605        r3.try_resize(25)
606            .expect("should succeed in memory allotment for r3");
607        r3.try_resize(20)
608            .expect("should succeed in memory allotment for r3");
609
610        // set r4=10
611        // this should not be reported in top 3
612        let mut r4 = MemoryConsumer::new("r4").register(&pool);
613        r4.grow(10);
614
615        // Test: reports if new reservation causes error
616        // using the previously set sizes for other consumers
617        let mut r5 = MemoryConsumer::new("r5").register(&pool);
618        let res = r5.try_grow(150);
619        assert!(res.is_err());
620        let error = res.unwrap_err().strip_backtrace();
621        assert_snapshot!(error, @r"
622        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
623          r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
624          r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
625          r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
626        Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
627        ");
628    }
629
630    #[test]
631    fn test_tracked_consumers_pool_register() {
632        let setting = make_settings();
633        let _bound = setting.bind_to_scope();
634        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
635            GreedyMemoryPool::new(100),
636            NonZeroUsize::new(3).unwrap(),
637        ));
638
639        let same_name = "foo";
640
641        // Test: see error message when no consumers recorded yet
642        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
643        let res = r0.try_grow(150);
644        assert!(res.is_err());
645        let error = res.unwrap_err().strip_backtrace();
646        assert_snapshot!(error, @r"
647        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
648          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
649        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
650        ");
651
652        // API: multiple registrations using the same hashed consumer,
653        // will be recognized *differently* in the TrackConsumersPool.
654
655        r0.grow(10); // make r0=10, pool available=90
656        let new_consumer_same_name = MemoryConsumer::new(same_name);
657        let mut r1 = new_consumer_same_name.register(&pool);
658        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
659        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
660        let res = r1.try_grow(150);
661        assert!(res.is_err());
662        let error = res.unwrap_err().strip_backtrace();
663        assert_snapshot!(error, @r"
664        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
665          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
666          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
667        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
668        ");
669
670        // Test: will accumulate size changes per consumer, not per reservation
671        r1.grow(20);
672
673        let res = r1.try_grow(150);
674        assert!(res.is_err());
675        let error = res.unwrap_err().strip_backtrace();
676        assert_snapshot!(error, @r"
677        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
678          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
679          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
680        Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
681        ");
682
683        // Test: different hashed consumer, (even with the same name),
684        // will be recognized as different in the TrackConsumersPool
685        let consumer_with_same_name_but_different_hash =
686            MemoryConsumer::new(same_name).with_can_spill(true);
687        let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
688        let res = r2.try_grow(150);
689        assert!(res.is_err());
690        let error = res.unwrap_err().strip_backtrace();
691        assert_snapshot!(error, @r"
692        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
693          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
694          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
695          foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
696        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
697        ");
698    }
699
700    #[test]
701    fn test_tracked_consumers_pool_deregister() {
702        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
703            // Baseline: see the 2 memory consumers
704            let setting = make_settings();
705            let _bound = setting.bind_to_scope();
706            let mut r0 = MemoryConsumer::new("r0").register(&pool);
707            r0.grow(10);
708            let r1_consumer = MemoryConsumer::new("r1");
709            let mut r1 = r1_consumer.register(&pool);
710            r1.grow(20);
711
712            let res = r0.try_grow(150);
713            assert!(res.is_err());
714            let error = res.unwrap_err().strip_backtrace();
715            allow_duplicates!(assert_snapshot!(error, @r"
716                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
717                  r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
718                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
719                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
720                "));
721
722            // Test: unregister one
723            // only the remaining one should be listed
724            drop(r1);
725            let res = r0.try_grow(150);
726            assert!(res.is_err());
727            let error = res.unwrap_err().strip_backtrace();
728            allow_duplicates!(assert_snapshot!(error, @r"
729                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
730                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
731                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
732                "));
733
734            // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
735            // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
736            let res = r0.try_grow(150);
737            assert!(res.is_err());
738            let error = res.unwrap_err().strip_backtrace();
739            allow_duplicates!(assert_snapshot!(error, @r"
740                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
741                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
742                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
743                "));
744
745            // Test: the registration needs to free itself (or be dropped),
746            // for the proper error message
747            let res = r0.try_grow(150);
748            assert!(res.is_err());
749            let error = res.unwrap_err().strip_backtrace();
750            allow_duplicates!(assert_snapshot!(error, @r"
751                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
752                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
753                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
754                "));
755        }
756
757        let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
758            FairSpillPool::new(100),
759            NonZeroUsize::new(3).unwrap(),
760        ));
761        test_per_pool_type(tracked_spill_pool);
762
763        let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
764            GreedyMemoryPool::new(100),
765            NonZeroUsize::new(3).unwrap(),
766        ));
767        test_per_pool_type(tracked_greedy_pool);
768    }
769
770    #[test]
771    fn test_tracked_consumers_pool_use_beyond_errors() {
772        let setting = make_settings();
773        let _bound = setting.bind_to_scope();
774        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
775            Arc::new(TrackConsumersPool::new(
776                GreedyMemoryPool::new(100),
777                NonZeroUsize::new(3).unwrap(),
778            ));
779        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
780            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
781            .unwrap();
782        // set r1=20
783        let mut r1 = MemoryConsumer::new("r1").register(&pool);
784        r1.grow(20);
785        // set r2=15
786        let mut r2 = MemoryConsumer::new("r2").register(&pool);
787        r2.grow(15);
788        // set r3=45
789        let mut r3 = MemoryConsumer::new("r3").register(&pool);
790        r3.grow(45);
791
792        let downcasted = upcasted
793            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
794            .unwrap();
795
796        // Test: can get runtime metrics, even without an error thrown
797        let res = downcasted.report_top(2);
798        assert_snapshot!(res, @r"
799        r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
800        r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
801        ");
802    }
803}