datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{
19    MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26    num::NonZeroUsize,
27    sync::atomic::{AtomicUsize, Ordering},
28};
29
30/// A [`MemoryPool`] that enforces no limit
31#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33    used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38        self.used.fetch_add(additional, Ordering::Relaxed);
39    }
40
41    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42        self.used.fetch_sub(shrink, Ordering::Relaxed);
43    }
44
45    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46        self.grow(reservation, additional);
47        Ok(())
48    }
49
50    fn reserved(&self) -> usize {
51        self.used.load(Ordering::Relaxed)
52    }
53
54    fn memory_limit(&self) -> MemoryLimit {
55        MemoryLimit::Infinite
56    }
57}
58
59/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
60///
61/// This pool works well for queries that do not need to spill or have
62/// a single spillable operator. See [`FairSpillPool`] if there are
63/// multiple spillable operators that all will spill.
64#[derive(Debug)]
65pub struct GreedyMemoryPool {
66    pool_size: usize,
67    used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71    /// Create a new pool that can allocate up to `pool_size` bytes
72    pub fn new(pool_size: usize) -> Self {
73        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74        Self {
75            pool_size,
76            used: AtomicUsize::new(0),
77        }
78    }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83        self.used.fetch_add(additional, Ordering::Relaxed);
84    }
85
86    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87        self.used.fetch_sub(shrink, Ordering::Relaxed);
88    }
89
90    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91        self.used
92            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93                let new_used = used + additional;
94                (new_used <= self.pool_size).then_some(new_used)
95            })
96            .map_err(|used| {
97                insufficient_capacity_err(
98                    reservation,
99                    additional,
100                    self.pool_size.saturating_sub(used),
101                )
102            })?;
103        Ok(())
104    }
105
106    fn reserved(&self) -> usize {
107        self.used.load(Ordering::Relaxed)
108    }
109
110    fn memory_limit(&self) -> MemoryLimit {
111        MemoryLimit::Finite(self.pool_size)
112    }
113}
114
115/// A [`MemoryPool`] that prevents spillable reservations from using more than
116/// an even fraction of the available memory sans any unspillable reservations
117/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
118///
119/// This pool works best when you know beforehand the query has
120/// multiple spillable operators that will likely all need to
121/// spill. Sometimes it will cause spills even when there was
122/// sufficient memory (reserved for other operators) to avoid doing
123/// so.
124///
125/// ```text
126///    ┌───────────────────────z──────────────────────z───────────────┐
127///    │                       z                      z               │
128///    │                       z                      z               │
129///    │       Spillable       z       Unspillable    z     Free      │
130///    │        Memory         z        Memory        z    Memory     │
131///    │                       z                      z               │
132///    │                       z                      z               │
133///    └───────────────────────z──────────────────────z───────────────┘
134/// ```
135///
136/// Unspillable memory is allocated in a first-come, first-serve fashion
137#[derive(Debug)]
138pub struct FairSpillPool {
139    /// The total memory limit
140    pool_size: usize,
141
142    state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147    /// The number of consumers that can spill
148    num_spill: usize,
149
150    /// The total amount of memory reserved that can be spilled
151    spillable: usize,
152
153    /// The total amount of memory reserved by consumers that cannot spill
154    unspillable: usize,
155}
156
157impl FairSpillPool {
158    /// Allocate up to `limit` bytes
159    pub fn new(pool_size: usize) -> Self {
160        debug!("Created new FairSpillPool(pool_size={pool_size})");
161        Self {
162            pool_size,
163            state: Mutex::new(FairSpillPoolState {
164                num_spill: 0,
165                spillable: 0,
166                unspillable: 0,
167            }),
168        }
169    }
170}
171
172impl MemoryPool for FairSpillPool {
173    fn register(&self, consumer: &MemoryConsumer) {
174        if consumer.can_spill {
175            self.state.lock().num_spill += 1;
176        }
177    }
178
179    fn unregister(&self, consumer: &MemoryConsumer) {
180        if consumer.can_spill {
181            let mut state = self.state.lock();
182            state.num_spill = state.num_spill.checked_sub(1).unwrap();
183        }
184    }
185
186    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187        let mut state = self.state.lock();
188        match reservation.registration.consumer.can_spill {
189            true => state.spillable += additional,
190            false => state.unspillable += additional,
191        }
192    }
193
194    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195        let mut state = self.state.lock();
196        match reservation.registration.consumer.can_spill {
197            true => state.spillable -= shrink,
198            false => state.unspillable -= shrink,
199        }
200    }
201
202    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203        let mut state = self.state.lock();
204
205        match reservation.registration.consumer.can_spill {
206            true => {
207                // The total amount of memory available to spilling consumers
208                let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210                // No spiller may use more than their fraction of the memory available
211                let available = spill_available
212                    .checked_div(state.num_spill)
213                    .unwrap_or(spill_available);
214
215                if reservation.size + additional > available {
216                    return Err(insufficient_capacity_err(
217                        reservation,
218                        additional,
219                        available,
220                    ));
221                }
222                state.spillable += additional;
223            }
224            false => {
225                let available = self
226                    .pool_size
227                    .saturating_sub(state.unspillable + state.spillable);
228
229                if available < additional {
230                    return Err(insufficient_capacity_err(
231                        reservation,
232                        additional,
233                        available,
234                    ));
235                }
236                state.unspillable += additional;
237            }
238        }
239        Ok(())
240    }
241
242    fn reserved(&self) -> usize {
243        let state = self.state.lock();
244        state.spillable + state.unspillable
245    }
246
247    fn memory_limit(&self) -> MemoryLimit {
248        MemoryLimit::Finite(self.pool_size)
249    }
250}
251
252/// Constructs a resources error based upon the individual [`MemoryReservation`].
253///
254/// The error references the `bytes already allocated` for the reservation,
255/// and not the total within the collective [`MemoryPool`],
256/// nor the total across multiple reservations with the same [`MemoryConsumer`].
257#[inline(always)]
258fn insufficient_capacity_err(
259    reservation: &MemoryReservation,
260    additional: usize,
261    available: usize,
262) -> DataFusionError {
263    resources_datafusion_err!(
264        "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool",
265        human_readable_size(additional),
266        reservation.registration.consumer.name,
267        human_readable_size(reservation.size),
268        human_readable_size(available)
269    )
270}
271
272#[derive(Debug)]
273struct TrackedConsumer {
274    name: String,
275    can_spill: bool,
276    reserved: AtomicUsize,
277    peak: AtomicUsize,
278}
279
280impl TrackedConsumer {
281    /// Shorthand to return the currently reserved value
282    fn reserved(&self) -> usize {
283        self.reserved.load(Ordering::Relaxed)
284    }
285
286    /// Return the peak value
287    fn peak(&self) -> usize {
288        self.peak.load(Ordering::Relaxed)
289    }
290
291    /// Grows the tracked consumer's reserved size,
292    /// should be called after the pool has successfully performed the grow().
293    fn grow(&self, additional: usize) {
294        self.reserved.fetch_add(additional, Ordering::Relaxed);
295        self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
296    }
297
298    /// Reduce the tracked consumer's reserved size,
299    /// should be called after the pool has successfully performed the shrink().
300    fn shrink(&self, shrink: usize) {
301        self.reserved.fetch_sub(shrink, Ordering::Relaxed);
302    }
303}
304
305/// A [`MemoryPool`] that tracks the consumers that have
306/// reserved memory within the inner memory pool.
307///
308/// By tracking memory reservations more carefully this pool
309/// can provide better error messages on the largest memory users
310/// when memory allocation fails.
311///
312/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
313/// The same consumer can have multiple reservations.
314///
315/// # Automatic Usage via [`RuntimeEnvBuilder`]
316///
317/// The easiest way to use `TrackConsumersPool` is via
318/// [`RuntimeEnvBuilder::with_memory_limit()`].
319///
320/// [`RuntimeEnvBuilder`]: crate::runtime_env::RuntimeEnvBuilder
321/// [`RuntimeEnvBuilder::with_memory_limit()`]: crate::runtime_env::RuntimeEnvBuilder::with_memory_limit
322///
323/// # Usage Examples
324///
325/// For more examples of using `TrackConsumersPool`, see the [memory_pool_tracking.rs] example
326///
327/// [memory_pool_tracking.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs
328/// [memory_pool_execution_plan.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
329#[derive(Debug)]
330pub struct TrackConsumersPool<I> {
331    /// The wrapped memory pool that actually handles reservation logic
332    inner: I,
333    /// The amount of consumers to report(ordered top to bottom by reservation size)
334    top: NonZeroUsize,
335    /// Maps consumer_id --> TrackedConsumer
336    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
337}
338
339impl<I: MemoryPool> TrackConsumersPool<I> {
340    /// Creates a new [`TrackConsumersPool`].
341    ///
342    /// # Arguments
343    /// * `inner` - The underlying memory pool that handles actual memory allocation
344    /// * `top` - The number of top memory consumers to include in error messages
345    ///
346    /// # Note
347    /// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
348    /// instead of creating this pool manually, as it automatically sets up tracking with
349    /// sensible defaults (top 5 consumers).
350    ///
351    /// # Example
352    ///
353    /// ```rust
354    /// use datafusion_execution::memory_pool::{
355    ///     FairSpillPool, GreedyMemoryPool, TrackConsumersPool,
356    /// };
357    /// use std::num::NonZeroUsize;
358    ///
359    /// // Create with a greedy pool backend, reporting top 3 consumers in error messages
360    /// let tracked_greedy = TrackConsumersPool::new(
361    ///     GreedyMemoryPool::new(1024 * 1024), // 1MB limit
362    ///     NonZeroUsize::new(3).unwrap(),
363    /// );
364    ///
365    /// // Create with a fair spill pool backend, reporting top 5 consumers in error messages
366    /// let tracked_fair = TrackConsumersPool::new(
367    ///     FairSpillPool::new(2 * 1024 * 1024), // 2MB limit
368    ///     NonZeroUsize::new(5).unwrap(),
369    /// );
370    /// ```
371    ///
372    /// # Impact on Error Messages
373    ///
374    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
375    /// in the reported [`DataFusionError::ResourcesExhausted`].
376    pub fn new(inner: I, top: NonZeroUsize) -> Self {
377        Self {
378            inner,
379            top,
380            tracked_consumers: Default::default(),
381        }
382    }
383
384    /// Returns a formatted string with the top memory consumers.
385    pub fn report_top(&self, top: usize) -> String {
386        let mut consumers = self
387            .tracked_consumers
388            .lock()
389            .iter()
390            .map(|(consumer_id, tracked_consumer)| {
391                (
392                    (
393                        *consumer_id,
394                        tracked_consumer.name.to_owned(),
395                        tracked_consumer.can_spill,
396                        tracked_consumer.peak(),
397                    ),
398                    tracked_consumer.reserved(),
399                )
400            })
401            .collect::<Vec<_>>();
402        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
403
404        consumers[0..std::cmp::min(top, consumers.len())]
405            .iter()
406            .map(|((id, name, can_spill, peak), size)| {
407                format!(
408                    "  {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
409                    human_readable_size(*size),
410                    human_readable_size(*peak),
411                )
412            })
413            .collect::<Vec<_>>()
414            .join(",\n")
415            + "."
416    }
417}
418
419impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
420    fn register(&self, consumer: &MemoryConsumer) {
421        self.inner.register(consumer);
422
423        let mut guard = self.tracked_consumers.lock();
424        let existing = guard.insert(
425            consumer.id(),
426            TrackedConsumer {
427                name: consumer.name().to_string(),
428                can_spill: consumer.can_spill(),
429                reserved: Default::default(),
430                peak: Default::default(),
431            },
432        );
433
434        debug_assert!(
435            existing.is_none(),
436            "Registered was called twice on the same consumer"
437        );
438    }
439
440    fn unregister(&self, consumer: &MemoryConsumer) {
441        self.inner.unregister(consumer);
442        self.tracked_consumers.lock().remove(&consumer.id());
443    }
444
445    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
446        self.inner.grow(reservation, additional);
447        self.tracked_consumers
448            .lock()
449            .entry(reservation.consumer().id())
450            .and_modify(|tracked_consumer| {
451                tracked_consumer.grow(additional);
452            });
453    }
454
455    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
456        self.inner.shrink(reservation, shrink);
457        self.tracked_consumers
458            .lock()
459            .entry(reservation.consumer().id())
460            .and_modify(|tracked_consumer| {
461                tracked_consumer.shrink(shrink);
462            });
463    }
464
465    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
466        self.inner
467            .try_grow(reservation, additional)
468            .map_err(|e| match e {
469                DataFusionError::ResourcesExhausted(e) => {
470                    // wrap OOM message in top consumers
471                    DataFusionError::ResourcesExhausted(
472                        provide_top_memory_consumers_to_error_msg(
473                            &reservation.consumer().name,
474                            &e,
475                            &self.report_top(self.top.into()),
476                        ),
477                    )
478                }
479                _ => e,
480            })?;
481
482        self.tracked_consumers
483            .lock()
484            .entry(reservation.consumer().id())
485            .and_modify(|tracked_consumer| {
486                tracked_consumer.grow(additional);
487            });
488        Ok(())
489    }
490
491    fn reserved(&self) -> usize {
492        self.inner.reserved()
493    }
494
495    fn memory_limit(&self) -> MemoryLimit {
496        self.inner.memory_limit()
497    }
498}
499
500fn provide_top_memory_consumers_to_error_msg(
501    consumer_name: &str,
502    error_msg: &str,
503    top_consumers: &str,
504) -> String {
505    format!(
506        "Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}"
507    )
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use insta::{Settings, allow_duplicates, assert_snapshot};
514    use std::sync::Arc;
515
516    fn make_settings() -> Settings {
517        let mut settings = Settings::clone_current();
518        settings.add_filter(
519            r"([^\s]+)\#\d+\(can spill: (true|false)\)",
520            "$1#[ID](can spill: $2)",
521        );
522        settings
523    }
524
525    #[test]
526    fn test_fair() {
527        let pool = Arc::new(FairSpillPool::new(100)) as _;
528
529        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
530        // Can grow beyond capacity of pool
531        r1.grow(2000);
532        assert_eq!(pool.reserved(), 2000);
533
534        let mut r2 = MemoryConsumer::new("r2")
535            .with_can_spill(true)
536            .register(&pool);
537        // Can grow beyond capacity of pool
538        r2.grow(2000);
539
540        assert_eq!(pool.reserved(), 4000);
541
542        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
543        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
544
545        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
546        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
547
548        r1.shrink(1990);
549        r2.shrink(2000);
550
551        assert_eq!(pool.reserved(), 10);
552
553        r1.try_grow(10).unwrap();
554        assert_eq!(pool.reserved(), 20);
555
556        // Can grow r2 to 80 as only spilling consumer
557        r2.try_grow(80).unwrap();
558        assert_eq!(pool.reserved(), 100);
559
560        r2.shrink(70);
561
562        assert_eq!(r1.size(), 20);
563        assert_eq!(r2.size(), 10);
564        assert_eq!(pool.reserved(), 30);
565
566        let mut r3 = MemoryConsumer::new("r3")
567            .with_can_spill(true)
568            .register(&pool);
569
570        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
571        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
572
573        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
574        r2.free();
575        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
576        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
577
578        // But dropping r2 does
579        drop(r2);
580        assert_eq!(pool.reserved(), 20);
581        r3.try_grow(80).unwrap();
582
583        assert_eq!(pool.reserved(), 100);
584        r1.free();
585        assert_eq!(pool.reserved(), 80);
586
587        let mut r4 = MemoryConsumer::new("s4").register(&pool);
588        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
589        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
590    }
591
592    #[test]
593    fn test_tracked_consumers_pool() {
594        let setting = make_settings();
595        let _bound = setting.bind_to_scope();
596        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
597            GreedyMemoryPool::new(100),
598            NonZeroUsize::new(3).unwrap(),
599        ));
600
601        // Test: use all the different interfaces to change reservation size
602
603        // set r1=50, using grow and shrink
604        let mut r1 = MemoryConsumer::new("r1").register(&pool);
605        r1.grow(50);
606        r1.grow(20);
607        r1.shrink(20);
608
609        // set r2=15 using try_grow
610        let mut r2 = MemoryConsumer::new("r2").register(&pool);
611        r2.try_grow(15)
612            .expect("should succeed in memory allotment for r2");
613
614        // set r3=20 using try_resize
615        let mut r3 = MemoryConsumer::new("r3").register(&pool);
616        r3.try_resize(25)
617            .expect("should succeed in memory allotment for r3");
618        r3.try_resize(20)
619            .expect("should succeed in memory allotment for r3");
620
621        // set r4=10
622        // this should not be reported in top 3
623        let mut r4 = MemoryConsumer::new("r4").register(&pool);
624        r4.grow(10);
625
626        // Test: reports if new reservation causes error
627        // using the previously set sizes for other consumers
628        let mut r5 = MemoryConsumer::new("r5").register(&pool);
629        let res = r5.try_grow(150);
630        assert!(res.is_err());
631        let error = res.unwrap_err().strip_backtrace();
632        assert_snapshot!(error, @r"
633        Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
634          r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
635          r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
636          r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
637        Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
638        ");
639    }
640
641    #[test]
642    fn test_tracked_consumers_pool_register() {
643        let setting = make_settings();
644        let _bound = setting.bind_to_scope();
645        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
646            GreedyMemoryPool::new(100),
647            NonZeroUsize::new(3).unwrap(),
648        ));
649
650        let same_name = "foo";
651
652        // Test: see error message when no consumers recorded yet
653        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
654        let res = r0.try_grow(150);
655        assert!(res.is_err());
656        let error = res.unwrap_err().strip_backtrace();
657        assert_snapshot!(error, @r"
658        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
659          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
660        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
661        ");
662
663        // API: multiple registrations using the same hashed consumer,
664        // will be recognized *differently* in the TrackConsumersPool.
665
666        r0.grow(10); // make r0=10, pool available=90
667        let new_consumer_same_name = MemoryConsumer::new(same_name);
668        let mut r1 = new_consumer_same_name.register(&pool);
669        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
670        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
671        let res = r1.try_grow(150);
672        assert!(res.is_err());
673        let error = res.unwrap_err().strip_backtrace();
674        assert_snapshot!(error, @r"
675        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
676          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
677          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
678        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
679        ");
680
681        // Test: will accumulate size changes per consumer, not per reservation
682        r1.grow(20);
683
684        let res = r1.try_grow(150);
685        assert!(res.is_err());
686        let error = res.unwrap_err().strip_backtrace();
687        assert_snapshot!(error, @r"
688        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
689          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
690          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
691        Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
692        ");
693
694        // Test: different hashed consumer, (even with the same name),
695        // will be recognized as different in the TrackConsumersPool
696        let consumer_with_same_name_but_different_hash =
697            MemoryConsumer::new(same_name).with_can_spill(true);
698        let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
699        let res = r2.try_grow(150);
700        assert!(res.is_err());
701        let error = res.unwrap_err().strip_backtrace();
702        assert_snapshot!(error, @r"
703        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
704          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
705          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
706          foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
707        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
708        ");
709    }
710
711    #[test]
712    fn test_tracked_consumers_pool_deregister() {
713        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
714            // Baseline: see the 2 memory consumers
715            let setting = make_settings();
716            let _bound = setting.bind_to_scope();
717            let mut r0 = MemoryConsumer::new("r0").register(&pool);
718            r0.grow(10);
719            let r1_consumer = MemoryConsumer::new("r1");
720            let mut r1 = r1_consumer.register(&pool);
721            r1.grow(20);
722
723            let res = r0.try_grow(150);
724            assert!(res.is_err());
725            let error = res.unwrap_err().strip_backtrace();
726            allow_duplicates!(assert_snapshot!(error, @r"
727            Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
728              r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
729              r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
730            Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
731            "));
732
733            // Test: unregister one
734            // only the remaining one should be listed
735            drop(r1);
736            let res = r0.try_grow(150);
737            assert!(res.is_err());
738            let error = res.unwrap_err().strip_backtrace();
739            allow_duplicates!(assert_snapshot!(error, @r"
740            Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
741              r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
742            Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
743            "));
744
745            // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
746            // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
747            let res = r0.try_grow(150);
748            assert!(res.is_err());
749            let error = res.unwrap_err().strip_backtrace();
750            allow_duplicates!(assert_snapshot!(error, @r"
751            Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
752              r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
753            Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
754            "));
755
756            // Test: the registration needs to free itself (or be dropped),
757            // for the proper error message
758            let res = r0.try_grow(150);
759            assert!(res.is_err());
760            let error = res.unwrap_err().strip_backtrace();
761            allow_duplicates!(assert_snapshot!(error, @r"
762            Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
763              r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
764            Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
765            "));
766        }
767
768        let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
769            FairSpillPool::new(100),
770            NonZeroUsize::new(3).unwrap(),
771        ));
772        test_per_pool_type(tracked_spill_pool);
773
774        let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
775            GreedyMemoryPool::new(100),
776            NonZeroUsize::new(3).unwrap(),
777        ));
778        test_per_pool_type(tracked_greedy_pool);
779    }
780
781    #[test]
782    fn test_tracked_consumers_pool_use_beyond_errors() {
783        let setting = make_settings();
784        let _bound = setting.bind_to_scope();
785        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
786            Arc::new(TrackConsumersPool::new(
787                GreedyMemoryPool::new(100),
788                NonZeroUsize::new(3).unwrap(),
789            ));
790        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
791            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
792            .unwrap();
793        // set r1=20
794        let mut r1 = MemoryConsumer::new("r1").register(&pool);
795        r1.grow(20);
796        // set r2=15
797        let mut r2 = MemoryConsumer::new("r2").register(&pool);
798        r2.grow(15);
799        // set r3=45
800        let mut r3 = MemoryConsumer::new("r3").register(&pool);
801        r3.grow(45);
802
803        let downcasted = upcasted
804            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
805            .unwrap();
806
807        // Test: can get runtime metrics, even without an error thrown
808        let res = downcasted.report_top(2);
809        assert_snapshot!(res, @r"
810        r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
811        r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
812        ");
813    }
814}