Skip to main content

datafusion_execution/memory_pool/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{Result, internal_datafusion_err};
22use std::hash::{Hash, Hasher};
23use std::{cmp::Ordering, sync::Arc, sync::atomic};
24
25mod pool;
26
27#[cfg(feature = "arrow_buffer_pool")]
28pub mod arrow;
29
30pub mod proxy {
31    pub use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
32}
33
34pub use datafusion_common::{
35    human_readable_count, human_readable_duration, human_readable_size, units,
36};
37pub use pool::*;
38
39/// Tracks and potentially limits memory use across operators during execution.
40///
41/// # Memory Management Overview
42///
43/// DataFusion is a streaming query engine, processing most queries without
44/// buffering the entire input. Most operators require a fixed amount of memory
45/// based on the schema and target batch size. However, certain operations such
46/// as sorting and grouping/joining, require buffering intermediate results,
47/// which can require memory proportional to the number of input rows.
48///
49/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
50/// Intermediate memory used as data streams through the system is not accounted
51/// (it assumed to be "small") but the large consumers of memory must register
52/// and constrain their use. This design trades off the additional code
53/// complexity of memory tracking with limiting resource usage.
54///
55/// When limiting memory with a `MemoryPool` you should typically reserve some
56/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
57///
58/// # Memory Management Design
59///
60/// As explained above, DataFusion's design ONLY limits operators that require
61/// "large" amounts of memory (proportional to number of input rows), such as
62/// `GroupByHashExec`. It does NOT track and limit memory used internally by
63/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
64/// between operators. Furthermore, operators should not reserve memory for the
65/// batches they produce. Instead, if a consumer operator needs to hold batches
66/// from its producers in memory for an extended period, it is the consumer
67/// operator's responsibility to reserve the necessary memory for those batches.
68///
69/// In order to avoid allocating memory until the OS or the container system
70/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
71/// large amounts of memory must first request their desired allocation from a
72/// [`MemoryPool`] before allocating more.  The request is typically managed via
73/// a  [`MemoryReservation`] and [`MemoryConsumer`].
74///
75/// If the allocation is successful, the operator should proceed and allocate
76/// the desired memory. If the allocation fails, the operator must either first
77/// free memory (e.g. by spilling to local disk) and try again, or error.
78///
79/// Note that a `MemoryPool` can be shared by concurrently executing plans,
80/// which can be used to control memory usage in a multi-tenant system.
81///
82/// # How MemoryPool works by example
83///
84/// Scenario 1:
85/// For `Filter` operator, `RecordBatch`es will stream through it, so it
86/// don't have to keep track of memory usage through [`MemoryPool`].
87///
88/// Scenario 2:
89/// For `CrossJoin` operator, if the input size gets larger, the intermediate
90/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
91/// limit the memory usage.
92/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
93/// additional memory. Memory pool updates the usage and returns success.
94/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
95/// again, memory pool does not have enough memory. Since `CrossJoin` operator
96/// has not implemented spilling, it will stop execution and return an error.
97///
98/// Scenario 3:
99/// For `Aggregate` operator, its intermediate states will also accumulate as
100/// the input size gets larger, but with spilling capability. When it tries to
101/// reserve more memory from the memory pool, and the memory pool has already
102/// reached the memory limit, it will return an error. Then, `Aggregate`
103/// operator will spill the intermediate buffers to disk, and release memory
104/// from the memory pool, and continue to retry memory reservation.
105///
106/// # Related Structs
107///
108/// To better understand memory management in DataFusion, here are the key structs
109/// and their relationships:
110///
111/// - [`MemoryConsumer`]: A named allocation traced by a particular operator. If an
112///   execution is parallelized, and there are multiple partitions of the same
113///   operator, each partition will have a separate `MemoryConsumer`.
114/// - `SharedRegistration`: A registration of a `MemoryConsumer` with a `MemoryPool`.
115///   `SharedRegistration` and `MemoryPool` have a many-to-one relationship. `MemoryPool`
116///   implementation can decide how to allocate memory based on the registered consumers.
117///   (e.g. `FairSpillPool` will try to share available memory evenly among all registered
118///   consumers)
119/// - [`MemoryReservation`]: Each `MemoryConsumer`/operator can have multiple
120///   `MemoryReservation`s for different internal data structures. The relationship
121///   between `MemoryConsumer` and `MemoryReservation` is one-to-many. This design
122///   enables cleaner operator implementations:
123///   - Different `MemoryReservation`s can be used for different purposes
124///   - `MemoryReservation` follows RAII principles - to release a reservation,
125///     simply drop the `MemoryReservation` object. When all `MemoryReservation`s
126///     for a `SharedRegistration` are dropped, the `SharedRegistration` is dropped
127///     when its reference count reaches zero, automatically unregistering the
128///     `MemoryConsumer` from the `MemoryPool`.
129///
130/// ## Relationship Diagram
131///
132/// ```text
133/// ┌──────────────────┐     ┌──────────────────┐
134/// │MemoryReservation │     │MemoryReservation │
135/// └───┬──────────────┘     └──────────────────┘ ......
136///     │belongs to                    │
137///     │      ┌───────────────────────┘           │  │
138///     │      │                                   │  │
139///     ▼      ▼                                   ▼  ▼
140/// ┌────────────────────────┐       ┌────────────────────────┐
141/// │   SharedRegistration   │       │   SharedRegistration   │
142/// │   ┌────────────────┐   │       │   ┌────────────────┐   │
143/// │   │                │   │       │   │                │   │
144/// │   │ MemoryConsumer │   │       │   │ MemoryConsumer │   │
145/// │   │                │   │       │   │                │   │
146/// │   └────────────────┘   │       │   └────────────────┘   │
147/// └────────────┬───────────┘       └────────────┬───────────┘
148///              │                                │
149///              │                        register│into
150///              │                                │
151///              └─────────────┐   ┌──────────────┘
152///                            │   │
153///                            ▼   ▼
154///    ╔═══════════════════════════════════════════════════╗
155///    ║                                                   ║
156///    ║                    MemoryPool                     ║
157///    ║                                                   ║
158///    ╚═══════════════════════════════════════════════════╝
159/// ```
160///
161/// For example, there are two parallel partitions of an operator X: each partition
162/// corresponds to a `MemoryConsumer` in the above diagram. Inside each partition of
163/// operator X, there are typically several `MemoryReservation`s - one for each
164/// internal data structure that needs memory tracking (e.g., 1 reservation for the hash
165/// table, and 1 reservation for buffered input, etc.).
166///
167/// # Implementing `MemoryPool`
168///
169/// You can implement a custom allocation policy by implementing the
170/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
171/// However, DataFusion comes with the following simple memory pool implementations that
172/// handle many common cases:
173///
174/// * [`UnboundedMemoryPool`]: no memory limits (the default)
175///
176/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
177///   come first served" policy
178///
179/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
180///   to all spilling operators fairly
181///
182/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
183///   providing better error messages on the largest memory users.
184pub trait MemoryPool: Send + Sync + std::fmt::Debug {
185    /// Registers a new [`MemoryConsumer`]
186    ///
187    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
188    fn register(&self, _consumer: &MemoryConsumer) {}
189
190    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
191    ///
192    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
193    fn unregister(&self, _consumer: &MemoryConsumer) {}
194
195    /// Infallibly grow the provided `reservation` by `additional` bytes
196    ///
197    /// This must always succeed
198    fn grow(&self, reservation: &MemoryReservation, additional: usize);
199
200    /// Infallibly shrink the provided `reservation` by `shrink` bytes
201    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
202
203    /// Attempt to grow the provided `reservation` by `additional` bytes
204    ///
205    /// On error the `allocation` will not be increased in size
206    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
207
208    /// Return the total amount of memory reserved
209    fn reserved(&self) -> usize;
210
211    /// Return the memory limit of the pool
212    ///
213    /// The default implementation of `MemoryPool::memory_limit`
214    /// will return `MemoryLimit::Unknown`.
215    /// If you are using your custom memory pool, but have the requirement to
216    /// know the memory usage limit of the pool, please implement this method
217    /// to return it(`Memory::Finite(limit)`).
218    fn memory_limit(&self) -> MemoryLimit {
219        MemoryLimit::Unknown
220    }
221}
222
223/// Memory limit of `MemoryPool`
224pub enum MemoryLimit {
225    Infinite,
226    /// Bounded memory limit in bytes.
227    Finite(usize),
228    Unknown,
229}
230
231/// A memory consumer is a named allocation traced by a particular
232/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
233/// a particular `MemoryConsumer`;
234///
235/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
236/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
237/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
238/// and is only guaranteed to share the name and inner properties.
239///
240/// For help with allocation accounting, see the [`proxy`] module.
241///
242/// [proxy]: datafusion_common::utils::proxy
243#[derive(Debug)]
244pub struct MemoryConsumer {
245    name: String,
246    can_spill: bool,
247    id: usize,
248}
249
250impl PartialEq for MemoryConsumer {
251    fn eq(&self, other: &Self) -> bool {
252        let is_same_id = self.id == other.id;
253
254        #[cfg(debug_assertions)]
255        if is_same_id {
256            assert_eq!(self.name, other.name);
257            assert_eq!(self.can_spill, other.can_spill);
258        }
259
260        is_same_id
261    }
262}
263
264impl Eq for MemoryConsumer {}
265
266impl Hash for MemoryConsumer {
267    fn hash<H: Hasher>(&self, state: &mut H) {
268        self.id.hash(state);
269        self.name.hash(state);
270        self.can_spill.hash(state);
271    }
272}
273
274impl MemoryConsumer {
275    fn new_unique_id() -> usize {
276        static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
277        ID.fetch_add(1, atomic::Ordering::Relaxed)
278    }
279
280    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
281    pub fn new(name: impl Into<String>) -> Self {
282        Self {
283            name: name.into(),
284            can_spill: false,
285            id: Self::new_unique_id(),
286        }
287    }
288
289    /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
290    /// which can be registered with a [`MemoryPool`],
291    /// This new consumer is separate from the original.
292    pub fn clone_with_new_id(&self) -> Self {
293        Self {
294            name: self.name.clone(),
295            can_spill: self.can_spill,
296            id: Self::new_unique_id(),
297        }
298    }
299
300    /// Return the unique id of this [`MemoryConsumer`]
301    pub fn id(&self) -> usize {
302        self.id
303    }
304
305    /// Set whether this allocation can be spilled to disk
306    pub fn with_can_spill(self, can_spill: bool) -> Self {
307        Self { can_spill, ..self }
308    }
309
310    /// Returns true if this allocation can spill to disk
311    pub fn can_spill(&self) -> bool {
312        self.can_spill
313    }
314
315    /// Returns the name associated with this allocation
316    pub fn name(&self) -> &str {
317        &self.name
318    }
319
320    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
321    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
322    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
323        pool.register(&self);
324        MemoryReservation {
325            registration: Arc::new(SharedRegistration {
326                pool: Arc::clone(pool),
327                consumer: self,
328            }),
329            size: atomic::AtomicUsize::new(0),
330        }
331    }
332}
333
334/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
335///
336/// Calls [`MemoryPool::unregister`] on drop to return any memory to
337/// the underlying pool.
338#[derive(Debug)]
339struct SharedRegistration {
340    pool: Arc<dyn MemoryPool>,
341    consumer: MemoryConsumer,
342}
343
344impl Drop for SharedRegistration {
345    fn drop(&mut self) {
346        self.pool.unregister(&self.consumer);
347    }
348}
349
350/// A [`MemoryReservation`] tracks an individual reservation of a
351/// number of bytes of memory in a [`MemoryPool`] that is freed back
352/// to the pool on drop.
353///
354/// The reservation can be grown or shrunk over time.
355#[derive(Debug)]
356pub struct MemoryReservation {
357    registration: Arc<SharedRegistration>,
358    size: atomic::AtomicUsize,
359}
360
361impl MemoryReservation {
362    /// Returns the size of this reservation in bytes
363    pub fn size(&self) -> usize {
364        self.size.load(atomic::Ordering::Relaxed)
365    }
366
367    /// Returns [MemoryConsumer] for this [MemoryReservation]
368    pub fn consumer(&self) -> &MemoryConsumer {
369        &self.registration.consumer
370    }
371
372    /// Frees all bytes from this reservation back to the underlying
373    /// pool, returning the number of bytes freed.
374    pub fn free(&self) -> usize {
375        let size = self.size.swap(0, atomic::Ordering::Relaxed);
376        if size != 0 {
377            self.registration.pool.shrink(self, size);
378        }
379        size
380    }
381
382    /// Frees `capacity` bytes from this reservation
383    ///
384    /// # Panics
385    ///
386    /// Panics if `capacity` exceeds [`Self::size`]
387    pub fn shrink(&self, capacity: usize) {
388        self.size
389            .fetch_update(
390                atomic::Ordering::Relaxed,
391                atomic::Ordering::Relaxed,
392                |prev| prev.checked_sub(capacity),
393            )
394            .unwrap_or_else(|prev| {
395                panic!("Cannot free the capacity {capacity} out of allocated size {prev}")
396            });
397        self.registration.pool.shrink(self, capacity);
398    }
399
400    /// Tries to free `capacity` bytes from this reservation
401    /// if `capacity` does not exceed [`Self::size`].
402    /// Returns new reservation size,
403    /// or error if shrinking capacity is more than allocated size.
404    pub fn try_shrink(&self, capacity: usize) -> Result<usize> {
405        let prev = self
406            .size
407            .fetch_update(
408                atomic::Ordering::Relaxed,
409                atomic::Ordering::Relaxed,
410                |prev| prev.checked_sub(capacity),
411            )
412            .map_err(|prev| {
413                internal_datafusion_err!(
414                    "Cannot free the capacity {capacity} out of allocated size {prev}"
415                )
416            })?;
417
418        self.registration.pool.shrink(self, capacity);
419        Ok(prev - capacity)
420    }
421
422    /// Sets the size of this reservation to `capacity`
423    pub fn resize(&self, capacity: usize) {
424        let size = self.size.load(atomic::Ordering::Relaxed);
425        match capacity.cmp(&size) {
426            Ordering::Greater => self.grow(capacity - size),
427            Ordering::Less => self.shrink(size - capacity),
428            _ => {}
429        }
430    }
431
432    /// Try to set the size of this reservation to `capacity`
433    pub fn try_resize(&self, capacity: usize) -> Result<()> {
434        let size = self.size.load(atomic::Ordering::Relaxed);
435        match capacity.cmp(&size) {
436            Ordering::Greater => self.try_grow(capacity - size)?,
437            Ordering::Less => {
438                self.try_shrink(size - capacity)?;
439            }
440            _ => {}
441        };
442        Ok(())
443    }
444
445    /// Increase the size of this reservation by `capacity` bytes
446    pub fn grow(&self, capacity: usize) {
447        self.registration.pool.grow(self, capacity);
448        self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
449    }
450
451    /// Try to increase the size of this reservation by `capacity`
452    /// bytes, returning error if there is insufficient capacity left
453    /// in the pool.
454    pub fn try_grow(&self, capacity: usize) -> Result<()> {
455        self.registration.pool.try_grow(self, capacity)?;
456        self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
457        Ok(())
458    }
459
460    /// Splits off `capacity` bytes from this [`MemoryReservation`]
461    /// into a new [`MemoryReservation`] with the same
462    /// [`MemoryConsumer`].
463    ///
464    /// This can be useful to free part of this reservation with RAAI
465    /// style dropping
466    ///
467    /// # Panics
468    ///
469    /// Panics if `capacity` exceeds [`Self::size`]
470    pub fn split(&self, capacity: usize) -> MemoryReservation {
471        self.size
472            .fetch_update(
473                atomic::Ordering::Relaxed,
474                atomic::Ordering::Relaxed,
475                |prev| prev.checked_sub(capacity),
476            )
477            .unwrap();
478        Self {
479            size: atomic::AtomicUsize::new(capacity),
480            registration: Arc::clone(&self.registration),
481        }
482    }
483
484    /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
485    pub fn new_empty(&self) -> Self {
486        Self {
487            size: atomic::AtomicUsize::new(0),
488            registration: Arc::clone(&self.registration),
489        }
490    }
491
492    /// Splits off all the bytes from this [`MemoryReservation`] into
493    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
494    pub fn take(&mut self) -> MemoryReservation {
495        self.split(self.size.load(atomic::Ordering::Relaxed))
496    }
497}
498
499impl Drop for MemoryReservation {
500    fn drop(&mut self) {
501        self.free();
502    }
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_id_uniqueness() {
511        let mut ids = std::collections::HashSet::new();
512        for _ in 0..100 {
513            let consumer = MemoryConsumer::new("test");
514            assert!(ids.insert(consumer.id())); // Ensures unique insertion
515        }
516    }
517
518    #[test]
519    fn test_memory_pool_underflow() {
520        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
521        let a1 = MemoryConsumer::new("a1").register(&pool);
522        assert_eq!(pool.reserved(), 0);
523
524        a1.grow(100);
525        assert_eq!(pool.reserved(), 100);
526
527        assert_eq!(a1.free(), 100);
528        assert_eq!(pool.reserved(), 0);
529
530        a1.try_grow(100).unwrap_err();
531        assert_eq!(pool.reserved(), 0);
532
533        a1.try_grow(30).unwrap();
534        assert_eq!(pool.reserved(), 30);
535
536        let a2 = MemoryConsumer::new("a2").register(&pool);
537        a2.try_grow(25).unwrap_err();
538        assert_eq!(pool.reserved(), 30);
539
540        drop(a1);
541        assert_eq!(pool.reserved(), 0);
542
543        a2.try_grow(25).unwrap();
544        assert_eq!(pool.reserved(), 25);
545    }
546
547    #[test]
548    fn test_split() {
549        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
550        let r1 = MemoryConsumer::new("r1").register(&pool);
551
552        r1.try_grow(20).unwrap();
553        assert_eq!(r1.size(), 20);
554        assert_eq!(pool.reserved(), 20);
555
556        // take 5 from r1, should still have same reservation split
557        let r2 = r1.split(5);
558        assert_eq!(r1.size(), 15);
559        assert_eq!(r2.size(), 5);
560        assert_eq!(pool.reserved(), 20);
561
562        // dropping r1 frees 15 but retains 5 as they have the same consumer
563        drop(r1);
564        assert_eq!(r2.size(), 5);
565        assert_eq!(pool.reserved(), 5);
566    }
567
568    #[test]
569    fn test_new_empty() {
570        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
571        let r1 = MemoryConsumer::new("r1").register(&pool);
572
573        r1.try_grow(20).unwrap();
574        let r2 = r1.new_empty();
575        r2.try_grow(5).unwrap();
576
577        assert_eq!(r1.size(), 20);
578        assert_eq!(r2.size(), 5);
579        assert_eq!(pool.reserved(), 25);
580    }
581
582    #[test]
583    fn test_take() {
584        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
585        let mut r1 = MemoryConsumer::new("r1").register(&pool);
586
587        r1.try_grow(20).unwrap();
588        let r2 = r1.take();
589        r2.try_grow(5).unwrap();
590
591        assert_eq!(r1.size(), 0);
592        assert_eq!(r2.size(), 25);
593        assert_eq!(pool.reserved(), 25);
594
595        // r1 can still grow again
596        r1.try_grow(3).unwrap();
597        assert_eq!(r1.size(), 3);
598        assert_eq!(r2.size(), 25);
599        assert_eq!(pool.reserved(), 28);
600    }
601
602    #[test]
603    fn test_try_shrink() {
604        let pool = Arc::new(GreedyMemoryPool::new(100)) as _;
605        let r1 = MemoryConsumer::new("r1").register(&pool);
606
607        r1.try_grow(50).unwrap();
608        assert_eq!(r1.size(), 50);
609        assert_eq!(pool.reserved(), 50);
610
611        // Successful shrink returns new size and frees pool memory
612        let new_size = r1.try_shrink(30).unwrap();
613        assert_eq!(new_size, 20);
614        assert_eq!(r1.size(), 20);
615        assert_eq!(pool.reserved(), 20);
616
617        // Freed pool memory is now available to other consumers
618        let r2 = MemoryConsumer::new("r2").register(&pool);
619        r2.try_grow(80).unwrap();
620        assert_eq!(pool.reserved(), 100);
621
622        // Shrinking more than allocated fails without changing state
623        let err = r1.try_shrink(25);
624        assert!(err.is_err());
625        assert_eq!(r1.size(), 20);
626        assert_eq!(pool.reserved(), 100);
627
628        // Shrink to exactly zero
629        let new_size = r1.try_shrink(20).unwrap();
630        assert_eq!(new_size, 0);
631        assert_eq!(r1.size(), 0);
632        assert_eq!(pool.reserved(), 80);
633    }
634}