Skip to main content

datafusion_common/
heap_size.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::stats::Precision;
19use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
20use arrow::array::{
21    Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
22    ListViewArray, MapArray, StructArray,
23};
24use arrow::datatypes::{
25    DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit,
26    TimeUnit, UnionFields, UnionMode, i256,
27};
28use chrono::{DateTime, Utc};
29use half::f16;
30use hashbrown::HashSet;
31use std::collections::HashMap;
32use std::fmt::Debug;
33use std::sync::Arc;
34
35/// This is a temporary solution until <https://github.com/apache/datafusion/pull/19599> and
36/// <https://github.com/apache/arrow-rs/pull/9138> are resolved.
37/// Trait for calculating the size of various containers
38pub trait DFHeapSize {
39    /// Return the size of any bytes allocated on the heap by this object,
40    /// including heap memory in those structures
41    ///
42    /// Note that the size of the type itself is not included in the result --
43    /// instead, that size is added by the caller (e.g. container).
44    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
45}
46
47#[derive(Default)]
48pub struct DFHeapSizeCtx {
49    seen: HashSet<usize>,
50}
51
52impl DFHeapSize for Statistics {
53    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
54        self.num_rows.heap_size(ctx)
55            + self.total_byte_size.heap_size(ctx)
56            + self.column_statistics.heap_size(ctx)
57    }
58}
59
60impl DFHeapSize for TableReference {
61    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
62        match self {
63            TableReference::Bare { table } => table.heap_size(ctx),
64            TableReference::Partial { schema, table } => {
65                schema.heap_size(ctx) + table.heap_size(ctx)
66            }
67            TableReference::Full {
68                catalog,
69                schema,
70                table,
71            } => catalog.heap_size(ctx) + schema.heap_size(ctx) + table.heap_size(ctx),
72        }
73    }
74}
75
76impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
77    for Precision<T>
78{
79    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
80        self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
81    }
82}
83
84impl DFHeapSize for ColumnStatistics {
85    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
86        self.null_count.heap_size(ctx)
87            + self.max_value.heap_size(ctx)
88            + self.min_value.heap_size(ctx)
89            + self.sum_value.heap_size(ctx)
90            + self.distinct_count.heap_size(ctx)
91            + self.byte_size.heap_size(ctx)
92    }
93}
94
95impl DFHeapSize for ScalarValue {
96    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
97        use crate::scalar::ScalarValue::*;
98        match self {
99            Null => 0,
100            Boolean(b) => b.heap_size(ctx),
101            Float16(f) => f.heap_size(ctx),
102            Float32(f) => f.heap_size(ctx),
103            Float64(f) => f.heap_size(ctx),
104            Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
105            Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
106            Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
107            Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
108            Int8(i) => i.heap_size(ctx),
109            Int16(i) => i.heap_size(ctx),
110            Int32(i) => i.heap_size(ctx),
111            Int64(i) => i.heap_size(ctx),
112            UInt8(u) => u.heap_size(ctx),
113            UInt16(u) => u.heap_size(ctx),
114            UInt32(u) => u.heap_size(ctx),
115            UInt64(u) => u.heap_size(ctx),
116            Utf8(u) => u.heap_size(ctx),
117            Utf8View(u) => u.heap_size(ctx),
118            LargeUtf8(l) => l.heap_size(ctx),
119            Binary(b) => b.heap_size(ctx),
120            BinaryView(b) => b.heap_size(ctx),
121            FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
122            LargeBinary(l) => l.heap_size(ctx),
123            FixedSizeList(f) => f.heap_size(ctx),
124            List(l) => l.heap_size(ctx),
125            LargeList(l) => l.heap_size(ctx),
126            Struct(s) => s.heap_size(ctx),
127            Map(m) => m.heap_size(ctx),
128            Date32(d) => d.heap_size(ctx),
129            Date64(d) => d.heap_size(ctx),
130            Time32Second(t) => t.heap_size(ctx),
131            Time32Millisecond(t) => t.heap_size(ctx),
132            Time64Microsecond(t) => t.heap_size(ctx),
133            Time64Nanosecond(t) => t.heap_size(ctx),
134            TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
135            TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
136            TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
137            TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
138            IntervalYearMonth(i) => i.heap_size(ctx),
139            IntervalDayTime(i) => i.heap_size(ctx),
140            IntervalMonthDayNano(i) => i.heap_size(ctx),
141            DurationSecond(d) => d.heap_size(ctx),
142            DurationMillisecond(d) => d.heap_size(ctx),
143            DurationMicrosecond(d) => d.heap_size(ctx),
144            DurationNanosecond(d) => d.heap_size(ctx),
145            Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
146            Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
147            RunEndEncoded(a, b, c) => {
148                a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
149            }
150            ListView(a) => a.heap_size(ctx),
151            LargeListView(a) => a.heap_size(ctx),
152        }
153    }
154}
155
156impl DFHeapSize for DataType {
157    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
158        use DataType::*;
159        match self {
160            Null => 0,
161            Boolean => 0,
162            Int8 => 0,
163            Int16 => 0,
164            Int32 => 0,
165            Int64 => 0,
166            UInt8 => 0,
167            UInt16 => 0,
168            UInt32 => 0,
169            UInt64 => 0,
170            Float16 => 0,
171            Float32 => 0,
172            Float64 => 0,
173            Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
174            Date32 => 0,
175            Date64 => 0,
176            Time32(t) => t.heap_size(ctx),
177            Time64(t) => t.heap_size(ctx),
178            Duration(t) => t.heap_size(ctx),
179            Interval(i) => i.heap_size(ctx),
180            Binary => 0,
181            FixedSizeBinary(i) => i.heap_size(ctx),
182            LargeBinary => 0,
183            BinaryView => 0,
184            Utf8 => 0,
185            LargeUtf8 => 0,
186            Utf8View => 0,
187            List(v) => v.heap_size(ctx),
188            ListView(v) => v.heap_size(ctx),
189            FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
190            LargeList(l) => l.heap_size(ctx),
191            LargeListView(l) => l.heap_size(ctx),
192            Struct(s) => s.heap_size(ctx),
193            Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
194            Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
195            Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
196            Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
197            Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
198            Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
199            Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
200            RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
201        }
202    }
203}
204
205impl<T: DFHeapSize> DFHeapSize for Vec<T> {
206    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
207        let item_size = size_of::<T>();
208        // account for the contents of the Vec
209        (self.capacity() * item_size) +
210            // add any heap allocations by contents
211            self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
212    }
213}
214
215impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
216    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
217        let capacity = self.capacity();
218        if capacity == 0 {
219            return 0;
220        }
221
222        // HashMap doesn't provide a way to get its heap size, so this is an approximation based on
223        // the behavior of hashbrown::HashMap as at version 0.16.0, and may become inaccurate
224        // if the implementation changes.
225        let key_val_size = size_of::<(K, V)>();
226        // Overhead for the control tags group, which may be smaller depending on architecture
227        let group_size = 16;
228        // 1 byte of metadata stored per bucket.
229        let metadata_size = 1;
230
231        // Compute the number of buckets for the capacity. Based on hashbrown's capacity_to_buckets
232        let buckets = if capacity < 15 {
233            let min_cap = match key_val_size {
234                0..=1 => 14,
235                2..=3 => 7,
236                _ => 3,
237            };
238            let cap = min_cap.max(capacity);
239            if cap < 4 {
240                4
241            } else if cap < 8 {
242                8
243            } else {
244                16
245            }
246        } else {
247            (capacity.saturating_mul(8) / 7).next_power_of_two()
248        };
249
250        group_size
251            + (buckets * (key_val_size + metadata_size))
252            + self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
253            + self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
254    }
255}
256
257impl<T: DFHeapSize> DFHeapSize for Arc<T> {
258    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
259        let ptr = Arc::as_ptr(self) as usize;
260
261        if !ctx.seen.insert(ptr) {
262            return 0;
263        }
264
265        // Arc stores weak and strong counts on the heap alongside an instance of T
266        2 * size_of::<usize>() + size_of::<T>() + self.as_ref().heap_size(ctx)
267    }
268}
269
270impl DFHeapSize for Arc<str> {
271    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
272        let ptr = Arc::as_ptr(self) as *const i32 as usize;
273
274        if !ctx.seen.insert(ptr) {
275            return 0;
276        }
277
278        // Arc stores weak and strong counts on the heap alongside an instance of T
279        2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
280    }
281}
282
283impl DFHeapSize for Arc<dyn DFHeapSize> {
284    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
285        let ptr = Arc::as_ptr(self) as *const i32 as usize;
286
287        if !ctx.seen.insert(ptr) {
288            return 0;
289        }
290
291        // Arc stores weak and strong counts on the heap alongside an instance of T
292        2 * size_of::<usize>() + size_of_val(self.as_ref()) + self.as_ref().heap_size(ctx)
293    }
294}
295
296impl DFHeapSize for Fields {
297    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
298        self.into_iter().map(|f| f.heap_size(ctx)).sum::<usize>()
299    }
300}
301
302impl DFHeapSize for StructArray {
303    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
304        self.get_array_memory_size()
305    }
306}
307
308impl DFHeapSize for LargeListArray {
309    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
310        self.get_array_memory_size()
311    }
312}
313
314impl DFHeapSize for LargeListViewArray {
315    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
316        self.get_array_memory_size()
317    }
318}
319
320impl DFHeapSize for ListArray {
321    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
322        self.get_array_memory_size()
323    }
324}
325
326impl DFHeapSize for ListViewArray {
327    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
328        self.get_array_memory_size()
329    }
330}
331
332impl DFHeapSize for FixedSizeListArray {
333    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
334        self.get_array_memory_size()
335    }
336}
337impl DFHeapSize for MapArray {
338    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
339        self.get_array_memory_size()
340    }
341}
342
343impl<T: DFHeapSize> DFHeapSize for Box<T> {
344    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
345        size_of::<T>() + self.as_ref().heap_size(ctx)
346    }
347}
348
349impl<T: DFHeapSize> DFHeapSize for Option<T> {
350    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
351        self.as_ref().map(|inner| inner.heap_size(ctx)).unwrap_or(0)
352    }
353}
354
355impl<A, B> DFHeapSize for (A, B)
356where
357    A: DFHeapSize,
358    B: DFHeapSize,
359{
360    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
361        self.0.heap_size(ctx) + self.1.heap_size(ctx)
362    }
363}
364
365impl DFHeapSize for String {
366    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
367        self.capacity()
368    }
369}
370
371impl DFHeapSize for str {
372    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
373        self.len()
374    }
375}
376
377impl DFHeapSize for UnionFields {
378    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
379        self.iter()
380            .map(|f| f.0.heap_size(ctx) + f.1.heap_size(ctx))
381            .sum()
382    }
383}
384
385impl DFHeapSize for UnionMode {
386    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
387        0 // no heap allocations
388    }
389}
390
391impl DFHeapSize for TimeUnit {
392    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
393        0 // no heap allocations
394    }
395}
396
397impl DFHeapSize for IntervalUnit {
398    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
399        0 // no heap allocations
400    }
401}
402
403impl DFHeapSize for Field {
404    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
405        self.name().heap_size(ctx)
406            + self.data_type().heap_size(ctx)
407            + self.is_nullable().heap_size(ctx)
408            + self.dict_is_ordered().heap_size(ctx)
409            + self.metadata().heap_size(ctx)
410    }
411}
412
413impl DFHeapSize for IntervalMonthDayNano {
414    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
415        self.days.heap_size(ctx)
416            + self.months.heap_size(ctx)
417            + self.nanoseconds.heap_size(ctx)
418    }
419}
420
421impl DFHeapSize for IntervalDayTime {
422    fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
423        self.days.heap_size(ctx) + self.milliseconds.heap_size(ctx)
424    }
425}
426
427impl DFHeapSize for DateTime<Utc> {
428    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
429        0 // no heap allocations
430    }
431}
432
433impl DFHeapSize for bool {
434    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
435        0 // no heap allocations
436    }
437}
438impl DFHeapSize for u8 {
439    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
440        0 // no heap allocations
441    }
442}
443
444impl DFHeapSize for u16 {
445    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
446        0 // no heap allocations
447    }
448}
449
450impl DFHeapSize for u32 {
451    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
452        0 // no heap allocations
453    }
454}
455
456impl DFHeapSize for u64 {
457    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
458        0 // no heap allocations
459    }
460}
461
462impl DFHeapSize for i8 {
463    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
464        0 // no heap allocations
465    }
466}
467
468impl DFHeapSize for i16 {
469    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
470        0 // no heap allocations
471    }
472}
473
474impl DFHeapSize for i32 {
475    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
476        0 // no heap allocations
477    }
478}
479impl DFHeapSize for i64 {
480    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
481        0 // no heap allocations
482    }
483}
484
485impl DFHeapSize for i128 {
486    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
487        0 // no heap allocations
488    }
489}
490
491impl DFHeapSize for i256 {
492    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
493        0 // no heap allocations
494    }
495}
496
497impl DFHeapSize for f16 {
498    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
499        0 // no heap allocations
500    }
501}
502
503impl DFHeapSize for f32 {
504    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
505        0 // no heap allocations
506    }
507}
508impl DFHeapSize for f64 {
509    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
510        0 // no heap allocations
511    }
512}
513
514impl DFHeapSize for usize {
515    fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
516        0 // no heap allocations
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523
524    #[test]
525    fn test_heap_size_arc_avoid_double_accounting() {
526        let a1 = Arc::new(vec![1, 2, 3]);
527        let mut ctx = DFHeapSizeCtx::default();
528        let heap_size = a1.heap_size(&mut ctx);
529
530        let a2 = Arc::clone(&a1);
531        let a3 = Arc::clone(&a1);
532        let a4 = Arc::clone(&a3);
533
534        let mut ctx = DFHeapSizeCtx::default();
535        let heap_size_with_clones = a1.heap_size(&mut ctx)
536            + a2.heap_size(&mut ctx)
537            + a3.heap_size(&mut ctx)
538            + a4.heap_size(&mut ctx);
539
540        assert_eq!(heap_size, heap_size_with_clones);
541    }
542
543    #[test]
544    fn test_heap_size_arc_str_avoid_double_accounting() {
545        let a1: Arc<str> = Arc::from("Hello");
546        let mut ctx = DFHeapSizeCtx::default();
547        let heap_size = a1.heap_size(&mut ctx);
548
549        let a2 = Arc::clone(&a1);
550        let a3 = Arc::clone(&a1);
551        let a4 = Arc::clone(&a3);
552
553        let mut ctx = DFHeapSizeCtx::default();
554        let heap_size_with_clones = a1.heap_size(&mut ctx)
555            + a2.heap_size(&mut ctx)
556            + a3.heap_size(&mut ctx)
557            + a4.heap_size(&mut ctx);
558
559        assert_eq!(heap_size, heap_size_with_clones);
560    }
561}