datafusion_execution/memory_pool/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{Result, internal_err};
22use std::hash::{Hash, Hasher};
23use std::{cmp::Ordering, sync::Arc, sync::atomic};
24
25mod pool;
26pub mod proxy {
27 pub use datafusion_common::utils::proxy::{HashTableAllocExt, VecAllocExt};
28}
29
30pub use datafusion_common::{
31 human_readable_count, human_readable_duration, human_readable_size, units,
32};
33pub use pool::*;
34
35/// Tracks and potentially limits memory use across operators during execution.
36///
37/// # Memory Management Overview
38///
39/// DataFusion is a streaming query engine, processing most queries without
40/// buffering the entire input. Most operators require a fixed amount of memory
41/// based on the schema and target batch size. However, certain operations such
42/// as sorting and grouping/joining, require buffering intermediate results,
43/// which can require memory proportional to the number of input rows.
44///
45/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
46/// Intermediate memory used as data streams through the system is not accounted
47/// (it assumed to be "small") but the large consumers of memory must register
48/// and constrain their use. This design trades off the additional code
49/// complexity of memory tracking with limiting resource usage.
50///
51/// When limiting memory with a `MemoryPool` you should typically reserve some
52/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
53///
54/// # Memory Management Design
55///
56/// As explained above, DataFusion's design ONLY limits operators that require
57/// "large" amounts of memory (proportional to number of input rows), such as
58/// `GroupByHashExec`. It does NOT track and limit memory used internally by
59/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
60/// between operators. Furthermore, operators should not reserve memory for the
61/// batches they produce. Instead, if a consumer operator needs to hold batches
62/// from its producers in memory for an extended period, it is the consumer
63/// operator's responsibility to reserve the necessary memory for those batches.
64///
65/// In order to avoid allocating memory until the OS or the container system
66/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
67/// large amounts of memory must first request their desired allocation from a
68/// [`MemoryPool`] before allocating more. The request is typically managed via
69/// a [`MemoryReservation`] and [`MemoryConsumer`].
70///
71/// If the allocation is successful, the operator should proceed and allocate
72/// the desired memory. If the allocation fails, the operator must either first
73/// free memory (e.g. by spilling to local disk) and try again, or error.
74///
75/// Note that a `MemoryPool` can be shared by concurrently executing plans,
76/// which can be used to control memory usage in a multi-tenant system.
77///
78/// # How MemoryPool works by example
79///
80/// Scenario 1:
81/// For `Filter` operator, `RecordBatch`es will stream through it, so it
82/// don't have to keep track of memory usage through [`MemoryPool`].
83///
84/// Scenario 2:
85/// For `CrossJoin` operator, if the input size gets larger, the intermediate
86/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
87/// limit the memory usage.
88/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
89/// additional memory. Memory pool updates the usage and returns success.
90/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
91/// again, memory pool does not have enough memory. Since `CrossJoin` operator
92/// has not implemented spilling, it will stop execution and return an error.
93///
94/// Scenario 3:
95/// For `Aggregate` operator, its intermediate states will also accumulate as
96/// the input size gets larger, but with spilling capability. When it tries to
97/// reserve more memory from the memory pool, and the memory pool has already
98/// reached the memory limit, it will return an error. Then, `Aggregate`
99/// operator will spill the intermediate buffers to disk, and release memory
100/// from the memory pool, and continue to retry memory reservation.
101///
102/// # Related Structs
103///
104/// To better understand memory management in DataFusion, here are the key structs
105/// and their relationships:
106///
107/// - [`MemoryConsumer`]: A named allocation traced by a particular operator. If an
108/// execution is parallelized, and there are multiple partitions of the same
109/// operator, each partition will have a separate `MemoryConsumer`.
110/// - `SharedRegistration`: A registration of a `MemoryConsumer` with a `MemoryPool`.
111/// `SharedRegistration` and `MemoryPool` have a many-to-one relationship. `MemoryPool`
112/// implementation can decide how to allocate memory based on the registered consumers.
113/// (e.g. `FairSpillPool` will try to share available memory evenly among all registered
114/// consumers)
115/// - [`MemoryReservation`]: Each `MemoryConsumer`/operator can have multiple
116/// `MemoryReservation`s for different internal data structures. The relationship
117/// between `MemoryConsumer` and `MemoryReservation` is one-to-many. This design
118/// enables cleaner operator implementations:
119/// - Different `MemoryReservation`s can be used for different purposes
120/// - `MemoryReservation` follows RAII principles - to release a reservation,
121/// simply drop the `MemoryReservation` object. When all `MemoryReservation`s
122/// for a `SharedRegistration` are dropped, the `SharedRegistration` is dropped
123/// when its reference count reaches zero, automatically unregistering the
124/// `MemoryConsumer` from the `MemoryPool`.
125///
126/// ## Relationship Diagram
127///
128/// ```text
129/// ┌──────────────────┐ ┌──────────────────┐
130/// │MemoryReservation │ │MemoryReservation │
131/// └───┬──────────────┘ └──────────────────┘ ......
132/// │belongs to │
133/// │ ┌───────────────────────┘ │ │
134/// │ │ │ │
135/// ▼ ▼ ▼ ▼
136/// ┌────────────────────────┐ ┌────────────────────────┐
137/// │ SharedRegistration │ │ SharedRegistration │
138/// │ ┌────────────────┐ │ │ ┌────────────────┐ │
139/// │ │ │ │ │ │ │ │
140/// │ │ MemoryConsumer │ │ │ │ MemoryConsumer │ │
141/// │ │ │ │ │ │ │ │
142/// │ └────────────────┘ │ │ └────────────────┘ │
143/// └────────────┬───────────┘ └────────────┬───────────┘
144/// │ │
145/// │ register│into
146/// │ │
147/// └─────────────┐ ┌──────────────┘
148/// │ │
149/// ▼ ▼
150/// ╔═══════════════════════════════════════════════════╗
151/// ║ ║
152/// ║ MemoryPool ║
153/// ║ ║
154/// ╚═══════════════════════════════════════════════════╝
155/// ```
156///
157/// For example, there are two parallel partitions of an operator X: each partition
158/// corresponds to a `MemoryConsumer` in the above diagram. Inside each partition of
159/// operator X, there are typically several `MemoryReservation`s - one for each
160/// internal data structure that needs memory tracking (e.g., 1 reservation for the hash
161/// table, and 1 reservation for buffered input, etc.).
162///
163/// # Implementing `MemoryPool`
164///
165/// You can implement a custom allocation policy by implementing the
166/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
167/// However, DataFusion comes with the following simple memory pool implementations that
168/// handle many common cases:
169///
170/// * [`UnboundedMemoryPool`]: no memory limits (the default)
171///
172/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
173/// come first served" policy
174///
175/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
176/// to all spilling operators fairly
177///
178/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
179/// providing better error messages on the largest memory users.
180pub trait MemoryPool: Send + Sync + std::fmt::Debug {
181 /// Registers a new [`MemoryConsumer`]
182 ///
183 /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
184 fn register(&self, _consumer: &MemoryConsumer) {}
185
186 /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
187 ///
188 /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
189 fn unregister(&self, _consumer: &MemoryConsumer) {}
190
191 /// Infallibly grow the provided `reservation` by `additional` bytes
192 ///
193 /// This must always succeed
194 fn grow(&self, reservation: &MemoryReservation, additional: usize);
195
196 /// Infallibly shrink the provided `reservation` by `shrink` bytes
197 fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
198
199 /// Attempt to grow the provided `reservation` by `additional` bytes
200 ///
201 /// On error the `allocation` will not be increased in size
202 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
203
204 /// Return the total amount of memory reserved
205 fn reserved(&self) -> usize;
206
207 /// Return the memory limit of the pool
208 ///
209 /// The default implementation of `MemoryPool::memory_limit`
210 /// will return `MemoryLimit::Unknown`.
211 /// If you are using your custom memory pool, but have the requirement to
212 /// know the memory usage limit of the pool, please implement this method
213 /// to return it(`Memory::Finite(limit)`).
214 fn memory_limit(&self) -> MemoryLimit {
215 MemoryLimit::Unknown
216 }
217}
218
219/// Memory limit of `MemoryPool`
220pub enum MemoryLimit {
221 Infinite,
222 /// Bounded memory limit in bytes.
223 Finite(usize),
224 Unknown,
225}
226
227/// A memory consumer is a named allocation traced by a particular
228/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
229/// a particular `MemoryConsumer`;
230///
231/// Each `MemoryConsumer` is identifiable by a process-unique id, and is therefor not cloneable,
232/// If you want a clone of a `MemoryConsumer`, you should look into [`MemoryConsumer::clone_with_new_id`],
233/// but note that this `MemoryConsumer` may be treated as a separate entity based on the used pool,
234/// and is only guaranteed to share the name and inner properties.
235///
236/// For help with allocation accounting, see the [`proxy`] module.
237///
238/// [proxy]: datafusion_common::utils::proxy
239#[derive(Debug)]
240pub struct MemoryConsumer {
241 name: String,
242 can_spill: bool,
243 id: usize,
244}
245
246impl PartialEq for MemoryConsumer {
247 fn eq(&self, other: &Self) -> bool {
248 let is_same_id = self.id == other.id;
249
250 #[cfg(debug_assertions)]
251 if is_same_id {
252 assert_eq!(self.name, other.name);
253 assert_eq!(self.can_spill, other.can_spill);
254 }
255
256 is_same_id
257 }
258}
259
260impl Eq for MemoryConsumer {}
261
262impl Hash for MemoryConsumer {
263 fn hash<H: Hasher>(&self, state: &mut H) {
264 self.id.hash(state);
265 self.name.hash(state);
266 self.can_spill.hash(state);
267 }
268}
269
270impl MemoryConsumer {
271 fn new_unique_id() -> usize {
272 static ID: atomic::AtomicUsize = atomic::AtomicUsize::new(0);
273 ID.fetch_add(1, atomic::Ordering::Relaxed)
274 }
275
276 /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
277 pub fn new(name: impl Into<String>) -> Self {
278 Self {
279 name: name.into(),
280 can_spill: false,
281 id: Self::new_unique_id(),
282 }
283 }
284
285 /// Returns a clone of this [`MemoryConsumer`] with a new unique id,
286 /// which can be registered with a [`MemoryPool`],
287 /// This new consumer is separate from the original.
288 pub fn clone_with_new_id(&self) -> Self {
289 Self {
290 name: self.name.clone(),
291 can_spill: self.can_spill,
292 id: Self::new_unique_id(),
293 }
294 }
295
296 /// Return the unique id of this [`MemoryConsumer`]
297 pub fn id(&self) -> usize {
298 self.id
299 }
300
301 /// Set whether this allocation can be spilled to disk
302 pub fn with_can_spill(self, can_spill: bool) -> Self {
303 Self { can_spill, ..self }
304 }
305
306 /// Returns true if this allocation can spill to disk
307 pub fn can_spill(&self) -> bool {
308 self.can_spill
309 }
310
311 /// Returns the name associated with this allocation
312 pub fn name(&self) -> &str {
313 &self.name
314 }
315
316 /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
317 /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
318 pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
319 pool.register(&self);
320 MemoryReservation {
321 registration: Arc::new(SharedRegistration {
322 pool: Arc::clone(pool),
323 consumer: self,
324 }),
325 size: 0,
326 }
327 }
328}
329
330/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
331///
332/// Calls [`MemoryPool::unregister`] on drop to return any memory to
333/// the underlying pool.
334#[derive(Debug)]
335struct SharedRegistration {
336 pool: Arc<dyn MemoryPool>,
337 consumer: MemoryConsumer,
338}
339
340impl Drop for SharedRegistration {
341 fn drop(&mut self) {
342 self.pool.unregister(&self.consumer);
343 }
344}
345
346/// A [`MemoryReservation`] tracks an individual reservation of a
347/// number of bytes of memory in a [`MemoryPool`] that is freed back
348/// to the pool on drop.
349///
350/// The reservation can be grown or shrunk over time.
351#[derive(Debug)]
352pub struct MemoryReservation {
353 registration: Arc<SharedRegistration>,
354 size: usize,
355}
356
357impl MemoryReservation {
358 /// Returns the size of this reservation in bytes
359 pub fn size(&self) -> usize {
360 self.size
361 }
362
363 /// Returns [MemoryConsumer] for this [MemoryReservation]
364 pub fn consumer(&self) -> &MemoryConsumer {
365 &self.registration.consumer
366 }
367
368 /// Frees all bytes from this reservation back to the underlying
369 /// pool, returning the number of bytes freed.
370 pub fn free(&mut self) -> usize {
371 let size = self.size;
372 if size != 0 {
373 self.shrink(size)
374 }
375 size
376 }
377
378 /// Frees `capacity` bytes from this reservation
379 ///
380 /// # Panics
381 ///
382 /// Panics if `capacity` exceeds [`Self::size`]
383 pub fn shrink(&mut self, capacity: usize) {
384 let new_size = self.size.checked_sub(capacity).unwrap();
385 self.registration.pool.shrink(self, capacity);
386 self.size = new_size
387 }
388
389 /// Tries to free `capacity` bytes from this reservation
390 /// if `capacity` does not exceed [`Self::size`]
391 /// Returns new reservation size
392 /// or error if shrinking capacity is more than allocated size
393 pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
394 if let Some(new_size) = self.size.checked_sub(capacity) {
395 self.registration.pool.shrink(self, capacity);
396 self.size = new_size;
397 Ok(new_size)
398 } else {
399 internal_err!(
400 "Cannot free the capacity {capacity} out of allocated size {}",
401 self.size
402 )
403 }
404 }
405
406 /// Sets the size of this reservation to `capacity`
407 pub fn resize(&mut self, capacity: usize) {
408 match capacity.cmp(&self.size) {
409 Ordering::Greater => self.grow(capacity - self.size),
410 Ordering::Less => self.shrink(self.size - capacity),
411 _ => {}
412 }
413 }
414
415 /// Try to set the size of this reservation to `capacity`
416 pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
417 match capacity.cmp(&self.size) {
418 Ordering::Greater => self.try_grow(capacity - self.size)?,
419 Ordering::Less => self.shrink(self.size - capacity),
420 _ => {}
421 };
422 Ok(())
423 }
424
425 /// Increase the size of this reservation by `capacity` bytes
426 pub fn grow(&mut self, capacity: usize) {
427 self.registration.pool.grow(self, capacity);
428 self.size += capacity;
429 }
430
431 /// Try to increase the size of this reservation by `capacity`
432 /// bytes, returning error if there is insufficient capacity left
433 /// in the pool.
434 pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
435 self.registration.pool.try_grow(self, capacity)?;
436 self.size += capacity;
437 Ok(())
438 }
439
440 /// Splits off `capacity` bytes from this [`MemoryReservation`]
441 /// into a new [`MemoryReservation`] with the same
442 /// [`MemoryConsumer`].
443 ///
444 /// This can be useful to free part of this reservation with RAAI
445 /// style dropping
446 ///
447 /// # Panics
448 ///
449 /// Panics if `capacity` exceeds [`Self::size`]
450 pub fn split(&mut self, capacity: usize) -> MemoryReservation {
451 self.size = self.size.checked_sub(capacity).unwrap();
452 Self {
453 size: capacity,
454 registration: Arc::clone(&self.registration),
455 }
456 }
457
458 /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
459 pub fn new_empty(&self) -> Self {
460 Self {
461 size: 0,
462 registration: Arc::clone(&self.registration),
463 }
464 }
465
466 /// Splits off all the bytes from this [`MemoryReservation`] into
467 /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
468 pub fn take(&mut self) -> MemoryReservation {
469 self.split(self.size)
470 }
471}
472
473impl Drop for MemoryReservation {
474 fn drop(&mut self) {
475 self.free();
476 }
477}
478
479#[cfg(test)]
480mod tests {
481 use super::*;
482
483 #[test]
484 fn test_id_uniqueness() {
485 let mut ids = std::collections::HashSet::new();
486 for _ in 0..100 {
487 let consumer = MemoryConsumer::new("test");
488 assert!(ids.insert(consumer.id())); // Ensures unique insertion
489 }
490 }
491
492 #[test]
493 fn test_memory_pool_underflow() {
494 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
495 let mut a1 = MemoryConsumer::new("a1").register(&pool);
496 assert_eq!(pool.reserved(), 0);
497
498 a1.grow(100);
499 assert_eq!(pool.reserved(), 100);
500
501 assert_eq!(a1.free(), 100);
502 assert_eq!(pool.reserved(), 0);
503
504 a1.try_grow(100).unwrap_err();
505 assert_eq!(pool.reserved(), 0);
506
507 a1.try_grow(30).unwrap();
508 assert_eq!(pool.reserved(), 30);
509
510 let mut a2 = MemoryConsumer::new("a2").register(&pool);
511 a2.try_grow(25).unwrap_err();
512 assert_eq!(pool.reserved(), 30);
513
514 drop(a1);
515 assert_eq!(pool.reserved(), 0);
516
517 a2.try_grow(25).unwrap();
518 assert_eq!(pool.reserved(), 25);
519 }
520
521 #[test]
522 fn test_split() {
523 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
524 let mut r1 = MemoryConsumer::new("r1").register(&pool);
525
526 r1.try_grow(20).unwrap();
527 assert_eq!(r1.size(), 20);
528 assert_eq!(pool.reserved(), 20);
529
530 // take 5 from r1, should still have same reservation split
531 let r2 = r1.split(5);
532 assert_eq!(r1.size(), 15);
533 assert_eq!(r2.size(), 5);
534 assert_eq!(pool.reserved(), 20);
535
536 // dropping r1 frees 15 but retains 5 as they have the same consumer
537 drop(r1);
538 assert_eq!(r2.size(), 5);
539 assert_eq!(pool.reserved(), 5);
540 }
541
542 #[test]
543 fn test_new_empty() {
544 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
545 let mut r1 = MemoryConsumer::new("r1").register(&pool);
546
547 r1.try_grow(20).unwrap();
548 let mut r2 = r1.new_empty();
549 r2.try_grow(5).unwrap();
550
551 assert_eq!(r1.size(), 20);
552 assert_eq!(r2.size(), 5);
553 assert_eq!(pool.reserved(), 25);
554 }
555
556 #[test]
557 fn test_take() {
558 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
559 let mut r1 = MemoryConsumer::new("r1").register(&pool);
560
561 r1.try_grow(20).unwrap();
562 let mut r2 = r1.take();
563 r2.try_grow(5).unwrap();
564
565 assert_eq!(r1.size(), 0);
566 assert_eq!(r2.size(), 25);
567 assert_eq!(pool.reserved(), 25);
568
569 // r1 can still grow again
570 r1.try_grow(3).unwrap();
571 assert_eq!(r1.size(), 3);
572 assert_eq!(r2.size(), 25);
573 assert_eq!(pool.reserved(), 28);
574 }
575}