datafusion_execution/memory_pool/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{Result, internal_datafusion_err};
22use std::any::Any;
23use std::fmt::Display;
24use std::hash::{Hash, Hasher};
25use std::{cmp::Ordering, sync::Arc, sync::atomic};
26
27mod pool;
28
29#[cfg(feature = "arrow_buffer_pool")]
30pub mod arrow;
31
32pub mod proxy {
33 pub use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
34}
35
36pub use datafusion_common::{
37 human_readable_count, human_readable_duration, human_readable_size, units,
38};
39pub use pool::*;
40
41/// Tracks and potentially limits memory use across operators during execution.
42///
43/// # Memory Management Overview
44///
45/// DataFusion is a streaming query engine, processing most queries without
46/// buffering the entire input. Most operators require a fixed amount of memory
47/// based on the schema and target batch size. However, certain operations such
48/// as sorting and grouping/joining, require buffering intermediate results,
49/// which can require memory proportional to the number of input rows.
50///
51/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
52/// Intermediate memory used as data streams through the system is not accounted
53/// (it assumed to be "small") but the large consumers of memory must register
54/// and constrain their use. This design trades off the additional code
55/// complexity of memory tracking with limiting resource usage.
56///
57/// When limiting memory with a `MemoryPool` you should typically reserve some
58/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
59///
60/// # Memory Management Design
61///
62/// As explained above, DataFusion's design ONLY limits operators that require
63/// "large" amounts of memory (proportional to number of input rows), such as
64/// `GroupByHashExec`. It does NOT track and limit memory used internally by
65/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
66/// between operators. Furthermore, operators should not reserve memory for the
67/// batches they produce. Instead, if a consumer operator needs to hold batches
68/// from its producers in memory for an extended period, it is the consumer
69/// operator's responsibility to reserve the necessary memory for those batches.
70///
71/// In order to avoid allocating memory until the OS or the container system
72/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
73/// large amounts of memory must first request their desired allocation from a
74/// [`MemoryPool`] before allocating more. The request is typically managed via
75/// a [`MemoryReservation`] and [`MemoryConsumer`].
76///
77/// If the allocation is successful, the operator should proceed and allocate
78/// the desired memory. If the allocation fails, the operator must either first
79/// free memory (e.g. by spilling to local disk) and try again, or error.
80///
81/// Note that a `MemoryPool` can be shared by concurrently executing plans,
82/// which can be used to control memory usage in a multi-tenant system.
83///
84/// # How MemoryPool works by example
85///
86/// Scenario 1:
87/// For `Filter` operator, `RecordBatch`es will stream through it, so it
88/// don't have to keep track of memory usage through [`MemoryPool`].
89///
90/// Scenario 2:
91/// For `CrossJoin` operator, if the input size gets larger, the intermediate
92/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
93/// limit the memory usage.
94/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
95/// additional memory. Memory pool updates the usage and returns success.
96/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
97/// again, memory pool does not have enough memory. Since `CrossJoin` operator
98/// has not implemented spilling, it will stop execution and return an error.
99///
100/// Scenario 3:
101/// For `Aggregate` operator, its intermediate states will also accumulate as
102/// the input size gets larger, but with spilling capability. When it tries to
103/// reserve more memory from the memory pool, and the memory pool has already
104/// reached the memory limit, it will return an error. Then, `Aggregate`
105/// operator will spill the intermediate buffers to disk, and release memory
106/// from the memory pool, and continue to retry memory reservation.
107///
108/// # Related Structs
109///
110/// To better understand memory management in DataFusion, here are the key structs
111/// and their relationships:
112///
113/// - [`MemoryConsumer`]: A named allocation traced by a particular operator. If an
114/// execution is parallelized, and there are multiple partitions of the same
115/// operator, each partition will have a separate `MemoryConsumer`.
116/// - `SharedRegistration`: A registration of a `MemoryConsumer` with a `MemoryPool`.
117/// `SharedRegistration` and `MemoryPool` have a many-to-one relationship. `MemoryPool`
118/// implementation can decide how to allocate memory based on the registered consumers.
119/// (e.g. `FairSpillPool` will try to share available memory evenly among all registered
120/// consumers)
121/// - [`MemoryReservation`]: Each `MemoryConsumer`/operator can have multiple
122/// `MemoryReservation`s for different internal data structures. The relationship
123/// between `MemoryConsumer` and `MemoryReservation` is one-to-many. This design
124/// enables cleaner operator implementations:
125/// - Different `MemoryReservation`s can be used for different purposes
126/// - `MemoryReservation` follows RAII principles - to release a reservation,
127/// simply drop the `MemoryReservation` object. When all `MemoryReservation`s
128/// for a `SharedRegistration` are dropped, the `SharedRegistration` is dropped
129/// when its reference count reaches zero, automatically unregistering the
130/// `MemoryConsumer` from the `MemoryPool`.
131///
132/// ## Relationship Diagram
133///
134/// ```text
135/// ┌──────────────────┐ ┌──────────────────┐
136/// │MemoryReservation │ │MemoryReservation │
137/// └───┬──────────────┘ └──────────────────┘ ......
138/// │belongs to │
139/// │ ┌───────────────────────┘ │ │
140/// │ │ │ │
141/// ▼ ▼ ▼ ▼
142/// ┌────────────────────────┐ ┌────────────────────────┐
143/// │ SharedRegistration │ │ SharedRegistration │
144/// │ ┌────────────────┐ │ │ ┌────────────────┐ │
145/// │ │ │ │ │ │ │ │
146/// │ │ MemoryConsumer │ │ │ │ MemoryConsumer │ │
147/// │ │ │ │ │ │ │ │
148/// │ └────────────────┘ │ │ └────────────────┘ │
149/// └────────────┬───────────┘ └────────────┬───────────┘
150/// │ │
151/// │ register│into
152/// │ │
153/// └─────────────┐ ┌──────────────┘
154/// │ │
155/// ▼ ▼
156/// ╔═══════════════════════════════════════════════════╗
157/// ║ ║
158/// ║ MemoryPool ║
159/// ║ ║
160/// ╚═══════════════════════════════════════════════════╝
161/// ```
162///
163/// For example, there are two parallel partitions of an operator X: each partition
164/// corresponds to a `MemoryConsumer` in the above diagram. Inside each partition of
165/// operator X, there are typically several `MemoryReservation`s - one for each
166/// internal data structure that needs memory tracking (e.g., 1 reservation for the hash
167/// table, and 1 reservation for buffered input, etc.).
168///
169/// # Implementing `MemoryPool`
170///
171/// You can implement a custom allocation policy by implementing the
172/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
173/// However, DataFusion comes with the following simple memory pool implementations that
174/// handle many common cases:
175///
176/// * [`UnboundedMemoryPool`]: no memory limits (the default)
177///
178/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
179/// come first served" policy
180///
181/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
182/// to all spilling operators fairly
183///
184/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
185/// providing better error messages on the largest memory users.
186pub trait MemoryPool: Any + Send + Sync + std::fmt::Debug + Display {
187 /// Return pool name
188 fn name(&self) -> &str;
189
190 /// Registers a new [`MemoryConsumer`]
191 ///
192 /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
193 fn register(&self, _consumer: &MemoryConsumer) {}
194
195 /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
196 ///
197 /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
198 fn unregister(&self, _consumer: &MemoryConsumer) {}
199
200 /// Infallibly grow the provided `reservation` by `additional` bytes
201 ///
202 /// This must always succeed
203 fn grow(&self, reservation: &MemoryReservation, additional: usize);
204
205 /// Infallibly shrink the provided `reservation` by `shrink` bytes
206 fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
207
208 /// Attempt to grow the provided `reservation` by `additional` bytes
209 ///
210 /// On error the `allocation` will not be increased in size
211 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
212
213 /// Return the total amount of memory reserved
214 fn reserved(&self) -> usize;
215
216 /// Return the memory limit of the pool
217 ///
218 /// The default implementation of `MemoryPool::memory_limit`
219 /// will return `MemoryLimit::Unknown`.
220 /// If you are using your custom memory pool, but have the requirement to
221 /// know the memory usage limit of the pool, please implement this method
222 /// to return it(`Memory::Finite(limit)`).
223 fn memory_limit(&self) -> MemoryLimit {
224 MemoryLimit::Unknown
225 }
226}
227
228impl dyn MemoryPool {
229 /// Returns `true` if this pool is of type `T`.
230 pub fn is<T: MemoryPool>(&self) -> bool {
231 (self as &dyn Any).is::<T>()
232 }
233
234 /// Attempts to downcast this pool to a concrete type `T`.
235 pub fn downcast_ref<T: MemoryPool>(&self) -> Option<&T> {
236 (self as &dyn Any).downcast_ref()
237 }
238}
239
240/// Memory limit of `MemoryPool`
241pub enum MemoryLimit {
242 Infinite,
243 /// Bounded memory limit in bytes.
244 Finite(usize),
245 Unknown,
246}
247
248/// A memory consumer is a named allocation traced by a particular
249/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
250/// a particular `MemoryConsumer`;
251///
252/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefore not cloneable,
253/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
254/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
255/// and is only guaranteed to share the name and inner properties.
256///
257/// For help with allocation accounting, see the [`proxy`] module.
258///
259/// [proxy]: datafusion_common::utils::proxy
260#[derive(Debug)]
261pub struct MemoryConsumer {
262 name: String,
263 can_spill: bool,
264 id: usize,
265}
266
267impl PartialEq for MemoryConsumer {
268 fn eq(&self, other: &Self) -> bool {
269 let is_same_id = self.id == other.id;
270
271 #[cfg(debug_assertions)]
272 if is_same_id {
273 assert_eq!(self.name, other.name);
274 assert_eq!(self.can_spill, other.can_spill);
275 }
276
277 is_same_id
278 }
279}
280
281impl Eq for MemoryConsumer {}
282
283impl Hash for MemoryConsumer {
284 fn hash<H: Hasher>(&self, state: &mut H) {
285 self.id.hash(state);
286 self.name.hash(state);
287 self.can_spill.hash(state);
288 }
289}
290
291impl MemoryConsumer {
292 fn new_unique_id() -> usize {
293 static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
294 ID.fetch_add(1, atomic::Ordering::Relaxed)
295 }
296
297 /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
298 pub fn new(name: impl Into<String>) -> Self {
299 Self {
300 name: name.into(),
301 can_spill: false,
302 id: Self::new_unique_id(),
303 }
304 }
305
306 /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
307 /// which can be registered with a [`MemoryPool`],
308 /// This new consumer is separate from the original.
309 pub fn clone_with_new_id(&self) -> Self {
310 Self {
311 name: self.name.clone(),
312 can_spill: self.can_spill,
313 id: Self::new_unique_id(),
314 }
315 }
316
317 /// Return the unique id of this [`MemoryConsumer`]
318 pub fn id(&self) -> usize {
319 self.id
320 }
321
322 /// Set whether this allocation can be spilled to disk
323 pub fn with_can_spill(self, can_spill: bool) -> Self {
324 Self { can_spill, ..self }
325 }
326
327 /// Returns true if this allocation can spill to disk
328 pub fn can_spill(&self) -> bool {
329 self.can_spill
330 }
331
332 /// Returns the name associated with this allocation
333 pub fn name(&self) -> &str {
334 &self.name
335 }
336
337 /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
338 /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
339 pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
340 pool.register(&self);
341 MemoryReservation {
342 registration: Arc::new(SharedRegistration {
343 pool: Arc::clone(pool),
344 consumer: self,
345 }),
346 size: atomic::AtomicUsize::new(0),
347 }
348 }
349}
350
351/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
352///
353/// Calls [`MemoryPool::unregister`] on drop to return any memory to
354/// the underlying pool.
355#[derive(Debug)]
356struct SharedRegistration {
357 pool: Arc<dyn MemoryPool>,
358 consumer: MemoryConsumer,
359}
360
361impl Drop for SharedRegistration {
362 fn drop(&mut self) {
363 self.pool.unregister(&self.consumer);
364 }
365}
366
367/// A [`MemoryReservation`] tracks an individual reservation of a
368/// number of bytes of memory in a [`MemoryPool`] that is freed back
369/// to the pool on drop.
370///
371/// The reservation can be grown or shrunk over time.
372#[derive(Debug)]
373pub struct MemoryReservation {
374 registration: Arc<SharedRegistration>,
375 size: atomic::AtomicUsize,
376}
377
378impl MemoryReservation {
379 /// Returns the size of this reservation in bytes
380 pub fn size(&self) -> usize {
381 self.size.load(atomic::Ordering::Relaxed)
382 }
383
384 /// Returns [MemoryConsumer] for this [MemoryReservation]
385 pub fn consumer(&self) -> &MemoryConsumer {
386 &self.registration.consumer
387 }
388
389 /// Frees all bytes from this reservation back to the underlying
390 /// pool, returning the number of bytes freed.
391 pub fn free(&self) -> usize {
392 let size = self.size.swap(0, atomic::Ordering::Relaxed);
393 if size != 0 {
394 self.registration.pool.shrink(self, size);
395 }
396 size
397 }
398
399 /// Frees `capacity` bytes from this reservation
400 ///
401 /// # Panics
402 ///
403 /// Panics if `capacity` exceeds [`Self::size`]
404 pub fn shrink(&self, capacity: usize) {
405 self.size
406 .fetch_update(
407 atomic::Ordering::Relaxed,
408 atomic::Ordering::Relaxed,
409 |prev| prev.checked_sub(capacity),
410 )
411 .unwrap_or_else(|prev| {
412 panic!("Cannot free the capacity {capacity} out of allocated size {prev}")
413 });
414 self.registration.pool.shrink(self, capacity);
415 }
416
417 /// Tries to free `capacity` bytes from this reservation
418 /// if `capacity` does not exceed [`Self::size`].
419 /// Returns new reservation size,
420 /// or error if shrinking capacity is more than allocated size.
421 pub fn try_shrink(&self, capacity: usize) -> Result<usize> {
422 let prev = self
423 .size
424 .fetch_update(
425 atomic::Ordering::Relaxed,
426 atomic::Ordering::Relaxed,
427 |prev| prev.checked_sub(capacity),
428 )
429 .map_err(|prev| {
430 internal_datafusion_err!(
431 "Cannot free the capacity {capacity} out of allocated size {prev}"
432 )
433 })?;
434
435 self.registration.pool.shrink(self, capacity);
436 Ok(prev - capacity)
437 }
438
439 /// Sets the size of this reservation to `capacity`
440 pub fn resize(&self, capacity: usize) {
441 let size = self.size.load(atomic::Ordering::Relaxed);
442 match capacity.cmp(&size) {
443 Ordering::Greater => self.grow(capacity - size),
444 Ordering::Less => self.shrink(size - capacity),
445 _ => {}
446 }
447 }
448
449 /// Try to set the size of this reservation to `capacity`
450 pub fn try_resize(&self, capacity: usize) -> Result<()> {
451 let size = self.size.load(atomic::Ordering::Relaxed);
452 match capacity.cmp(&size) {
453 Ordering::Greater => self.try_grow(capacity - size)?,
454 Ordering::Less => {
455 self.try_shrink(size - capacity)?;
456 }
457 _ => {}
458 };
459 Ok(())
460 }
461
462 /// Increase the size of this reservation by `capacity` bytes
463 pub fn grow(&self, capacity: usize) {
464 self.registration.pool.grow(self, capacity);
465 self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
466 }
467
468 /// Try to increase the size of this reservation by `capacity`
469 /// bytes, returning error if there is insufficient capacity left
470 /// in the pool.
471 pub fn try_grow(&self, capacity: usize) -> Result<()> {
472 self.registration.pool.try_grow(self, capacity)?;
473 self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
474 Ok(())
475 }
476
477 /// Splits off `capacity` bytes from this [`MemoryReservation`]
478 /// into a new [`MemoryReservation`] with the same
479 /// [`MemoryConsumer`].
480 ///
481 /// This can be useful to free part of this reservation with RAAI
482 /// style dropping
483 ///
484 /// # Panics
485 ///
486 /// Panics if `capacity` exceeds [`Self::size`]
487 pub fn split(&self, capacity: usize) -> MemoryReservation {
488 self.size
489 .fetch_update(
490 atomic::Ordering::Relaxed,
491 atomic::Ordering::Relaxed,
492 |prev| prev.checked_sub(capacity),
493 )
494 .unwrap();
495 Self {
496 size: atomic::AtomicUsize::new(capacity),
497 registration: Arc::clone(&self.registration),
498 }
499 }
500
501 /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
502 pub fn new_empty(&self) -> Self {
503 Self {
504 size: atomic::AtomicUsize::new(0),
505 registration: Arc::clone(&self.registration),
506 }
507 }
508
509 /// Splits off all the bytes from this [`MemoryReservation`] into
510 /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
511 pub fn take(&mut self) -> MemoryReservation {
512 self.split(self.size.load(atomic::Ordering::Relaxed))
513 }
514}
515
516impl Drop for MemoryReservation {
517 fn drop(&mut self) {
518 self.free();
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use super::*;
525
526 #[test]
527 fn test_id_uniqueness() {
528 let mut ids = std::collections::HashSet::new();
529 for _ in 0..100 {
530 let consumer = MemoryConsumer::new("test");
531 assert!(ids.insert(consumer.id())); // Ensures unique insertion
532 }
533 }
534
535 #[test]
536 fn test_memory_pool_underflow() {
537 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
538 let a1 = MemoryConsumer::new("a1").register(&pool);
539 assert_eq!(pool.reserved(), 0);
540
541 a1.grow(100);
542 assert_eq!(pool.reserved(), 100);
543
544 assert_eq!(a1.free(), 100);
545 assert_eq!(pool.reserved(), 0);
546
547 a1.try_grow(100).unwrap_err();
548 assert_eq!(pool.reserved(), 0);
549
550 a1.try_grow(30).unwrap();
551 assert_eq!(pool.reserved(), 30);
552
553 let a2 = MemoryConsumer::new("a2").register(&pool);
554 a2.try_grow(25).unwrap_err();
555 assert_eq!(pool.reserved(), 30);
556
557 drop(a1);
558 assert_eq!(pool.reserved(), 0);
559
560 a2.try_grow(25).unwrap();
561 assert_eq!(pool.reserved(), 25);
562 }
563
564 #[test]
565 fn test_split() {
566 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
567 let r1 = MemoryConsumer::new("r1").register(&pool);
568
569 r1.try_grow(20).unwrap();
570 assert_eq!(r1.size(), 20);
571 assert_eq!(pool.reserved(), 20);
572
573 // take 5 from r1, should still have same reservation split
574 let r2 = r1.split(5);
575 assert_eq!(r1.size(), 15);
576 assert_eq!(r2.size(), 5);
577 assert_eq!(pool.reserved(), 20);
578
579 // dropping r1 frees 15 but retains 5 as they have the same consumer
580 drop(r1);
581 assert_eq!(r2.size(), 5);
582 assert_eq!(pool.reserved(), 5);
583 }
584
585 #[test]
586 fn test_new_empty() {
587 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
588 let r1 = MemoryConsumer::new("r1").register(&pool);
589
590 r1.try_grow(20).unwrap();
591 let r2 = r1.new_empty();
592 r2.try_grow(5).unwrap();
593
594 assert_eq!(r1.size(), 20);
595 assert_eq!(r2.size(), 5);
596 assert_eq!(pool.reserved(), 25);
597 }
598
599 #[test]
600 fn test_take() {
601 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
602 let mut r1 = MemoryConsumer::new("r1").register(&pool);
603
604 r1.try_grow(20).unwrap();
605 let r2 = r1.take();
606 r2.try_grow(5).unwrap();
607
608 assert_eq!(r1.size(), 0);
609 assert_eq!(r2.size(), 25);
610 assert_eq!(pool.reserved(), 25);
611
612 // r1 can still grow again
613 r1.try_grow(3).unwrap();
614 assert_eq!(r1.size(), 3);
615 assert_eq!(r2.size(), 25);
616 assert_eq!(pool.reserved(), 28);
617 }
618
619 #[test]
620 fn test_downcast() {
621 let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(50));
622
623 assert!(pool.is::<GreedyMemoryPool>());
624 assert!(!pool.is::<UnboundedMemoryPool>());
625
626 let greedy: &GreedyMemoryPool = pool.downcast_ref().unwrap();
627 assert_eq!(greedy.reserved(), 0);
628 assert!(pool.downcast_ref::<UnboundedMemoryPool>().is_none());
629 }
630
631 #[test]
632 fn test_try_shrink() {
633 let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
634 let r1 = MemoryConsumer::new("r1").register(&pool);
635
636 r1.try_grow(50).unwrap();
637 assert_eq!(r1.size(), 50);
638 assert_eq!(pool.reserved(), 50);
639
640 // Successful shrink returns new size and frees pool memory
641 let new_size = r1.try_shrink(30).unwrap();
642 assert_eq!(new_size, 20);
643 assert_eq!(r1.size(), 20);
644 assert_eq!(pool.reserved(), 20);
645
646 // Freed pool memory is now available to other consumers
647 let r2 = MemoryConsumer::new("r2").register(&pool);
648 r2.try_grow(80).unwrap();
649 assert_eq!(pool.reserved(), 100);
650
651 // Shrinking more than allocated fails without changing state
652 let err = r1.try_shrink(25);
653 assert!(err.is_err());
654 assert_eq!(r1.size(), 20);
655 assert_eq!(pool.reserved(), 100);
656
657 // Shrink to exactly zero
658 let new_size = r1.try_shrink(20).unwrap();
659 assert_eq!(new_size, 0);
660 assert_eq!(r1.size(), 0);
661 assert_eq!(pool.reserved(), 80);
662 }
663}