datafusion_common/utils/proxy.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations
19
20use hashbrown::{
21 hash_table::HashTable,
22 raw::{Bucket, RawTable},
23};
24use std::mem::size_of;
25
26/// Extension trait for [`Vec`] to account for allocations.
27pub trait VecAllocExt {
28 /// Item type.
29 type T;
30
31 /// [Push](Vec::push) new element to vector and increase
32 /// `accounting` by any newly allocated bytes.
33 ///
34 /// Note that allocation counts capacity, not size
35 ///
36 /// # Example:
37 /// ```
38 /// # use datafusion_common::utils::proxy::VecAllocExt;
39 /// // use allocated to incrementally track how much memory is allocated in the vec
40 /// let mut allocated = 0;
41 /// let mut vec = Vec::new();
42 /// // Push data into the vec and the accounting will be updated to reflect
43 /// // memory allocation
44 /// vec.push_accounted(1, &mut allocated);
45 /// assert_eq!(allocated, 16); // space for 4 u32s
46 /// vec.push_accounted(1, &mut allocated);
47 /// assert_eq!(allocated, 16); // no new allocation needed
48 ///
49 /// // push more data into the vec
50 /// for _ in 0..10 {
51 /// vec.push_accounted(1, &mut allocated);
52 /// }
53 /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
54 /// assert_eq!(vec.allocated_size(), 64);
55 /// ```
56 /// # Example with other allocations:
57 /// ```
58 /// # use datafusion_common::utils::proxy::VecAllocExt;
59 /// // You can use the same allocated size to track memory allocated by
60 /// // another source. For example
61 /// let mut allocated = 27;
62 /// let mut vec = Vec::new();
63 /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
64 /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
65 /// ```
66 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
67
68 /// Return the amount of memory allocated by this Vec to store elements
69 /// (`size_of<T> * capacity`).
70 ///
71 /// Note this calculation is not recursive, and does not include any heap
72 /// allocations contained within the Vec's elements. Does not include the
73 /// size of `self`
74 ///
75 /// # Example:
76 /// ```
77 /// # use datafusion_common::utils::proxy::VecAllocExt;
78 /// let mut vec = Vec::new();
79 /// // Push data into the vec and the accounting will be updated to reflect
80 /// // memory allocation
81 /// vec.push(1);
82 /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
83 /// vec.push(1);
84 /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
85 ///
86 /// // push more data into the vec
87 /// for _ in 0..10 {
88 /// vec.push(1);
89 /// }
90 /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
91 /// ```
92 fn allocated_size(&self) -> usize;
93}
94
95impl<T> VecAllocExt for Vec<T> {
96 type T = T;
97
98 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
99 let prev_capacity = self.capacity();
100 self.push(x);
101 let new_capacity = self.capacity();
102 if new_capacity > prev_capacity {
103 // capacity changed, so we allocated more
104 let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
105 // Note multiplication should never overflow because `push` would
106 // have panic'd first, but the checked_add could potentially
107 // overflow since accounting could be tracking additional values, and
108 // could be greater than what is stored in the Vec
109 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
110 }
111 }
112 fn allocated_size(&self) -> usize {
113 size_of::<T>() * self.capacity()
114 }
115}
116
117/// Extension trait for hash browns [`RawTable`] to account for allocations.
118pub trait RawTableAllocExt {
119 /// Item type.
120 type T;
121
122 /// [Insert](RawTable::insert) new element into table and increase
123 /// `accounting` by any newly allocated bytes.
124 ///
125 /// Returns the bucket where the element was inserted.
126 /// Note that allocation counts capacity, not size.
127 ///
128 /// # Example:
129 /// ```
130 /// # use datafusion_common::utils::proxy::RawTableAllocExt;
131 /// # use hashbrown::raw::RawTable;
132 /// let mut table = RawTable::new();
133 /// let mut allocated = 0;
134 /// let hash_fn = |x: &u32| (*x as u64) % 1000;
135 /// // pretend 0x3117 is the hash value for 1
136 /// table.insert_accounted(1, hash_fn, &mut allocated);
137 /// assert_eq!(allocated, 64);
138 ///
139 /// // insert more values
140 /// for i in 0..100 {
141 /// table.insert_accounted(i, hash_fn, &mut allocated);
142 /// }
143 /// assert_eq!(allocated, 400);
144 /// ```
145 fn insert_accounted(
146 &mut self,
147 x: Self::T,
148 hasher: impl Fn(&Self::T) -> u64,
149 accounting: &mut usize,
150 ) -> Bucket<Self::T>;
151}
152
153impl<T> RawTableAllocExt for RawTable<T> {
154 type T = T;
155
156 fn insert_accounted(
157 &mut self,
158 x: Self::T,
159 hasher: impl Fn(&Self::T) -> u64,
160 accounting: &mut usize,
161 ) -> Bucket<Self::T> {
162 let hash = hasher(&x);
163
164 match self.try_insert_no_grow(hash, x) {
165 Ok(bucket) => bucket,
166 Err(x) => {
167 // need to request more memory
168
169 let bump_elements = self.capacity().max(16);
170 let bump_size = bump_elements * size_of::<T>();
171 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
172
173 self.reserve(bump_elements, hasher);
174
175 // still need to insert the element since first try failed
176 // Note: cannot use `.expect` here because `T` may not implement `Debug`
177 match self.try_insert_no_grow(hash, x) {
178 Ok(bucket) => bucket,
179 Err(_) => panic!("just grew the container"),
180 }
181 }
182 }
183 }
184}
185
186/// Extension trait for hash browns [`HashTable`] to account for allocations.
187pub trait HashTableAllocExt {
188 /// Item type.
189 type T;
190
191 /// Insert new element into table and increase
192 /// `accounting` by any newly allocated bytes.
193 ///
194 /// Returns the bucket where the element was inserted.
195 /// Note that allocation counts capacity, not size.
196 ///
197 /// # Example:
198 /// ```
199 /// # use datafusion_common::utils::proxy::HashTableAllocExt;
200 /// # use hashbrown::hash_table::HashTable;
201 /// let mut table = HashTable::new();
202 /// let mut allocated = 0;
203 /// let hash_fn = |x: &u32| (*x as u64) % 1000;
204 /// // pretend 0x3117 is the hash value for 1
205 /// table.insert_accounted(1, hash_fn, &mut allocated);
206 /// assert_eq!(allocated, 64);
207 ///
208 /// // insert more values
209 /// for i in 0..100 {
210 /// table.insert_accounted(i, hash_fn, &mut allocated);
211 /// }
212 /// assert_eq!(allocated, 400);
213 /// ```
214 fn insert_accounted(
215 &mut self,
216 x: Self::T,
217 hasher: impl Fn(&Self::T) -> u64,
218 accounting: &mut usize,
219 );
220}
221
222impl<T> HashTableAllocExt for HashTable<T>
223where
224 T: Eq,
225{
226 type T = T;
227
228 fn insert_accounted(
229 &mut self,
230 x: Self::T,
231 hasher: impl Fn(&Self::T) -> u64,
232 accounting: &mut usize,
233 ) {
234 let hash = hasher(&x);
235
236 // NOTE: `find_entry` does NOT grow!
237 match self.find_entry(hash, |y| y == &x) {
238 Ok(_occupied) => {}
239 Err(_absent) => {
240 if self.len() == self.capacity() {
241 // need to request more memory
242 let bump_elements = self.capacity().max(16);
243 let bump_size = bump_elements * size_of::<T>();
244 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
245
246 self.reserve(bump_elements, &hasher);
247 }
248
249 // still need to insert the element since first try failed
250 self.entry(hash, |y| y == &x, hasher).insert(x);
251 }
252 }
253 }
254}