datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{
19    human_readable_size, MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{resources_datafusion_err, DataFusionError, Result};
23use log::debug;
24use parking_lot::Mutex;
25use std::{
26    num::NonZeroUsize,
27    sync::atomic::{AtomicUsize, Ordering},
28};
29
30/// A [`MemoryPool`] that enforces no limit
31#[derive(Debug, Default)]
32pub struct UnboundedMemoryPool {
33    used: AtomicUsize,
34}
35
36impl MemoryPool for UnboundedMemoryPool {
37    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
38        self.used.fetch_add(additional, Ordering::Relaxed);
39    }
40
41    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
42        self.used.fetch_sub(shrink, Ordering::Relaxed);
43    }
44
45    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
46        self.grow(reservation, additional);
47        Ok(())
48    }
49
50    fn reserved(&self) -> usize {
51        self.used.load(Ordering::Relaxed)
52    }
53
54    fn memory_limit(&self) -> MemoryLimit {
55        MemoryLimit::Infinite
56    }
57}
58
59/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
60///
61/// This pool works well for queries that do not need to spill or have
62/// a single spillable operator. See [`FairSpillPool`] if there are
63/// multiple spillable operators that all will spill.
64#[derive(Debug)]
65pub struct GreedyMemoryPool {
66    pool_size: usize,
67    used: AtomicUsize,
68}
69
70impl GreedyMemoryPool {
71    /// Create a new pool that can allocate up to `pool_size` bytes
72    pub fn new(pool_size: usize) -> Self {
73        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
74        Self {
75            pool_size,
76            used: AtomicUsize::new(0),
77        }
78    }
79}
80
81impl MemoryPool for GreedyMemoryPool {
82    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
83        self.used.fetch_add(additional, Ordering::Relaxed);
84    }
85
86    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
87        self.used.fetch_sub(shrink, Ordering::Relaxed);
88    }
89
90    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
91        self.used
92            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
93                let new_used = used + additional;
94                (new_used <= self.pool_size).then_some(new_used)
95            })
96            .map_err(|used| {
97                insufficient_capacity_err(
98                    reservation,
99                    additional,
100                    self.pool_size.saturating_sub(used),
101                )
102            })?;
103        Ok(())
104    }
105
106    fn reserved(&self) -> usize {
107        self.used.load(Ordering::Relaxed)
108    }
109
110    fn memory_limit(&self) -> MemoryLimit {
111        MemoryLimit::Finite(self.pool_size)
112    }
113}
114
115/// A [`MemoryPool`] that prevents spillable reservations from using more than
116/// an even fraction of the available memory sans any unspillable reservations
117/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
118///
119/// This pool works best when you know beforehand the query has
120/// multiple spillable operators that will likely all need to
121/// spill. Sometimes it will cause spills even when there was
122/// sufficient memory (reserved for other operators) to avoid doing
123/// so.
124///
125/// ```text
126///    ┌───────────────────────z──────────────────────z───────────────┐
127///    │                       z                      z               │
128///    │                       z                      z               │
129///    │       Spillable       z       Unspillable    z     Free      │
130///    │        Memory         z        Memory        z    Memory     │
131///    │                       z                      z               │
132///    │                       z                      z               │
133///    └───────────────────────z──────────────────────z───────────────┘
134/// ```
135///
136/// Unspillable memory is allocated in a first-come, first-serve fashion
137#[derive(Debug)]
138pub struct FairSpillPool {
139    /// The total memory limit
140    pool_size: usize,
141
142    state: Mutex<FairSpillPoolState>,
143}
144
145#[derive(Debug)]
146struct FairSpillPoolState {
147    /// The number of consumers that can spill
148    num_spill: usize,
149
150    /// The total amount of memory reserved that can be spilled
151    spillable: usize,
152
153    /// The total amount of memory reserved by consumers that cannot spill
154    unspillable: usize,
155}
156
157impl FairSpillPool {
158    /// Allocate up to `limit` bytes
159    pub fn new(pool_size: usize) -> Self {
160        debug!("Created new FairSpillPool(pool_size={pool_size})");
161        Self {
162            pool_size,
163            state: Mutex::new(FairSpillPoolState {
164                num_spill: 0,
165                spillable: 0,
166                unspillable: 0,
167            }),
168        }
169    }
170}
171
172impl MemoryPool for FairSpillPool {
173    fn register(&self, consumer: &MemoryConsumer) {
174        if consumer.can_spill {
175            self.state.lock().num_spill += 1;
176        }
177    }
178
179    fn unregister(&self, consumer: &MemoryConsumer) {
180        if consumer.can_spill {
181            let mut state = self.state.lock();
182            state.num_spill = state.num_spill.checked_sub(1).unwrap();
183        }
184    }
185
186    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
187        let mut state = self.state.lock();
188        match reservation.registration.consumer.can_spill {
189            true => state.spillable += additional,
190            false => state.unspillable += additional,
191        }
192    }
193
194    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
195        let mut state = self.state.lock();
196        match reservation.registration.consumer.can_spill {
197            true => state.spillable -= shrink,
198            false => state.unspillable -= shrink,
199        }
200    }
201
202    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
203        let mut state = self.state.lock();
204
205        match reservation.registration.consumer.can_spill {
206            true => {
207                // The total amount of memory available to spilling consumers
208                let spill_available = self.pool_size.saturating_sub(state.unspillable);
209
210                // No spiller may use more than their fraction of the memory available
211                let available = spill_available
212                    .checked_div(state.num_spill)
213                    .unwrap_or(spill_available);
214
215                if reservation.size + additional > available {
216                    return Err(insufficient_capacity_err(
217                        reservation,
218                        additional,
219                        available,
220                    ));
221                }
222                state.spillable += additional;
223            }
224            false => {
225                let available = self
226                    .pool_size
227                    .saturating_sub(state.unspillable + state.spillable);
228
229                if available < additional {
230                    return Err(insufficient_capacity_err(
231                        reservation,
232                        additional,
233                        available,
234                    ));
235                }
236                state.unspillable += additional;
237            }
238        }
239        Ok(())
240    }
241
242    fn reserved(&self) -> usize {
243        let state = self.state.lock();
244        state.spillable + state.unspillable
245    }
246
247    fn memory_limit(&self) -> MemoryLimit {
248        MemoryLimit::Finite(self.pool_size)
249    }
250}
251
252/// Constructs a resources error based upon the individual [`MemoryReservation`].
253///
254/// The error references the `bytes already allocated` for the reservation,
255/// and not the total within the collective [`MemoryPool`],
256/// nor the total across multiple reservations with the same [`MemoryConsumer`].
257#[inline(always)]
258fn insufficient_capacity_err(
259    reservation: &MemoryReservation,
260    additional: usize,
261    available: usize,
262) -> DataFusionError {
263    resources_datafusion_err!("Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total pool", 
264    human_readable_size(additional), reservation.registration.consumer.name, human_readable_size(reservation.size), human_readable_size(available))
265}
266
267#[derive(Debug)]
268struct TrackedConsumer {
269    name: String,
270    can_spill: bool,
271    reserved: AtomicUsize,
272}
273
274impl TrackedConsumer {
275    /// Shorthand to return the currently reserved value
276    fn reserved(&self) -> usize {
277        self.reserved.load(Ordering::Relaxed)
278    }
279
280    /// Grows the tracked consumer's reserved size,
281    /// should be called after the pool has successfully performed the grow().
282    fn grow(&self, additional: usize) {
283        self.reserved.fetch_add(additional, Ordering::Relaxed);
284    }
285
286    /// Reduce the tracked consumer's reserved size,
287    /// should be called after the pool has successfully performed the shrink().
288    fn shrink(&self, shrink: usize) {
289        self.reserved.fetch_sub(shrink, Ordering::Relaxed);
290    }
291}
292
293/// A [`MemoryPool`] that tracks the consumers that have
294/// reserved memory within the inner memory pool.
295///
296/// By tracking memory reservations more carefully this pool
297/// can provide better error messages on the largest memory users
298///
299/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
300/// The same consumer can have multiple reservations.
301#[derive(Debug)]
302pub struct TrackConsumersPool<I> {
303    /// The wrapped memory pool that actually handles reservation logic
304    inner: I,
305    /// The amount of consumers to report(ordered top to bottom by reservation size)
306    top: NonZeroUsize,
307    /// Maps consumer_id --> TrackedConsumer
308    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
309}
310
311impl<I: MemoryPool> TrackConsumersPool<I> {
312    /// Creates a new [`TrackConsumersPool`].
313    ///
314    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
315    /// in the reported [`DataFusionError::ResourcesExhausted`].
316    pub fn new(inner: I, top: NonZeroUsize) -> Self {
317        Self {
318            inner,
319            top,
320            tracked_consumers: Default::default(),
321        }
322    }
323
324    /// The top consumers in a report string.
325    pub fn report_top(&self, top: usize) -> String {
326        let mut consumers = self
327            .tracked_consumers
328            .lock()
329            .iter()
330            .map(|(consumer_id, tracked_consumer)| {
331                (
332                    (
333                        *consumer_id,
334                        tracked_consumer.name.to_owned(),
335                        tracked_consumer.can_spill,
336                    ),
337                    tracked_consumer.reserved(),
338                )
339            })
340            .collect::<Vec<_>>();
341        consumers.sort_by(|a, b| b.1.cmp(&a.1)); // inverse ordering
342
343        consumers[0..std::cmp::min(top, consumers.len())]
344            .iter()
345            .map(|((id, name, can_spill), size)| {
346                format!(
347                    "  {name}#{id}(can spill: {can_spill}) consumed {}",
348                    human_readable_size(*size)
349                )
350            })
351            .collect::<Vec<_>>()
352            .join(",\n")
353            + "."
354    }
355}
356
357impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
358    fn register(&self, consumer: &MemoryConsumer) {
359        self.inner.register(consumer);
360
361        let mut guard = self.tracked_consumers.lock();
362        let existing = guard.insert(
363            consumer.id(),
364            TrackedConsumer {
365                name: consumer.name().to_string(),
366                can_spill: consumer.can_spill(),
367                reserved: Default::default(),
368            },
369        );
370
371        debug_assert!(
372            existing.is_none(),
373            "Registered was called twice on the same consumer"
374        );
375    }
376
377    fn unregister(&self, consumer: &MemoryConsumer) {
378        self.inner.unregister(consumer);
379        self.tracked_consumers.lock().remove(&consumer.id());
380    }
381
382    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
383        self.inner.grow(reservation, additional);
384        self.tracked_consumers
385            .lock()
386            .entry(reservation.consumer().id())
387            .and_modify(|tracked_consumer| {
388                tracked_consumer.grow(additional);
389            });
390    }
391
392    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
393        self.inner.shrink(reservation, shrink);
394        self.tracked_consumers
395            .lock()
396            .entry(reservation.consumer().id())
397            .and_modify(|tracked_consumer| {
398                tracked_consumer.shrink(shrink);
399            });
400    }
401
402    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
403        self.inner
404            .try_grow(reservation, additional)
405            .map_err(|e| match e {
406                DataFusionError::ResourcesExhausted(e) => {
407                    // wrap OOM message in top consumers
408                    DataFusionError::ResourcesExhausted(
409                        provide_top_memory_consumers_to_error_msg(
410                            e,
411                            self.report_top(self.top.into()),
412                        ),
413                    )
414                }
415                _ => e,
416            })?;
417
418        self.tracked_consumers
419            .lock()
420            .entry(reservation.consumer().id())
421            .and_modify(|tracked_consumer| {
422                tracked_consumer.grow(additional);
423            });
424        Ok(())
425    }
426
427    fn reserved(&self) -> usize {
428        self.inner.reserved()
429    }
430
431    fn memory_limit(&self) -> MemoryLimit {
432        self.inner.memory_limit()
433    }
434}
435
436fn provide_top_memory_consumers_to_error_msg(
437    error_msg: String,
438    top_consumers: String,
439) -> String {
440    format!("Additional allocation failed with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}")
441}
442
443#[cfg(test)]
444mod tests {
445    use super::*;
446    use insta::{allow_duplicates, assert_snapshot, Settings};
447    use std::sync::Arc;
448
449    fn make_settings() -> Settings {
450        let mut settings = Settings::clone_current();
451        settings.add_filter(
452            r"([^\s]+)\#\d+\(can spill: (true|false)\)",
453            "$1#[ID](can spill: $2)",
454        );
455        settings
456    }
457
458    #[test]
459    fn test_fair() {
460        let pool = Arc::new(FairSpillPool::new(100)) as _;
461
462        let mut r1 = MemoryConsumer::new("unspillable").register(&pool);
463        // Can grow beyond capacity of pool
464        r1.grow(2000);
465        assert_eq!(pool.reserved(), 2000);
466
467        let mut r2 = MemoryConsumer::new("r2")
468            .with_can_spill(true)
469            .register(&pool);
470        // Can grow beyond capacity of pool
471        r2.grow(2000);
472
473        assert_eq!(pool.reserved(), 4000);
474
475        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
476        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
477
478        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
479        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total pool");
480
481        r1.shrink(1990);
482        r2.shrink(2000);
483
484        assert_eq!(pool.reserved(), 10);
485
486        r1.try_grow(10).unwrap();
487        assert_eq!(pool.reserved(), 20);
488
489        // Can grow r2 to 80 as only spilling consumer
490        r2.try_grow(80).unwrap();
491        assert_eq!(pool.reserved(), 100);
492
493        r2.shrink(70);
494
495        assert_eq!(r1.size(), 20);
496        assert_eq!(r2.size(), 10);
497        assert_eq!(pool.reserved(), 30);
498
499        let mut r3 = MemoryConsumer::new("r3")
500            .with_can_spill(true)
501            .register(&pool);
502
503        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
504        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
505
506        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
507        r2.free();
508        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
509        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total pool");
510
511        // But dropping r2 does
512        drop(r2);
513        assert_eq!(pool.reserved(), 20);
514        r3.try_grow(80).unwrap();
515
516        assert_eq!(pool.reserved(), 100);
517        r1.free();
518        assert_eq!(pool.reserved(), 80);
519
520        let mut r4 = MemoryConsumer::new("s4").register(&pool);
521        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
522        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total pool");
523    }
524
525    #[test]
526    fn test_tracked_consumers_pool() {
527        let setting = make_settings();
528        let _bound = setting.bind_to_scope();
529        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
530            GreedyMemoryPool::new(100),
531            NonZeroUsize::new(3).unwrap(),
532        ));
533
534        // Test: use all the different interfaces to change reservation size
535
536        // set r1=50, using grow and shrink
537        let mut r1 = MemoryConsumer::new("r1").register(&pool);
538        r1.grow(70);
539        r1.shrink(20);
540
541        // set r2=15 using try_grow
542        let mut r2 = MemoryConsumer::new("r2").register(&pool);
543        r2.try_grow(15)
544            .expect("should succeed in memory allotment for r2");
545
546        // set r3=20 using try_resize
547        let mut r3 = MemoryConsumer::new("r3").register(&pool);
548        r3.try_resize(25)
549            .expect("should succeed in memory allotment for r3");
550        r3.try_resize(20)
551            .expect("should succeed in memory allotment for r3");
552
553        // set r4=10
554        // this should not be reported in top 3
555        let mut r4 = MemoryConsumer::new("r4").register(&pool);
556        r4.grow(10);
557
558        // Test: reports if new reservation causes error
559        // using the previously set sizes for other consumers
560        let mut r5 = MemoryConsumer::new("r5").register(&pool);
561        let res = r5.try_grow(150);
562        assert!(res.is_err());
563        let error = res.unwrap_err().strip_backtrace();
564        assert_snapshot!(error, @r"
565        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
566          r1#[ID](can spill: false) consumed 50.0 B,
567          r3#[ID](can spill: false) consumed 20.0 B,
568          r2#[ID](can spill: false) consumed 15.0 B.
569        Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total pool
570        ");
571    }
572
573    #[test]
574    fn test_tracked_consumers_pool_register() {
575        let setting = make_settings();
576        let _bound = setting.bind_to_scope();
577        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
578            GreedyMemoryPool::new(100),
579            NonZeroUsize::new(3).unwrap(),
580        ));
581
582        let same_name = "foo";
583
584        // Test: see error message when no consumers recorded yet
585        let mut r0 = MemoryConsumer::new(same_name).register(&pool);
586        let res = r0.try_grow(150);
587        assert!(res.is_err());
588        let error = res.unwrap_err().strip_backtrace();
589        assert_snapshot!(error, @r"
590        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
591          foo#[ID](can spill: false) consumed 0.0 B.
592        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total pool
593        ");
594
595        // API: multiple registrations using the same hashed consumer,
596        // will be recognized *differently* in the TrackConsumersPool.
597
598        r0.grow(10); // make r0=10, pool available=90
599        let new_consumer_same_name = MemoryConsumer::new(same_name);
600        let mut r1 = new_consumer_same_name.register(&pool);
601        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
602        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
603        let res = r1.try_grow(150);
604        assert!(res.is_err());
605        let error = res.unwrap_err().strip_backtrace();
606        assert_snapshot!(error, @r"
607        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
608          foo#[ID](can spill: false) consumed 10.0 B,
609          foo#[ID](can spill: false) consumed 0.0 B.
610        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total pool
611        ");
612
613        // Test: will accumulate size changes per consumer, not per reservation
614        r1.grow(20);
615
616        let res = r1.try_grow(150);
617        assert!(res.is_err());
618        let error = res.unwrap_err().strip_backtrace();
619        assert_snapshot!(error, @r"
620        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
621          foo#[ID](can spill: false) consumed 20.0 B,
622          foo#[ID](can spill: false) consumed 10.0 B.
623        Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total pool
624        ");
625
626        // Test: different hashed consumer, (even with the same name),
627        // will be recognized as different in the TrackConsumersPool
628        let consumer_with_same_name_but_different_hash =
629            MemoryConsumer::new(same_name).with_can_spill(true);
630        let mut r2 = consumer_with_same_name_but_different_hash.register(&pool);
631        let res = r2.try_grow(150);
632        assert!(res.is_err());
633        let error = res.unwrap_err().strip_backtrace();
634        assert_snapshot!(error, @r"
635        Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
636          foo#[ID](can spill: false) consumed 20.0 B,
637          foo#[ID](can spill: false) consumed 10.0 B,
638          foo#[ID](can spill: true) consumed 0.0 B.
639        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total pool
640        ");
641    }
642
643    #[test]
644    fn test_tracked_consumers_pool_deregister() {
645        fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
646            // Baseline: see the 2 memory consumers
647            let setting = make_settings();
648            let _bound = setting.bind_to_scope();
649            let mut r0 = MemoryConsumer::new("r0").register(&pool);
650            r0.grow(10);
651            let r1_consumer = MemoryConsumer::new("r1");
652            let mut r1 = r1_consumer.register(&pool);
653            r1.grow(20);
654
655            let res = r0.try_grow(150);
656            assert!(res.is_err());
657            let error = res.unwrap_err().strip_backtrace();
658            allow_duplicates!(assert_snapshot!(error, @r"
659                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
660                  r1#[ID](can spill: false) consumed 20.0 B,
661                  r0#[ID](can spill: false) consumed 10.0 B.
662                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total pool
663                "));
664
665            // Test: unregister one
666            // only the remaining one should be listed
667            drop(r1);
668            let res = r0.try_grow(150);
669            assert!(res.is_err());
670            let error = res.unwrap_err().strip_backtrace();
671            allow_duplicates!(assert_snapshot!(error, @r"
672                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
673                  r0#[ID](can spill: false) consumed 10.0 B.
674                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
675                "));
676
677            // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
678            // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
679            let res = r0.try_grow(150);
680            assert!(res.is_err());
681            let error = res.unwrap_err().strip_backtrace();
682            allow_duplicates!(assert_snapshot!(error, @r"
683                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
684                  r0#[ID](can spill: false) consumed 10.0 B.
685                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
686                "));
687
688            // Test: the registration needs to free itself (or be dropped),
689            // for the proper error message
690            let res = r0.try_grow(150);
691            assert!(res.is_err());
692            let error = res.unwrap_err().strip_backtrace();
693            allow_duplicates!(assert_snapshot!(error, @r"
694                Resources exhausted: Additional allocation failed with top memory consumers (across reservations) as:
695                  r0#[ID](can spill: false) consumed 10.0 B.
696                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total pool
697                "));
698        }
699
700        let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
701            FairSpillPool::new(100),
702            NonZeroUsize::new(3).unwrap(),
703        ));
704        test_per_pool_type(tracked_spill_pool);
705
706        let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
707            GreedyMemoryPool::new(100),
708            NonZeroUsize::new(3).unwrap(),
709        ));
710        test_per_pool_type(tracked_greedy_pool);
711    }
712
713    #[test]
714    fn test_tracked_consumers_pool_use_beyond_errors() {
715        let setting = make_settings();
716        let _bound = setting.bind_to_scope();
717        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
718            Arc::new(TrackConsumersPool::new(
719                GreedyMemoryPool::new(100),
720                NonZeroUsize::new(3).unwrap(),
721            ));
722        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
723            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
724            .unwrap();
725        // set r1=20
726        let mut r1 = MemoryConsumer::new("r1").register(&pool);
727        r1.grow(20);
728        // set r2=15
729        let mut r2 = MemoryConsumer::new("r2").register(&pool);
730        r2.grow(15);
731        // set r3=45
732        let mut r3 = MemoryConsumer::new("r3").register(&pool);
733        r3.grow(45);
734
735        let downcasted = upcasted
736            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
737            .unwrap();
738
739        // Test: can get runtime metrics, even without an error thrown
740        let res = downcasted.report_top(2);
741        assert_snapshot!(res, @r"
742        r3#[ID](can spill: false) consumed 45.0 B,
743        r1#[ID](can spill: false) consumed 20.0 B.
744        ");
745    }
746}