datafusion_execution/memory_pool/
arrow.rs1use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation};
21use std::fmt::Debug;
22use std::sync::Arc;
23
24#[derive(Debug)]
36pub struct ArrowMemoryPool {
37 inner: Arc<dyn MemoryPool>,
38 consumer: MemoryConsumer,
39}
40
41impl ArrowMemoryPool {
42 pub fn new(inner: Arc<dyn MemoryPool>, consumer: MemoryConsumer) -> Self {
45 Self { inner, consumer }
46 }
47}
48
49impl arrow_buffer::MemoryReservation for MemoryReservation {
50 fn size(&self) -> usize {
51 MemoryReservation::size(self)
52 }
53
54 fn resize(&mut self, new_size: usize) {
55 MemoryReservation::resize(self, new_size)
56 }
57}
58
59impl arrow_buffer::MemoryPool for ArrowMemoryPool {
60 fn reserve(&self, size: usize) -> Box<dyn arrow_buffer::MemoryReservation> {
61 let consumer = self.consumer.clone_with_new_id();
62 let mut reservation = consumer.register(&self.inner);
63 reservation.grow(size);
64
65 Box::new(reservation)
66 }
67
68 fn available(&self) -> isize {
69 (self.capacity() as i128 - self.used() as i128)
71 .try_into()
72 .unwrap_or(isize::MIN)
73 }
74
75 fn used(&self) -> usize {
76 self.inner.reserved()
77 }
78
79 fn capacity(&self) -> usize {
80 match self.inner.memory_limit() {
81 MemoryLimit::Infinite | MemoryLimit::Unknown => usize::MAX,
82 MemoryLimit::Finite(capacity) => capacity,
83 }
84 }
85}
86
87#[cfg(test)]
88mod tests {
89 use super::*;
90 use crate::memory_pool::{GreedyMemoryPool, UnboundedMemoryPool};
91 use arrow::array::{Array, Int32Array};
92 use arrow_buffer::MemoryPool;
93
94 fn claim_array(array: &dyn Array, pool: &dyn MemoryPool) {
97 for buffer in array.to_data().buffers() {
98 buffer.claim(pool);
99 }
100 }
101
102 #[test]
103 pub fn can_claim_array() {
104 let pool = Arc::new(UnboundedMemoryPool::default());
105
106 let consumer = MemoryConsumer::new("arrow");
107 let arrow_pool = ArrowMemoryPool::new(pool, consumer);
108
109 let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
110 claim_array(&array, &arrow_pool);
111
112 assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
113
114 let slice = array.slice(0, 2);
115
116 claim_array(&slice, &arrow_pool);
118
119 assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
120 }
121
122 #[test]
123 pub fn can_claim_array_with_finite_limit() {
124 let pool_capacity = 1024;
125 let pool = Arc::new(GreedyMemoryPool::new(pool_capacity));
126
127 let consumer = MemoryConsumer::new("arrow");
128 let arrow_pool = ArrowMemoryPool::new(pool, consumer);
129
130 assert_eq!(arrow_pool.capacity(), pool_capacity);
131 assert_eq!(arrow_pool.available(), pool_capacity as isize);
132
133 let array = Int32Array::from(vec![1, 2, 3, 4, 5]);
134 claim_array(&array, &arrow_pool);
135
136 assert_eq!(arrow_pool.used(), array.get_buffer_memory_size());
137 assert_eq!(
138 arrow_pool.available(),
139 (pool_capacity - array.get_buffer_memory_size()) as isize
140 );
141 }
142}