Skip to main content

datafusion_execution/memory_pool/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{Result, internal_datafusion_err};
22use std::any::Any;
23use std::fmt::Display;
24use std::hash::{Hash, Hasher};
25use std::{cmp::Ordering, sync::Arc, sync::atomic};
26
27mod pool;
28
29#[cfg(feature = "arrow_buffer_pool")]
30pub mod arrow;
31
32pub mod proxy {
33    pub use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
34}
35
36pub use datafusion_common::{
37    human_readable_count, human_readable_duration, human_readable_size, units,
38};
39pub use pool::*;
40
41/// Tracks and potentially limits memory use across operators during execution.
42///
43/// # Memory Management Overview
44///
45/// DataFusion is a streaming query engine, processing most queries without
46/// buffering the entire input. Most operators require a fixed amount of memory
47/// based on the schema and target batch size. However, certain operations such
48/// as sorting and grouping/joining, require buffering intermediate results,
49/// which can require memory proportional to the number of input rows.
50///
51/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
52/// Intermediate memory used as data streams through the system is not accounted
53/// (it assumed to be "small") but the large consumers of memory must register
54/// and constrain their use. This design trades off the additional code
55/// complexity of memory tracking with limiting resource usage.
56///
57/// When limiting memory with a `MemoryPool` you should typically reserve some
58/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
59///
60/// # Memory Management Design
61///
62/// As explained above, DataFusion's design ONLY limits operators that require
63/// "large" amounts of memory (proportional to number of input rows), such as
64/// `GroupByHashExec`. It does NOT track and limit memory used internally by
65/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
66/// between operators. Furthermore, operators should not reserve memory for the
67/// batches they produce. Instead, if a consumer operator needs to hold batches
68/// from its producers in memory for an extended period, it is the consumer
69/// operator's responsibility to reserve the necessary memory for those batches.
70///
71/// In order to avoid allocating memory until the OS or the container system
72/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
73/// large amounts of memory must first request their desired allocation from a
74/// [`MemoryPool`] before allocating more.  The request is typically managed via
75/// a  [`MemoryReservation`] and [`MemoryConsumer`].
76///
77/// If the allocation is successful, the operator should proceed and allocate
78/// the desired memory. If the allocation fails, the operator must either first
79/// free memory (e.g. by spilling to local disk) and try again, or error.
80///
81/// Note that a `MemoryPool` can be shared by concurrently executing plans,
82/// which can be used to control memory usage in a multi-tenant system.
83///
84/// # How MemoryPool works by example
85///
86/// Scenario 1:
87/// For `Filter` operator, `RecordBatch`es will stream through it, so it
88/// don't have to keep track of memory usage through [`MemoryPool`].
89///
90/// Scenario 2:
91/// For `CrossJoin` operator, if the input size gets larger, the intermediate
92/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
93/// limit the memory usage.
94/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
95/// additional memory. Memory pool updates the usage and returns success.
96/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
97/// again, memory pool does not have enough memory. Since `CrossJoin` operator
98/// has not implemented spilling, it will stop execution and return an error.
99///
100/// Scenario 3:
101/// For `Aggregate` operator, its intermediate states will also accumulate as
102/// the input size gets larger, but with spilling capability. When it tries to
103/// reserve more memory from the memory pool, and the memory pool has already
104/// reached the memory limit, it will return an error. Then, `Aggregate`
105/// operator will spill the intermediate buffers to disk, and release memory
106/// from the memory pool, and continue to retry memory reservation.
107///
108/// # Related Structs
109///
110/// To better understand memory management in DataFusion, here are the key structs
111/// and their relationships:
112///
113/// - [`MemoryConsumer`]: A named allocation traced by a particular operator. If an
114///   execution is parallelized, and there are multiple partitions of the same
115///   operator, each partition will have a separate `MemoryConsumer`.
116/// - `SharedRegistration`: A registration of a `MemoryConsumer` with a `MemoryPool`.
117///   `SharedRegistration` and `MemoryPool` have a many-to-one relationship. `MemoryPool`
118///   implementation can decide how to allocate memory based on the registered consumers.
119///   (e.g. `FairSpillPool` will try to share available memory evenly among all registered
120///   consumers)
121/// - [`MemoryReservation`]: Each `MemoryConsumer`/operator can have multiple
122///   `MemoryReservation`s for different internal data structures. The relationship
123///   between `MemoryConsumer` and `MemoryReservation` is one-to-many. This design
124///   enables cleaner operator implementations:
125///   - Different `MemoryReservation`s can be used for different purposes
126///   - `MemoryReservation` follows RAII principles - to release a reservation,
127///     simply drop the `MemoryReservation` object. When all `MemoryReservation`s
128///     for a `SharedRegistration` are dropped, the `SharedRegistration` is dropped
129///     when its reference count reaches zero, automatically unregistering the
130///     `MemoryConsumer` from the `MemoryPool`.
131///
132/// ## Relationship Diagram
133///
134/// ```text
135/// ┌──────────────────┐     ┌──────────────────┐
136/// │MemoryReservation │     │MemoryReservation │
137/// └───┬──────────────┘     └──────────────────┘ ......
138///     │belongs to                    │
139///     │      ┌───────────────────────┘           │  │
140///     │      │                                   │  │
141///     ▼      ▼                                   ▼  ▼
142/// ┌────────────────────────┐       ┌────────────────────────┐
143/// │   SharedRegistration   │       │   SharedRegistration   │
144/// │   ┌────────────────┐   │       │   ┌────────────────┐   │
145/// │   │                │   │       │   │                │   │
146/// │   │ MemoryConsumer │   │       │   │ MemoryConsumer │   │
147/// │   │                │   │       │   │                │   │
148/// │   └────────────────┘   │       │   └────────────────┘   │
149/// └────────────┬───────────┘       └────────────┬───────────┘
150///              │                                │
151///              │                        register│into
152///              │                                │
153///              └─────────────┐   ┌──────────────┘
154///                            │   │
155///                            ▼   ▼
156///    ╔═══════════════════════════════════════════════════╗
157///    ║                                                   ║
158///    ║                    MemoryPool                     ║
159///    ║                                                   ║
160///    ╚═══════════════════════════════════════════════════╝
161/// ```
162///
163/// For example, there are two parallel partitions of an operator X: each partition
164/// corresponds to a `MemoryConsumer` in the above diagram. Inside each partition of
165/// operator X, there are typically several `MemoryReservation`s - one for each
166/// internal data structure that needs memory tracking (e.g., 1 reservation for the hash
167/// table, and 1 reservation for buffered input, etc.).
168///
169/// # Implementing `MemoryPool`
170///
171/// You can implement a custom allocation policy by implementing the
172/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
173/// However, DataFusion comes with the following simple memory pool implementations that
174/// handle many common cases:
175///
176/// * [`UnboundedMemoryPool`]: no memory limits (the default)
177///
178/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
179///   come first served" policy
180///
181/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
182///   to all spilling operators fairly
183///
184/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
185///   providing better error messages on the largest memory users.
186pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display {
187    /// Return pool name
188    fn name(&self) -> &str;
189
190    /// Registers a new [`MemoryConsumer`]
191    ///
192    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
193    fn register(&self, _consumer: &MemoryConsumer) {}
194
195    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
196    ///
197    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
198    fn unregister(&self, _consumer: &MemoryConsumer) {}
199
200    /// Infallibly grow the provided `reservation` by `additional` bytes
201    ///
202    /// This must always succeed
203    fn grow(&self, reservation: &MemoryReservation, additional: usize);
204
205    /// Infallibly shrink the provided `reservation` by `shrink` bytes
206    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
207
208    /// Attempt to grow the provided `reservation` by `additional` bytes
209    ///
210    /// On error the `allocation` will not be increased in size
211    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
212
213    /// Return the total amount of memory reserved
214    fn reserved(&self) -> usize;
215
216    /// Return the memory limit of the pool
217    ///
218    /// The default implementation of `MemoryPool::memory_limit`
219    /// will return `MemoryLimit::Unknown`.
220    /// If you are using your custom memory pool, but have the requirement to
221    /// know the memory usage limit of the pool, please implement this method
222    /// to return it(`Memory::Finite(limit)`).
223    fn memory_limit(&self) -> MemoryLimit {
224        MemoryLimit::Unknown
225    }
226}
227
228impl dyn MemoryPool {
229    /// Returns `true` if this pool is of type `T`.
230    pub fn is<T: MemoryPool>(&self) -> bool {
231        (self as &dyn Any).is::<T>()
232    }
233
234    /// Attempts to downcast this pool to a concrete type `T`.
235    pub fn downcast_ref<T: MemoryPool>(&self) -> Option<&T> {
236        (self as &dyn Any).downcast_ref()
237    }
238}
239
240/// Memory limit of `MemoryPool`
241pub enum MemoryLimit {
242    Infinite,
243    /// Bounded memory limit in bytes.
244    Finite(usize),
245    Unknown,
246}
247
248/// A memory consumer is a named allocation traced by a particular
249/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
250/// a particular `MemoryConsumer`;
251///
252/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefore not cloneable,
253/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
254/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
255/// and is only guaranteed to share the name and inner properties.
256///
257/// For help with allocation accounting, see the [`proxy`] module.
258///
259/// [proxy]: datafusion_common::utils::proxy
260#[derive(Debug)]
261pub struct MemoryConsumer {
262    name: String,
263    can_spill: bool,
264    id: usize,
265}
266
267impl PartialEq for MemoryConsumer {
268    fn eq(&self, other: &Self) -> bool {
269        let is_same_id = self.id == other.id;
270
271        #[cfg(debug_assertions)]
272        if is_same_id {
273            assert_eq!(self.name, other.name);
274            assert_eq!(self.can_spill, other.can_spill);
275        }
276
277        is_same_id
278    }
279}
280
281impl Eq for MemoryConsumer {}
282
283impl Hash for MemoryConsumer {
284    fn hash<H: Hasher>(&self, state: &mut H) {
285        self.id.hash(state);
286        self.name.hash(state);
287        self.can_spill.hash(state);
288    }
289}
290
291impl MemoryConsumer {
292    fn new_unique_id() -> usize {
293        static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
294        ID.fetch_add(1, atomic::Ordering::Relaxed)
295    }
296
297    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
298    pub fn new(name: impl Into<String>) -> Self {
299        Self {
300            name: name.into(),
301            can_spill: false,
302            id: Self::new_unique_id(),
303        }
304    }
305
306    /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
307    /// which can be registered with a [`MemoryPool`],
308    /// This new consumer is separate from the original.
309    pub fn clone_with_new_id(&self) -> Self {
310        Self {
311            name: self.name.clone(),
312            can_spill: self.can_spill,
313            id: Self::new_unique_id(),
314        }
315    }
316
317    /// Return the unique id of this [`MemoryConsumer`]
318    pub fn id(&self) -> usize {
319        self.id
320    }
321
322    /// Set whether this allocation can be spilled to disk
323    pub fn with_can_spill(self, can_spill: bool) -> Self {
324        Self { can_spill, ..self }
325    }
326
327    /// Returns true if this allocation can spill to disk
328    pub fn can_spill(&self) -> bool {
329        self.can_spill
330    }
331
332    /// Returns the name associated with this allocation
333    pub fn name(&self) -> &str {
334        &self.name
335    }
336
337    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
338    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
339    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
340        pool.register(&self);
341        MemoryReservation {
342            registration: Arc::new(SharedRegistration {
343                pool: Arc::clone(pool),
344                consumer: self,
345            }),
346            size: atomic::AtomicUsize::new(0),
347        }
348    }
349}
350
351/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
352///
353/// Calls [`MemoryPool::unregister`] on drop to return any memory to
354/// the underlying pool.
355#[derive(Debug)]
356struct SharedRegistration {
357    pool: Arc<dyn MemoryPool>,
358    consumer: MemoryConsumer,
359}
360
361impl Drop for SharedRegistration {
362    fn drop(&mut self) {
363        self.pool.unregister(&self.consumer);
364    }
365}
366
367/// A [`MemoryReservation`] tracks an individual reservation of a
368/// number of bytes of memory in a [`MemoryPool`] that is freed back
369/// to the pool on drop.
370///
371/// The reservation can be grown or shrunk over time.
372#[derive(Debug)]
373pub struct MemoryReservation {
374    registration: Arc<SharedRegistration>,
375    size: atomic::AtomicUsize,
376}
377
378impl MemoryReservation {
379    /// Returns the size of this reservation in bytes
380    pub fn size(&self) -> usize {
381        self.size.load(atomic::Ordering::Relaxed)
382    }
383
384    /// Returns [MemoryConsumer] for this [MemoryReservation]
385    pub fn consumer(&self) -> &MemoryConsumer {
386        &self.registration.consumer
387    }
388
389    /// Frees all bytes from this reservation back to the underlying
390    /// pool, returning the number of bytes freed.
391    pub fn free(&self) -> usize {
392        let size = self.size.swap(0, atomic::Ordering::Relaxed);
393        if size != 0 {
394            self.registration.pool.shrink(self, size);
395        }
396        size
397    }
398
399    /// Frees `capacity` bytes from this reservation
400    ///
401    /// # Panics
402    ///
403    /// Panics if `capacity` exceeds [`Self::size`]
404    pub fn shrink(&self, capacity: usize) {
405        self.size
406            .fetch_update(
407                atomic::Ordering::Relaxed,
408                atomic::Ordering::Relaxed,
409                |prev| prev.checked_sub(capacity),
410            )
411            .unwrap_or_else(|prev| {
412                panic!("Cannot free the capacity {capacity} out of allocated size {prev}")
413            });
414        self.registration.pool.shrink(self, capacity);
415    }
416
417    /// Tries to free `capacity` bytes from this reservation
418    /// if `capacity` does not exceed [`Self::size`].
419    /// Returns new reservation size,
420    /// or error if shrinking capacity is more than allocated size.
421    pub fn try_shrink(&self, capacity: usize) -> Result<usize> {
422        let prev = self
423            .size
424            .fetch_update(
425                atomic::Ordering::Relaxed,
426                atomic::Ordering::Relaxed,
427                |prev| prev.checked_sub(capacity),
428            )
429            .map_err(|prev| {
430                internal_datafusion_err!(
431                    "Cannot free the capacity {capacity} out of allocated size {prev}"
432                )
433            })?;
434
435        self.registration.pool.shrink(self, capacity);
436        Ok(prev - capacity)
437    }
438
439    /// Sets the size of this reservation to `capacity`
440    pub fn resize(&self, capacity: usize) {
441        let size = self.size.load(atomic::Ordering::Relaxed);
442        match capacity.cmp(&size) {
443            Ordering::Greater => self.grow(capacity - size),
444            Ordering::Less => self.shrink(size - capacity),
445            _ => {}
446        }
447    }
448
449    /// Try to set the size of this reservation to `capacity`
450    pub fn try_resize(&self, capacity: usize) -> Result<()> {
451        let size = self.size.load(atomic::Ordering::Relaxed);
452        match capacity.cmp(&size) {
453            Ordering::Greater => self.try_grow(capacity - size)?,
454            Ordering::Less => {
455                self.try_shrink(size - capacity)?;
456            }
457            _ => {}
458        };
459        Ok(())
460    }
461
462    /// Increase the size of this reservation by `capacity` bytes
463    pub fn grow(&self, capacity: usize) {
464        self.registration.pool.grow(self, capacity);
465        self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
466    }
467
468    /// Try to increase the size of this reservation by `capacity`
469    /// bytes, returning error if there is insufficient capacity left
470    /// in the pool.
471    pub fn try_grow(&self, capacity: usize) -> Result<()> {
472        self.registration.pool.try_grow(self, capacity)?;
473        self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
474        Ok(())
475    }
476
477    /// Splits off `capacity` bytes from this [`MemoryReservation`]
478    /// into a new [`MemoryReservation`] with the same
479    /// [`MemoryConsumer`].
480    ///
481    /// This can be useful to free part of this reservation with RAAI
482    /// style dropping
483    ///
484    /// # Panics
485    ///
486    /// Panics if `capacity` exceeds [`Self::size`]
487    pub fn split(&self, capacity: usize) -> MemoryReservation {
488        self.size
489            .fetch_update(
490                atomic::Ordering::Relaxed,
491                atomic::Ordering::Relaxed,
492                |prev| prev.checked_sub(capacity),
493            )
494            .unwrap();
495        Self {
496            size: atomic::AtomicUsize::new(capacity),
497            registration: Arc::clone(&self.registration),
498        }
499    }
500
501    /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
502    pub fn new_empty(&self) -> Self {
503        Self {
504            size: atomic::AtomicUsize::new(0),
505            registration: Arc::clone(&self.registration),
506        }
507    }
508
509    /// Splits off all the bytes from this [`MemoryReservation`] into
510    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
511    pub fn take(&mut self) -> MemoryReservation {
512        self.split(self.size.load(atomic::Ordering::Relaxed))
513    }
514}
515
516impl Drop for MemoryReservation {
517    fn drop(&mut self) {
518        self.free();
519    }
520}
521
522#[cfg(test)]
523mod tests {
524    use super::*;
525
526    #[test]
527    fn test_id_uniqueness() {
528        let mut ids = std::collections::HashSet::new();
529        for _ in 0..100 {
530            let consumer = MemoryConsumer::new("test");
531            assert!(ids.insert(consumer.id())); // Ensures unique insertion
532        }
533    }
534
535    #[test]
536    fn test_memory_pool_underflow() {
537        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
538        let a1 = MemoryConsumer::new("a1").register(&pool);
539        assert_eq!(pool.reserved(), 0);
540
541        a1.grow(100);
542        assert_eq!(pool.reserved(), 100);
543
544        assert_eq!(a1.free(), 100);
545        assert_eq!(pool.reserved(), 0);
546
547        a1.try_grow(100).unwrap_err();
548        assert_eq!(pool.reserved(), 0);
549
550        a1.try_grow(30).unwrap();
551        assert_eq!(pool.reserved(), 30);
552
553        let a2 = MemoryConsumer::new("a2").register(&pool);
554        a2.try_grow(25).unwrap_err();
555        assert_eq!(pool.reserved(), 30);
556
557        drop(a1);
558        assert_eq!(pool.reserved(), 0);
559
560        a2.try_grow(25).unwrap();
561        assert_eq!(pool.reserved(), 25);
562    }
563
564    #[test]
565    fn test_split() {
566        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
567        let r1 = MemoryConsumer::new("r1").register(&pool);
568
569        r1.try_grow(20).unwrap();
570        assert_eq!(r1.size(), 20);
571        assert_eq!(pool.reserved(), 20);
572
573        // take 5 from r1, should still have same reservation split
574        let r2 = r1.split(5);
575        assert_eq!(r1.size(), 15);
576        assert_eq!(r2.size(), 5);
577        assert_eq!(pool.reserved(), 20);
578
579        // dropping r1 frees 15 but retains 5 as they have the same consumer
580        drop(r1);
581        assert_eq!(r2.size(), 5);
582        assert_eq!(pool.reserved(), 5);
583    }
584
585    #[test]
586    fn test_new_empty() {
587        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
588        let r1 = MemoryConsumer::new("r1").register(&pool);
589
590        r1.try_grow(20).unwrap();
591        let r2 = r1.new_empty();
592        r2.try_grow(5).unwrap();
593
594        assert_eq!(r1.size(), 20);
595        assert_eq!(r2.size(), 5);
596        assert_eq!(pool.reserved(), 25);
597    }
598
599    #[test]
600    fn test_take() {
601        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
602        let mut r1 = MemoryConsumer::new("r1").register(&pool);
603
604        r1.try_grow(20).unwrap();
605        let r2 = r1.take();
606        r2.try_grow(5).unwrap();
607
608        assert_eq!(r1.size(), 0);
609        assert_eq!(r2.size(), 25);
610        assert_eq!(pool.reserved(), 25);
611
612        // r1 can still grow again
613        r1.try_grow(3).unwrap();
614        assert_eq!(r1.size(), 3);
615        assert_eq!(r2.size(), 25);
616        assert_eq!(pool.reserved(), 28);
617    }
618
619    #[test]
620    fn test_downcast() {
621        let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(50));
622
623        assert!(pool.is::<GreedyMemoryPool>());
624        assert!(!pool.is::<UnboundedMemoryPool>());
625
626        let greedy: &GreedyMemoryPool = pool.downcast_ref().unwrap();
627        assert_eq!(greedy.reserved(), 0);
628        assert!(pool.downcast_ref::<UnboundedMemoryPool>().is_none());
629    }
630
631    #[test]
632    fn test_try_shrink() {
633        let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
634        let r1 = MemoryConsumer::new("r1").register(&pool);
635
636        r1.try_grow(50).unwrap();
637        assert_eq!(r1.size(), 50);
638        assert_eq!(pool.reserved(), 50);
639
640        // Successful shrink returns new size and frees pool memory
641        let new_size = r1.try_shrink(30).unwrap();
642        assert_eq!(new_size, 20);
643        assert_eq!(r1.size(), 20);
644        assert_eq!(pool.reserved(), 20);
645
646        // Freed pool memory is now available to other consumers
647        let r2 = MemoryConsumer::new("r2").register(&pool);
648        r2.try_grow(80).unwrap();
649        assert_eq!(pool.reserved(), 100);
650
651        // Shrinking more than allocated fails without changing state
652        let err = r1.try_shrink(25);
653        assert!(err.is_err());
654        assert_eq!(r1.size(), 20);
655        assert_eq!(pool.reserved(), 100);
656
657        // Shrink to exactly zero
658        let new_size = r1.try_shrink(20).unwrap();
659        assert_eq!(new_size, 0);
660        assert_eq!(r1.size(), 0);
661        assert_eq!(pool.reserved(), 80);
662    }
663}