datafusion_common/utils/
proxy.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] and [`RawTableAllocExt`] to help tracking of memory allocations
19
20use hashbrown::{
21    hash_table::HashTable,
22    raw::{Bucket, RawTable},
23};
24use std::mem::size_of;
25
26/// Extension trait for [`Vec`] to account for allocations.
27pub trait VecAllocExt {
28    /// Item type.
29    type T;
30
31    /// [Push](Vec::push) new element to vector and increase
32    /// `accounting` by any newly allocated bytes.
33    ///
34    /// Note that allocation counts  capacity, not size
35    ///
36    /// # Example:
37    /// ```
38    /// # use datafusion_common::utils::proxy::VecAllocExt;
39    /// // use allocated to incrementally track how much memory is allocated in the vec
40    /// let mut allocated = 0;
41    /// let mut vec = Vec::new();
42    /// // Push data into the vec and the accounting will be updated to reflect
43    /// // memory allocation
44    /// vec.push_accounted(1, &mut allocated);
45    /// assert_eq!(allocated, 16); // space for 4 u32s
46    /// vec.push_accounted(1, &mut allocated);
47    /// assert_eq!(allocated, 16); // no new allocation needed
48    ///
49    /// // push more data into the vec
50    /// for _ in 0..10 {
51    ///     vec.push_accounted(1, &mut allocated);
52    /// }
53    /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
54    /// assert_eq!(vec.allocated_size(), 64);
55    /// ```
56    /// # Example with other allocations:
57    /// ```
58    /// # use datafusion_common::utils::proxy::VecAllocExt;
59    /// // You can use the same allocated size to track memory allocated by
60    /// // another source. For example
61    /// let mut allocated = 27;
62    /// let mut vec = Vec::new();
63    /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
64    /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
65    /// ```
66    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
67
68    /// Return the amount of memory allocated by this Vec to store elements
69    /// (`size_of<T> * capacity`).
70    ///
71    /// Note this calculation is not recursive, and does not include any heap
72    /// allocations contained within the Vec's elements. Does not include the
73    /// size of `self`
74    ///
75    /// # Example:
76    /// ```
77    /// # use datafusion_common::utils::proxy::VecAllocExt;
78    /// let mut vec = Vec::new();
79    /// // Push data into the vec and the accounting will be updated to reflect
80    /// // memory allocation
81    /// vec.push(1);
82    /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
83    /// vec.push(1);
84    /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
85    ///
86    /// // push more data into the vec
87    /// for _ in 0..10 {
88    ///     vec.push(1);
89    /// }
90    /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
91    /// ```
92    fn allocated_size(&self) -> usize;
93}
94
95impl<T> VecAllocExt for Vec<T> {
96    type T = T;
97
98    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
99        let prev_capacity = self.capacity();
100        self.push(x);
101        let new_capacity = self.capacity();
102        if new_capacity > prev_capacity {
103            // capacity changed, so we allocated more
104            let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
105            // Note multiplication should never overflow because `push` would
106            // have panic'd first, but the checked_add could potentially
107            // overflow since accounting could be tracking additional values, and
108            // could be greater than what is stored in the Vec
109            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
110        }
111    }
112    fn allocated_size(&self) -> usize {
113        size_of::<T>() * self.capacity()
114    }
115}
116
117/// Extension trait for hash browns [`RawTable`] to account for allocations.
118pub trait RawTableAllocExt {
119    /// Item type.
120    type T;
121
122    /// [Insert](RawTable::insert) new element into table and increase
123    /// `accounting` by any newly allocated bytes.
124    ///
125    /// Returns the bucket where the element was inserted.
126    /// Note that allocation counts capacity, not size.
127    ///
128    /// # Example:
129    /// ```
130    /// # use datafusion_common::utils::proxy::RawTableAllocExt;
131    /// # use hashbrown::raw::RawTable;
132    /// let mut table = RawTable::new();
133    /// let mut allocated = 0;
134    /// let hash_fn = |x: &u32| (*x as u64) % 1000;
135    /// // pretend 0x3117 is the hash value for 1
136    /// table.insert_accounted(1, hash_fn, &mut allocated);
137    /// assert_eq!(allocated, 64);
138    ///
139    /// // insert more values
140    /// for i in 0..100 {
141    ///     table.insert_accounted(i, hash_fn, &mut allocated);
142    /// }
143    /// assert_eq!(allocated, 400);
144    /// ```
145    fn insert_accounted(
146        &mut self,
147        x: Self::T,
148        hasher: impl Fn(&Self::T) -> u64,
149        accounting: &mut usize,
150    ) -> Bucket<Self::T>;
151}
152
153impl<T> RawTableAllocExt for RawTable<T> {
154    type T = T;
155
156    fn insert_accounted(
157        &mut self,
158        x: Self::T,
159        hasher: impl Fn(&Self::T) -> u64,
160        accounting: &mut usize,
161    ) -> Bucket<Self::T> {
162        let hash = hasher(&x);
163
164        match self.try_insert_no_grow(hash, x) {
165            Ok(bucket) => bucket,
166            Err(x) => {
167                // need to request more memory
168
169                let bump_elements = self.capacity().max(16);
170                let bump_size = bump_elements * size_of::<T>();
171                *accounting = (*accounting).checked_add(bump_size).expect("overflow");
172
173                self.reserve(bump_elements, hasher);
174
175                // still need to insert the element since first try failed
176                // Note: cannot use `.expect` here because `T` may not implement `Debug`
177                match self.try_insert_no_grow(hash, x) {
178                    Ok(bucket) => bucket,
179                    Err(_) => panic!("just grew the container"),
180                }
181            }
182        }
183    }
184}
185
186/// Extension trait for hash browns [`HashTable`] to account for allocations.
187pub trait HashTableAllocExt {
188    /// Item type.
189    type T;
190
191    /// Insert new element into table and increase
192    /// `accounting` by any newly allocated bytes.
193    ///
194    /// Returns the bucket where the element was inserted.
195    /// Note that allocation counts capacity, not size.
196    ///
197    /// # Example:
198    /// ```
199    /// # use datafusion_common::utils::proxy::HashTableAllocExt;
200    /// # use hashbrown::hash_table::HashTable;
201    /// let mut table = HashTable::new();
202    /// let mut allocated = 0;
203    /// let hash_fn = |x: &u32| (*x as u64) % 1000;
204    /// // pretend 0x3117 is the hash value for 1
205    /// table.insert_accounted(1, hash_fn, &mut allocated);
206    /// assert_eq!(allocated, 64);
207    ///
208    /// // insert more values
209    /// for i in 0..100 {
210    ///     table.insert_accounted(i, hash_fn, &mut allocated);
211    /// }
212    /// assert_eq!(allocated, 400);
213    /// ```
214    fn insert_accounted(
215        &mut self,
216        x: Self::T,
217        hasher: impl Fn(&Self::T) -> u64,
218        accounting: &mut usize,
219    );
220}
221
222impl<T> HashTableAllocExt for HashTable<T>
223where
224    T: Eq,
225{
226    type T = T;
227
228    fn insert_accounted(
229        &mut self,
230        x: Self::T,
231        hasher: impl Fn(&Self::T) -> u64,
232        accounting: &mut usize,
233    ) {
234        let hash = hasher(&x);
235
236        // NOTE: `find_entry` does NOT grow!
237        match self.find_entry(hash, |y| y == &x) {
238            Ok(_occupied) => {}
239            Err(_absent) => {
240                if self.len() == self.capacity() {
241                    // need to request more memory
242                    let bump_elements = self.capacity().max(16);
243                    let bump_size = bump_elements * size_of::<T>();
244                    *accounting = (*accounting).checked_add(bump_size).expect("overflow");
245
246                    self.reserve(bump_elements, &hasher);
247                }
248
249                // still need to insert the element since first try failed
250                self.entry(hash, |y| y == &x, hasher).insert(x);
251            }
252        }
253    }
254}