Skip to main content

samkhya_arrow/
ingest.rs

1//! Array-level ingestion helpers: feed an [`arrow::array::Array`] into a
2//! samkhya sketch.
3//!
4//! Each `ingest_array_into_*` function dispatches once on the array's
5//! `DataType`, downcasts to the concrete primitive / byte array, and
6//! walks the values. Nulls are skipped. Unsupported types are silently
7//! ignored (for HLL/Bloom/CMS) so a generalized "build sketches for
8//! every column" caller can fan out without first auditing the schema.
9//!
10//! The histogram helper is the exception: it only makes sense for
11//! numeric columns, so it surfaces an [`Error::InvalidSketch`] for
12//! non-numeric input rather than producing an empty / meaningless
13//! histogram.
14
15use arrow::array::{
16    Array, BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array, Float64Array,
17    Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
18    TimestampNanosecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array,
19};
20use arrow::datatypes::{DataType, TimeUnit};
21use samkhya_core::sketches::{BloomFilter, CountMinSketch, HllSketch};
22use samkhya_core::{Error, Result};
23
24/// Apply `f` to every non-null primitive value in `array`, downcast as
25/// `$arr_ty`, converting each to its little-endian byte representation.
26///
27/// Implemented as a macro because Arrow's `PrimitiveArray<T>` instances
28/// are distinct generic types — a single generic function would have to
29/// thread `ArrowPrimitiveType` plumbing more than we need.
30macro_rules! le_walk {
31    ($array:expr, $arr_ty:ty, $f:expr) => {{
32        let arr = $array
33            .as_any()
34            .downcast_ref::<$arr_ty>()
35            .expect("downcast guarded by data_type match arm");
36        for v in arr.iter().flatten() {
37            ($f)(&v.to_le_bytes());
38        }
39    }};
40}
41
42macro_rules! bytes_walk {
43    ($array:expr, $arr_ty:ty, $f:expr) => {{
44        let arr = $array
45            .as_any()
46            .downcast_ref::<$arr_ty>()
47            .expect("downcast guarded by data_type match arm");
48        for v in arr.iter().flatten() {
49            ($f)(v.as_ref());
50        }
51    }};
52}
53
54macro_rules! str_walk {
55    ($array:expr, $arr_ty:ty, $f:expr) => {{
56        let arr = $array
57            .as_any()
58            .downcast_ref::<$arr_ty>()
59            .expect("downcast guarded by data_type match arm");
60        for v in arr.iter().flatten() {
61            ($f)(v.as_bytes());
62        }
63    }};
64}
65
66/// Drive a per-value byte-slice callback over every supported Arrow
67/// array type. Returns `true` if the array's `DataType` was recognized,
68/// `false` otherwise — callers that need to flag unsupported types
69/// (e.g. the histogram path) check the return value.
70fn for_each_value<F: FnMut(&[u8])>(array: &dyn Array, mut f: F) -> bool {
71    match array.data_type() {
72        DataType::Int8 => le_walk!(array, Int8Array, &mut f),
73        DataType::Int16 => le_walk!(array, Int16Array, &mut f),
74        DataType::Int32 => le_walk!(array, Int32Array, &mut f),
75        DataType::Int64 => le_walk!(array, Int64Array, &mut f),
76        DataType::UInt8 => le_walk!(array, UInt8Array, &mut f),
77        DataType::UInt16 => le_walk!(array, UInt16Array, &mut f),
78        DataType::UInt32 => le_walk!(array, UInt32Array, &mut f),
79        DataType::UInt64 => le_walk!(array, UInt64Array, &mut f),
80        DataType::Float32 => le_walk!(array, Float32Array, &mut f),
81        DataType::Float64 => le_walk!(array, Float64Array, &mut f),
82        DataType::Utf8 => str_walk!(array, StringArray, &mut f),
83        DataType::LargeUtf8 => str_walk!(array, LargeStringArray, &mut f),
84        DataType::Binary => bytes_walk!(array, BinaryArray, &mut f),
85        DataType::LargeBinary => bytes_walk!(array, LargeBinaryArray, &mut f),
86        DataType::Date32 => le_walk!(array, Date32Array, &mut f),
87        DataType::Date64 => le_walk!(array, Date64Array, &mut f),
88        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
89            le_walk!(array, TimestampNanosecondArray, &mut f)
90        }
91        DataType::Boolean => {
92            let arr = array
93                .as_any()
94                .downcast_ref::<BooleanArray>()
95                .expect("downcast guarded by data_type match arm");
96            for v in arr.iter().flatten() {
97                let byte: u8 = u8::from(v);
98                f(&[byte]);
99            }
100        }
101        _ => return false,
102    }
103    true
104}
105
106/// Ingest every non-null value of `array` into `hll`, hashing as the
107/// canonical byte form described at the crate root. Unsupported types
108/// are silently skipped — they contribute zero values to the sketch.
109pub fn ingest_array_into_hll(array: &dyn Array, hll: &mut HllSketch) {
110    let _ = for_each_value(array, |bytes| hll.add(bytes));
111}
112
113/// Ingest every non-null value of `array` into `bloom`.
114pub fn ingest_array_into_bloom(array: &dyn Array, bloom: &mut BloomFilter) {
115    let _ = for_each_value(array, |bytes| bloom.insert(bytes));
116}
117
118/// Ingest every non-null value of `array` into `cms`, with a fixed
119/// per-value count weight. Use `count_per_value = 1` to count
120/// occurrences directly; use a higher weight to pre-aggregate.
121pub fn ingest_array_into_cms(array: &dyn Array, cms: &mut CountMinSketch, count_per_value: u32) {
122    let _ = for_each_value(array, |bytes| cms.add(bytes, count_per_value));
123}
124
125/// Pull non-null primitive values out of an Arrow array and append them
126/// to `out` after casting to `f64`. Used by the histogram path.
127macro_rules! collect_primitive_as_f64 {
128    ($array:expr, $arr_ty:ty, $out:expr) => {{
129        let arr = $array
130            .as_any()
131            .downcast_ref::<$arr_ty>()
132            .expect("downcast guarded by data_type match arm");
133        for v in arr.iter().flatten() {
134            $out.push(v as f64);
135        }
136    }};
137}
138
139/// Extract non-null numeric values from `array` as `f64`, ready to feed
140/// into [`samkhya_core::sketches::EquiDepthHistogram::from_values`].
141/// Returns an [`Error::InvalidSketch`] for non-numeric arrays — the
142/// histogram has no meaningful interpretation over strings / bytes /
143/// booleans.
144pub fn ingest_array_into_histogram_values(array: &dyn Array) -> Result<Vec<f64>> {
145    let mut out: Vec<f64> = Vec::with_capacity(array.len());
146    match array.data_type() {
147        DataType::Int8 => collect_primitive_as_f64!(array, Int8Array, out),
148        DataType::Int16 => collect_primitive_as_f64!(array, Int16Array, out),
149        DataType::Int32 => collect_primitive_as_f64!(array, Int32Array, out),
150        DataType::Int64 => collect_primitive_as_f64!(array, Int64Array, out),
151        DataType::UInt8 => collect_primitive_as_f64!(array, UInt8Array, out),
152        DataType::UInt16 => collect_primitive_as_f64!(array, UInt16Array, out),
153        DataType::UInt32 => collect_primitive_as_f64!(array, UInt32Array, out),
154        DataType::UInt64 => collect_primitive_as_f64!(array, UInt64Array, out),
155        DataType::Float32 => collect_primitive_as_f64!(array, Float32Array, out),
156        DataType::Float64 => collect_primitive_as_f64!(array, Float64Array, out),
157        // Date / timestamp columns are integer-backed and order-preserving
158        // under the f64 cast, so they remain meaningful for range
159        // selectivity. Nanosecond timestamps past 2^53 lose precision,
160        // but the equi-depth histogram is a lossy summary anyway.
161        DataType::Date32 => collect_primitive_as_f64!(array, Date32Array, out),
162        DataType::Date64 => collect_primitive_as_f64!(array, Date64Array, out),
163        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
164            collect_primitive_as_f64!(array, TimestampNanosecondArray, out)
165        }
166        other => {
167            return Err(Error::InvalidSketch(format!(
168                "histogram requires a numeric Arrow type, got {other:?}"
169            )));
170        }
171    }
172    Ok(out)
173}