datafusion_execution/memory_pool/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{internal_err, Result};
22use std::hash::{Hash, Hasher};
23use std::{cmp::Ordering, sync::atomic, sync::Arc};
24
25mod pool;
26pub mod proxy {
27 pub use datafusion_common::utils::proxy::{
28 HashTableAllocExt, RawTableAllocExt, VecAllocExt,
29 };
30}
31
32pub use pool::*;
33
34/// Tracks and potentially limits memory use across operators during execution.
35///
36/// # Memory Management Overview
37///
38/// DataFusion is a streaming query engine, processing most queries without
39/// buffering the entire input. Most operators require a fixed amount of memory
40/// based on the schema and target batch size. However, certain operations such
41/// as sorting and grouping/joining, require buffering intermediate results,
42/// which can require memory proportional to the number of input rows.
43///
44/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
45/// Intermediate memory used as data streams through the system is not accounted
46/// (it assumed to be "small") but the large consumers of memory must register
47/// and constrain their use. This design trades off the additional code
48/// complexity of memory tracking with limiting resource usage.
49///
50/// When limiting memory with a `MemoryPool` you should typically reserve some
51/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
52///
53/// # Memory Management Design
54///
55/// As explained above, DataFusion's design ONLY limits operators that require
56/// "large" amounts of memory (proportional to number of input rows), such as
57/// `GroupByHashExec`. It does NOT track and limit memory used internally by
58/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
59/// between operators. Furthermore, operators should not reserve memory for the
60/// batches they produce. Instead, if a parent operator needs to hold batches
61/// from its children in memory for an extended period, it is the parent
62/// operator's responsibility to reserve the necessary memory for those batches.
63///
64/// In order to avoid allocating memory until the OS or the container system
65/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
66/// large amounts of memory must first request their desired allocation from a
67/// [`MemoryPool`] before allocating more. The request is typically managed via
68/// a [`MemoryReservation`] and [`MemoryConsumer`].
69///
70/// If the allocation is successful, the operator should proceed and allocate
71/// the desired memory. If the allocation fails, the operator must either first
72/// free memory (e.g. by spilling to local disk) and try again, or error.
73///
74/// Note that a `MemoryPool` can be shared by concurrently executing plans,
75/// which can be used to control memory usage in a multi-tenant system.
76///
77/// # How MemoryPool works by example
78///
79/// Scenario 1:
80/// For `Filter` operator, `RecordBatch`es will stream through it, so it
81/// don't have to keep track of memory usage through [`MemoryPool`].
82///
83/// Scenario 2:
84/// For `CrossJoin` operator, if the input size gets larger, the intermediate
85/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
86/// limit the memory usage.
87/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
88/// additional memory. Memory pool updates the usage and returns success.
89/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
90/// again, memory pool does not have enough memory. Since `CrossJoin` operator
91/// has not implemented spilling, it will stop execution and return an error.
92///
93/// Scenario 3:
94/// For `Aggregate` operator, its intermediate states will also accumulate as
95/// the input size gets larger, but with spilling capability. When it tries to
96/// reserve more memory from the memory pool, and the memory pool has already
97/// reached the memory limit, it will return an error. Then, `Aggregate`
98/// operator will spill the intermediate buffers to disk, and release memory
99/// from the memory pool, and continue to retry memory reservation.
100///
101/// # Implementing `MemoryPool`
102///
103/// You can implement a custom allocation policy by implementing the
104/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
105/// However, DataFusion comes with the following simple memory pool implementations that
106/// handle many common cases:
107///
108/// * [`UnboundedMemoryPool`]: no memory limits (the default)
109///
110/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
111/// come first served" policy
112///
113/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
114/// to all spilling operators fairly
115///
116/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
117/// providing better error messages on the largest memory users.
118pub trait MemoryPool: Send + Sync + std::fmt::Debug {
119 /// Registers a new [`MemoryConsumer`]
120 ///
121 /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
122 fn register(&self, _consumer: &MemoryConsumer) {}
123
124 /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
125 ///
126 /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
127 fn unregister(&self, _consumer: &MemoryConsumer) {}
128
129 /// Infallibly grow the provided `reservation` by `additional` bytes
130 ///
131 /// This must always succeed
132 fn grow(&self, reservation: &MemoryReservation, additional: usize);
133
134 /// Infallibly shrink the provided `reservation` by `shrink` bytes
135 fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
136
137 /// Attempt to grow the provided `reservation` by `additional` bytes
138 ///
139 /// On error the `allocation` will not be increased in size
140 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
141
142 /// Return the total amount of memory reserved
143 fn reserved(&self) -> usize;
144
145 /// Return the memory limit of the pool
146 ///
147 /// The default implementation of `MemoryPool::memory_limit`
148 /// will return `MemoryLimit::Unknown`.
149 /// If you are using your custom memory pool, but have the requirement to
150 /// know the memory usage limit of the pool, please implement this method
151 /// to return it(`Memory::Finite(limit)`).
152 fn memory_limit(&self) -> MemoryLimit {
153 MemoryLimit::Unknown
154 }
155}
156
157/// Memory limit of `MemoryPool`
158pub enum MemoryLimit {
159 Infinite,
160 /// Bounded memory limit in bytes.
161 Finite(usize),
162 Unknown,
163}
164
165/// A memory consumer is a named allocation traced by a particular
166/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
167/// a particular `MemoryConsumer`;
168///
169/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
170/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
171/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
172/// and is only guaranteed to share the name and inner properties.
173///
174/// For help with allocation accounting, see the [`proxy`] module.
175///
176/// [proxy]: datafusion_common::utils::proxy
177#[derive(Debug)]
178pub struct MemoryConsumer {
179 name: String,
180 can_spill: bool,
181 id: usize,
182}
183
184impl PartialEq for MemoryConsumer {
185 fn eq(&self, other: &Self) -> bool {
186 let is_same_id = self.id == other.id;
187
188 #[cfg(debug_assertions)]
189 if is_same_id {
190 assert_eq!(self.name, other.name);
191 assert_eq!(self.can_spill, other.can_spill);
192 }
193
194 is_same_id
195 }
196}
197
198impl Eq for MemoryConsumer {}
199
200impl Hash for MemoryConsumer {
201 fn hash<H: Hasher>(&self, state: &mut H) {
202 self.id.hash(state);
203 self.name.hash(state);
204 self.can_spill.hash(state);
205 }
206}
207
208impl MemoryConsumer {
209 fn new_unique_id() -> usize {
210 static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
211 ID.fetch_add(1, atomic::Ordering::Relaxed)
212 }
213
214 /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
215 pub fn new(name: impl Into<String>) -> Self {
216 Self {
217 name: name.into(),
218 can_spill: false,
219 id: Self::new_unique_id(),
220 }
221 }
222
223 /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
224 /// which can be registered with a [`MemoryPool`],
225 /// This new consumer is separate from the original.
226 pub fn clone_with_new_id(&self) -> Self {
227 Self {
228 name: self.name.clone(),
229 can_spill: self.can_spill,
230 id: Self::new_unique_id(),
231 }
232 }
233
234 /// Return the unique id of this [`MemoryConsumer`]
235 pub fn id(&self) -> usize {
236 self.id
237 }
238
239 /// Set whether this allocation can be spilled to disk
240 pub fn with_can_spill(self, can_spill: bool) -> Self {
241 Self { can_spill, ..self }
242 }
243
244 /// Returns true if this allocation can spill to disk
245 pub fn can_spill(&self) -> bool {
246 self.can_spill
247 }
248
249 /// Returns the name associated with this allocation
250 pub fn name(&self) -> &str {
251 &self.name
252 }
253
254 /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
255 /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
256 pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
257 pool.register(&self);
258 MemoryReservation {
259 registration: Arc::new(SharedRegistration {
260 pool: Arc::clone(pool),
261 consumer: self,
262 }),
263 size: 0,
264 }
265 }
266}
267
268/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
269///
270/// Calls [`MemoryPool::unregister`] on drop to return any memory to
271/// the underlying pool.
272#[derive(Debug)]
273struct SharedRegistration {
274 pool: Arc<dyn MemoryPool>,
275 consumer: MemoryConsumer,
276}
277
278impl Drop for SharedRegistration {
279 fn drop(&mut self) {
280 self.pool.unregister(&self.consumer);
281 }
282}
283
284/// A [`MemoryReservation`] tracks an individual reservation of a
285/// number of bytes of memory in a [`MemoryPool`] that is freed back
286/// to the pool on drop.
287///
288/// The reservation can be grown or shrunk over time.
289#[derive(Debug)]
290pub struct MemoryReservation {
291 registration: Arc<SharedRegistration>,
292 size: usize,
293}
294
295impl MemoryReservation {
296 /// Returns the size of this reservation in bytes
297 pub fn size(&self) -> usize {
298 self.size
299 }
300
301 /// Returns [MemoryConsumer] for this [MemoryReservation]
302 pub fn consumer(&self) -> &MemoryConsumer {
303 &self.registration.consumer
304 }
305
306 /// Frees all bytes from this reservation back to the underlying
307 /// pool, returning the number of bytes freed.
308 pub fn free(&mut self) -> usize {
309 let size = self.size;
310 if size != 0 {
311 self.shrink(size)
312 }
313 size
314 }
315
316 /// Frees `capacity` bytes from this reservation
317 ///
318 /// # Panics
319 ///
320 /// Panics if `capacity` exceeds [`Self::size`]
321 pub fn shrink(&mut self, capacity: usize) {
322 let new_size = self.size.checked_sub(capacity).unwrap();
323 self.registration.pool.shrink(self, capacity);
324 self.size = new_size
325 }
326
327 /// Tries to free `capacity` bytes from this reservation
328 /// if `capacity` does not exceed [`Self::size`]
329 /// Returns new reservation size
330 /// or error if shrinking capacity is more than allocated size
331 pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
332 if let Some(new_size) = self.size.checked_sub(capacity) {
333 self.registration.pool.shrink(self, capacity);
334 self.size = new_size;
335 Ok(new_size)
336 } else {
337 internal_err!(
338 "Cannot free the capacity {capacity} out of allocated size {}",
339 self.size
340 )
341 }
342 }
343
344 /// Sets the size of this reservation to `capacity`
345 pub fn resize(&mut self, capacity: usize) {
346 match capacity.cmp(&self.size) {
347 Ordering::Greater => self.grow(capacity - self.size),
348 Ordering::Less => self.shrink(self.size - capacity),
349 _ => {}
350 }
351 }
352
353 /// Try to set the size of this reservation to `capacity`
354 pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
355 match capacity.cmp(&self.size) {
356 Ordering::Greater => self.try_grow(capacity - self.size)?,
357 Ordering::Less => self.shrink(self.size - capacity),
358 _ => {}
359 };
360 Ok(())
361 }
362
363 /// Increase the size of this reservation by `capacity` bytes
364 pub fn grow(&mut self, capacity: usize) {
365 self.registration.pool.grow(self, capacity);
366 self.size += capacity;
367 }
368
369 /// Try to increase the size of this reservation by `capacity`
370 /// bytes, returning error if there is insufficient capacity left
371 /// in the pool.
372 pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
373 self.registration.pool.try_grow(self, capacity)?;
374 self.size += capacity;
375 Ok(())
376 }
377
378 /// Splits off `capacity` bytes from this [`MemoryReservation`]
379 /// into a new [`MemoryReservation`] with the same
380 /// [`MemoryConsumer`].
381 ///
382 /// This can be useful to free part of this reservation with RAAI
383 /// style dropping
384 ///
385 /// # Panics
386 ///
387 /// Panics if `capacity` exceeds [`Self::size`]
388 pub fn split(&mut self, capacity: usize) -> MemoryReservation {
389 self.size = self.size.checked_sub(capacity).unwrap();
390 Self {
391 size: capacity,
392 registration: Arc::clone(&self.registration),
393 }
394 }
395
396 /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
397 pub fn new_empty(&self) -> Self {
398 Self {
399 size: 0,
400 registration: Arc::clone(&self.registration),
401 }
402 }
403
404 /// Splits off all the bytes from this [`MemoryReservation`] into
405 /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
406 pub fn take(&mut self) -> MemoryReservation {
407 self.split(self.size)
408 }
409}
410
411impl Drop for MemoryReservation {
412 fn drop(&mut self) {
413 self.free();
414 }
415}
416
417pub mod units {
418 pub const TB: u64 = 1 << 40;
419 pub const GB: u64 = 1 << 30;
420 pub const MB: u64 = 1 << 20;
421 pub const KB: u64 = 1 << 10;
422}
423
424/// Present size in human-readable form
425pub fn human_readable_size(size: usize) -> String {
426 use units::*;
427
428 let size = size as u64;
429 let (value, unit) = {
430 if size >= 2 * TB {
431 (size as f64 / TB as f64, "TB")
432 } else if size >= 2 * GB {
433 (size as f64 / GB as f64, "GB")
434 } else if size >= 2 * MB {
435 (size as f64 / MB as f64, "MB")
436 } else if size >= 2 * KB {
437 (size as f64 / KB as f64, "KB")
438 } else {
439 (size as f64, "B")
440 }
441 };
442 format!("{value:.1} {unit}")
443}
444
445#[cfg(test)]
446mod tests {
447 use super::*;
448
449 #[test]
450 fn test_id_uniqueness() {
451 let mut ids = std::collections::HashSet::new();
452 for _ in 0..100 {
453 let consumer = MemoryConsumer::new("test");
454 assert!(ids.insert(consumer.id())); // Ensures unique insertion
455 }
456 }
457
458 #[test]
459 fn test_memory_pool_underflow() {
460 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
461 let mut a1 = MemoryConsumer::new("a1").register(&pool);
462 assert_eq!(pool.reserved(), 0);
463
464 a1.grow(100);
465 assert_eq!(pool.reserved(), 100);
466
467 assert_eq!(a1.free(), 100);
468 assert_eq!(pool.reserved(), 0);
469
470 a1.try_grow(100).unwrap_err();
471 assert_eq!(pool.reserved(), 0);
472
473 a1.try_grow(30).unwrap();
474 assert_eq!(pool.reserved(), 30);
475
476 let mut a2 = MemoryConsumer::new("a2").register(&pool);
477 a2.try_grow(25).unwrap_err();
478 assert_eq!(pool.reserved(), 30);
479
480 drop(a1);
481 assert_eq!(pool.reserved(), 0);
482
483 a2.try_grow(25).unwrap();
484 assert_eq!(pool.reserved(), 25);
485 }
486
487 #[test]
488 fn test_split() {
489 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
490 let mut r1 = MemoryConsumer::new("r1").register(&pool);
491
492 r1.try_grow(20).unwrap();
493 assert_eq!(r1.size(), 20);
494 assert_eq!(pool.reserved(), 20);
495
496 // take 5 from r1, should still have same reservation split
497 let r2 = r1.split(5);
498 assert_eq!(r1.size(), 15);
499 assert_eq!(r2.size(), 5);
500 assert_eq!(pool.reserved(), 20);
501
502 // dropping r1 frees 15 but retains 5 as they have the same consumer
503 drop(r1);
504 assert_eq!(r2.size(), 5);
505 assert_eq!(pool.reserved(), 5);
506 }
507
508 #[test]
509 fn test_new_empty() {
510 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
511 let mut r1 = MemoryConsumer::new("r1").register(&pool);
512
513 r1.try_grow(20).unwrap();
514 let mut r2 = r1.new_empty();
515 r2.try_grow(5).unwrap();
516
517 assert_eq!(r1.size(), 20);
518 assert_eq!(r2.size(), 5);
519 assert_eq!(pool.reserved(), 25);
520 }
521
522 #[test]
523 fn test_take() {
524 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
525 let mut r1 = MemoryConsumer::new("r1").register(&pool);
526
527 r1.try_grow(20).unwrap();
528 let mut r2 = r1.take();
529 r2.try_grow(5).unwrap();
530
531 assert_eq!(r1.size(), 0);
532 assert_eq!(r2.size(), 25);
533 assert_eq!(pool.reserved(), 25);
534
535 // r1 can still grow again
536 r1.try_grow(3).unwrap();
537 assert_eq!(r1.size(), 3);
538 assert_eq!(r2.size(), 25);
539 assert_eq!(pool.reserved(), 28);
540 }
541}