Skip to main content

vortex_array/aggregate_fn/fns/bounded_min/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14use vortex_session::registry::CachedId;
15
16use crate::ArrayRef;
17use crate::Columnar;
18use crate::ExecutionCtx;
19use crate::IntoArray;
20use crate::aggregate_fn::AggregateFnId;
21use crate::aggregate_fn::AggregateFnRef;
22use crate::aggregate_fn::AggregateFnSatisfaction;
23use crate::aggregate_fn::AggregateFnVTable;
24use crate::aggregate_fn::NumericalAggregateOpts;
25use crate::aggregate_fn::fns::min::Min;
26use crate::aggregate_fn::fns::min_max::MinMax;
27use crate::aggregate_fn::fns::min_max::min_max;
28use crate::dtype::DType;
29use crate::partial_ord::partial_min;
30use crate::scalar::Scalar;
31use crate::scalar::ScalarTruncation;
32use crate::scalar::lower_bound;
33
34/// Options for [`BoundedMin`].
35#[derive(Clone, Debug, PartialEq, Eq, Hash)]
36pub struct BoundedMinOptions {
37    /// Maximum byte length for UTF8/Binary bounds.
38    pub max_bytes: NonZeroUsize,
39}
40
41impl Display for BoundedMinOptions {
42    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43        write!(f, "{}", self.max_bytes.get())
44    }
45}
46
47/// Compute a byte-bounded lower bound for the minimum non-null value of a UTF8/Binary array.
48#[derive(Clone, Debug)]
49pub struct BoundedMin;
50
51enum BoundedMinState {
52    Empty,
53    Value(Scalar),
54}
55
56/// Partial accumulator state for the bounded minimum aggregate.
57pub struct BoundedMinPartial {
58    state: BoundedMinState,
59    element_dtype: DType,
60    max_bytes: NonZeroUsize,
61}
62
63impl BoundedMinPartial {
64    fn merge(&mut self, min: Scalar) {
65        if min.is_null() {
66            return;
67        }
68
69        self.state = match std::mem::replace(&mut self.state, BoundedMinState::Empty) {
70            BoundedMinState::Empty => BoundedMinState::Value(min),
71            BoundedMinState::Value(current) => BoundedMinState::Value(
72                partial_min(min, current).vortex_expect("incomparable bounded min scalars"),
73            ),
74        };
75    }
76}
77
78impl AggregateFnVTable for BoundedMin {
79    type Options = BoundedMinOptions;
80    type Partial = BoundedMinPartial;
81
82    fn id(&self) -> AggregateFnId {
83        static ID: CachedId = CachedId::new("vortex.bounded_min");
84        *ID
85    }
86
87    fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
88        let max_bytes = u64::try_from(options.max_bytes.get())?;
89        Ok(Some(max_bytes.to_le_bytes().to_vec()))
90    }
91
92    fn deserialize(
93        &self,
94        metadata: &[u8],
95        _session: &VortexSession,
96    ) -> VortexResult<Self::Options> {
97        vortex_ensure!(
98            metadata.len() == size_of::<u64>(),
99            "BoundedMin options expected {} bytes, got {}",
100            size_of::<u64>(),
101            metadata.len()
102        );
103        let mut bytes = [0u8; size_of::<u64>()];
104        bytes.copy_from_slice(metadata);
105        let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
106        vortex_ensure!(max_bytes > 0, "BoundedMin requires max_bytes > 0");
107        Ok(BoundedMinOptions {
108            max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
109        })
110    }
111
112    fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
113        supported_dtype(options, input_dtype).map(DType::as_nullable)
114    }
115
116    fn can_satisfy(
117        &self,
118        options: &Self::Options,
119        requested: &AggregateFnRef,
120    ) -> AggregateFnSatisfaction {
121        if let Some(other) = requested.as_opt::<Self>() {
122            return if other == options {
123                AggregateFnSatisfaction::Exact
124            } else if options.max_bytes >= other.max_bytes {
125                AggregateFnSatisfaction::Approximate
126            } else {
127                AggregateFnSatisfaction::No
128            };
129        }
130
131        // The stored bound skips NaNs, so it cannot stand in for a NaN-including minimum.
132        if requested
133            .as_opt::<Min>()
134            .is_some_and(|options| options.skip_nans)
135        {
136            AggregateFnSatisfaction::Approximate
137        } else {
138            AggregateFnSatisfaction::No
139        }
140    }
141
142    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
143        self.return_dtype(options, input_dtype)
144    }
145
146    fn empty_partial(
147        &self,
148        options: &Self::Options,
149        input_dtype: &DType,
150    ) -> VortexResult<Self::Partial> {
151        Ok(BoundedMinPartial {
152            state: BoundedMinState::Empty,
153            element_dtype: input_dtype.clone(),
154            max_bytes: options.max_bytes,
155        })
156    }
157
158    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
159        partial.merge(other);
160        Ok(())
161    }
162
163    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
164        let dtype = partial.element_dtype.as_nullable();
165        match &partial.state {
166            BoundedMinState::Empty => Ok(Scalar::null(dtype)),
167            BoundedMinState::Value(min) => min.cast(&dtype),
168        }
169    }
170
171    fn reset(&self, partial: &mut Self::Partial) {
172        partial.state = BoundedMinState::Empty;
173    }
174
175    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
176        false
177    }
178
179    fn accumulate(
180        &self,
181        partial: &mut Self::Partial,
182        batch: &Columnar,
183        ctx: &mut ExecutionCtx,
184    ) -> VortexResult<()> {
185        // Delegate to the existing min_max implementation for now. A dedicated bounded-min
186        // aggregate would avoid computing max when only min is needed.
187        let array = match batch {
188            Columnar::Canonical(canonical) => canonical.clone().into_array(),
189            Columnar::Constant(constant) => constant.clone().into_array(),
190        };
191        let Some(result) = min_max(&array, ctx, NumericalAggregateOpts::default())? else {
192            return Ok(());
193        };
194        if let Some(bound) = truncate_min(result.min, partial.max_bytes.get())? {
195            partial.merge(bound);
196        }
197        Ok(())
198    }
199
200    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
201        Ok(partials)
202    }
203
204    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
205        self.to_scalar(partial)
206    }
207}
208
209fn supported_dtype<'a>(_options: &BoundedMinOptions, input_dtype: &'a DType) -> Option<&'a DType> {
210    MinMax
211        .return_dtype(&NumericalAggregateOpts::default(), input_dtype)
212        .map(|_| input_dtype)
213}
214
215fn truncate_min(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
216    let nullability = value.dtype().nullability();
217    match value.dtype() {
218        DType::Utf8(_) => {
219            Ok(
220                lower_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
221                    .map(|(bound, _)| bound),
222            )
223        }
224        DType::Binary(_) => {
225            Ok(
226                lower_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
227                    .map(|(bound, _)| bound),
228            )
229        }
230        _ => Ok(Some(value)),
231    }
232}
233#[cfg(test)]
234mod tests {
235    use std::num::NonZeroUsize;
236
237    use vortex_buffer::buffer;
238    use vortex_error::VortexExpect;
239    use vortex_error::VortexResult;
240    use vortex_session::VortexSession;
241
242    use crate::IntoArray;
243    use crate::VortexSessionExecute;
244    use crate::aggregate_fn::Accumulator;
245    use crate::aggregate_fn::AggregateFnSatisfaction;
246    use crate::aggregate_fn::AggregateFnVTable;
247    use crate::aggregate_fn::AggregateFnVTableExt;
248    use crate::aggregate_fn::DynAccumulator;
249    use crate::aggregate_fn::NumericalAggregateOpts;
250    use crate::aggregate_fn::fns::bounded_min::BoundedMin;
251    use crate::aggregate_fn::fns::bounded_min::BoundedMinOptions;
252    use crate::aggregate_fn::fns::max::Max;
253    use crate::aggregate_fn::fns::min::Min;
254    use crate::array_session;
255    use crate::arrays::PrimitiveArray;
256    use crate::arrays::VarBinViewArray;
257    use crate::dtype::Nullability;
258    use crate::scalar::Scalar;
259    use crate::validity::Validity;
260
261    fn max_bytes(value: usize) -> NonZeroUsize {
262        NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
263    }
264
265    fn fresh_session() -> VortexSession {
266        array_session()
267    }
268
269    #[test]
270    fn bounded_min_truncates_utf8_to_lower_bound() -> VortexResult<()> {
271        let mut ctx = array_session().create_execution_ctx();
272        let array =
273            VarBinViewArray::from_iter_str(["snowman⛄️snowman", "untruncated"]).into_array();
274        let mut acc = Accumulator::try_new(
275            BoundedMin,
276            BoundedMinOptions {
277                max_bytes: max_bytes(9),
278            },
279            array.dtype().clone(),
280        )?;
281
282        acc.accumulate(&array, &mut ctx)?;
283
284        assert_eq!(
285            acc.finish()?,
286            Scalar::utf8("snowman", Nullability::Nullable)
287        );
288        Ok(())
289    }
290
291    #[test]
292    fn bounded_min_keeps_fixed_width_values_exact() -> VortexResult<()> {
293        let mut ctx = array_session().create_execution_ctx();
294        let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
295        let mut acc = Accumulator::try_new(
296            BoundedMin,
297            BoundedMinOptions {
298                max_bytes: max_bytes(9),
299            },
300            array.dtype().clone(),
301        )?;
302
303        acc.accumulate(&array, &mut ctx)?;
304
305        assert_eq!(
306            acc.finish()?,
307            Scalar::primitive(5i32, Nullability::Nullable)
308        );
309        Ok(())
310    }
311
312    #[test]
313    fn bounded_min_null_partial_does_not_poison_existing_bound() -> VortexResult<()> {
314        let mut ctx = fresh_session().create_execution_ctx();
315        let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
316        let mut acc = Accumulator::try_new(
317            BoundedMin,
318            BoundedMinOptions {
319                max_bytes: max_bytes(2),
320            },
321            values.dtype().clone(),
322        )?;
323
324        acc.accumulate(&values, &mut ctx)?;
325        acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
326
327        assert_eq!(
328            acc.finish()?,
329            Scalar::binary(buffer![1u8], Nullability::Nullable)
330        );
331        Ok(())
332    }
333
334    #[test]
335    fn bounded_min_satisfies_min_bounds() {
336        let stored = BoundedMin.bind(BoundedMinOptions {
337            max_bytes: max_bytes(5),
338        });
339        let same = BoundedMin.bind(BoundedMinOptions {
340            max_bytes: max_bytes(5),
341        });
342        let looser_bounded = BoundedMin.bind(BoundedMinOptions {
343            max_bytes: max_bytes(4),
344        });
345        let tighter_bounded = BoundedMin.bind(BoundedMinOptions {
346            max_bytes: max_bytes(6),
347        });
348
349        assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
350        assert_eq!(
351            stored.can_satisfy(&looser_bounded),
352            AggregateFnSatisfaction::Approximate
353        );
354        assert_eq!(
355            stored.can_satisfy(&tighter_bounded),
356            AggregateFnSatisfaction::No
357        );
358        assert_eq!(
359            stored.can_satisfy(&Min.bind(NumericalAggregateOpts::default())),
360            AggregateFnSatisfaction::Approximate
361        );
362        assert_eq!(
363            stored.can_satisfy(&Min.bind(NumericalAggregateOpts::include_nans())),
364            AggregateFnSatisfaction::No
365        );
366        assert_eq!(
367            Min.bind(NumericalAggregateOpts::include_nans())
368                .can_satisfy(&stored),
369            AggregateFnSatisfaction::No
370        );
371        assert_eq!(
372            Min.bind(NumericalAggregateOpts::default())
373                .can_satisfy(&stored),
374            AggregateFnSatisfaction::Approximate
375        );
376        assert_eq!(
377            stored.can_satisfy(&Max.bind(NumericalAggregateOpts::default())),
378            AggregateFnSatisfaction::No
379        );
380    }
381
382    #[test]
383    fn bounded_min_options_round_trip() -> VortexResult<()> {
384        let options = BoundedMinOptions {
385            max_bytes: max_bytes(64),
386        };
387        let metadata = BoundedMin
388            .serialize(&options)?
389            .expect("serializable options");
390        let roundtrip = BoundedMin.deserialize(&metadata, &VortexSession::empty())?;
391
392        assert_eq!(roundtrip, options);
393        Ok(())
394    }
395}