datafusion_execution/memory_pool/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{internal_err, Result};
22use std::hash::{Hash, Hasher};
23use std::{cmp::Ordering, sync::atomic, sync::Arc};
24
25mod pool;
26pub mod proxy {
27    pub use datafusion_common::utils::proxy::{
28        HashTableAllocExt, RawTableAllocExt, VecAllocExt,
29    };
30}
31
32pub use pool::*;
33
34/// Tracks and potentially limits memory use across operators during execution.
35///
36/// # Memory Management Overview
37///
38/// DataFusion is a streaming query engine, processing most queries without
39/// buffering the entire input. Most operators require a fixed amount of memory
40/// based on the schema and target batch size. However, certain operations such
41/// as sorting and grouping/joining, require buffering intermediate results,
42/// which can require memory proportional to the number of input rows.
43///
44/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
45/// Intermediate memory used as data streams through the system is not accounted
46/// (it assumed to be "small") but the large consumers of memory must register
47/// and constrain their use. This design trades off the additional code
48/// complexity of memory tracking with limiting resource usage.
49///
50/// When limiting memory with a `MemoryPool` you should typically reserve some
51/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
52///
53/// # Memory Management Design
54///
55/// As explained above, DataFusion's design ONLY limits operators that require
56/// "large" amounts of memory (proportional to number of input rows), such as
57/// `GroupByHashExec`. It does NOT track and limit memory used internally by
58/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
59/// between operators. Furthermore, operators should not reserve memory for the
60/// batches they produce. Instead, if a consumer operator needs to hold batches
61/// from its producers in memory for an extended period, it is the consumer
62/// operator's responsibility to reserve the necessary memory for those batches.
63///
64/// In order to avoid allocating memory until the OS or the container system
65/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
66/// large amounts of memory must first request their desired allocation from a
67/// [`MemoryPool`] before allocating more.  The request is typically managed via
68/// a  [`MemoryReservation`] and [`MemoryConsumer`].
69///
70/// If the allocation is successful, the operator should proceed and allocate
71/// the desired memory. If the allocation fails, the operator must either first
72/// free memory (e.g. by spilling to local disk) and try again, or error.
73///
74/// Note that a `MemoryPool` can be shared by concurrently executing plans,
75/// which can be used to control memory usage in a multi-tenant system.
76///
77/// # How MemoryPool works by example
78///
79/// Scenario 1:
80/// For `Filter` operator, `RecordBatch`es will stream through it, so it
81/// don't have to keep track of memory usage through [`MemoryPool`].
82///
83/// Scenario 2:
84/// For `CrossJoin` operator, if the input size gets larger, the intermediate
85/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
86/// limit the memory usage.
87/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
88/// additional memory. Memory pool updates the usage and returns success.
89/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
90/// again, memory pool does not have enough memory. Since `CrossJoin` operator
91/// has not implemented spilling, it will stop execution and return an error.
92///
93/// Scenario 3:
94/// For `Aggregate` operator, its intermediate states will also accumulate as
95/// the input size gets larger, but with spilling capability. When it tries to
96/// reserve more memory from the memory pool, and the memory pool has already
97/// reached the memory limit, it will return an error. Then, `Aggregate`
98/// operator will spill the intermediate buffers to disk, and release memory
99/// from the memory pool, and continue to retry memory reservation.
100///
101/// # Related Structs
102///
103/// To better understand memory management in DataFusion, here are the key structs
104/// and their relationships:
105///
106/// - [`MemoryConsumer`]: A named allocation traced by a particular operator. If an
107///   execution is parallelized, and there are multiple partitions of the same
108///   operator, each partition will have a separate `MemoryConsumer`.
109/// - `SharedRegistration`: A registration of a `MemoryConsumer` with a `MemoryPool`.
110///   `SharedRegistration` and `MemoryPool` have a many-to-one relationship. `MemoryPool`
111///   implementation can decide how to allocate memory based on the registered consumers.
112///   (e.g. `FairSpillPool` will try to share available memory evenly among all registered
113///   consumers)
114/// - [`MemoryReservation`]: Each `MemoryConsumer`/operator can have multiple
115///   `MemoryReservation`s for different internal data structures. The relationship
116///   between `MemoryConsumer` and `MemoryReservation` is one-to-many. This design
117///   enables cleaner operator implementations:
118///   - Different `MemoryReservation`s can be used for different purposes
119///   - `MemoryReservation` follows RAII principles - to release a reservation,
120///     simply drop the `MemoryReservation` object. When all `MemoryReservation`s
121///     for a `SharedRegistration` are dropped, the `SharedRegistration` is dropped
122///     when its reference count reaches zero, automatically unregistering the
123///     `MemoryConsumer` from the `MemoryPool`.
124///
125/// ## Relationship Diagram
126///
127/// ```text
128/// ┌──────────────────┐     ┌──────────────────┐
129/// │MemoryReservation │     │MemoryReservation │
130/// └───┬──────────────┘     └──────────────────┘ ......
131///     │belongs to                    │
132///     │      ┌───────────────────────┘           │  │
133///     │      │                                   │  │
134///     ▼      ▼                                   ▼  ▼
135/// ┌────────────────────────┐       ┌────────────────────────┐
136/// │   SharedRegistration   │       │   SharedRegistration   │
137/// │   ┌────────────────┐   │       │   ┌────────────────┐   │
138/// │   │                │   │       │   │                │   │
139/// │   │ MemoryConsumer │   │       │   │ MemoryConsumer │   │
140/// │   │                │   │       │   │                │   │
141/// │   └────────────────┘   │       │   └────────────────┘   │
142/// └────────────┬───────────┘       └────────────┬───────────┘
143///              │                                │
144///              │                        register│into
145///              │                                │
146///              └─────────────┐   ┌──────────────┘
147///                            │   │
148///                            ▼   ▼
149///    ╔═══════════════════════════════════════════════════╗
150///    ║                                                   ║
151///    ║                    MemoryPool                     ║
152///    ║                                                   ║
153///    ╚═══════════════════════════════════════════════════╝
154/// ```
155///
156/// For example, there are two parallel partitions of an operator X: each partition
157/// corresponds to a `MemoryConsumer` in the above diagram. Inside each partition of
158/// operator X, there are typically several `MemoryReservation`s - one for each
159/// internal data structure that needs memory tracking (e.g., 1 reservation for the hash
160/// table, and 1 reservation for buffered input, etc.).
161///
162/// # Implementing `MemoryPool`
163///
164/// You can implement a custom allocation policy by implementing the
165/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
166/// However, DataFusion comes with the following simple memory pool implementations that
167/// handle many common cases:
168///
169/// * [`UnboundedMemoryPool`]: no memory limits (the default)
170///
171/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
172///   come first served" policy
173///
174/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
175///   to all spilling operators fairly
176///
177/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
178///   providing better error messages on the largest memory users.
179pub trait MemoryPool: Send + Sync + std::fmt::Debug {
180    /// Registers a new [`MemoryConsumer`]
181    ///
182    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
183    fn register(&self, _consumer: &MemoryConsumer) {}
184
185    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
186    ///
187    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
188    fn unregister(&self, _consumer: &MemoryConsumer) {}
189
190    /// Infallibly grow the provided `reservation` by `additional` bytes
191    ///
192    /// This must always succeed
193    fn grow(&self, reservation: &MemoryReservation, additional: usize);
194
195    /// Infallibly shrink the provided `reservation` by `shrink` bytes
196    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
197
198    /// Attempt to grow the provided `reservation` by `additional` bytes
199    ///
200    /// On error the `allocation` will not be increased in size
201    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
202
203    /// Return the total amount of memory reserved
204    fn reserved(&self) -> usize;
205
206    /// Return the memory limit of the pool
207    ///
208    /// The default implementation of `MemoryPool::memory_limit`
209    /// will return `MemoryLimit::Unknown`.
210    /// If you are using your custom memory pool, but have the requirement to
211    /// know the memory usage limit of the pool, please implement this method
212    /// to return it(`Memory::Finite(limit)`).
213    fn memory_limit(&self) -> MemoryLimit {
214        MemoryLimit::Unknown
215    }
216}
217
218/// Memory limit of `MemoryPool`
219pub enum MemoryLimit {
220    Infinite,
221    /// Bounded memory limit in bytes.
222    Finite(usize),
223    Unknown,
224}
225
226/// A memory consumer is a named allocation traced by a particular
227/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
228/// a particular `MemoryConsumer`;
229///
230/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
231/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
232/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
233/// and is only guaranteed to share the name and inner properties.
234///
235/// For help with allocation accounting, see the [`proxy`] module.
236///
237/// [proxy]: datafusion_common::utils::proxy
238#[derive(Debug)]
239pub struct MemoryConsumer {
240    name: String,
241    can_spill: bool,
242    id: usize,
243}
244
245impl PartialEq for MemoryConsumer {
246    fn eq(&self, other: &Self) -> bool {
247        let is_same_id = self.id == other.id;
248
249        #[cfg(debug_assertions)]
250        if is_same_id {
251            assert_eq!(self.name, other.name);
252            assert_eq!(self.can_spill, other.can_spill);
253        }
254
255        is_same_id
256    }
257}
258
259impl Eq for MemoryConsumer {}
260
261impl Hash for MemoryConsumer {
262    fn hash<H: Hasher>(&self, state: &mut H) {
263        self.id.hash(state);
264        self.name.hash(state);
265        self.can_spill.hash(state);
266    }
267}
268
269impl MemoryConsumer {
270    fn new_unique_id() -> usize {
271        static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
272        ID.fetch_add(1, atomic::Ordering::Relaxed)
273    }
274
275    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
276    pub fn new(name: impl Into<String>) -> Self {
277        Self {
278            name: name.into(),
279            can_spill: false,
280            id: Self::new_unique_id(),
281        }
282    }
283
284    /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
285    /// which can be registered with a [`MemoryPool`],
286    /// This new consumer is separate from the original.
287    pub fn clone_with_new_id(&self) -> Self {
288        Self {
289            name: self.name.clone(),
290            can_spill: self.can_spill,
291            id: Self::new_unique_id(),
292        }
293    }
294
295    /// Return the unique id of this [`MemoryConsumer`]
296    pub fn id(&self) -> usize {
297        self.id
298    }
299
300    /// Set whether this allocation can be spilled to disk
301    pub fn with_can_spill(self, can_spill: bool) -> Self {
302        Self { can_spill, ..self }
303    }
304
305    /// Returns true if this allocation can spill to disk
306    pub fn can_spill(&self) -> bool {
307        self.can_spill
308    }
309
310    /// Returns the name associated with this allocation
311    pub fn name(&self) -> &str {
312        &self.name
313    }
314
315    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
316    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
317    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
318        pool.register(&self);
319        MemoryReservation {
320            registration: Arc::new(SharedRegistration {
321                pool: Arc::clone(pool),
322                consumer: self,
323            }),
324            size: 0,
325        }
326    }
327}
328
329/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
330///
331/// Calls [`MemoryPool::unregister`] on drop to return any memory to
332/// the underlying pool.
333#[derive(Debug)]
334struct SharedRegistration {
335    pool: Arc<dyn MemoryPool>,
336    consumer: MemoryConsumer,
337}
338
339impl Drop for SharedRegistration {
340    fn drop(&mut self) {
341        self.pool.unregister(&self.consumer);
342    }
343}
344
345/// A [`MemoryReservation`] tracks an individual reservation of a
346/// number of bytes of memory in a [`MemoryPool`] that is freed back
347/// to the pool on drop.
348///
349/// The reservation can be grown or shrunk over time.
350#[derive(Debug)]
351pub struct MemoryReservation {
352    registration: Arc<SharedRegistration>,
353    size: usize,
354}
355
356impl MemoryReservation {
357    /// Returns the size of this reservation in bytes
358    pub fn size(&self) -> usize {
359        self.size
360    }
361
362    /// Returns [MemoryConsumer] for this [MemoryReservation]
363    pub fn consumer(&self) -> &MemoryConsumer {
364        &self.registration.consumer
365    }
366
367    /// Frees all bytes from this reservation back to the underlying
368    /// pool, returning the number of bytes freed.
369    pub fn free(&mut self) -> usize {
370        let size = self.size;
371        if size != 0 {
372            self.shrink(size)
373        }
374        size
375    }
376
377    /// Frees `capacity` bytes from this reservation
378    ///
379    /// # Panics
380    ///
381    /// Panics if `capacity` exceeds [`Self::size`]
382    pub fn shrink(&mut self, capacity: usize) {
383        let new_size = self.size.checked_sub(capacity).unwrap();
384        self.registration.pool.shrink(self, capacity);
385        self.size = new_size
386    }
387
388    /// Tries to free `capacity` bytes from this reservation
389    /// if `capacity` does not exceed [`Self::size`]
390    /// Returns new reservation size
391    /// or error if shrinking capacity is more than allocated size
392    pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
393        if let Some(new_size) = self.size.checked_sub(capacity) {
394            self.registration.pool.shrink(self, capacity);
395            self.size = new_size;
396            Ok(new_size)
397        } else {
398            internal_err!(
399                "Cannot free the capacity {capacity} out of allocated size {}",
400                self.size
401            )
402        }
403    }
404
405    /// Sets the size of this reservation to `capacity`
406    pub fn resize(&mut self, capacity: usize) {
407        match capacity.cmp(&self.size) {
408            Ordering::Greater => self.grow(capacity - self.size),
409            Ordering::Less => self.shrink(self.size - capacity),
410            _ => {}
411        }
412    }
413
414    /// Try to set the size of this reservation to `capacity`
415    pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
416        match capacity.cmp(&self.size) {
417            Ordering::Greater => self.try_grow(capacity - self.size)?,
418            Ordering::Less => self.shrink(self.size - capacity),
419            _ => {}
420        };
421        Ok(())
422    }
423
424    /// Increase the size of this reservation by `capacity` bytes
425    pub fn grow(&mut self, capacity: usize) {
426        self.registration.pool.grow(self, capacity);
427        self.size += capacity;
428    }
429
430    /// Try to increase the size of this reservation by `capacity`
431    /// bytes, returning error if there is insufficient capacity left
432    /// in the pool.
433    pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
434        self.registration.pool.try_grow(self, capacity)?;
435        self.size += capacity;
436        Ok(())
437    }
438
439    /// Splits off `capacity` bytes from this [`MemoryReservation`]
440    /// into a new [`MemoryReservation`] with the same
441    /// [`MemoryConsumer`].
442    ///
443    /// This can be useful to free part of this reservation with RAAI
444    /// style dropping
445    ///
446    /// # Panics
447    ///
448    /// Panics if `capacity` exceeds [`Self::size`]
449    pub fn split(&mut self, capacity: usize) -> MemoryReservation {
450        self.size = self.size.checked_sub(capacity).unwrap();
451        Self {
452            size: capacity,
453            registration: Arc::clone(&self.registration),
454        }
455    }
456
457    /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
458    pub fn new_empty(&self) -> Self {
459        Self {
460            size: 0,
461            registration: Arc::clone(&self.registration),
462        }
463    }
464
465    /// Splits off all the bytes from this [`MemoryReservation`] into
466    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
467    pub fn take(&mut self) -> MemoryReservation {
468        self.split(self.size)
469    }
470}
471
472impl Drop for MemoryReservation {
473    fn drop(&mut self) {
474        self.free();
475    }
476}
477
478pub mod units {
479    pub const TB: u64 = 1 << 40;
480    pub const GB: u64 = 1 << 30;
481    pub const MB: u64 = 1 << 20;
482    pub const KB: u64 = 1 << 10;
483}
484
485/// Present size in human-readable form
486pub fn human_readable_size(size: usize) -> String {
487    use units::*;
488
489    let size = size as u64;
490    let (value, unit) = {
491        if size >= 2 * TB {
492            (size as f64 / TB as f64, "TB")
493        } else if size >= 2 * GB {
494            (size as f64 / GB as f64, "GB")
495        } else if size >= 2 * MB {
496            (size as f64 / MB as f64, "MB")
497        } else if size >= 2 * KB {
498            (size as f64 / KB as f64, "KB")
499        } else {
500            (size as f64, "B")
501        }
502    };
503    format!("{value:.1} {unit}")
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509
510    #[test]
511    fn test_id_uniqueness() {
512        let mut ids = std::collections::HashSet::new();
513        for _ in 0..100 {
514            let consumer = MemoryConsumer::new("test");
515            assert!(ids.insert(consumer.id())); // Ensures unique insertion
516        }
517    }
518
519    #[test]
520    fn test_memory_pool_underflow() {
521        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
522        let mut a1 = MemoryConsumer::new("a1").register(&pool);
523        assert_eq!(pool.reserved(), 0);
524
525        a1.grow(100);
526        assert_eq!(pool.reserved(), 100);
527
528        assert_eq!(a1.free(), 100);
529        assert_eq!(pool.reserved(), 0);
530
531        a1.try_grow(100).unwrap_err();
532        assert_eq!(pool.reserved(), 0);
533
534        a1.try_grow(30).unwrap();
535        assert_eq!(pool.reserved(), 30);
536
537        let mut a2 = MemoryConsumer::new("a2").register(&pool);
538        a2.try_grow(25).unwrap_err();
539        assert_eq!(pool.reserved(), 30);
540
541        drop(a1);
542        assert_eq!(pool.reserved(), 0);
543
544        a2.try_grow(25).unwrap();
545        assert_eq!(pool.reserved(), 25);
546    }
547
548    #[test]
549    fn test_split() {
550        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
551        let mut r1 = MemoryConsumer::new("r1").register(&pool);
552
553        r1.try_grow(20).unwrap();
554        assert_eq!(r1.size(), 20);
555        assert_eq!(pool.reserved(), 20);
556
557        // take 5 from r1, should still have same reservation split
558        let r2 = r1.split(5);
559        assert_eq!(r1.size(), 15);
560        assert_eq!(r2.size(), 5);
561        assert_eq!(pool.reserved(), 20);
562
563        // dropping r1 frees 15 but retains 5 as they have the same consumer
564        drop(r1);
565        assert_eq!(r2.size(), 5);
566        assert_eq!(pool.reserved(), 5);
567    }
568
569    #[test]
570    fn test_new_empty() {
571        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
572        let mut r1 = MemoryConsumer::new("r1").register(&pool);
573
574        r1.try_grow(20).unwrap();
575        let mut r2 = r1.new_empty();
576        r2.try_grow(5).unwrap();
577
578        assert_eq!(r1.size(), 20);
579        assert_eq!(r2.size(), 5);
580        assert_eq!(pool.reserved(), 25);
581    }
582
583    #[test]
584    fn test_take() {
585        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
586        let mut r1 = MemoryConsumer::new("r1").register(&pool);
587
588        r1.try_grow(20).unwrap();
589        let mut r2 = r1.take();
590        r2.try_grow(5).unwrap();
591
592        assert_eq!(r1.size(), 0);
593        assert_eq!(r2.size(), 25);
594        assert_eq!(pool.reserved(), 25);
595
596        // r1 can still grow again
597        r1.try_grow(3).unwrap();
598        assert_eq!(r1.size(), 3);
599        assert_eq!(r2.size(), 25);
600        assert_eq!(pool.reserved(), 28);
601    }
602}