Skip to main content

datafusion_execution/memory_pool/
arrow.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Adapter for integrating DataFusion's [`MemoryPool`] with Arrow's memory tracking APIs.
19
20use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation};
21use std::fmt::Debug;
22use std::sync::Arc;
23
24/// An adapter that implements Arrow's [`arrow_buffer::MemoryPool`] trait
25/// by wrapping a DataFusion [`MemoryPool`].
26///
27/// This allows DataFusion's memory management system to be used with Arrow's
28/// memory allocation APIs. Each reservation made through this pool will be
29/// tracked using the provided [`MemoryConsumer`], enabling DataFusion to
30/// monitor and limit memory usage across Arrow operations.
31///
32/// This is useful when you want Arrow operations (such as array builders
33/// or compute kernels) to participate in DataFusion's memory management
34/// and respect the same memory limits as DataFusion operators.
35#[derive(Debug)]
36pub struct ArrowMemoryPool {
37    inner: Arc<dyn MemoryPool>,
38    consumer: MemoryConsumer,
39}
40
41impl ArrowMemoryPool {
42    /// Creates a new [`ArrowMemoryPool`] that wraps the given DataFusion [`MemoryPool`]
43    /// and tracks allocations under the specified [`MemoryConsumer`].
44    pub fn new(inner: Arc<dyn MemoryPool>, consumer: MemoryConsumer) -> Self {
45        Self { inner, consumer }
46    }
47}
48
49impl arrow_buffer::MemoryReservation for MemoryReservation {
50    fn size(&self) -> usize {
51        MemoryReservation::size(self)
52    }
53
54    fn resize(&mut self, new_size: usize) {
55        MemoryReservation::resize(self, new_size)
56    }
57}
58
59impl arrow_buffer::MemoryPool for ArrowMemoryPool {
60    fn reserve(&self, size: usize) -> Box<dyn arrow_buffer::MemoryReservation> {
61        let consumer = self.consumer.clone_with_new_id();
62        let mut reservation = consumer.register(&self.inner);
63        reservation.grow(size);
64
65        Box::new(reservation)
66    }
67
68    fn available(&self) -> isize {
69        // The pool may be overfilled, so this method might return a negative value.
70        (self.capacity() as i128 - self.used() as i128)
71            .try_into()
72            .unwrap_or(isize::MIN)
73    }
74
75    fn used(&self) -> usize {
76        self.inner.reserved()
77    }
78
79    fn capacity(&self) -> usize {
80        match self.inner.memory_limit() {
81            MemoryLimit::Infinite | MemoryLimit::Unknown => usize::MAX,
82            MemoryLimit::Finite(capacity) => capacity,
83        }
84    }
85}
86
87#[cfg(test)]
88mod tests {
89    use super::*;
90    use crate::memory_pool::{GreedyMemoryPool, UnboundedMemoryPool};
91    use arrow::array::{Array, Int32Array};
92    use arrow_buffer::MemoryPool;
93
94    // Until https://github.com/apache/arrow-rs/pull/8918 lands, we need to iterate all
95    // buffers in the array. Change once the PR is released.
96    fn claim_array(array: &dyn Array, pool: &dyn MemoryPool) {
97        for buffer in array.to_data().buffers() {
98            buffer.claim(pool);
99        }
100    }
101
102    #[test]
103    pub fn can_claim_array() {
104        let pool = Arc::new(UnboundedMemoryPool::default());
105
106        let consumer = MemoryConsumer::new("arrow");
107        let arrow_pool = ArrowMemoryPool::new(pool, consumer);
108
109        let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
110        claim_array(&array, &arrow_pool);
111
112        assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
113
114        let slice = array.slice(0, 2);
115
116        // This should be a no-op
117        claim_array(&slice, &arrow_pool);
118
119        assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
120    }
121
122    #[test]
123    pub fn can_claim_array_with_finite_limit() {
124        let pool_capacity = 1024;
125        let pool = Arc::new(GreedyMemoryPool::new(pool_capacity));
126
127        let consumer = MemoryConsumer::new("arrow");
128        let arrow_pool = ArrowMemoryPool::new(pool, consumer);
129
130        assert_eq!(arrow_pool.capacity(), pool_capacity);
131        assert_eq!(arrow_pool.available(), pool_capacity as isize);
132
133        let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
134        claim_array(&array, &arrow_pool);
135
136        assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
137        assert_eq!(
138            arrow_pool.available(),
139            (pool_capacity - array.get_buffer_memory_size()) as isize
140        );
141    }
142}