datafusion_execution/memory_pool/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{internal_err, Result};
22use std::hash::{Hash, Hasher};
23use std::{cmp::Ordering, sync::atomic, sync::Arc};
24
25mod pool;
26pub mod proxy {
27    pub use datafusion_common::utils::proxy::{
28        HashTableAllocExt, RawTableAllocExt, VecAllocExt,
29    };
30}
31
32pub use pool::*;
33
34/// Tracks and potentially limits memory use across operators during execution.
35///
36/// # Memory Management Overview
37///
38/// DataFusion is a streaming query engine, processing most queries without
39/// buffering the entire input. Most operators require a fixed amount of memory
40/// based on the schema and target batch size. However, certain operations such
41/// as sorting and grouping/joining, require buffering intermediate results,
42/// which can require memory proportional to the number of input rows.
43///
44/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
45/// Intermediate memory used as data streams through the system is not accounted
46/// (it assumed to be "small") but the large consumers of memory must register
47/// and constrain their use. This design trades off the additional code
48/// complexity of memory tracking with limiting resource usage.
49///
50/// When limiting memory with a `MemoryPool` you should typically reserve some
51/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
52///
53/// # Memory Management Design
54///
55/// As explained above, DataFusion's design ONLY limits operators that require
56/// "large" amounts of memory (proportional to number of input rows), such as
57/// `GroupByHashExec`. It does NOT track and limit memory used internally by
58/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
59/// between operators. Furthermore, operators should not reserve memory for the
60/// batches they produce. Instead, if a parent operator needs to hold batches
61/// from its children in memory for an extended period, it is the parent
62/// operator's responsibility to reserve the necessary memory for those batches.
63///
64/// In order to avoid allocating memory until the OS or the container system
65/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
66/// large amounts of memory must first request their desired allocation from a
67/// [`MemoryPool`] before allocating more.  The request is typically managed via
68/// a  [`MemoryReservation`] and [`MemoryConsumer`].
69///
70/// If the allocation is successful, the operator should proceed and allocate
71/// the desired memory. If the allocation fails, the operator must either first
72/// free memory (e.g. by spilling to local disk) and try again, or error.
73///
74/// Note that a `MemoryPool` can be shared by concurrently executing plans,
75/// which can be used to control memory usage in a multi-tenant system.
76///
77/// # How MemoryPool works by example
78///
79/// Scenario 1:
80/// For `Filter` operator, `RecordBatch`es will stream through it, so it
81/// don't have to keep track of memory usage through [`MemoryPool`].
82///
83/// Scenario 2:
84/// For `CrossJoin` operator, if the input size gets larger, the intermediate
85/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
86/// limit the memory usage.
87/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
88/// additional memory. Memory pool updates the usage and returns success.
89/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
90/// again, memory pool does not have enough memory. Since `CrossJoin` operator
91/// has not implemented spilling, it will stop execution and return an error.
92///
93/// Scenario 3:
94/// For `Aggregate` operator, its intermediate states will also accumulate as
95/// the input size gets larger, but with spilling capability. When it tries to
96/// reserve more memory from the memory pool, and the memory pool has already
97/// reached the memory limit, it will return an error. Then, `Aggregate`
98/// operator will spill the intermediate buffers to disk, and release memory
99/// from the memory pool, and continue to retry memory reservation.
100///
101/// # Implementing `MemoryPool`
102///
103/// You can implement a custom allocation policy by implementing the
104/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
105/// However, DataFusion comes with the following simple memory pool implementations that
106/// handle many common cases:
107///
108/// * [`UnboundedMemoryPool`]: no memory limits (the default)
109///
110/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
111///   come first served" policy
112///
113/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
114///   to all spilling operators fairly
115///
116/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
117///   providing better error messages on the largest memory users.
118pub trait MemoryPool: Send + Sync + std::fmt::Debug {
119    /// Registers a new [`MemoryConsumer`]
120    ///
121    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
122    fn register(&self, _consumer: &MemoryConsumer) {}
123
124    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
125    ///
126    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
127    fn unregister(&self, _consumer: &MemoryConsumer) {}
128
129    /// Infallibly grow the provided `reservation` by `additional` bytes
130    ///
131    /// This must always succeed
132    fn grow(&self, reservation: &MemoryReservation, additional: usize);
133
134    /// Infallibly shrink the provided `reservation` by `shrink` bytes
135    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
136
137    /// Attempt to grow the provided `reservation` by `additional` bytes
138    ///
139    /// On error the `allocation` will not be increased in size
140    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
141
142    /// Return the total amount of memory reserved
143    fn reserved(&self) -> usize;
144}
145
146/// A memory consumer is a named allocation traced by a particular
147/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
148/// a particular `MemoryConsumer`;
149///
150/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
151/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
152/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
153/// and is only guaranteed to share the name and inner properties.
154///
155/// For help with allocation accounting, see the [`proxy`] module.
156///
157/// [proxy]: datafusion_common::utils::proxy
158#[derive(Debug)]
159pub struct MemoryConsumer {
160    name: String,
161    can_spill: bool,
162    id: usize,
163}
164
165impl PartialEq for MemoryConsumer {
166    fn eq(&self, other: &Self) -> bool {
167        let is_same_id = self.id == other.id;
168
169        #[cfg(debug_assertions)]
170        if is_same_id {
171            assert_eq!(self.name, other.name);
172            assert_eq!(self.can_spill, other.can_spill);
173        }
174
175        is_same_id
176    }
177}
178
179impl Eq for MemoryConsumer {}
180
181impl Hash for MemoryConsumer {
182    fn hash<H: Hasher>(&self, state: &mut H) {
183        self.id.hash(state);
184        self.name.hash(state);
185        self.can_spill.hash(state);
186    }
187}
188
189impl MemoryConsumer {
190    fn new_unique_id() -> usize {
191        static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
192        ID.fetch_add(1, atomic::Ordering::Relaxed)
193    }
194
195    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
196    pub fn new(name: impl Into<String>) -> Self {
197        Self {
198            name: name.into(),
199            can_spill: false,
200            id: Self::new_unique_id(),
201        }
202    }
203
204    /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
205    /// which can be registered with a [`MemoryPool`],
206    /// This new consumer is separate from the original.
207    pub fn clone_with_new_id(&self) -> Self {
208        Self {
209            name: self.name.clone(),
210            can_spill: self.can_spill,
211            id: Self::new_unique_id(),
212        }
213    }
214
215    /// Return the unique id of this [`MemoryConsumer`]
216    pub fn id(&self) -> usize {
217        self.id
218    }
219
220    /// Set whether this allocation can be spilled to disk
221    pub fn with_can_spill(self, can_spill: bool) -> Self {
222        Self { can_spill, ..self }
223    }
224
225    /// Returns true if this allocation can spill to disk
226    pub fn can_spill(&self) -> bool {
227        self.can_spill
228    }
229
230    /// Returns the name associated with this allocation
231    pub fn name(&self) -> &str {
232        &self.name
233    }
234
235    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
236    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
237    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
238        pool.register(&self);
239        MemoryReservation {
240            registration: Arc::new(SharedRegistration {
241                pool: Arc::clone(pool),
242                consumer: self,
243            }),
244            size: 0,
245        }
246    }
247}
248
249/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
250///
251/// Calls [`MemoryPool::unregister`] on drop to return any memory to
252/// the underlying pool.
253#[derive(Debug)]
254struct SharedRegistration {
255    pool: Arc<dyn MemoryPool>,
256    consumer: MemoryConsumer,
257}
258
259impl Drop for SharedRegistration {
260    fn drop(&mut self) {
261        self.pool.unregister(&self.consumer);
262    }
263}
264
265/// A [`MemoryReservation`] tracks an individual reservation of a
266/// number of bytes of memory in a [`MemoryPool`] that is freed back
267/// to the pool on drop.
268///
269/// The reservation can be grown or shrunk over time.
270#[derive(Debug)]
271pub struct MemoryReservation {
272    registration: Arc<SharedRegistration>,
273    size: usize,
274}
275
276impl MemoryReservation {
277    /// Returns the size of this reservation in bytes
278    pub fn size(&self) -> usize {
279        self.size
280    }
281
282    /// Returns [MemoryConsumer] for this [MemoryReservation]
283    pub fn consumer(&self) -> &MemoryConsumer {
284        &self.registration.consumer
285    }
286
287    /// Frees all bytes from this reservation back to the underlying
288    /// pool, returning the number of bytes freed.
289    pub fn free(&mut self) -> usize {
290        let size = self.size;
291        if size != 0 {
292            self.shrink(size)
293        }
294        size
295    }
296
297    /// Frees `capacity` bytes from this reservation
298    ///
299    /// # Panics
300    ///
301    /// Panics if `capacity` exceeds [`Self::size`]
302    pub fn shrink(&mut self, capacity: usize) {
303        let new_size = self.size.checked_sub(capacity).unwrap();
304        self.registration.pool.shrink(self, capacity);
305        self.size = new_size
306    }
307
308    /// Tries to free `capacity` bytes from this reservation
309    /// if `capacity` does not exceed [`Self::size`]
310    /// Returns new reservation size
311    /// or error if shrinking capacity is more than allocated size
312    pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
313        if let Some(new_size) = self.size.checked_sub(capacity) {
314            self.registration.pool.shrink(self, capacity);
315            self.size = new_size;
316            Ok(new_size)
317        } else {
318            internal_err!(
319                "Cannot free the capacity {capacity} out of allocated size {}",
320                self.size
321            )
322        }
323    }
324
325    /// Sets the size of this reservation to `capacity`
326    pub fn resize(&mut self, capacity: usize) {
327        match capacity.cmp(&self.size) {
328            Ordering::Greater => self.grow(capacity - self.size),
329            Ordering::Less => self.shrink(self.size - capacity),
330            _ => {}
331        }
332    }
333
334    /// Try to set the size of this reservation to `capacity`
335    pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
336        match capacity.cmp(&self.size) {
337            Ordering::Greater => self.try_grow(capacity - self.size)?,
338            Ordering::Less => self.shrink(self.size - capacity),
339            _ => {}
340        };
341        Ok(())
342    }
343
344    /// Increase the size of this reservation by `capacity` bytes
345    pub fn grow(&mut self, capacity: usize) {
346        self.registration.pool.grow(self, capacity);
347        self.size += capacity;
348    }
349
350    /// Try to increase the size of this reservation by `capacity`
351    /// bytes, returning error if there is insufficient capacity left
352    /// in the pool.
353    pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
354        self.registration.pool.try_grow(self, capacity)?;
355        self.size += capacity;
356        Ok(())
357    }
358
359    /// Splits off `capacity` bytes from this [`MemoryReservation`]
360    /// into a new [`MemoryReservation`] with the same
361    /// [`MemoryConsumer`].
362    ///
363    /// This can be useful to free part of this reservation with RAAI
364    /// style dropping
365    ///
366    /// # Panics
367    ///
368    /// Panics if `capacity` exceeds [`Self::size`]
369    pub fn split(&mut self, capacity: usize) -> MemoryReservation {
370        self.size = self.size.checked_sub(capacity).unwrap();
371        Self {
372            size: capacity,
373            registration: Arc::clone(&self.registration),
374        }
375    }
376
377    /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
378    pub fn new_empty(&self) -> Self {
379        Self {
380            size: 0,
381            registration: Arc::clone(&self.registration),
382        }
383    }
384
385    /// Splits off all the bytes from this [`MemoryReservation`] into
386    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
387    pub fn take(&mut self) -> MemoryReservation {
388        self.split(self.size)
389    }
390}
391
392impl Drop for MemoryReservation {
393    fn drop(&mut self) {
394        self.free();
395    }
396}
397
398pub mod units {
399    pub const TB: u64 = 1 << 40;
400    pub const GB: u64 = 1 << 30;
401    pub const MB: u64 = 1 << 20;
402    pub const KB: u64 = 1 << 10;
403}
404
405/// Present size in human-readable form
406pub fn human_readable_size(size: usize) -> String {
407    use units::*;
408
409    let size = size as u64;
410    let (value, unit) = {
411        if size >= 2 * TB {
412            (size as f64 / TB as f64, "TB")
413        } else if size >= 2 * GB {
414            (size as f64 / GB as f64, "GB")
415        } else if size >= 2 * MB {
416            (size as f64 / MB as f64, "MB")
417        } else if size >= 2 * KB {
418            (size as f64 / KB as f64, "KB")
419        } else {
420            (size as f64, "B")
421        }
422    };
423    format!("{value:.1} {unit}")
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    #[test]
431    fn test_id_uniqueness() {
432        let mut ids = std::collections::HashSet::new();
433        for _ in 0..100 {
434            let consumer = MemoryConsumer::new("test");
435            assert!(ids.insert(consumer.id())); // Ensures unique insertion
436        }
437    }
438
439    #[test]
440    fn test_memory_pool_underflow() {
441        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
442        let mut a1 = MemoryConsumer::new("a1").register(&pool);
443        assert_eq!(pool.reserved(), 0);
444
445        a1.grow(100);
446        assert_eq!(pool.reserved(), 100);
447
448        assert_eq!(a1.free(), 100);
449        assert_eq!(pool.reserved(), 0);
450
451        a1.try_grow(100).unwrap_err();
452        assert_eq!(pool.reserved(), 0);
453
454        a1.try_grow(30).unwrap();
455        assert_eq!(pool.reserved(), 30);
456
457        let mut a2 = MemoryConsumer::new("a2").register(&pool);
458        a2.try_grow(25).unwrap_err();
459        assert_eq!(pool.reserved(), 30);
460
461        drop(a1);
462        assert_eq!(pool.reserved(), 0);
463
464        a2.try_grow(25).unwrap();
465        assert_eq!(pool.reserved(), 25);
466    }
467
468    #[test]
469    fn test_split() {
470        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
471        let mut r1 = MemoryConsumer::new("r1").register(&pool);
472
473        r1.try_grow(20).unwrap();
474        assert_eq!(r1.size(), 20);
475        assert_eq!(pool.reserved(), 20);
476
477        // take 5 from r1, should still have same reservation split
478        let r2 = r1.split(5);
479        assert_eq!(r1.size(), 15);
480        assert_eq!(r2.size(), 5);
481        assert_eq!(pool.reserved(), 20);
482
483        // dropping r1 frees 15 but retains 5 as they have the same consumer
484        drop(r1);
485        assert_eq!(r2.size(), 5);
486        assert_eq!(pool.reserved(), 5);
487    }
488
489    #[test]
490    fn test_new_empty() {
491        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
492        let mut r1 = MemoryConsumer::new("r1").register(&pool);
493
494        r1.try_grow(20).unwrap();
495        let mut r2 = r1.new_empty();
496        r2.try_grow(5).unwrap();
497
498        assert_eq!(r1.size(), 20);
499        assert_eq!(r2.size(), 5);
500        assert_eq!(pool.reserved(), 25);
501    }
502
503    #[test]
504    fn test_take() {
505        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
506        let mut r1 = MemoryConsumer::new("r1").register(&pool);
507
508        r1.try_grow(20).unwrap();
509        let mut r2 = r1.take();
510        r2.try_grow(5).unwrap();
511
512        assert_eq!(r1.size(), 0);
513        assert_eq!(r2.size(), 25);
514        assert_eq!(pool.reserved(), 25);
515
516        // r1 can still grow again
517        r1.try_grow(3).unwrap();
518        assert_eq!(r1.size(), 3);
519        assert_eq!(r2.size(), 25);
520        assert_eq!(pool.reserved(), 28);
521    }
522}