pub trait MemoryPool:
Send
+ Sync
+ Debug {
// Required methods
fn grow(&self, reservation: &MemoryReservation, additional: usize);
fn shrink(&self, reservation: &MemoryReservation, shrink: usize);
fn try_grow(
&self,
reservation: &MemoryReservation,
additional: usize,
) -> Result<()>;
fn reserved(&self) -> usize;
// Provided methods
fn register(&self, _consumer: &MemoryConsumer) { ... }
fn unregister(&self, _consumer: &MemoryConsumer) { ... }
fn memory_limit(&self) -> MemoryLimit { ... }
}Expand description
Tracks and potentially limits memory use across operators during execution.
§Memory Management Overview
DataFusion is a streaming query engine, processing most queries without buffering the entire input. Most operators require a fixed amount of memory based on the schema and target batch size. However, certain operations such as sorting and grouping/joining, require buffering intermediate results, which can require memory proportional to the number of input rows.
Rather than tracking all allocations, DataFusion takes a pragmatic approach: Intermediate memory used as data streams through the system is not accounted (it assumed to be “small”) but the large consumers of memory must register and constrain their use. This design trades off the additional code complexity of memory tracking with limiting resource usage.
When limiting memory with a MemoryPool you should typically reserve some
overhead (e.g. 10%) for the “small” memory allocations that are not tracked.
§Memory Management Design
As explained above, DataFusion’s design ONLY limits operators that require
“large” amounts of memory (proportional to number of input rows), such as
GroupByHashExec. It does NOT track and limit memory used internally by
other operators such as DataSourceExec or the RecordBatches that flow
between operators. Furthermore, operators should not reserve memory for the
batches they produce. Instead, if a consumer operator needs to hold batches
from its producers in memory for an extended period, it is the consumer
operator’s responsibility to reserve the necessary memory for those batches.
In order to avoid allocating memory until the OS or the container system
kills the process, DataFusion ExecutionPlans (operators) that consume
large amounts of memory must first request their desired allocation from a
MemoryPool before allocating more. The request is typically managed via
a MemoryReservation and MemoryConsumer.
If the allocation is successful, the operator should proceed and allocate the desired memory. If the allocation fails, the operator must either first free memory (e.g. by spilling to local disk) and try again, or error.
Note that a MemoryPool can be shared by concurrently executing plans,
which can be used to control memory usage in a multi-tenant system.
§How MemoryPool works by example
Scenario 1:
For Filter operator, RecordBatches will stream through it, so it
don’t have to keep track of memory usage through MemoryPool.
Scenario 2:
For CrossJoin operator, if the input size gets larger, the intermediate
state will also grow. So CrossJoin operator will use MemoryPool to
limit the memory usage.
2.1 CrossJoin operator has read a new batch, asked memory pool for
additional memory. Memory pool updates the usage and returns success.
2.2 CrossJoin has read another batch, and tries to reserve more memory
again, memory pool does not have enough memory. Since CrossJoin operator
has not implemented spilling, it will stop execution and return an error.
Scenario 3:
For Aggregate operator, its intermediate states will also accumulate as
the input size gets larger, but with spilling capability. When it tries to
reserve more memory from the memory pool, and the memory pool has already
reached the memory limit, it will return an error. Then, Aggregate
operator will spill the intermediate buffers to disk, and release memory
from the memory pool, and continue to retry memory reservation.
§Related Structs
To better understand memory management in DataFusion, here are the key structs and their relationships:
MemoryConsumer: A named allocation traced by a particular operator. If an execution is parallelized, and there are multiple partitions of the same operator, each partition will have a separateMemoryConsumer.SharedRegistration: A registration of aMemoryConsumerwith aMemoryPool.SharedRegistrationandMemoryPoolhave a many-to-one relationship.MemoryPoolimplementation can decide how to allocate memory based on the registered consumers. (e.g.FairSpillPoolwill try to share available memory evenly among all registered consumers)MemoryReservation: EachMemoryConsumer/operator can have multipleMemoryReservations for different internal data structures. The relationship betweenMemoryConsumerandMemoryReservationis one-to-many. This design enables cleaner operator implementations:- Different
MemoryReservations can be used for different purposes MemoryReservationfollows RAII principles - to release a reservation, simply drop theMemoryReservationobject. When allMemoryReservations for aSharedRegistrationare dropped, theSharedRegistrationis dropped when its reference count reaches zero, automatically unregistering theMemoryConsumerfrom theMemoryPool.
- Different
§Relationship Diagram
┌──────────────────┐ ┌──────────────────┐
│MemoryReservation │ │MemoryReservation │
└───┬──────────────┘ └──────────────────┘ ......
│belongs to │
│ ┌───────────────────────┘ │ │
│ │ │ │
▼ ▼ ▼ ▼
┌────────────────────────┐ ┌────────────────────────┐
│ SharedRegistration │ │ SharedRegistration │
│ ┌────────────────┐ │ │ ┌────────────────┐ │
│ │ │ │ │ │ │ │
│ │ MemoryConsumer │ │ │ │ MemoryConsumer │ │
│ │ │ │ │ │ │ │
│ └────────────────┘ │ │ └────────────────┘ │
└────────────┬───────────┘ └────────────┬───────────┘
│ │
│ register│into
│ │
└─────────────┐ ┌──────────────┘
│ │
▼ ▼
╔═══════════════════════════════════════════════════╗
║ ║
║ MemoryPool ║
║ ║
╚═══════════════════════════════════════════════════╝For example, there are two parallel partitions of an operator X: each partition
corresponds to a MemoryConsumer in the above diagram. Inside each partition of
operator X, there are typically several MemoryReservations - one for each
internal data structure that needs memory tracking (e.g., 1 reservation for the hash
table, and 1 reservation for buffered input, etc.).
§Implementing MemoryPool
You can implement a custom allocation policy by implementing the
MemoryPool trait and configuring a SessionContext appropriately.
However, DataFusion comes with the following simple memory pool implementations that
handle many common cases:
-
UnboundedMemoryPool: no memory limits (the default) -
GreedyMemoryPool: Limits memory usage to a fixed size using a “first come first served” policy -
FairSpillPool: Limits memory usage to a fixed size, allocating memory to all spilling operators fairly -
TrackConsumersPool: Wraps anotherMemoryPooland tracks consumers, providing better error messages on the largest memory users.
Required Methods§
Sourcefn grow(&self, reservation: &MemoryReservation, additional: usize)
fn grow(&self, reservation: &MemoryReservation, additional: usize)
Infallibly grow the provided reservation by additional bytes
This must always succeed
Sourcefn shrink(&self, reservation: &MemoryReservation, shrink: usize)
fn shrink(&self, reservation: &MemoryReservation, shrink: usize)
Infallibly shrink the provided reservation by shrink bytes
Provided Methods§
Sourcefn register(&self, _consumer: &MemoryConsumer)
fn register(&self, _consumer: &MemoryConsumer)
Registers a new MemoryConsumer
Note: Subsequent calls to Self::grow must be made to reserve memory
Sourcefn unregister(&self, _consumer: &MemoryConsumer)
fn unregister(&self, _consumer: &MemoryConsumer)
Records the destruction of a MemoryReservation with MemoryConsumer
Note: Prior calls to Self::shrink must be made to free any reserved memory
Sourcefn memory_limit(&self) -> MemoryLimit
fn memory_limit(&self) -> MemoryLimit
Return the memory limit of the pool
The default implementation of MemoryPool::memory_limit
will return MemoryLimit::Unknown.
If you are using your custom memory pool, but have the requirement to
know the memory usage limit of the pool, please implement this method
to return it(Memory::Finite(limit)).