1use arrow::array::{
16 Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
17 Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
18 TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
19};
20use arrow::datatypes::{DataType, TimeUnit};
21use samkhya_core::sketches::{BloomFilter, CountMinSketch, HllSketch};
22use samkhya_core::{Error, Result};
23
24macro_rules! le_walk {
31 ($array:expr, $arr_ty:ty, $f:expr) => {{
32 let arr = $array
33 .as_any()
34 .downcast_ref::<$arr_ty>()
35 .expect("downcast guarded by data_type match arm");
36 for v in arr.iter().flatten() {
37 ($f)(&v.to_le_bytes());
38 }
39 }};
40}
41
42macro_rules! bytes_walk {
43 ($array:expr, $arr_ty:ty, $f:expr) => {{
44 let arr = $array
45 .as_any()
46 .downcast_ref::<$arr_ty>()
47 .expect("downcast guarded by data_type match arm");
48 for v in arr.iter().flatten() {
49 ($f)(v.as_ref());
50 }
51 }};
52}
53
54macro_rules! str_walk {
55 ($array:expr, $arr_ty:ty, $f:expr) => {{
56 let arr = $array
57 .as_any()
58 .downcast_ref::<$arr_ty>()
59 .expect("downcast guarded by data_type match arm");
60 for v in arr.iter().flatten() {
61 ($f)(v.as_bytes());
62 }
63 }};
64}
65
66fn for_each_value<F: FnMut(&[u8])>(array: &dyn Array, mut f: F) -> bool {
71 match array.data_type() {
72 DataType::Int8 => le_walk!(array, Int8Array, &mut f),
73 DataType::Int16 => le_walk!(array, Int16Array, &mut f),
74 DataType::Int32 => le_walk!(array, Int32Array, &mut f),
75 DataType::Int64 => le_walk!(array, Int64Array, &mut f),
76 DataType::UInt8 => le_walk!(array, UInt8Array, &mut f),
77 DataType::UInt16 => le_walk!(array, UInt16Array, &mut f),
78 DataType::UInt32 => le_walk!(array, UInt32Array, &mut f),
79 DataType::UInt64 => le_walk!(array, UInt64Array, &mut f),
80 DataType::Float32 => le_walk!(array, Float32Array, &mut f),
81 DataType::Float64 => le_walk!(array, Float64Array, &mut f),
82 DataType::Utf8 => str_walk!(array, StringArray, &mut f),
83 DataType::LargeUtf8 => str_walk!(array, LargeStringArray, &mut f),
84 DataType::Binary => bytes_walk!(array, BinaryArray, &mut f),
85 DataType::LargeBinary => bytes_walk!(array, LargeBinaryArray, &mut f),
86 DataType::Date32 => le_walk!(array, Date32Array, &mut f),
87 DataType::Date64 => le_walk!(array, Date64Array, &mut f),
88 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
89 le_walk!(array, TimestampNanosecondArray, &mut f)
90 }
91 DataType::Boolean => {
92 let arr = array
93 .as_any()
94 .downcast_ref::<BooleanArray>()
95 .expect("downcast guarded by data_type match arm");
96 for v in arr.iter().flatten() {
97 let byte: u8 = u8::from(v);
98 f(&[byte]);
99 }
100 }
101 _ => return false,
102 }
103 true
104}
105
106pub fn ingest_array_into_hll(array: &dyn Array, hll: &mut HllSketch) {
110 let _ = for_each_value(array, |bytes| hll.add(bytes));
111}
112
113pub fn ingest_array_into_bloom(array: &dyn Array, bloom: &mut BloomFilter) {
115 let _ = for_each_value(array, |bytes| bloom.insert(bytes));
116}
117
118pub fn ingest_array_into_cms(array: &dyn Array, cms: &mut CountMinSketch, count_per_value: u32) {
122 let _ = for_each_value(array, |bytes| cms.add(bytes, count_per_value));
123}
124
125macro_rules! collect_primitive_as_f64 {
128 ($array:expr, $arr_ty:ty, $out:expr) => {{
129 let arr = $array
130 .as_any()
131 .downcast_ref::<$arr_ty>()
132 .expect("downcast guarded by data_type match arm");
133 for v in arr.iter().flatten() {
134 $out.push(v as f64);
135 }
136 }};
137}
138
139pub fn ingest_array_into_histogram_values(array: &dyn Array) -> Result<Vec<f64>> {
145 let mut out: Vec<f64> = Vec::with_capacity(array.len());
146 match array.data_type() {
147 DataType::Int8 => collect_primitive_as_f64!(array, Int8Array, out),
148 DataType::Int16 => collect_primitive_as_f64!(array, Int16Array, out),
149 DataType::Int32 => collect_primitive_as_f64!(array, Int32Array, out),
150 DataType::Int64 => collect_primitive_as_f64!(array, Int64Array, out),
151 DataType::UInt8 => collect_primitive_as_f64!(array, UInt8Array, out),
152 DataType::UInt16 => collect_primitive_as_f64!(array, UInt16Array, out),
153 DataType::UInt32 => collect_primitive_as_f64!(array, UInt32Array, out),
154 DataType::UInt64 => collect_primitive_as_f64!(array, UInt64Array, out),
155 DataType::Float32 => collect_primitive_as_f64!(array, Float32Array, out),
156 DataType::Float64 => collect_primitive_as_f64!(array, Float64Array, out),
157 DataType::Date32 => collect_primitive_as_f64!(array, Date32Array, out),
162 DataType::Date64 => collect_primitive_as_f64!(array, Date64Array, out),
163 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
164 collect_primitive_as_f64!(array, TimestampNanosecondArray, out)
165 }
166 other => {
167 return Err(Error::InvalidSketch(format!(
168 "histogram requires a numeric Arrow type, got {other:?}"
169 )));
170 }
171 }
172 Ok(out)
173}