use crate::stats::Precision;
use crate::{ColumnStatistics, ScalarValue, Statistics, TableReference};
use arrow::array::{
Array, FixedSizeListArray, LargeListArray, LargeListViewArray, ListArray,
ListViewArray, MapArray, StructArray,
};
use arrow::datatypes::{
DataType, Field, Fields, IntervalDayTime, IntervalMonthDayNano, IntervalUnit,
TimeUnit, UnionFields, UnionMode, i256,
};
use chrono::{DateTime, Utc};
use half::f16;
use hashbrown::HashSet;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
pub trait DFHeapSize {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize;
}
#[derive(Default)]
pub struct DFHeapSizeCtx {
seen: HashSet<usize>,
}
impl DFHeapSize for Statistics {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.num_rows.heap_size(ctx)
+ self.total_byte_size.heap_size(ctx)
+ self.column_statistics.heap_size(ctx)
}
}
impl DFHeapSize for TableReference {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
match self {
TableReference::Bare { table } => table.heap_size(ctx),
TableReference::Partial { schema, table } => {
schema.heap_size(ctx) + table.heap_size(ctx)
}
TableReference::Full {
catalog,
schema,
table,
} => catalog.heap_size(ctx) + schema.heap_size(ctx) + table.heap_size(ctx),
}
}
}
impl<T: Debug + Clone + PartialEq + Eq + PartialOrd + DFHeapSize> DFHeapSize
for Precision<T>
{
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.get_value().map_or_else(|| 0, |v| v.heap_size(ctx))
}
}
impl DFHeapSize for ColumnStatistics {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.null_count.heap_size(ctx)
+ self.max_value.heap_size(ctx)
+ self.min_value.heap_size(ctx)
+ self.sum_value.heap_size(ctx)
+ self.distinct_count.heap_size(ctx)
+ self.byte_size.heap_size(ctx)
}
}
impl DFHeapSize for ScalarValue {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
use crate::scalar::ScalarValue::*;
match self {
Null => 0,
Boolean(b) => b.heap_size(ctx),
Float16(f) => f.heap_size(ctx),
Float32(f) => f.heap_size(ctx),
Float64(f) => f.heap_size(ctx),
Decimal32(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
Decimal64(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
Decimal128(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
Decimal256(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
Int8(i) => i.heap_size(ctx),
Int16(i) => i.heap_size(ctx),
Int32(i) => i.heap_size(ctx),
Int64(i) => i.heap_size(ctx),
UInt8(u) => u.heap_size(ctx),
UInt16(u) => u.heap_size(ctx),
UInt32(u) => u.heap_size(ctx),
UInt64(u) => u.heap_size(ctx),
Utf8(u) => u.heap_size(ctx),
Utf8View(u) => u.heap_size(ctx),
LargeUtf8(l) => l.heap_size(ctx),
Binary(b) => b.heap_size(ctx),
BinaryView(b) => b.heap_size(ctx),
FixedSizeBinary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
LargeBinary(l) => l.heap_size(ctx),
FixedSizeList(f) => f.heap_size(ctx),
List(l) => l.heap_size(ctx),
LargeList(l) => l.heap_size(ctx),
Struct(s) => s.heap_size(ctx),
Map(m) => m.heap_size(ctx),
Date32(d) => d.heap_size(ctx),
Date64(d) => d.heap_size(ctx),
Time32Second(t) => t.heap_size(ctx),
Time32Millisecond(t) => t.heap_size(ctx),
Time64Microsecond(t) => t.heap_size(ctx),
Time64Nanosecond(t) => t.heap_size(ctx),
TimestampSecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
TimestampMillisecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
TimestampMicrosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
TimestampNanosecond(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
IntervalYearMonth(i) => i.heap_size(ctx),
IntervalDayTime(i) => i.heap_size(ctx),
IntervalMonthDayNano(i) => i.heap_size(ctx),
DurationSecond(d) => d.heap_size(ctx),
DurationMillisecond(d) => d.heap_size(ctx),
DurationMicrosecond(d) => d.heap_size(ctx),
DurationNanosecond(d) => d.heap_size(ctx),
Union(a, b, c) => a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx),
Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
RunEndEncoded(a, b, c) => {
a.heap_size(ctx) + b.heap_size(ctx) + c.heap_size(ctx)
}
ListView(a) => a.heap_size(ctx),
LargeListView(a) => a.heap_size(ctx),
}
}
}
impl DFHeapSize for DataType {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
use DataType::*;
match self {
Null => 0,
Boolean => 0,
Int8 => 0,
Int16 => 0,
Int32 => 0,
Int64 => 0,
UInt8 => 0,
UInt16 => 0,
UInt32 => 0,
UInt64 => 0,
Float16 => 0,
Float32 => 0,
Float64 => 0,
Timestamp(t, s) => t.heap_size(ctx) + s.heap_size(ctx),
Date32 => 0,
Date64 => 0,
Time32(t) => t.heap_size(ctx),
Time64(t) => t.heap_size(ctx),
Duration(t) => t.heap_size(ctx),
Interval(i) => i.heap_size(ctx),
Binary => 0,
FixedSizeBinary(i) => i.heap_size(ctx),
LargeBinary => 0,
BinaryView => 0,
Utf8 => 0,
LargeUtf8 => 0,
Utf8View => 0,
List(v) => v.heap_size(ctx),
ListView(v) => v.heap_size(ctx),
FixedSizeList(f, i) => f.heap_size(ctx) + i.heap_size(ctx),
LargeList(l) => l.heap_size(ctx),
LargeListView(l) => l.heap_size(ctx),
Struct(s) => s.heap_size(ctx),
Union(u, m) => u.heap_size(ctx) + m.heap_size(ctx),
Dictionary(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
Decimal32(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
Decimal64(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
Decimal128(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
Decimal256(p, s) => p.heap_size(ctx) + s.heap_size(ctx),
Map(m, b) => m.heap_size(ctx) + b.heap_size(ctx),
RunEndEncoded(a, b) => a.heap_size(ctx) + b.heap_size(ctx),
}
}
}
impl<T: DFHeapSize> DFHeapSize for Vec<T> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
let item_size = size_of::<T>();
(self.capacity() * item_size) +
self.iter().map(|t| t.heap_size(ctx)).sum::<usize>()
}
}
impl<K: DFHeapSize, V: DFHeapSize> DFHeapSize for HashMap<K, V> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
let capacity = self.capacity();
if capacity == 0 {
return 0;
}
let key_val_size = size_of::<(K, V)>();
let group_size = 16;
let metadata_size = 1;
let buckets = if capacity < 15 {
let min_cap = match key_val_size {
0..=1 => 14,
2..=3 => 7,
_ => 3,
};
let cap = min_cap.max(capacity);
if cap < 4 {
4
} else if cap < 8 {
8
} else {
16
}
} else {
(capacity.saturating_mul(8) / 7).next_power_of_two()
};
group_size
+ (buckets * (key_val_size + metadata_size))
+ self.keys().map(|k| k.heap_size(ctx)).sum::<usize>()
+ self.values().map(|v| v.heap_size(ctx)).sum::<usize>()
}
}
impl<T: DFHeapSize> DFHeapSize for Arc<T> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
let ptr = Arc::as_ptr(self) as usize;
if !ctx.seen.insert(ptr) {
return 0;
}
2 * size_of::<usize>() + size_of::<T>() + self.as_ref().heap_size(ctx)
}
}
impl DFHeapSize for Arc<str> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
let ptr = Arc::as_ptr(self) as *const i32 as usize;
if !ctx.seen.insert(ptr) {
return 0;
}
2 * size_of::<usize>() + self.as_ref().heap_size(ctx)
}
}
impl DFHeapSize for Arc<dyn DFHeapSize> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
let ptr = Arc::as_ptr(self) as *const i32 as usize;
if !ctx.seen.insert(ptr) {
return 0;
}
2 * size_of::<usize>() + size_of_val(self.as_ref()) + self.as_ref().heap_size(ctx)
}
}
impl DFHeapSize for Fields {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.into_iter().map(|f| f.heap_size(ctx)).sum::<usize>()
}
}
impl DFHeapSize for StructArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for LargeListArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for LargeListViewArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for ListArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for ListViewArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for FixedSizeListArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl DFHeapSize for MapArray {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.get_array_memory_size()
}
}
impl<T: DFHeapSize> DFHeapSize for Box<T> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
size_of::<T>() + self.as_ref().heap_size(ctx)
}
}
impl<T: DFHeapSize> DFHeapSize for Option<T> {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.as_ref().map(|inner| inner.heap_size(ctx)).unwrap_or(0)
}
}
impl<A, B> DFHeapSize for (A, B)
where
A: DFHeapSize,
B: DFHeapSize,
{
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.0.heap_size(ctx) + self.1.heap_size(ctx)
}
}
impl DFHeapSize for String {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.capacity()
}
}
impl DFHeapSize for str {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
self.len()
}
}
impl DFHeapSize for UnionFields {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.iter()
.map(|f| f.0.heap_size(ctx) + f.1.heap_size(ctx))
.sum()
}
}
impl DFHeapSize for UnionMode {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for TimeUnit {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for IntervalUnit {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for Field {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.name().heap_size(ctx)
+ self.data_type().heap_size(ctx)
+ self.is_nullable().heap_size(ctx)
+ self.dict_is_ordered().heap_size(ctx)
+ self.metadata().heap_size(ctx)
}
}
impl DFHeapSize for IntervalMonthDayNano {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.days.heap_size(ctx)
+ self.months.heap_size(ctx)
+ self.nanoseconds.heap_size(ctx)
}
}
impl DFHeapSize for IntervalDayTime {
fn heap_size(&self, ctx: &mut DFHeapSizeCtx) -> usize {
self.days.heap_size(ctx) + self.milliseconds.heap_size(ctx)
}
}
impl DFHeapSize for DateTime<Utc> {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for bool {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for u8 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for u16 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for u32 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for u64 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i8 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i16 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i32 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i64 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i128 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for i256 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for f16 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for f32 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for f64 {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
impl DFHeapSize for usize {
fn heap_size(&self, _: &mut DFHeapSizeCtx) -> usize {
0 }
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_heap_size_arc_avoid_double_accounting() {
let a1 = Arc::new(vec![1, 2, 3]);
let mut ctx = DFHeapSizeCtx::default();
let heap_size = a1.heap_size(&mut ctx);
let a2 = Arc::clone(&a1);
let a3 = Arc::clone(&a1);
let a4 = Arc::clone(&a3);
let mut ctx = DFHeapSizeCtx::default();
let heap_size_with_clones = a1.heap_size(&mut ctx)
+ a2.heap_size(&mut ctx)
+ a3.heap_size(&mut ctx)
+ a4.heap_size(&mut ctx);
assert_eq!(heap_size, heap_size_with_clones);
}
#[test]
fn test_heap_size_arc_str_avoid_double_accounting() {
let a1: Arc<str> = Arc::from("Hello");
let mut ctx = DFHeapSizeCtx::default();
let heap_size = a1.heap_size(&mut ctx);
let a2 = Arc::clone(&a1);
let a3 = Arc::clone(&a1);
let a4 = Arc::clone(&a3);
let mut ctx = DFHeapSizeCtx::default();
let heap_size_with_clones = a1.heap_size(&mut ctx)
+ a2.heap_size(&mut ctx)
+ a3.heap_size(&mut ctx)
+ a4.heap_size(&mut ctx);
assert_eq!(heap_size, heap_size_with_clones);
}
}