datafusion_common/utils/proxy.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`VecAllocExt`] to help tracking of memory allocations
19
20use hashbrown::hash_table::HashTable;
21use std::mem::size_of;
22
23/// Extension trait for [`Vec`] to account for allocations.
24pub trait VecAllocExt {
25 /// Item type.
26 type T;
27
28 /// [Push](Vec::push) new element to vector and increase
29 /// `accounting` by any newly allocated bytes.
30 ///
31 /// Note that allocation counts capacity, not size
32 ///
33 /// # Example:
34 /// ```
35 /// # use datafusion_common::utils::proxy::VecAllocExt;
36 /// // use allocated to incrementally track how much memory is allocated in the vec
37 /// let mut allocated = 0;
38 /// let mut vec = Vec::new();
39 /// // Push data into the vec and the accounting will be updated to reflect
40 /// // memory allocation
41 /// vec.push_accounted(1, &mut allocated);
42 /// assert_eq!(allocated, 16); // space for 4 u32s
43 /// vec.push_accounted(1, &mut allocated);
44 /// assert_eq!(allocated, 16); // no new allocation needed
45 ///
46 /// // push more data into the vec
47 /// for _ in 0..10 {
48 /// vec.push_accounted(1, &mut allocated);
49 /// }
50 /// assert_eq!(allocated, 64); // underlying vec has space for 10 u32s
51 /// assert_eq!(vec.allocated_size(), 64);
52 /// ```
53 /// # Example with other allocations:
54 /// ```
55 /// # use datafusion_common::utils::proxy::VecAllocExt;
56 /// // You can use the same allocated size to track memory allocated by
57 /// // another source. For example
58 /// let mut allocated = 27;
59 /// let mut vec = Vec::new();
60 /// vec.push_accounted(1, &mut allocated); // allocates 16 bytes for vec
61 /// assert_eq!(allocated, 43); // 16 bytes for vec, 27 bytes for other
62 /// ```
63 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
64
65 /// Return the amount of memory allocated by this Vec to store elements
66 /// (`size_of<T> * capacity`).
67 ///
68 /// Note this calculation is not recursive, and does not include any heap
69 /// allocations contained within the Vec's elements. Does not include the
70 /// size of `self`
71 ///
72 /// # Example:
73 /// ```
74 /// # use datafusion_common::utils::proxy::VecAllocExt;
75 /// let mut vec = Vec::new();
76 /// // Push data into the vec and the accounting will be updated to reflect
77 /// // memory allocation
78 /// vec.push(1);
79 /// assert_eq!(vec.allocated_size(), 16); // space for 4 u32s
80 /// vec.push(1);
81 /// assert_eq!(vec.allocated_size(), 16); // no new allocation needed
82 ///
83 /// // push more data into the vec
84 /// for _ in 0..10 {
85 /// vec.push(1);
86 /// }
87 /// assert_eq!(vec.allocated_size(), 64); // space for 64 now
88 /// ```
89 fn allocated_size(&self) -> usize;
90}
91
92impl<T> VecAllocExt for Vec<T> {
93 type T = T;
94
95 fn push_accounted(&mut self, x: Self::T, accounting: &mut usize) {
96 let prev_capacity = self.capacity();
97 self.push(x);
98 let new_capacity = self.capacity();
99 if new_capacity > prev_capacity {
100 // capacity changed, so we allocated more
101 let bump_size = (new_capacity - prev_capacity) * size_of::<T>();
102 // Note multiplication should never overflow because `push` would
103 // have panic'd first, but the checked_add could potentially
104 // overflow since accounting could be tracking additional values, and
105 // could be greater than what is stored in the Vec
106 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
107 }
108 }
109 fn allocated_size(&self) -> usize {
110 size_of::<T>() * self.capacity()
111 }
112}
113
114/// Extension trait for hash browns [`HashTable`] to account for allocations.
115pub trait HashTableAllocExt {
116 /// Item type.
117 type T;
118
119 /// Insert new element into table and increase
120 /// `accounting` by any newly allocated bytes.
121 ///
122 /// Returns the bucket where the element was inserted.
123 /// Note that allocation counts capacity, not size.
124 ///
125 /// # Example:
126 /// ```
127 /// # use datafusion_common::utils::proxy::HashTableAllocExt;
128 /// # use hashbrown::hash_table::HashTable;
129 /// let mut table = HashTable::new();
130 /// let mut allocated = 0;
131 /// let hash_fn = |x: &u32| (*x as u64) % 1000;
132 /// // pretend 0x3117 is the hash value for 1
133 /// table.insert_accounted(1, hash_fn, &mut allocated);
134 /// assert_eq!(allocated, 64);
135 ///
136 /// // insert more values
137 /// for i in 0..100 {
138 /// table.insert_accounted(i, hash_fn, &mut allocated);
139 /// }
140 /// assert_eq!(allocated, 400);
141 /// ```
142 fn insert_accounted(
143 &mut self,
144 x: Self::T,
145 hasher: impl Fn(&Self::T) -> u64,
146 accounting: &mut usize,
147 );
148}
149
150impl<T> HashTableAllocExt for HashTable<T>
151where
152 T: Eq,
153{
154 type T = T;
155
156 fn insert_accounted(
157 &mut self,
158 x: Self::T,
159 hasher: impl Fn(&Self::T) -> u64,
160 accounting: &mut usize,
161 ) {
162 let hash = hasher(&x);
163
164 // NOTE: `find_entry` does NOT grow!
165 match self.find_entry(hash, |y| y == &x) {
166 Ok(_occupied) => {}
167 Err(_absent) => {
168 if self.len() == self.capacity() {
169 // need to request more memory
170 let bump_elements = self.capacity().max(16);
171 let bump_size = bump_elements * size_of::<T>();
172 *accounting = (*accounting).checked_add(bump_size).expect("overflow");
173
174 self.reserve(bump_elements, &hasher);
175 }
176
177 // still need to insert the element since first try failed
178 self.entry(hash, |y| y == &x, hasher).insert(x);
179 }
180 }
181 }
182}