datafusion_execution/memory_pool/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{internal_err, Result};
22use std::{cmp::Ordering, sync::Arc};
23
24mod pool;
25pub mod proxy {
26    pub use datafusion_common::utils::proxy::{
27        HashTableAllocExt, RawTableAllocExt, VecAllocExt,
28    };
29}
30
31pub use pool::*;
32
33/// Tracks and potentially limits memory use across operators during execution.
34///
35/// # Memory Management Overview
36///
37/// DataFusion is a streaming query engine, processing most queries without
38/// buffering the entire input. Most operators require a fixed amount of memory
39/// based on the schema and target batch size. However, certain operations such
40/// as sorting and grouping/joining, require buffering intermediate results,
41/// which can require memory proportional to the number of input rows.
42///
43/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
44/// Intermediate memory used as data streams through the system is not accounted
45/// (it assumed to be "small") but the large consumers of memory must register
46/// and constrain their use. This design trades off the additional code
47/// complexity of memory tracking with limiting resource usage.
48///
49/// When limiting memory with a `MemoryPool` you should typically reserve some
50/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
51///
52/// # Memory Management Design
53///
54/// As explained above, DataFusion's design ONLY limits operators that require
55/// "large" amounts of memory (proportional to number of input rows), such as
56/// `GroupByHashExec`. It does NOT track and limit memory used internally by
57/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
58/// between operators. Furthermore, operators should not reserve memory for the
59/// batches they produce. Instead, if a parent operator needs to hold batches
60/// from its children in memory for an extended period, it is the parent
61/// operator's responsibility to reserve the necessary memory for those batches.
62///
63/// In order to avoid allocating memory until the OS or the container system
64/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
65/// large amounts of memory must first request their desired allocation from a
66/// [`MemoryPool`] before allocating more.  The request is typically managed via
67/// a  [`MemoryReservation`] and [`MemoryConsumer`].
68///
69/// If the allocation is successful, the operator should proceed and allocate
70/// the desired memory. If the allocation fails, the operator must either first
71/// free memory (e.g. by spilling to local disk) and try again, or error.
72///
73/// Note that a `MemoryPool` can be shared by concurrently executing plans,
74/// which can be used to control memory usage in a multi-tenant system.
75///
76/// # How MemoryPool works by example
77///
78/// Scenario 1:
79/// For `Filter` operator, `RecordBatch`es will stream through it, so it
80/// don't have to keep track of memory usage through [`MemoryPool`].
81///
82/// Scenario 2:
83/// For `CrossJoin` operator, if the input size gets larger, the intermediate
84/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
85/// limit the memory usage.
86/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
87/// additional memory. Memory pool updates the usage and returns success.
88/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
89/// again, memory pool does not have enough memory. Since `CrossJoin` operator
90/// has not implemented spilling, it will stop execution and return an error.
91///
92/// Scenario 3:
93/// For `Aggregate` operator, its intermediate states will also accumulate as
94/// the input size gets larger, but with spilling capability. When it tries to
95/// reserve more memory from the memory pool, and the memory pool has already
96/// reached the memory limit, it will return an error. Then, `Aggregate`
97/// operator will spill the intermediate buffers to disk, and release memory
98/// from the memory pool, and continue to retry memory reservation.
99///
100/// # Implementing `MemoryPool`
101///
102/// You can implement a custom allocation policy by implementing the
103/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
104/// However, DataFusion comes with the following simple memory pool implementations that
105/// handle many common cases:
106///
107/// * [`UnboundedMemoryPool`]: no memory limits (the default)
108///
109/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
110///   come first served" policy
111///
112/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
113///   to all spilling operators fairly
114///
115/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
116///   providing better error messages on the largest memory users.
117pub trait MemoryPool: Send + Sync + std::fmt::Debug {
118    /// Registers a new [`MemoryConsumer`]
119    ///
120    /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
121    fn register(&self, _consumer: &MemoryConsumer) {}
122
123    /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
124    ///
125    /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
126    fn unregister(&self, _consumer: &MemoryConsumer) {}
127
128    /// Infallibly grow the provided `reservation` by `additional` bytes
129    ///
130    /// This must always succeed
131    fn grow(&self, reservation: &MemoryReservation, additional: usize);
132
133    /// Infallibly shrink the provided `reservation` by `shrink` bytes
134    fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
135
136    /// Attempt to grow the provided `reservation` by `additional` bytes
137    ///
138    /// On error the `allocation` will not be increased in size
139    fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
140
141    /// Return the total amount of memory reserved
142    fn reserved(&self) -> usize;
143}
144
145/// A memory consumer is a named allocation traced by a particular
146/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
147/// a particular `MemoryConsumer`;
148///
149/// For help with allocation accounting, see the [`proxy`] module.
150///
151/// [proxy]: datafusion_common::utils::proxy
152#[derive(Debug, PartialEq, Eq, Hash, Clone)]
153pub struct MemoryConsumer {
154    name: String,
155    can_spill: bool,
156}
157
158impl MemoryConsumer {
159    /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
160    pub fn new(name: impl Into<String>) -> Self {
161        Self {
162            name: name.into(),
163            can_spill: false,
164        }
165    }
166
167    /// Set whether this allocation can be spilled to disk
168    pub fn with_can_spill(self, can_spill: bool) -> Self {
169        Self { can_spill, ..self }
170    }
171
172    /// Returns true if this allocation can spill to disk
173    pub fn can_spill(&self) -> bool {
174        self.can_spill
175    }
176
177    /// Returns the name associated with this allocation
178    pub fn name(&self) -> &str {
179        &self.name
180    }
181
182    /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
183    /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
184    pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
185        pool.register(&self);
186        MemoryReservation {
187            registration: Arc::new(SharedRegistration {
188                pool: Arc::clone(pool),
189                consumer: self,
190            }),
191            size: 0,
192        }
193    }
194}
195
196/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
197///
198/// Calls [`MemoryPool::unregister`] on drop to return any memory to
199/// the underlying pool.
200#[derive(Debug)]
201struct SharedRegistration {
202    pool: Arc<dyn MemoryPool>,
203    consumer: MemoryConsumer,
204}
205
206impl Drop for SharedRegistration {
207    fn drop(&mut self) {
208        self.pool.unregister(&self.consumer);
209    }
210}
211
212/// A [`MemoryReservation`] tracks an individual reservation of a
213/// number of bytes of memory in a [`MemoryPool`] that is freed back
214/// to the pool on drop.
215///
216/// The reservation can be grown or shrunk over time.
217#[derive(Debug)]
218pub struct MemoryReservation {
219    registration: Arc<SharedRegistration>,
220    size: usize,
221}
222
223impl MemoryReservation {
224    /// Returns the size of this reservation in bytes
225    pub fn size(&self) -> usize {
226        self.size
227    }
228
229    /// Returns [MemoryConsumer] for this [MemoryReservation]
230    pub fn consumer(&self) -> &MemoryConsumer {
231        &self.registration.consumer
232    }
233
234    /// Frees all bytes from this reservation back to the underlying
235    /// pool, returning the number of bytes freed.
236    pub fn free(&mut self) -> usize {
237        let size = self.size;
238        if size != 0 {
239            self.shrink(size)
240        }
241        size
242    }
243
244    /// Frees `capacity` bytes from this reservation
245    ///
246    /// # Panics
247    ///
248    /// Panics if `capacity` exceeds [`Self::size`]
249    pub fn shrink(&mut self, capacity: usize) {
250        let new_size = self.size.checked_sub(capacity).unwrap();
251        self.registration.pool.shrink(self, capacity);
252        self.size = new_size
253    }
254
255    /// Tries to free `capacity` bytes from this reservation
256    /// if `capacity` does not exceed [`Self::size`]
257    /// Returns new reservation size
258    /// or error if shrinking capacity is more than allocated size
259    pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
260        if let Some(new_size) = self.size.checked_sub(capacity) {
261            self.registration.pool.shrink(self, capacity);
262            self.size = new_size;
263            Ok(new_size)
264        } else {
265            internal_err!(
266                "Cannot free the capacity {capacity} out of allocated size {}",
267                self.size
268            )
269        }
270    }
271
272    /// Sets the size of this reservation to `capacity`
273    pub fn resize(&mut self, capacity: usize) {
274        match capacity.cmp(&self.size) {
275            Ordering::Greater => self.grow(capacity - self.size),
276            Ordering::Less => self.shrink(self.size - capacity),
277            _ => {}
278        }
279    }
280
281    /// Try to set the size of this reservation to `capacity`
282    pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
283        match capacity.cmp(&self.size) {
284            Ordering::Greater => self.try_grow(capacity - self.size)?,
285            Ordering::Less => self.shrink(self.size - capacity),
286            _ => {}
287        };
288        Ok(())
289    }
290
291    /// Increase the size of this reservation by `capacity` bytes
292    pub fn grow(&mut self, capacity: usize) {
293        self.registration.pool.grow(self, capacity);
294        self.size += capacity;
295    }
296
297    /// Try to increase the size of this reservation by `capacity`
298    /// bytes, returning error if there is insufficient capacity left
299    /// in the pool.
300    pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
301        self.registration.pool.try_grow(self, capacity)?;
302        self.size += capacity;
303        Ok(())
304    }
305
306    /// Splits off `capacity` bytes from this [`MemoryReservation`]
307    /// into a new [`MemoryReservation`] with the same
308    /// [`MemoryConsumer`].
309    ///
310    /// This can be useful to free part of this reservation with RAAI
311    /// style dropping
312    ///
313    /// # Panics
314    ///
315    /// Panics if `capacity` exceeds [`Self::size`]
316    pub fn split(&mut self, capacity: usize) -> MemoryReservation {
317        self.size = self.size.checked_sub(capacity).unwrap();
318        Self {
319            size: capacity,
320            registration: Arc::clone(&self.registration),
321        }
322    }
323
324    /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
325    pub fn new_empty(&self) -> Self {
326        Self {
327            size: 0,
328            registration: Arc::clone(&self.registration),
329        }
330    }
331
332    /// Splits off all the bytes from this [`MemoryReservation`] into
333    /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
334    pub fn take(&mut self) -> MemoryReservation {
335        self.split(self.size)
336    }
337}
338
339impl Drop for MemoryReservation {
340    fn drop(&mut self) {
341        self.free();
342    }
343}
344
345pub mod units {
346    pub const TB: u64 = 1 << 40;
347    pub const GB: u64 = 1 << 30;
348    pub const MB: u64 = 1 << 20;
349    pub const KB: u64 = 1 << 10;
350}
351
352/// Present size in human readable form
353pub fn human_readable_size(size: usize) -> String {
354    use units::*;
355
356    let size = size as u64;
357    let (value, unit) = {
358        if size >= 2 * TB {
359            (size as f64 / TB as f64, "TB")
360        } else if size >= 2 * GB {
361            (size as f64 / GB as f64, "GB")
362        } else if size >= 2 * MB {
363            (size as f64 / MB as f64, "MB")
364        } else if size >= 2 * KB {
365            (size as f64 / KB as f64, "KB")
366        } else {
367            (size as f64, "B")
368        }
369    };
370    format!("{value:.1} {unit}")
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376
377    #[test]
378    fn test_memory_pool_underflow() {
379        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
380        let mut a1 = MemoryConsumer::new("a1").register(&pool);
381        assert_eq!(pool.reserved(), 0);
382
383        a1.grow(100);
384        assert_eq!(pool.reserved(), 100);
385
386        assert_eq!(a1.free(), 100);
387        assert_eq!(pool.reserved(), 0);
388
389        a1.try_grow(100).unwrap_err();
390        assert_eq!(pool.reserved(), 0);
391
392        a1.try_grow(30).unwrap();
393        assert_eq!(pool.reserved(), 30);
394
395        let mut a2 = MemoryConsumer::new("a2").register(&pool);
396        a2.try_grow(25).unwrap_err();
397        assert_eq!(pool.reserved(), 30);
398
399        drop(a1);
400        assert_eq!(pool.reserved(), 0);
401
402        a2.try_grow(25).unwrap();
403        assert_eq!(pool.reserved(), 25);
404    }
405
406    #[test]
407    fn test_split() {
408        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
409        let mut r1 = MemoryConsumer::new("r1").register(&pool);
410
411        r1.try_grow(20).unwrap();
412        assert_eq!(r1.size(), 20);
413        assert_eq!(pool.reserved(), 20);
414
415        // take 5 from r1, should still have same reservation split
416        let r2 = r1.split(5);
417        assert_eq!(r1.size(), 15);
418        assert_eq!(r2.size(), 5);
419        assert_eq!(pool.reserved(), 20);
420
421        // dropping r1 frees 15 but retains 5 as they have the same consumer
422        drop(r1);
423        assert_eq!(r2.size(), 5);
424        assert_eq!(pool.reserved(), 5);
425    }
426
427    #[test]
428    fn test_new_empty() {
429        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
430        let mut r1 = MemoryConsumer::new("r1").register(&pool);
431
432        r1.try_grow(20).unwrap();
433        let mut r2 = r1.new_empty();
434        r2.try_grow(5).unwrap();
435
436        assert_eq!(r1.size(), 20);
437        assert_eq!(r2.size(), 5);
438        assert_eq!(pool.reserved(), 25);
439    }
440
441    #[test]
442    fn test_take() {
443        let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
444        let mut r1 = MemoryConsumer::new("r1").register(&pool);
445
446        r1.try_grow(20).unwrap();
447        let mut r2 = r1.take();
448        r2.try_grow(5).unwrap();
449
450        assert_eq!(r1.size(), 0);
451        assert_eq!(r2.size(), 25);
452        assert_eq!(pool.reserved(), 25);
453
454        // r1 can still grow again
455        r1.try_grow(3).unwrap();
456        assert_eq!(r1.size(), 3);
457        assert_eq!(r2.size(), 25);
458        assert_eq!(pool.reserved(), 28);
459    }
460}