datafusion_common/utils/
proxy.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] to help tracking of memory allocations
19
20use hashbrown::hash_table::HashTable;
21use std::mem::size_of;
22
23/// Extension trait for [`Vec`] to account for allocations.
24pub trait VecAllocExt {
25    /// Item type.
26    type T;
27
28    /// [Push](Vec::push) new element to vector and increase
29    /// `accounting` by any newly allocated bytes.
30    ///
31    /// Note that allocation counts  capacity, not size
32    ///
33    /// # Example:
34    /// ```
35    /// # use datafusion_common::utils::proxy::VecAllocExt;
36    /// // use allocated to incrementally track how much memory is allocated in the vec
37    /// let mut allocated = 0;
38    /// let mut vec = Vec::new();
39    /// // Push data into the vec and the accounting will be updated to reflect
40    /// // memory allocation
41    /// vec.push_accounted(1, &mut allocated);
42    /// assert_eq!(allocated, 16); // space for 4 u32s
43    /// vec.push_accounted(1, &mut allocated);
44    /// assert_eq!(allocated, 16); // no new allocation needed
45    ///
46    /// // push more data into the vec
47    /// for _ in 0..10 {
48    ///     vec.push_accounted(1, &mut allocated);
49    /// }
50    /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
51    /// assert_eq!(vec.allocated_size(), 64);
52    /// ```
53    /// # Example with other allocations:
54    /// ```
55    /// # use datafusion_common::utils::proxy::VecAllocExt;
56    /// // You can use the same allocated size to track memory allocated by
57    /// // another source. For example
58    /// let mut allocated = 27;
59    /// let mut vec = Vec::new();
60    /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
61    /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
62    /// ```
63    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
64
65    /// Return the amount of memory allocated by this Vec to store elements
66    /// (`size_of<T> * capacity`).
67    ///
68    /// Note this calculation is not recursive, and does not include any heap
69    /// allocations contained within the Vec's elements. Does not include the
70    /// size of `self`
71    ///
72    /// # Example:
73    /// ```
74    /// # use datafusion_common::utils::proxy::VecAllocExt;
75    /// let mut vec = Vec::new();
76    /// // Push data into the vec and the accounting will be updated to reflect
77    /// // memory allocation
78    /// vec.push(1);
79    /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
80    /// vec.push(1);
81    /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
82    ///
83    /// // push more data into the vec
84    /// for _ in 0..10 {
85    ///     vec.push(1);
86    /// }
87    /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
88    /// ```
89    fn allocated_size(&self) -> usize;
90}
91
92impl<T> VecAllocExt for Vec<T> {
93    type T = T;
94
95    fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
96        let prev_capacity = self.capacity();
97        self.push(x);
98        let new_capacity = self.capacity();
99        if new_capacity > prev_capacity {
100            // capacity changed, so we allocated more
101            let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
102            // Note multiplication should never overflow because `push` would
103            // have panic'd first, but the checked_add could potentially
104            // overflow since accounting could be tracking additional values, and
105            // could be greater than what is stored in the Vec
106            *accounting = (*accounting).checked_add(bump_size).expect("overflow");
107        }
108    }
109    fn allocated_size(&self) -> usize {
110        size_of::<T>() * self.capacity()
111    }
112}
113
114/// Extension trait for hash browns [`HashTable`] to account for allocations.
115pub trait HashTableAllocExt {
116    /// Item type.
117    type T;
118
119    /// Insert new element into table and increase
120    /// `accounting` by any newly allocated bytes.
121    ///
122    /// Returns the bucket where the element was inserted.
123    /// Note that allocation counts capacity, not size.
124    ///
125    /// # Example:
126    /// ```
127    /// # use datafusion_common::utils::proxy::HashTableAllocExt;
128    /// # use hashbrown::hash_table::HashTable;
129    /// let mut table = HashTable::new();
130    /// let mut allocated = 0;
131    /// let hash_fn = |x: &u32| (*x as u64) % 1000;
132    /// // pretend 0x3117 is the hash value for 1
133    /// table.insert_accounted(1, hash_fn, &mut allocated);
134    /// assert_eq!(allocated, 64);
135    ///
136    /// // insert more values
137    /// for i in 0..100 {
138    ///     table.insert_accounted(i, hash_fn, &mut allocated);
139    /// }
140    /// assert_eq!(allocated, 400);
141    /// ```
142    fn insert_accounted(
143        &mut self,
144        x: Self::T,
145        hasher: impl Fn(&Self::T) -> u64,
146        accounting: &mut usize,
147    );
148}
149
150impl<T> HashTableAllocExt for HashTable<T>
151where
152    T: Eq,
153{
154    type T = T;
155
156    fn insert_accounted(
157        &mut self,
158        x: Self::T,
159        hasher: impl Fn(&Self::T) -> u64,
160        accounting: &mut usize,
161    ) {
162        let hash = hasher(&x);
163
164        // NOTE: `find_entry` does NOT grow!
165        match self.find_entry(hash, |y| y == &x) {
166            Ok(_occupied) => {}
167            Err(_absent) => {
168                if self.len() == self.capacity() {
169                    // need to request more memory
170                    let bump_elements = self.capacity().max(16);
171                    let bump_size = bump_elements * size_of::<T>();
172                    *accounting = (*accounting).checked_add(bump_size).expect("overflow");
173
174                    self.reserve(bump_elements, &hasher);
175                }
176
177                // still need to insert the element since first try failed
178                self.entry(hash, |y| y == &x, hasher).insert(x);
179            }
180        }
181    }
182}