Skip to main content

datafusion_execution/memory_pool/
pool.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::memory_pool::{
19    MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
20};
21use datafusion_common::HashMap;
22use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
23use log::debug;
24use parking_lot::Mutex;
25use std::fmt::{Display, Formatter};
26use std::{
27    num::NonZeroUsize,
28    sync::atomic::{AtomicUsize, Ordering},
29};
30
31/// A [`MemoryPool`] that enforces no limit
32#[derive(Debug, Default)]
33pub struct UnboundedMemoryPool {
34    used: AtomicUsize,
35}
36
37impl MemoryPool for UnboundedMemoryPool {
38    fn name(&self) -> &str {
39        "unbounded"
40    }
41
42    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
43        self.used.fetch_add(additional, Ordering::Relaxed);
44    }
45
46    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
47        self.used.fetch_sub(shrink, Ordering::Relaxed);
48    }
49
50    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
51        self.grow(reservation, additional);
52        Ok(())
53    }
54
55    fn reserved(&self) -> usize {
56        self.used.load(Ordering::Relaxed)
57    }
58
59    fn memory_limit(&self) -> MemoryLimit {
60        MemoryLimit::Infinite
61    }
62}
63
64impl Display for UnboundedMemoryPool {
65    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
66        let used = self.used.load(Ordering::Relaxed);
67        write!(f, "{}(used: {})", &self.name(), human_readable_size(used))
68    }
69}
70
71/// A [`MemoryPool`] that implements a greedy first-come first-serve limit.
72///
73/// This pool works well for queries that do not need to spill or have
74/// a single spillable operator. See [`FairSpillPool`] if there are
75/// multiple spillable operators that all will spill.
76#[derive(Debug)]
77pub struct GreedyMemoryPool {
78    pool_size: usize,
79    used: AtomicUsize,
80}
81
82impl GreedyMemoryPool {
83    /// Create a new pool that can allocate up to `pool_size` bytes
84    pub fn new(pool_size: usize) -> Self {
85        debug!("Created new GreedyMemoryPool(pool_size={pool_size})");
86        Self {
87            pool_size,
88            used: AtomicUsize::new(0),
89        }
90    }
91}
92
93impl MemoryPool for GreedyMemoryPool {
94    fn name(&self) -> &str {
95        "greedy"
96    }
97
98    fn grow(&self, _reservation: &MemoryReservation, additional: usize) {
99        self.used.fetch_add(additional, Ordering::Relaxed);
100    }
101
102    fn shrink(&self, _reservation: &MemoryReservation, shrink: usize) {
103        self.used.fetch_sub(shrink, Ordering::Relaxed);
104    }
105
106    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
107        self.used
108            .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |used| {
109                let new_used = used + additional;
110                (new_used <= self.pool_size).then_some(new_used)
111            })
112            .map_err(|used| {
113                insufficient_capacity_err(
114                    reservation,
115                    additional,
116                    self.pool_size.saturating_sub(used),
117                    self,
118                )
119            })?;
120        Ok(())
121    }
122
123    fn reserved(&self) -> usize {
124        self.used.load(Ordering::Relaxed)
125    }
126
127    fn memory_limit(&self) -> MemoryLimit {
128        MemoryLimit::Finite(self.pool_size)
129    }
130}
131
132impl Display for GreedyMemoryPool {
133    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
134        let used = self.used.load(Ordering::Relaxed);
135        write!(
136            f,
137            "{}(used: {}, pool_size: {})",
138            &self.name(),
139            human_readable_size(used),
140            human_readable_size(self.pool_size)
141        )
142    }
143}
144
145/// A [`MemoryPool`] that prevents spillable reservations from using more than
146/// an even fraction of the available memory sans any unspillable reservations
147/// (i.e. `(pool_size - unspillable_memory) / num_spillable_reservations`)
148///
149/// This pool works best when you know beforehand the query has
150/// multiple spillable operators that will likely all need to
151/// spill. Sometimes it will cause spills even when there was
152/// sufficient memory (reserved for other operators) to avoid doing
153/// so.
154///
155/// ```text
156///    ┌───────────────────────z──────────────────────z───────────────┐
157///    │                       z                      z               │
158///    │                       z                      z               │
159///    │       Spillable       z       Unspillable    z     Free      │
160///    │        Memory         z        Memory        z    Memory     │
161///    │                       z                      z               │
162///    │                       z                      z               │
163///    └───────────────────────z──────────────────────z───────────────┘
164/// ```
165///
166/// Unspillable memory is allocated in a first-come, first-serve fashion
167#[derive(Debug)]
168pub struct FairSpillPool {
169    /// The total memory limit
170    pool_size: usize,
171
172    state: Mutex<FairSpillPoolState>,
173}
174
175#[derive(Debug)]
176struct FairSpillPoolState {
177    /// The number of consumers that can spill
178    num_spill: usize,
179
180    /// The total amount of memory reserved that can be spilled
181    spillable: usize,
182
183    /// The total amount of memory reserved by consumers that cannot spill
184    unspillable: usize,
185}
186
187impl FairSpillPool {
188    /// Allocate up to `limit` bytes
189    pub fn new(pool_size: usize) -> Self {
190        debug!("Created new FairSpillPool(pool_size={pool_size})");
191        Self {
192            pool_size,
193            state: Mutex::new(FairSpillPoolState {
194                num_spill: 0,
195                spillable: 0,
196                unspillable: 0,
197            }),
198        }
199    }
200}
201
202impl MemoryPool for FairSpillPool {
203    fn name(&self) -> &str {
204        "fair"
205    }
206
207    fn register(&self, consumer: &MemoryConsumer) {
208        if consumer.can_spill {
209            self.state.lock().num_spill += 1;
210        }
211    }
212
213    fn unregister(&self, consumer: &MemoryConsumer) {
214        if consumer.can_spill {
215            let mut state = self.state.lock();
216            state.num_spill = state.num_spill.checked_sub(1).unwrap();
217        }
218    }
219
220    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
221        let mut state = self.state.lock();
222        match reservation.registration.consumer.can_spill {
223            true => state.spillable += additional,
224            false => state.unspillable += additional,
225        }
226    }
227
228    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
229        let mut state = self.state.lock();
230        match reservation.registration.consumer.can_spill {
231            true => state.spillable -= shrink,
232            false => state.unspillable -= shrink,
233        }
234    }
235
236    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
237        let mut state = self.state.lock();
238
239        match reservation.registration.consumer.can_spill {
240            true => {
241                // The total amount of memory available to spilling consumers
242                let spill_available = self.pool_size.saturating_sub(state.unspillable);
243
244                // No spiller may use more than their fraction of the memory available
245                let available = spill_available
246                    .checked_div(state.num_spill)
247                    .unwrap_or(spill_available);
248
249                if reservation.size() + additional > available {
250                    return Err(insufficient_capacity_err(
251                        reservation,
252                        additional,
253                        available,
254                        self,
255                    ));
256                }
257                state.spillable += additional;
258            }
259            false => {
260                let available = self
261                    .pool_size
262                    .saturating_sub(state.unspillable + state.spillable);
263
264                if available < additional {
265                    return Err(insufficient_capacity_err(
266                        reservation,
267                        additional,
268                        available,
269                        self,
270                    ));
271                }
272                state.unspillable += additional;
273            }
274        }
275        Ok(())
276    }
277
278    fn reserved(&self) -> usize {
279        let state = self.state.lock();
280        state.spillable + state.unspillable
281    }
282
283    fn memory_limit(&self) -> MemoryLimit {
284        MemoryLimit::Finite(self.pool_size)
285    }
286}
287
288impl Display for FairSpillPool {
289    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
290        write!(
291            f,
292            "{}(pool_size: {})",
293            &self.name(),
294            human_readable_size(self.pool_size),
295        )
296    }
297}
298
299/// Constructs a resources error based upon the individual [`MemoryReservation`].
300///
301/// The error references the `bytes already allocated` for the reservation,
302/// and not the total within the collective [`MemoryPool`],
303/// nor the total across multiple reservations with the same [`MemoryConsumer`].
304#[inline(always)]
305fn insufficient_capacity_err(
306    reservation: &MemoryReservation,
307    additional: usize,
308    available: usize,
309    pool: &impl MemoryPool,
310) -> DataFusionError {
311    resources_datafusion_err!(
312        "Failed to allocate additional {} for {} with {} already allocated for this reservation - {} remain available for the total memory pool: {}",
313        human_readable_size(additional),
314        reservation.registration.consumer.name,
315        human_readable_size(reservation.size()),
316        human_readable_size(available),
317        pool
318    )
319}
320
321#[derive(Debug)]
322struct TrackedConsumer {
323    name: String,
324    can_spill: bool,
325    reserved: AtomicUsize,
326    peak: AtomicUsize,
327}
328
329impl TrackedConsumer {
330    /// Shorthand to return the currently reserved value
331    fn reserved(&self) -> usize {
332        self.reserved.load(Ordering::Relaxed)
333    }
334
335    /// Return the peak value
336    fn peak(&self) -> usize {
337        self.peak.load(Ordering::Relaxed)
338    }
339
340    /// Grows the tracked consumer's reserved size,
341    /// should be called after the pool has successfully performed the grow().
342    fn grow(&self, additional: usize) {
343        self.reserved.fetch_add(additional, Ordering::Relaxed);
344        self.peak.fetch_max(self.reserved(), Ordering::Relaxed);
345    }
346
347    /// Reduce the tracked consumer's reserved size,
348    /// should be called after the pool has successfully performed the shrink().
349    fn shrink(&self, shrink: usize) {
350        self.reserved.fetch_sub(shrink, Ordering::Relaxed);
351    }
352}
353
354/// A point-in-time snapshot of a tracked memory consumer's state.
355///
356/// Returned by [`TrackConsumersPool::metrics()`].
357#[derive(Debug, Clone)]
358pub struct MemoryConsumerMetrics {
359    /// The name of the memory consumer
360    pub name: String,
361    /// Whether this consumer can spill to disk
362    pub can_spill: bool,
363    /// The number of bytes currently reserved by this consumer
364    pub reserved: usize,
365    /// The peak number of bytes reserved by this consumer
366    pub peak: usize,
367}
368
369impl From<&TrackedConsumer> for MemoryConsumerMetrics {
370    fn from(tracked: &TrackedConsumer) -> Self {
371        Self {
372            name: tracked.name.clone(),
373            can_spill: tracked.can_spill,
374            reserved: tracked.reserved(),
375            peak: tracked.peak(),
376        }
377    }
378}
379
380/// A [`MemoryPool`] that tracks the consumers that have
381/// reserved memory within the inner memory pool.
382///
383/// By tracking memory reservations more carefully this pool
384/// can provide better error messages on the largest memory users
385/// when memory allocation fails.
386///
387/// Tracking is per hashed [`MemoryConsumer`], not per [`MemoryReservation`].
388/// The same consumer can have multiple reservations.
389///
390/// # Automatic Usage via [`RuntimeEnvBuilder`]
391///
392/// The easiest way to use `TrackConsumersPool` is via
393/// [`RuntimeEnvBuilder::with_memory_limit()`].
394///
395/// [`RuntimeEnvBuilder`]: crate::runtime_env::RuntimeEnvBuilder
396/// [`RuntimeEnvBuilder::with_memory_limit()`]: crate::runtime_env::RuntimeEnvBuilder::with_memory_limit
397///
398/// # Usage Examples
399///
400/// For more examples of using `TrackConsumersPool`, see the [memory_pool_tracking.rs] example
401///
402/// [memory_pool_tracking.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_tracking.rs
403/// [memory_pool_execution_plan.rs]: https://github.com/apache/datafusion/blob/main/datafusion-examples/examples/execution_monitoring/memory_pool_execution_plan.rs
404#[derive(Debug)]
405pub struct TrackConsumersPool<I> {
406    /// The wrapped memory pool that actually handles reservation logic
407    inner: I,
408    /// The amount of consumers to report(ordered top to bottom by reservation size)
409    top: NonZeroUsize,
410    /// Maps consumer_id --> TrackedConsumer
411    tracked_consumers: Mutex<HashMap<usize, TrackedConsumer>>,
412}
413
414impl<I: MemoryPool> Display for TrackConsumersPool<I> {
415    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
416        write!(
417            f,
418            "{}(inner_pool: {}, num_of_top_consumers: {})",
419            &self.name(),
420            &self.inner,
421            &self.top,
422        )
423    }
424}
425
426impl<I: MemoryPool> TrackConsumersPool<I> {
427    /// Creates a new [`TrackConsumersPool`].
428    ///
429    /// # Arguments
430    /// * `inner` - The underlying memory pool that handles actual memory allocation
431    /// * `top` - The number of top memory consumers to include in error messages
432    ///
433    /// # Note
434    /// In most cases, you should use [`RuntimeEnvBuilder::with_memory_limit()`](crate::runtime_env::RuntimeEnvBuilder::with_memory_limit)
435    /// instead of creating this pool manually, as it automatically sets up tracking with
436    /// sensible defaults (top 5 consumers).
437    ///
438    /// # Example
439    ///
440    /// ```rust
441    /// use datafusion_execution::memory_pool::{
442    ///     FairSpillPool, GreedyMemoryPool, TrackConsumersPool,
443    /// };
444    /// use std::num::NonZeroUsize;
445    ///
446    /// // Create with a greedy pool backend, reporting top 3 consumers in error messages
447    /// let tracked_greedy = TrackConsumersPool::new(
448    ///     GreedyMemoryPool::new(1024 * 1024), // 1MB limit
449    ///     NonZeroUsize::new(3).unwrap(),
450    /// );
451    ///
452    /// // Create with a fair spill pool backend, reporting top 5 consumers in error messages
453    /// let tracked_fair = TrackConsumersPool::new(
454    ///     FairSpillPool::new(2 * 1024 * 1024), // 2MB limit
455    ///     NonZeroUsize::new(5).unwrap(),
456    /// );
457    /// ```
458    ///
459    /// # Impact on Error Messages
460    ///
461    /// The `top` determines how many Top K [`MemoryConsumer`]s to include
462    /// in the reported [`DataFusionError::ResourcesExhausted`].
463    pub fn new(inner: I, top: NonZeroUsize) -> Self {
464        Self {
465            inner,
466            top,
467            tracked_consumers: Default::default(),
468        }
469    }
470
471    /// Returns a reference to the wrapped inner [`MemoryPool`].
472    pub fn inner(&self) -> &I {
473        &self.inner
474    }
475
476    /// Returns a snapshot of all currently tracked consumers.
477    pub fn metrics(&self) -> Vec<MemoryConsumerMetrics> {
478        self.tracked_consumers
479            .lock()
480            .values()
481            .map(Into::into)
482            .collect()
483    }
484
485    /// Returns a formatted string with the top memory consumers.
486    pub fn report_top(&self, top: usize) -> String {
487        let mut consumers = self
488            .tracked_consumers
489            .lock()
490            .iter()
491            .map(|(consumer_id, tracked_consumer)| {
492                (
493                    (
494                        *consumer_id,
495                        tracked_consumer.name.to_owned(),
496                        tracked_consumer.can_spill,
497                        tracked_consumer.peak(),
498                    ),
499                    tracked_consumer.reserved(),
500                )
501            })
502            .collect::<Vec<_>>();
503        consumers.sort_by_key(|consumer| std::cmp::Reverse(consumer.1));
504
505        consumers[0..std::cmp::min(top, consumers.len())]
506            .iter()
507            .map(|((id, name, can_spill, peak), size)| {
508                format!(
509                    "  {name}#{id}(can spill: {can_spill}) consumed {}, peak {}",
510                    human_readable_size(*size),
511                    human_readable_size(*peak),
512                )
513            })
514            .collect::<Vec<_>>()
515            .join(",\n")
516            + "."
517    }
518}
519
520impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
521    fn name(&self) -> &str {
522        "track_consumers"
523    }
524
525    fn register(&self, consumer: &MemoryConsumer) {
526        self.inner.register(consumer);
527
528        let mut guard = self.tracked_consumers.lock();
529        let existing = guard.insert(
530            consumer.id(),
531            TrackedConsumer {
532                name: consumer.name().to_string(),
533                can_spill: consumer.can_spill(),
534                reserved: Default::default(),
535                peak: Default::default(),
536            },
537        );
538
539        debug_assert!(
540            existing.is_none(),
541            "Registered was called twice on the same consumer"
542        );
543    }
544
545    fn unregister(&self, consumer: &MemoryConsumer) {
546        self.inner.unregister(consumer);
547        self.tracked_consumers.lock().remove(&consumer.id());
548    }
549
550    fn grow(&self, reservation: &MemoryReservation, additional: usize) {
551        self.inner.grow(reservation, additional);
552        self.tracked_consumers
553            .lock()
554            .entry(reservation.consumer().id())
555            .and_modify(|tracked_consumer| {
556                tracked_consumer.grow(additional);
557            });
558    }
559
560    fn shrink(&self, reservation: &MemoryReservation, shrink: usize) {
561        self.inner.shrink(reservation, shrink);
562        self.tracked_consumers
563            .lock()
564            .entry(reservation.consumer().id())
565            .and_modify(|tracked_consumer| {
566                tracked_consumer.shrink(shrink);
567            });
568    }
569
570    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()> {
571        self.inner
572            .try_grow(reservation, additional)
573            .map_err(|e| match e {
574                DataFusionError::ResourcesExhausted(e) => {
575                    // wrap OOM message in top consumers
576                    DataFusionError::ResourcesExhausted(
577                        provide_top_memory_consumers_to_error_msg(
578                            &reservation.consumer().name,
579                            &e,
580                            &self.report_top(self.top.into()),
581                        ),
582                    )
583                }
584                _ => e,
585            })?;
586
587        self.tracked_consumers
588            .lock()
589            .entry(reservation.consumer().id())
590            .and_modify(|tracked_consumer| {
591                tracked_consumer.grow(additional);
592            });
593        Ok(())
594    }
595
596    fn reserved(&self) -> usize {
597        self.inner.reserved()
598    }
599
600    fn memory_limit(&self) -> MemoryLimit {
601        self.inner.memory_limit()
602    }
603}
604
605fn provide_top_memory_consumers_to_error_msg(
606    consumer_name: &str,
607    error_msg: &str,
608    top_consumers: &str,
609) -> String {
610    format!(
611        "Additional allocation failed for {consumer_name} with top memory consumers (across reservations) as:\n{top_consumers}\nError: {error_msg}"
612    )
613}
614
615#[cfg(test)]
616mod tests {
617    use super::*;
618    use insta::{Settings, allow_duplicates, assert_snapshot, with_settings};
619    use std::sync::Arc;
620
621    fn make_settings() -> Settings {
622        let mut settings = Settings::clone_current();
623        settings.add_filter(
624            r"([^\s]+)\#\d+\(can spill: (true|false)\)",
625            "$1#[ID](can spill: $2)",
626        );
627        settings
628    }
629
630    #[test]
631    fn test_fair() {
632        let pool = Arc::new(FairSpillPool::new(100)) as _;
633
634        let r1 = MemoryConsumer::new("unspillable").register(&pool);
635        // Can grow beyond capacity of pool
636        r1.grow(2000);
637        assert_eq!(pool.reserved(), 2000);
638
639        let r2 = MemoryConsumer::new("r2")
640            .with_can_spill(true)
641            .register(&pool);
642        // Can grow beyond capacity of pool
643        r2.grow(2000);
644
645        assert_eq!(pool.reserved(), 4000);
646
647        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
648        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
649
650        let err = r2.try_grow(1).unwrap_err().strip_backtrace();
651        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 1.0 B for r2 with 2000.0 B already allocated for this reservation - 0.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
652
653        r1.shrink(1990);
654        r2.shrink(2000);
655
656        assert_eq!(pool.reserved(), 10);
657
658        r1.try_grow(10).unwrap();
659        assert_eq!(pool.reserved(), 20);
660
661        // Can grow r2 to 80 as only spilling consumer
662        r2.try_grow(80).unwrap();
663        assert_eq!(pool.reserved(), 100);
664
665        r2.shrink(70);
666
667        assert_eq!(r1.size(), 20);
668        assert_eq!(r2.size(), 10);
669        assert_eq!(pool.reserved(), 30);
670
671        let r3 = MemoryConsumer::new("r3")
672            .with_can_spill(true)
673            .register(&pool);
674
675        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
676        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
677
678        //Shrinking r2 to zero doesn't allow a3 to allocate more than 45
679        r2.free();
680        let err = r3.try_grow(70).unwrap_err().strip_backtrace();
681        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 70.0 B for r3 with 0.0 B already allocated for this reservation - 40.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
682
683        // But dropping r2 does
684        drop(r2);
685        assert_eq!(pool.reserved(), 20);
686        r3.try_grow(80).unwrap();
687
688        assert_eq!(pool.reserved(), 100);
689        r1.free();
690        assert_eq!(pool.reserved(), 80);
691
692        let r4 = MemoryConsumer::new("s4").register(&pool);
693        let err = r4.try_grow(30).unwrap_err().strip_backtrace();
694        assert_snapshot!(err, @"Resources exhausted: Failed to allocate additional 30.0 B for s4 with 0.0 B already allocated for this reservation - 20.0 B remain available for the total memory pool: fair(pool_size: 100.0 B)");
695    }
696
697    #[test]
698    fn test_tracked_consumers_pool() {
699        let setting = make_settings();
700        let _bound = setting.bind_to_scope();
701        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
702            GreedyMemoryPool::new(100),
703            NonZeroUsize::new(3).unwrap(),
704        ));
705
706        // Test: use all the different interfaces to change reservation size
707
708        // set r1=50, using grow and shrink
709        let r1 = MemoryConsumer::new("r1").register(&pool);
710        r1.grow(50);
711        r1.grow(20);
712        r1.shrink(20);
713
714        // set r2=15 using try_grow
715        let r2 = MemoryConsumer::new("r2").register(&pool);
716        r2.try_grow(15)
717            .expect("should succeed in memory allotment for r2");
718
719        // set r3=20 using try_resize
720        let r3 = MemoryConsumer::new("r3").register(&pool);
721        r3.try_resize(25)
722            .expect("should succeed in memory allotment for r3");
723        r3.try_resize(20)
724            .expect("should succeed in memory allotment for r3");
725
726        // set r4=10
727        // this should not be reported in top 3
728        let r4 = MemoryConsumer::new("r4").register(&pool);
729        r4.grow(10);
730
731        // Test: reports if new reservation causes error
732        // using the previously set sizes for other consumers
733        let r5 = MemoryConsumer::new("r5").register(&pool);
734        let res = r5.try_grow(150);
735        assert!(res.is_err());
736        let error = res.unwrap_err().strip_backtrace();
737        assert_snapshot!(error, @r"
738        Resources exhausted: Additional allocation failed for r5 with top memory consumers (across reservations) as:
739          r1#[ID](can spill: false) consumed 50.0 B, peak 70.0 B,
740          r3#[ID](can spill: false) consumed 20.0 B, peak 25.0 B,
741          r2#[ID](can spill: false) consumed 15.0 B, peak 15.0 B.
742        Error: Failed to allocate additional 150.0 B for r5 with 0.0 B already allocated for this reservation - 5.0 B remain available for the total memory pool: greedy(used: 95.0 B, pool_size: 100.0 B)
743        ");
744    }
745
746    #[test]
747    fn test_tracked_consumers_pool_register() {
748        let setting = make_settings();
749        let _bound = setting.bind_to_scope();
750        let pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
751            GreedyMemoryPool::new(100),
752            NonZeroUsize::new(3).unwrap(),
753        ));
754
755        let same_name = "foo";
756
757        // Test: see error message when no consumers recorded yet
758        let r0 = MemoryConsumer::new(same_name).register(&pool);
759        let res = r0.try_grow(150);
760        assert!(res.is_err());
761        let error = res.unwrap_err().strip_backtrace();
762        assert_snapshot!(error, @r"
763        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
764          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
765        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 100.0 B remain available for the total memory pool: greedy(used: 0.0 B, pool_size: 100.0 B)
766        ");
767
768        // API: multiple registrations using the same hashed consumer,
769        // will be recognized *differently* in the TrackConsumersPool.
770
771        r0.grow(10); // make r0=10, pool available=90
772        let new_consumer_same_name = MemoryConsumer::new(same_name);
773        let r1 = new_consumer_same_name.register(&pool);
774        // TODO: the insufficient_capacity_err() message is per reservation, not per consumer.
775        // a followup PR will clarify this message "0 bytes already allocated for this reservation"
776        let res = r1.try_grow(150);
777        assert!(res.is_err());
778        let error = res.unwrap_err().strip_backtrace();
779        assert_snapshot!(error, @r"
780        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
781          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
782          foo#[ID](can spill: false) consumed 0.0 B, peak 0.0 B.
783        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: greedy(used: 10.0 B, pool_size: 100.0 B)
784        ");
785
786        // Test: will accumulate size changes per consumer, not per reservation
787        r1.grow(20);
788
789        let res = r1.try_grow(150);
790        assert!(res.is_err());
791        let error = res.unwrap_err().strip_backtrace();
792        assert_snapshot!(error, @r"
793        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
794          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
795          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
796        Error: Failed to allocate additional 150.0 B for foo with 20.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
797        ");
798
799        // Test: different hashed consumer, (even with the same name),
800        // will be recognized as different in the TrackConsumersPool
801        let consumer_with_same_name_but_different_hash =
802            MemoryConsumer::new(same_name).with_can_spill(true);
803        let r2 = consumer_with_same_name_but_different_hash.register(&pool);
804        let res = r2.try_grow(150);
805        assert!(res.is_err());
806        let error = res.unwrap_err().strip_backtrace();
807        assert_snapshot!(error, @r"
808        Resources exhausted: Additional allocation failed for foo with top memory consumers (across reservations) as:
809          foo#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
810          foo#[ID](can spill: false) consumed 10.0 B, peak 10.0 B,
811          foo#[ID](can spill: true) consumed 0.0 B, peak 0.0 B.
812        Error: Failed to allocate additional 150.0 B for foo with 0.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: greedy(used: 30.0 B, pool_size: 100.0 B)
813        ");
814    }
815
816    #[test]
817    fn test_tracked_consumers_pool_deregister() {
818        fn test_per_pool_type<P: MemoryPool + 'static>(pool: Arc<TrackConsumersPool<P>>) {
819            // `snapshot_suffix` ties each insta snapshot to this pool's inner backend; filters
820            // normalize inner pool `Display` so fair vs greedy share the same `@` reference text.
821            with_settings!({
822                snapshot_suffix => pool.inner().name().to_string(),
823                filters => vec![
824                    (
825                        r"([^\s]+)\#\d+\(can spill: (true|false)\)",
826                        "$1#[ID](can spill: $2)",
827                    ),
828                    (
829                        r"for the total memory pool: [^\n]+",
830                        "for the total memory pool: [INNER_POOL]",
831                    ),
832                ],
833            }, {
834                let memory_pool: Arc<dyn MemoryPool> = Arc::<TrackConsumersPool<P>>::clone(&pool);
835                let r0 = MemoryConsumer::new("r0").register(&memory_pool);
836                r0.grow(10);
837                let r1 = MemoryConsumer::new("r1").register(&memory_pool);
838                r1.grow(20);
839
840                // Baseline: see the 2 memory consumers
841                let error = r0.try_grow(150).unwrap_err().strip_backtrace();
842                assert_snapshot!(error, @r"
843                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
844                  r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B,
845                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
846                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 70.0 B remain available for the total memory pool: [INNER_POOL]
847                ");
848
849                // Test: unregister one — only the remaining consumer should be listed
850                drop(r1);
851                let error = r0.try_grow(150).unwrap_err().strip_backtrace();
852                assert_snapshot!(error, @r"
853                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
854                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
855                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
856                ");
857
858                // Test: actual message we see is the `available is 70`. When it should be `available is 90`.
859                // This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
860                let error = r0.try_grow(150).unwrap_err().strip_backtrace();
861                assert_snapshot!(error, @r"
862                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
863                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
864                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
865                ");
866
867                // Test: the registration needs to free itself (or be dropped),
868                // for the proper error message
869                let error = r0.try_grow(150).unwrap_err().strip_backtrace();
870                assert_snapshot!(error, @r"
871                Resources exhausted: Additional allocation failed for r0 with top memory consumers (across reservations) as:
872                  r0#[ID](can spill: false) consumed 10.0 B, peak 10.0 B.
873                Error: Failed to allocate additional 150.0 B for r0 with 10.0 B already allocated for this reservation - 90.0 B remain available for the total memory pool: [INNER_POOL]
874                ");
875                }
876            );
877        }
878
879        allow_duplicates! {
880            let tracked_spill_pool = Arc::new(TrackConsumersPool::new(
881                FairSpillPool::new(100),
882                NonZeroUsize::new(3).unwrap(),
883            ));
884            test_per_pool_type(tracked_spill_pool);
885
886            let tracked_greedy_pool = Arc::new(TrackConsumersPool::new(
887                GreedyMemoryPool::new(100),
888                NonZeroUsize::new(3).unwrap(),
889            ));
890            test_per_pool_type(tracked_greedy_pool);
891        }
892    }
893
894    #[test]
895    fn test_track_consumers_pool_metrics() {
896        let track_consumers_pool = Arc::new(TrackConsumersPool::new(
897            GreedyMemoryPool::new(1000),
898            NonZeroUsize::new(3).unwrap(),
899        ));
900        let memory_pool: Arc<dyn MemoryPool> = Arc::clone(&track_consumers_pool) as _;
901
902        // Empty pool has no metrics
903        assert!(track_consumers_pool.metrics().is_empty());
904
905        // Register consumers with different spill settings
906        let r1 = MemoryConsumer::new("spilling")
907            .with_can_spill(true)
908            .register(&memory_pool);
909        let r2 = MemoryConsumer::new("non-spilling").register(&memory_pool);
910
911        // Grow r1 in two steps to verify peak tracking
912        r1.grow(100);
913        r1.grow(50);
914        r1.shrink(50); // reserved=100, peak=150
915
916        r2.grow(200); // reserved=200, peak=200
917
918        let mut metrics = track_consumers_pool.metrics();
919        metrics.sort_by_key(|m| m.name.clone());
920
921        assert_eq!(metrics.len(), 2);
922
923        let m_non = &metrics[0];
924        assert_eq!(m_non.name, "non-spilling");
925        assert!(!m_non.can_spill);
926        assert_eq!(m_non.reserved, 200);
927        assert_eq!(m_non.peak, 200);
928
929        let m_spill = &metrics[1];
930        assert_eq!(m_spill.name, "spilling");
931        assert!(m_spill.can_spill);
932        assert_eq!(m_spill.reserved, 100);
933        assert_eq!(m_spill.peak, 150);
934
935        // Unregistered consumers are removed from metrics
936        drop(r2);
937        let metrics = track_consumers_pool.metrics();
938        assert_eq!(metrics.len(), 1);
939        assert_eq!(metrics[0].name, "spilling");
940    }
941
942    #[test]
943    fn test_tracked_consumers_pool_use_beyond_errors() {
944        let setting = make_settings();
945        let _bound = setting.bind_to_scope();
946        let upcasted: Arc<dyn std::any::Any + Send + Sync> =
947            Arc::new(TrackConsumersPool::new(
948                GreedyMemoryPool::new(100),
949                NonZeroUsize::new(3).unwrap(),
950            ));
951        let pool: Arc<dyn MemoryPool> = Arc::clone(&upcasted)
952            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
953            .unwrap();
954        // set r1=20
955        let r1 = MemoryConsumer::new("r1").register(&pool);
956        r1.grow(20);
957        // set r2=15
958        let r2 = MemoryConsumer::new("r2").register(&pool);
959        r2.grow(15);
960        // set r3=45
961        let r3 = MemoryConsumer::new("r3").register(&pool);
962        r3.grow(45);
963
964        let downcasted = upcasted
965            .downcast::<TrackConsumersPool<GreedyMemoryPool>>()
966            .unwrap();
967
968        // Test: can get runtime metrics, even without an error thrown
969        let res = downcasted.report_top(2);
970        assert_snapshot!(res, @r"
971        r3#[ID](can spill: false) consumed 45.0 B, peak 45.0 B,
972        r1#[ID](can spill: false) consumed 20.0 B, peak 20.0 B.
973        ");
974    }
975
976    #[test]
977    fn test_memory_pool_display_fmt() {
978        let top = NonZeroUsize::new(5).unwrap();
979
980        // UnboundedMemoryPool Display with default allocation: 0.0B
981        let unbounded = UnboundedMemoryPool::default();
982        assert_eq!(
983            unbounded.to_string(),
984            "unbounded(used: 0.0 B)",
985            "UnboundedMemoryPool Display"
986        );
987
988        // UnboundedMemoryPool Display with reservations
989        let unbounded_arc: Arc<dyn MemoryPool> = Arc::new(UnboundedMemoryPool::default());
990        let r = MemoryConsumer::new("u").register(&unbounded_arc);
991        r.grow(2048);
992        assert_eq!(
993            unbounded_arc.as_ref().to_string(),
994            "unbounded(used: 2.0 KB)",
995            "UnboundedMemoryPool Display with reservations"
996        );
997
998        // GreedyMemoryPool Display with default allocation: 100.0B
999        let greedy = GreedyMemoryPool::new(100);
1000        assert_eq!(
1001            greedy.to_string(),
1002            "greedy(used: 0.0 B, pool_size: 100.0 B)",
1003            "GreedyMemoryPool Display"
1004        );
1005
1006        // GreedyMemoryPool Display with reservations
1007        let greedy_arc: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(100));
1008        let r = MemoryConsumer::new("g").register(&greedy_arc);
1009        r.grow(50);
1010        assert_eq!(
1011            greedy_arc.as_ref().to_string(),
1012            "greedy(used: 50.0 B, pool_size: 100.0 B)",
1013            "GreedyMemoryPool Display with reservations"
1014        );
1015
1016        // FairSpillPool Display with default allocation: 4.0KB and without reservations
1017        let fair = FairSpillPool::new(4096);
1018        assert_eq!(
1019            fair.to_string(),
1020            "fair(pool_size: 4.0 KB)",
1021            "FairSpillPool Display"
1022        );
1023
1024        // TrackConsumersPool<GreedyMemoryPool> Display with default allocation: 128.0B and without reservations
1025        let tracked_greedy = TrackConsumersPool::new(GreedyMemoryPool::new(128), top);
1026        assert_eq!(
1027            tracked_greedy.to_string(),
1028            "track_consumers(inner_pool: greedy(used: 0.0 B, pool_size: 128.0 B), num_of_top_consumers: 5)",
1029            "TrackConsumersPool<GreedyMemoryPool> Display"
1030        );
1031
1032        // TrackConsumersPool<FairSpillPool> Display with default allocation: 256.0B and without reservations
1033        let tracked_fair = TrackConsumersPool::new(FairSpillPool::new(256), top);
1034        assert_eq!(
1035            tracked_fair.to_string(),
1036            "track_consumers(inner_pool: fair(pool_size: 256.0 B), num_of_top_consumers: 5)",
1037            "TrackConsumersPool<FairSpillPool> Display"
1038        );
1039
1040        // TrackConsumersPool<UnboundedMemoryPool> Display without reservations
1041        let tracked_unbounded =
1042            TrackConsumersPool::new(UnboundedMemoryPool::default(), top);
1043        assert_eq!(
1044            tracked_unbounded.to_string(),
1045            "track_consumers(inner_pool: unbounded(used: 0.0 B), num_of_top_consumers: 5)",
1046            "TrackConsumersPool<UnboundedMemoryPool> Display"
1047        );
1048    }
1049}