datafusion_execution/memory_pool/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryPool`] for memory management during query execution, [`proxy`] for
19//! help with allocation accounting.
20
21use datafusion_common::{internal_err, Result};
22use std::{cmp::Ordering, sync::Arc};
23
24mod pool;
25pub mod proxy {
26 pub use datafusion_common::utils::proxy::{
27 HashTableAllocExt, RawTableAllocExt, VecAllocExt,
28 };
29}
30
31pub use pool::*;
32
33/// Tracks and potentially limits memory use across operators during execution.
34///
35/// # Memory Management Overview
36///
37/// DataFusion is a streaming query engine, processing most queries without
38/// buffering the entire input. Most operators require a fixed amount of memory
39/// based on the schema and target batch size. However, certain operations such
40/// as sorting and grouping/joining, require buffering intermediate results,
41/// which can require memory proportional to the number of input rows.
42///
43/// Rather than tracking all allocations, DataFusion takes a pragmatic approach:
44/// Intermediate memory used as data streams through the system is not accounted
45/// (it assumed to be "small") but the large consumers of memory must register
46/// and constrain their use. This design trades off the additional code
47/// complexity of memory tracking with limiting resource usage.
48///
49/// When limiting memory with a `MemoryPool` you should typically reserve some
50/// overhead (e.g. 10%) for the "small" memory allocations that are not tracked.
51///
52/// # Memory Management Design
53///
54/// As explained above, DataFusion's design ONLY limits operators that require
55/// "large" amounts of memory (proportional to number of input rows), such as
56/// `GroupByHashExec`. It does NOT track and limit memory used internally by
57/// other operators such as `DataSourceExec` or the `RecordBatch`es that flow
58/// between operators. Furthermore, operators should not reserve memory for the
59/// batches they produce. Instead, if a parent operator needs to hold batches
60/// from its children in memory for an extended period, it is the parent
61/// operator's responsibility to reserve the necessary memory for those batches.
62///
63/// In order to avoid allocating memory until the OS or the container system
64/// kills the process, DataFusion `ExecutionPlan`s (operators) that consume
65/// large amounts of memory must first request their desired allocation from a
66/// [`MemoryPool`] before allocating more. The request is typically managed via
67/// a [`MemoryReservation`] and [`MemoryConsumer`].
68///
69/// If the allocation is successful, the operator should proceed and allocate
70/// the desired memory. If the allocation fails, the operator must either first
71/// free memory (e.g. by spilling to local disk) and try again, or error.
72///
73/// Note that a `MemoryPool` can be shared by concurrently executing plans,
74/// which can be used to control memory usage in a multi-tenant system.
75///
76/// # How MemoryPool works by example
77///
78/// Scenario 1:
79/// For `Filter` operator, `RecordBatch`es will stream through it, so it
80/// don't have to keep track of memory usage through [`MemoryPool`].
81///
82/// Scenario 2:
83/// For `CrossJoin` operator, if the input size gets larger, the intermediate
84/// state will also grow. So `CrossJoin` operator will use [`MemoryPool`] to
85/// limit the memory usage.
86/// 2.1 `CrossJoin` operator has read a new batch, asked memory pool for
87/// additional memory. Memory pool updates the usage and returns success.
88/// 2.2 `CrossJoin` has read another batch, and tries to reserve more memory
89/// again, memory pool does not have enough memory. Since `CrossJoin` operator
90/// has not implemented spilling, it will stop execution and return an error.
91///
92/// Scenario 3:
93/// For `Aggregate` operator, its intermediate states will also accumulate as
94/// the input size gets larger, but with spilling capability. When it tries to
95/// reserve more memory from the memory pool, and the memory pool has already
96/// reached the memory limit, it will return an error. Then, `Aggregate`
97/// operator will spill the intermediate buffers to disk, and release memory
98/// from the memory pool, and continue to retry memory reservation.
99///
100/// # Implementing `MemoryPool`
101///
102/// You can implement a custom allocation policy by implementing the
103/// [`MemoryPool`] trait and configuring a `SessionContext` appropriately.
104/// However, DataFusion comes with the following simple memory pool implementations that
105/// handle many common cases:
106///
107/// * [`UnboundedMemoryPool`]: no memory limits (the default)
108///
109/// * [`GreedyMemoryPool`]: Limits memory usage to a fixed size using a "first
110/// come first served" policy
111///
112/// * [`FairSpillPool`]: Limits memory usage to a fixed size, allocating memory
113/// to all spilling operators fairly
114///
115/// * [`TrackConsumersPool`]: Wraps another [`MemoryPool`] and tracks consumers,
116/// providing better error messages on the largest memory users.
117pub trait MemoryPool: Send + Sync + std::fmt::Debug {
118 /// Registers a new [`MemoryConsumer`]
119 ///
120 /// Note: Subsequent calls to [`Self::grow`] must be made to reserve memory
121 fn register(&self, _consumer: &MemoryConsumer) {}
122
123 /// Records the destruction of a [`MemoryReservation`] with [`MemoryConsumer`]
124 ///
125 /// Note: Prior calls to [`Self::shrink`] must be made to free any reserved memory
126 fn unregister(&self, _consumer: &MemoryConsumer) {}
127
128 /// Infallibly grow the provided `reservation` by `additional` bytes
129 ///
130 /// This must always succeed
131 fn grow(&self, reservation: &MemoryReservation, additional: usize);
132
133 /// Infallibly shrink the provided `reservation` by `shrink` bytes
134 fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
135
136 /// Attempt to grow the provided `reservation` by `additional` bytes
137 ///
138 /// On error the `allocation` will not be increased in size
139 fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;
140
141 /// Return the total amount of memory reserved
142 fn reserved(&self) -> usize;
143}
144
145/// A memory consumer is a named allocation traced by a particular
146/// [`MemoryReservation`] in a [`MemoryPool`]. All allocations are registered to
147/// a particular `MemoryConsumer`;
148///
149/// For help with allocation accounting, see the [`proxy`] module.
150///
151/// [proxy]: datafusion_common::utils::proxy
152#[derive(Debug, PartialEq, Eq, Hash, Clone)]
153pub struct MemoryConsumer {
154 name: String,
155 can_spill: bool,
156}
157
158impl MemoryConsumer {
159 /// Create a new empty [`MemoryConsumer`] that can be grown using [`MemoryReservation`]
160 pub fn new(name: impl Into<String>) -> Self {
161 Self {
162 name: name.into(),
163 can_spill: false,
164 }
165 }
166
167 /// Set whether this allocation can be spilled to disk
168 pub fn with_can_spill(self, can_spill: bool) -> Self {
169 Self { can_spill, ..self }
170 }
171
172 /// Returns true if this allocation can spill to disk
173 pub fn can_spill(&self) -> bool {
174 self.can_spill
175 }
176
177 /// Returns the name associated with this allocation
178 pub fn name(&self) -> &str {
179 &self.name
180 }
181
182 /// Registers this [`MemoryConsumer`] with the provided [`MemoryPool`] returning
183 /// a [`MemoryReservation`] that can be used to grow or shrink the memory reservation
184 pub fn register(self, pool: &Arc<dyn MemoryPool>) -> MemoryReservation {
185 pool.register(&self);
186 MemoryReservation {
187 registration: Arc::new(SharedRegistration {
188 pool: Arc::clone(pool),
189 consumer: self,
190 }),
191 size: 0,
192 }
193 }
194}
195
196/// A registration of a [`MemoryConsumer`] with a [`MemoryPool`].
197///
198/// Calls [`MemoryPool::unregister`] on drop to return any memory to
199/// the underlying pool.
200#[derive(Debug)]
201struct SharedRegistration {
202 pool: Arc<dyn MemoryPool>,
203 consumer: MemoryConsumer,
204}
205
206impl Drop for SharedRegistration {
207 fn drop(&mut self) {
208 self.pool.unregister(&self.consumer);
209 }
210}
211
212/// A [`MemoryReservation`] tracks an individual reservation of a
213/// number of bytes of memory in a [`MemoryPool`] that is freed back
214/// to the pool on drop.
215///
216/// The reservation can be grown or shrunk over time.
217#[derive(Debug)]
218pub struct MemoryReservation {
219 registration: Arc<SharedRegistration>,
220 size: usize,
221}
222
223impl MemoryReservation {
224 /// Returns the size of this reservation in bytes
225 pub fn size(&self) -> usize {
226 self.size
227 }
228
229 /// Returns [MemoryConsumer] for this [MemoryReservation]
230 pub fn consumer(&self) -> &MemoryConsumer {
231 &self.registration.consumer
232 }
233
234 /// Frees all bytes from this reservation back to the underlying
235 /// pool, returning the number of bytes freed.
236 pub fn free(&mut self) -> usize {
237 let size = self.size;
238 if size != 0 {
239 self.shrink(size)
240 }
241 size
242 }
243
244 /// Frees `capacity` bytes from this reservation
245 ///
246 /// # Panics
247 ///
248 /// Panics if `capacity` exceeds [`Self::size`]
249 pub fn shrink(&mut self, capacity: usize) {
250 let new_size = self.size.checked_sub(capacity).unwrap();
251 self.registration.pool.shrink(self, capacity);
252 self.size = new_size
253 }
254
255 /// Tries to free `capacity` bytes from this reservation
256 /// if `capacity` does not exceed [`Self::size`]
257 /// Returns new reservation size
258 /// or error if shrinking capacity is more than allocated size
259 pub fn try_shrink(&mut self, capacity: usize) -> Result<usize> {
260 if let Some(new_size) = self.size.checked_sub(capacity) {
261 self.registration.pool.shrink(self, capacity);
262 self.size = new_size;
263 Ok(new_size)
264 } else {
265 internal_err!(
266 "Cannot free the capacity {capacity} out of allocated size {}",
267 self.size
268 )
269 }
270 }
271
272 /// Sets the size of this reservation to `capacity`
273 pub fn resize(&mut self, capacity: usize) {
274 match capacity.cmp(&self.size) {
275 Ordering::Greater => self.grow(capacity - self.size),
276 Ordering::Less => self.shrink(self.size - capacity),
277 _ => {}
278 }
279 }
280
281 /// Try to set the size of this reservation to `capacity`
282 pub fn try_resize(&mut self, capacity: usize) -> Result<()> {
283 match capacity.cmp(&self.size) {
284 Ordering::Greater => self.try_grow(capacity - self.size)?,
285 Ordering::Less => self.shrink(self.size - capacity),
286 _ => {}
287 };
288 Ok(())
289 }
290
291 /// Increase the size of this reservation by `capacity` bytes
292 pub fn grow(&mut self, capacity: usize) {
293 self.registration.pool.grow(self, capacity);
294 self.size += capacity;
295 }
296
297 /// Try to increase the size of this reservation by `capacity`
298 /// bytes, returning error if there is insufficient capacity left
299 /// in the pool.
300 pub fn try_grow(&mut self, capacity: usize) -> Result<()> {
301 self.registration.pool.try_grow(self, capacity)?;
302 self.size += capacity;
303 Ok(())
304 }
305
306 /// Splits off `capacity` bytes from this [`MemoryReservation`]
307 /// into a new [`MemoryReservation`] with the same
308 /// [`MemoryConsumer`].
309 ///
310 /// This can be useful to free part of this reservation with RAAI
311 /// style dropping
312 ///
313 /// # Panics
314 ///
315 /// Panics if `capacity` exceeds [`Self::size`]
316 pub fn split(&mut self, capacity: usize) -> MemoryReservation {
317 self.size = self.size.checked_sub(capacity).unwrap();
318 Self {
319 size: capacity,
320 registration: Arc::clone(&self.registration),
321 }
322 }
323
324 /// Returns a new empty [`MemoryReservation`] with the same [`MemoryConsumer`]
325 pub fn new_empty(&self) -> Self {
326 Self {
327 size: 0,
328 registration: Arc::clone(&self.registration),
329 }
330 }
331
332 /// Splits off all the bytes from this [`MemoryReservation`] into
333 /// a new [`MemoryReservation`] with the same [`MemoryConsumer`]
334 pub fn take(&mut self) -> MemoryReservation {
335 self.split(self.size)
336 }
337}
338
339impl Drop for MemoryReservation {
340 fn drop(&mut self) {
341 self.free();
342 }
343}
344
345pub mod units {
346 pub const TB: u64 = 1 << 40;
347 pub const GB: u64 = 1 << 30;
348 pub const MB: u64 = 1 << 20;
349 pub const KB: u64 = 1 << 10;
350}
351
352/// Present size in human readable form
353pub fn human_readable_size(size: usize) -> String {
354 use units::*;
355
356 let size = size as u64;
357 let (value, unit) = {
358 if size >= 2 * TB {
359 (size as f64 / TB as f64, "TB")
360 } else if size >= 2 * GB {
361 (size as f64 / GB as f64, "GB")
362 } else if size >= 2 * MB {
363 (size as f64 / MB as f64, "MB")
364 } else if size >= 2 * KB {
365 (size as f64 / KB as f64, "KB")
366 } else {
367 (size as f64, "B")
368 }
369 };
370 format!("{value:.1} {unit}")
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376
377 #[test]
378 fn test_memory_pool_underflow() {
379 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
380 let mut a1 = MemoryConsumer::new("a1").register(&pool);
381 assert_eq!(pool.reserved(), 0);
382
383 a1.grow(100);
384 assert_eq!(pool.reserved(), 100);
385
386 assert_eq!(a1.free(), 100);
387 assert_eq!(pool.reserved(), 0);
388
389 a1.try_grow(100).unwrap_err();
390 assert_eq!(pool.reserved(), 0);
391
392 a1.try_grow(30).unwrap();
393 assert_eq!(pool.reserved(), 30);
394
395 let mut a2 = MemoryConsumer::new("a2").register(&pool);
396 a2.try_grow(25).unwrap_err();
397 assert_eq!(pool.reserved(), 30);
398
399 drop(a1);
400 assert_eq!(pool.reserved(), 0);
401
402 a2.try_grow(25).unwrap();
403 assert_eq!(pool.reserved(), 25);
404 }
405
406 #[test]
407 fn test_split() {
408 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
409 let mut r1 = MemoryConsumer::new("r1").register(&pool);
410
411 r1.try_grow(20).unwrap();
412 assert_eq!(r1.size(), 20);
413 assert_eq!(pool.reserved(), 20);
414
415 // take 5 from r1, should still have same reservation split
416 let r2 = r1.split(5);
417 assert_eq!(r1.size(), 15);
418 assert_eq!(r2.size(), 5);
419 assert_eq!(pool.reserved(), 20);
420
421 // dropping r1 frees 15 but retains 5 as they have the same consumer
422 drop(r1);
423 assert_eq!(r2.size(), 5);
424 assert_eq!(pool.reserved(), 5);
425 }
426
427 #[test]
428 fn test_new_empty() {
429 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
430 let mut r1 = MemoryConsumer::new("r1").register(&pool);
431
432 r1.try_grow(20).unwrap();
433 let mut r2 = r1.new_empty();
434 r2.try_grow(5).unwrap();
435
436 assert_eq!(r1.size(), 20);
437 assert_eq!(r2.size(), 5);
438 assert_eq!(pool.reserved(), 25);
439 }
440
441 #[test]
442 fn test_take() {
443 let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
444 let mut r1 = MemoryConsumer::new("r1").register(&pool);
445
446 r1.try_grow(20).unwrap();
447 let mut r2 = r1.take();
448 r2.try_grow(5).unwrap();
449
450 assert_eq!(r1.size(), 0);
451 assert_eq!(r2.size(), 25);
452 assert_eq!(pool.reserved(), 25);
453
454 // r1 can still grow again
455 r1.try_grow(3).unwrap();
456 assert_eq!(r1.size(), 3);
457 assert_eq!(r2.size(), 25);
458 assert_eq!(pool.reserved(), 28);
459 }
460}