datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{
19    human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26    num::NonZeroUsize,
27    sync::atomic::{AtomicUsize, Ordering},
28};
29
30/// A [`MemoryPool`] that enforces no limit
31#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33    used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38        self.used.fetch_add(additional, Ordering::Relaxed);
39    }
40
41    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42        self.used.fetch_sub(shrink, Ordering::Relaxed);
43    }
44
45    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46        self.grow(reservation, additional);
47        Ok(())
48    }
49
50    fn reserved(&self) -> usize {
51        self.used.load(Ordering::Relaxed)
52    }
53
54    fn memory_limit(&self) -> MemoryLimit {
55        MemoryLimit::Infinite
56    }
57}
58
59/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
60///
61/// This pool works well for queries that do not need to spill or have
62/// a single spillable operator. See [`FairSpillPool`] if there are
63/// multiple spillable operators that all will spill.
64#[derive(Debug)]
65pub struct GreedyMemoryPool {
66    pool_size: usize,
67    used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71    /// Create a new pool that can allocate up to `pool_size` bytes
72    pub fn new(pool_size: usize) -> Self {
73        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74        Self {
75            pool_size,
76            used: AtomicUsize::new(0),
77        }
78    }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83        self.used.fetch_add(additional, Ordering::Relaxed);
84    }
85
86    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87        self.used.fetch_sub(shrink, Ordering::Relaxed);
88    }
89
90    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91        self.used
92            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93                let new_used = used + additional;
94                (new_used <= self.pool_size).then_some(new_used)
95            })
96            .map_err(|used| {
97                insufficient_capacity_err(
98                    reservation,
99                    additional,
100                    self.pool_size.saturating_sub(used),
101                )
102            })?;
103        Ok(())
104    }
105
106    fn reserved(&self) -> usize {
107        self.used.load(Ordering::Relaxed)
108    }
109
110    fn memory_limit(&self) -> MemoryLimit {
111        MemoryLimit::Finite(self.pool_size)
112    }
113}
114
115/// A [`MemoryPool`] that prevents spillable reservations from using more than
116/// an even fraction of the available memory sans any unspillable reservations
117/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
118///
119/// This pool works best when you know beforehand the query has
120/// multiple spillable operators that will likely all need to
121/// spill. Sometimes it will cause spills even when there was
122/// sufficient memory (reserved for other operators) to avoid doing
123/// so.
124///
125/// ```text
126///    ┌───────────────────────z──────────────────────z───────────────┐
127///    │                       z                      z               │
128///    │                       z                      z               │
129///    │       Spillable       z       Unspillable    z     Free      │
130///    │        Memory         z        Memory        z    Memory     │
131///    │                       z                      z               │
132///    │                       z                      z               │
133///    └───────────────────────z──────────────────────z───────────────┘
134/// ```
135///
136/// Unspillable memory is allocated in a first-come, first-serve fashion
137#[derive(Debug)]
138pub struct FairSpillPool {
139    /// The total memory limit
140    pool_size: usize,
141
142    state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147    /// The number of consumers that can spill
148    num_spill: usize,
149
150    /// The total amount of memory reserved that can be spilled
151    spillable: usize,
152
153    /// The total amount of memory reserved by consumers that cannot spill
154    unspillable: usize,
155}
156
157impl FairSpillPool {
158    /// Allocate up to `limit` bytes
159    pub fn new(pool_size: usize) -> Self {
160        debug!("Created new FairSpillPool(pool_size={pool_size})");
161        Self {
162            pool_size,
163            state: Mutex::new(FairSpillPoolState {
164                num_spill: 0,
165                spillable: 0,
166                unspillable: 0,
167            }),
168        }
169    }
170}
171
172impl MemoryPool for FairSpillPool {
173    fn register(&self, consumer: &MemoryConsumer) {
174        if consumer.can_spill {
175            self.state.lock().num_spill += 1;
176        }
177    }
178
179    fn unregister(&self, consumer: &MemoryConsumer) {
180        if consumer.can_spill {
181            let mut state = self.state.lock();
182            state.num_spill = state.num_spill.checked_sub(1).unwrap();
183        }
184    }
185
186    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187        let mut state = self.state.lock();
188        match reservation.registration.consumer.can_spill {
189            true => state.spillable += additional,
190            false => state.unspillable += additional,
191        }
192    }
193
194    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195        let mut state = self.state.lock();
196        match reservation.registration.consumer.can_spill {
197            true => state.spillable -= shrink,
198            false => state.unspillable -= shrink,
199        }
200    }
201
202    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203        let mut state = self.state.lock();
204
205        match reservation.registration.consumer.can_spill {
206            true => {
207                // The total amount of memory available to spilling consumers
208                let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210                // No spiller may use more than their fraction of the memory available
211                let available = spill_available
212                    .checked_div(state.num_spill)
213                    .unwrap_or(spill_available);
214
215                if reservation.size + additional > available {
216                    return Err(insufficient_capacity_err(
217                        reservation,
218                        additional,
219                        available,
220                    ));
221                }
222                state.spillable += additional;
223            }
224            false => {
225                let available = self
226                    .pool_size
227                    .saturating_sub(state.unspillable + state.spillable);
228
229                if available < additional {
230                    return Err(insufficient_capacity_err(
231                        reservation,
232                        additional,
233                        available,
234                    ));
235                }
236                state.unspillable += additional;
237            }
238        }
239        Ok(())
240    }
241
242    fn reserved(&self) -> usize {
243        let state = self.state.lock();
244        state.spillable + state.unspillable
245    }
246
247    fn memory_limit(&self) -> MemoryLimit {
248        MemoryLimit::Finite(self.pool_size)
249    }
250}
251
252/// Constructs a resources error based upon the individual [`MemoryReservation`].
253///
254/// The error references the `bytes already allocated` for the reservation,
255/// and not the total within the collective [`MemoryPool`],
256/// nor the total across multiple reservations with the same [`MemoryConsumer`].
257#[inline(always)]
258fn insufficient_capacity_err(
259    reservation: &MemoryReservation,
260    additional: usize,
261    available: usize,
262) -> DataFusionError {
263    resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", 
264    human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269    name: String,
270    can_spill: bool,
271    reserved: AtomicUsize,
272    peak: AtomicUsize,
273}
274
275impl TrackedConsumer {
276    /// Shorthand to return the currently reserved value
277    fn reserved(&self) -> usize {
278        self.reserved.load(Ordering::Relaxed)
279    }
280
281    /// Return the peak value
282    fn peak(&self) -> usize {
283        self.peak.load(Ordering::Relaxed)
284    }
285
286    /// Grows the tracked consumer's reserved size,
287    /// should be called after the pool has successfully performed the grow().
288    fn grow(&self, additional: usize) {
289        self.reserved.fetch_add(additional, Ordering::Relaxed);
290        self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
291    }
292
293    /// Reduce the tracked consumer's reserved size,
294    /// should be called after the pool has successfully performed the shrink().
295    fn shrink(&self, shrink: usize) {
296        self.reserved.fetch_sub(shrink, Ordering::Relaxed);
297    }
298}
299
300/// A [`MemoryPool`] that tracks the consumers that have
301/// reserved memory within the inner memory pool.
302///
303/// By tracking memory reservations more carefully this pool
304/// can provide better error messages on the largest memory users
305/// when memory allocation fails.
306///
307/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
308/// The same consumer can have multiple reservations.
309///
310/// # Automatic Usage via [`RuntimeEnvBuilder`]
311///
312/// The easiest way to use `TrackConsumersPool` is via
313/// [`RuntimeEnvBuilder::with_memory_limit()`].
314///
315/// [`RuntimeEnvBuilder`]: crate::runtime_env::RuntimeEnvBuilder
316/// [`RuntimeEnvBuilder::with_memory_limit()`]: crate::runtime_env::RuntimeEnvBuilder::with_memory_limit
317///
318/// # Usage Examples
319///
320/// For more examples of using `TrackConsumersPool`, see the [memory_pool_tracking.rs] example
321///
322/// [memory_pool_tracking.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_tracking.rs
323/// [memory_pool_execution_plan.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/memory_pool_execution_plan.rs
324#[derive(Debug)]
325pub struct TrackConsumersPool<I> {
326    /// The wrapped memory pool that actually handles reservation logic
327    inner: I,
328    /// The amount of consumers to report(ordered top to bottom by reservation size)
329    top: NonZeroUsize,
330    /// Maps consumer_id --> TrackedConsumer
331    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
332}
333
334impl<I: MemoryPool> TrackConsumersPool<I> {
335    /// Creates a new [`TrackConsumersPool`].
336    ///
337    /// # Arguments
338    /// * `inner` - The underlying memory pool that handles actual memory allocation
339    /// * `top` - The number of top memory consumers to include in error messages
340    ///
341    /// # Note
342    /// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
343    /// instead of creating this pool manually, as it automatically sets up tracking with
344    /// sensible defaults (top 5 consumers).
345    ///
346    /// # Example
347    ///
348    /// ```rust
349    /// use datafusion_execution::memory_pool::{
350    ///     FairSpillPool, GreedyMemoryPool, TrackConsumersPool,
351    /// };
352    /// use std::num::NonZeroUsize;
353    ///
354    /// // Create with a greedy pool backend, reporting top 3 consumers in error messages
355    /// let tracked_greedy = TrackConsumersPool::new(
356    ///     GreedyMemoryPool::new(1024 * 1024), // 1MB limit
357    ///     NonZeroUsize::new(3).unwrap(),
358    /// );
359    ///
360    /// // Create with a fair spill pool backend, reporting top 5 consumers in error messages
361    /// let tracked_fair = TrackConsumersPool::new(
362    ///     FairSpillPool::new(2 * 1024 * 1024), // 2MB limit
363    ///     NonZeroUsize::new(5).unwrap(),
364    /// );
365    /// ```
366    ///
367    /// # Impact on Error Messages
368    ///
369    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
370    /// in the reported [`DataFusionError::ResourcesExhausted`].
371    pub fn new(inner: I, top: NonZeroUsize) -> Self {
372        Self {
373            inner,
374            top,
375            tracked_consumers: Default::default(),
376        }
377    }
378
379    /// Returns a formatted string with the top memory consumers.
380    pub fn report_top(&self, top: usize) -> String {
381        let mut consumers = self
382            .tracked_consumers
383            .lock()
384            .iter()
385            .map(|(consumer_id, tracked_consumer)| {
386                (
387                    (
388                        *consumer_id,
389                        tracked_consumer.name.to_owned(),
390                        tracked_consumer.can_spill,
391                        tracked_consumer.peak(),
392                    ),
393                    tracked_consumer.reserved(),
394                )
395            })
396            .collect::<Vec<_>>();
397        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
398
399        consumers[0..std::cmp::min(top, consumers.len())]
400            .iter()
401            .map(|((id, name, can_spill, peak), size)| {
402                format!(
403                    "  {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
404                    human_readable_size(*size),
405                    human_readable_size(*peak),
406                )
407            })
408            .collect::<Vec<_>>()
409            .join(",\n")
410            + "."
411    }
412}
413
414impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
415    fn register(&self, consumer: &MemoryConsumer) {
416        self.inner.register(consumer);
417
418        let mut guard = self.tracked_consumers.lock();
419        let existing = guard.insert(
420            consumer.id(),
421            TrackedConsumer {
422                name: consumer.name().to_string(),
423                can_spill: consumer.can_spill(),
424                reserved: Default::default(),
425                peak: Default::default(),
426            },
427        );
428
429        debug_assert!(
430            existing.is_none(),
431            "Registered was called twice on the same consumer"
432        );
433    }
434
435    fn unregister(&self, consumer: &MemoryConsumer) {
436        self.inner.unregister(consumer);
437        self.tracked_consumers.lock().remove(&consumer.id());
438    }
439
440    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
441        self.inner.grow(reservation, additional);
442        self.tracked_consumers
443            .lock()
444            .entry(reservation.consumer().id())
445            .and_modify(|tracked_consumer| {
446                tracked_consumer.grow(additional);
447            });
448    }
449
450    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
451        self.inner.shrink(reservation, shrink);
452        self.tracked_consumers
453            .lock()
454            .entry(reservation.consumer().id())
455            .and_modify(|tracked_consumer| {
456                tracked_consumer.shrink(shrink);
457            });
458    }
459
460    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
461        self.inner
462            .try_grow(reservation, additional)
463            .map_err(|e| match e {
464                DataFusionError::ResourcesExhausted(e) => {
465                    // wrap OOM message in top consumers
466                    DataFusionError::ResourcesExhausted(
467                        provide_top_memory_consumers_to_error_msg(
468                            &reservation.consumer().name,
469                            e,
470                            self.report_top(self.top.into()),
471                        ),
472                    )
473                }
474                _ => e,
475            })?;
476
477        self.tracked_consumers
478            .lock()
479            .entry(reservation.consumer().id())
480            .and_modify(|tracked_consumer| {
481                tracked_consumer.grow(additional);
482            });
483        Ok(())
484    }
485
486    fn reserved(&self) -> usize {
487        self.inner.reserved()
488    }
489
490    fn memory_limit(&self) -> MemoryLimit {
491        self.inner.memory_limit()
492    }
493}
494
495fn provide_top_memory_consumers_to_error_msg(
496    consumer_name: &str,
497    error_msg: String,
498    top_consumers: String,
499) -> String {
500    format!("Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
501}
502
503#[cfg(test)]
504mod tests {
505    use super::*;
506    use insta::{allow_duplicates, assert_snapshot, Settings};
507    use std::sync::Arc;
508
509    fn make_settings() -> Settings {
510        let mut settings = Settings::clone_current();
511        settings.add_filter(
512            r"([^\s]+)\#\d+\(can spill: (true|false)\)",
513            "$1#[ID](can spill: $2)",
514        );
515        settings
516    }
517
518    #[test]
519    fn test_fair() {
520        let pool = Arc::new(FairSpillPool::new(100)) as _;
521
522        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
523        // Can grow beyond capacity of pool
524        r1.grow(2000);
525        assert_eq!(pool.reserved(), 2000);
526
527        let mut r2 = MemoryConsumer::new("r2")
528            .with_can_spill(true)
529            .register(&pool);
530        // Can grow beyond capacity of pool
531        r2.grow(2000);
532
533        assert_eq!(pool.reserved(), 4000);
534
535        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
536        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
537
538        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
539        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
540
541        r1.shrink(1990);
542        r2.shrink(2000);
543
544        assert_eq!(pool.reserved(), 10);
545
546        r1.try_grow(10).unwrap();
547        assert_eq!(pool.reserved(), 20);
548
549        // Can grow r2 to 80 as only spilling consumer
550        r2.try_grow(80).unwrap();
551        assert_eq!(pool.reserved(), 100);
552
553        r2.shrink(70);
554
555        assert_eq!(r1.size(), 20);
556        assert_eq!(r2.size(), 10);
557        assert_eq!(pool.reserved(), 30);
558
559        let mut r3 = MemoryConsumer::new("r3")
560            .with_can_spill(true)
561            .register(&pool);
562
563        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
564        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
565
566        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
567        r2.free();
568        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
569        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
570
571        // But dropping r2 does
572        drop(r2);
573        assert_eq!(pool.reserved(), 20);
574        r3.try_grow(80).unwrap();
575
576        assert_eq!(pool.reserved(), 100);
577        r1.free();
578        assert_eq!(pool.reserved(), 80);
579
580        let mut r4 = MemoryConsumer::new("s4").register(&pool);
581        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
582        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
583    }
584
585    #[test]
586    fn test_tracked_consumers_pool() {
587        let setting = make_settings();
588        let _bound = setting.bind_to_scope();
589        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
590            GreedyMemoryPool::new(100),
591            NonZeroUsize::new(3).unwrap(),
592        ));
593
594        // Test: use all the different interfaces to change reservation size
595
596        // set r1=50, using grow and shrink
597        let mut r1 = MemoryConsumer::new("r1").register(&pool);
598        r1.grow(50);
599        r1.grow(20);
600        r1.shrink(20);
601
602        // set r2=15 using try_grow
603        let mut r2 = MemoryConsumer::new("r2").register(&pool);
604        r2.try_grow(15)
605            .expect("should succeed in memory allotment for r2");
606
607        // set r3=20 using try_resize
608        let mut r3 = MemoryConsumer::new("r3").register(&pool);
609        r3.try_resize(25)
610            .expect("should succeed in memory allotment for r3");
611        r3.try_resize(20)
612            .expect("should succeed in memory allotment for r3");
613
614        // set r4=10
615        // this should not be reported in top 3
616        let mut r4 = MemoryConsumer::new("r4").register(&pool);
617        r4.grow(10);
618
619        // Test: reports if new reservation causes error
620        // using the previously set sizes for other consumers
621        let mut r5 = MemoryConsumer::new("r5").register(&pool);
622        let res = r5.try_grow(150);
623        assert!(res.is_err());
624        let error = res.unwrap_err().strip_backtrace();
625        assert_snapshot!(error, @r"
626        Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
627          r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
628          r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
629          r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
630        Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
631        ");
632    }
633
634    #[test]
635    fn test_tracked_consumers_pool_register() {
636        let setting = make_settings();
637        let _bound = setting.bind_to_scope();
638        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
639            GreedyMemoryPool::new(100),
640            NonZeroUsize::new(3).unwrap(),
641        ));
642
643        let same_name = "foo";
644
645        // Test: see error message when no consumers recorded yet
646        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
647        let res = r0.try_grow(150);
648        assert!(res.is_err());
649        let error = res.unwrap_err().strip_backtrace();
650        assert_snapshot!(error, @r"
651        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
652          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
653        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
654        ");
655
656        // API: multiple registrations using the same hashed consumer,
657        // will be recognized *differently* in the TrackConsumersPool.
658
659        r0.grow(10); // make r0=10, pool available=90
660        let new_consumer_same_name = MemoryConsumer::new(same_name);
661        let mut r1 = new_consumer_same_name.register(&pool);
662        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
663        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
664        let res = r1.try_grow(150);
665        assert!(res.is_err());
666        let error = res.unwrap_err().strip_backtrace();
667        assert_snapshot!(error, @r"
668        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
669          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
670          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
671        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
672        ");
673
674        // Test: will accumulate size changes per consumer, not per reservation
675        r1.grow(20);
676
677        let res = r1.try_grow(150);
678        assert!(res.is_err());
679        let error = res.unwrap_err().strip_backtrace();
680        assert_snapshot!(error, @r"
681        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
682          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
683          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
684        Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
685        ");
686
687        // Test: different hashed consumer, (even with the same name),
688        // will be recognized as different in the TrackConsumersPool
689        let consumer_with_same_name_but_different_hash =
690            MemoryConsumer::new(same_name).with_can_spill(true);
691        let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
692        let res = r2.try_grow(150);
693        assert!(res.is_err());
694        let error = res.unwrap_err().strip_backtrace();
695        assert_snapshot!(error, @r"
696        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
697          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
698          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
699          foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
700        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
701        ");
702    }
703
704    #[test]
705    fn test_tracked_consumers_pool_deregister() {
706        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
707            // Baseline: see the 2 memory consumers
708            let setting = make_settings();
709            let _bound = setting.bind_to_scope();
710            let mut r0 = MemoryConsumer::new("r0").register(&pool);
711            r0.grow(10);
712            let r1_consumer = MemoryConsumer::new("r1");
713            let mut r1 = r1_consumer.register(&pool);
714            r1.grow(20);
715
716            let res = r0.try_grow(150);
717            assert!(res.is_err());
718            let error = res.unwrap_err().strip_backtrace();
719            allow_duplicates!(assert_snapshot!(error, @r"
720                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
721                  r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
722                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
723                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
724                "));
725
726            // Test: unregister one
727            // only the remaining one should be listed
728            drop(r1);
729            let res = r0.try_grow(150);
730            assert!(res.is_err());
731            let error = res.unwrap_err().strip_backtrace();
732            allow_duplicates!(assert_snapshot!(error, @r"
733                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
734                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
735                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
736                "));
737
738            // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
739            // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
740            let res = r0.try_grow(150);
741            assert!(res.is_err());
742            let error = res.unwrap_err().strip_backtrace();
743            allow_duplicates!(assert_snapshot!(error, @r"
744                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
745                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
746                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
747                "));
748
749            // Test: the registration needs to free itself (or be dropped),
750            // for the proper error message
751            let res = r0.try_grow(150);
752            assert!(res.is_err());
753            let error = res.unwrap_err().strip_backtrace();
754            allow_duplicates!(assert_snapshot!(error, @r"
755                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
756                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
757                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
758                "));
759        }
760
761        let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
762            FairSpillPool::new(100),
763            NonZeroUsize::new(3).unwrap(),
764        ));
765        test_per_pool_type(tracked_spill_pool);
766
767        let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
768            GreedyMemoryPool::new(100),
769            NonZeroUsize::new(3).unwrap(),
770        ));
771        test_per_pool_type(tracked_greedy_pool);
772    }
773
774    #[test]
775    fn test_tracked_consumers_pool_use_beyond_errors() {
776        let setting = make_settings();
777        let _bound = setting.bind_to_scope();
778        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
779            Arc::new(TrackConsumersPool::new(
780                GreedyMemoryPool::new(100),
781                NonZeroUsize::new(3).unwrap(),
782            ));
783        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
784            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
785            .unwrap();
786        // set r1=20
787        let mut r1 = MemoryConsumer::new("r1").register(&pool);
788        r1.grow(20);
789        // set r2=15
790        let mut r2 = MemoryConsumer::new("r2").register(&pool);
791        r2.grow(15);
792        // set r3=45
793        let mut r3 = MemoryConsumer::new("r3").register(&pool);
794        r3.grow(45);
795
796        let downcasted = upcasted
797            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
798            .unwrap();
799
800        // Test: can get runtime metrics, even without an error thrown
801        let res = downcasted.report_top(2);
802        assert_snapshot!(res, @r"
803        r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
804        r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
805        ");
806    }
807}