Skip to main content

vortex_array/aggregate_fn/fns/bounded_min/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Columnar;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::aggregate_fn::AggregateFnId;
20use crate::aggregate_fn::AggregateFnRef;
21use crate::aggregate_fn::AggregateFnSatisfaction;
22use crate::aggregate_fn::AggregateFnVTable;
23use crate::aggregate_fn::EmptyOptions;
24use crate::aggregate_fn::fns::min::Min;
25use crate::aggregate_fn::fns::min_max::MinMax;
26use crate::aggregate_fn::fns::min_max::min_max;
27use crate::dtype::DType;
28use crate::partial_ord::partial_min;
29use crate::scalar::Scalar;
30use crate::scalar::ScalarTruncation;
31use crate::scalar::lower_bound;
32
33/// Options for [`BoundedMin`].
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct BoundedMinOptions {
36    /// Maximum byte length for UTF8/Binary bounds.
37    pub max_bytes: NonZeroUsize,
38}
39
40impl Display for BoundedMinOptions {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.max_bytes.get())
43    }
44}
45
46/// Compute a byte-bounded lower bound for the minimum non-null value of a UTF8/Binary array.
47#[derive(Clone, Debug)]
48pub struct BoundedMin;
49
50enum BoundedMinState {
51    Empty,
52    Value(Scalar),
53}
54
55/// Partial accumulator state for the bounded minimum aggregate.
56pub struct BoundedMinPartial {
57    state: BoundedMinState,
58    element_dtype: DType,
59    max_bytes: NonZeroUsize,
60}
61
62impl BoundedMinPartial {
63    fn merge(&mut self, min: Scalar) {
64        if min.is_null() {
65            return;
66        }
67
68        self.state = match std::mem::replace(&mut self.state, BoundedMinState::Empty) {
69            BoundedMinState::Empty => BoundedMinState::Value(min),
70            BoundedMinState::Value(current) => BoundedMinState::Value(
71                partial_min(min, current).vortex_expect("incomparable bounded min scalars"),
72            ),
73        };
74    }
75}
76
77impl AggregateFnVTable for BoundedMin {
78    type Options = BoundedMinOptions;
79    type Partial = BoundedMinPartial;
80
81    fn id(&self) -> AggregateFnId {
82        AggregateFnId::new("vortex.bounded_min")
83    }
84
85    fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
86        let max_bytes = u64::try_from(options.max_bytes.get())?;
87        Ok(Some(max_bytes.to_le_bytes().to_vec()))
88    }
89
90    fn deserialize(
91        &self,
92        metadata: &[u8],
93        _session: &VortexSession,
94    ) -> VortexResult<Self::Options> {
95        vortex_ensure!(
96            metadata.len() == size_of::<u64>(),
97            "BoundedMin options expected {} bytes, got {}",
98            size_of::<u64>(),
99            metadata.len()
100        );
101        let mut bytes = [0u8; size_of::<u64>()];
102        bytes.copy_from_slice(metadata);
103        let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
104        vortex_ensure!(max_bytes > 0, "BoundedMin requires max_bytes > 0");
105        Ok(BoundedMinOptions {
106            max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
107        })
108    }
109
110    fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
111        supported_dtype(options, input_dtype).map(DType::as_nullable)
112    }
113
114    fn can_satisfy(
115        &self,
116        options: &Self::Options,
117        requested: &AggregateFnRef,
118    ) -> AggregateFnSatisfaction {
119        if let Some(other) = requested.as_opt::<Self>() {
120            return if other == options {
121                AggregateFnSatisfaction::Exact
122            } else if options.max_bytes >= other.max_bytes {
123                AggregateFnSatisfaction::Approximate
124            } else {
125                AggregateFnSatisfaction::No
126            };
127        }
128
129        if requested.is::<Min>() {
130            AggregateFnSatisfaction::Approximate
131        } else {
132            AggregateFnSatisfaction::No
133        }
134    }
135
136    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
137        self.return_dtype(options, input_dtype)
138    }
139
140    fn empty_partial(
141        &self,
142        options: &Self::Options,
143        input_dtype: &DType,
144    ) -> VortexResult<Self::Partial> {
145        Ok(BoundedMinPartial {
146            state: BoundedMinState::Empty,
147            element_dtype: input_dtype.clone(),
148            max_bytes: options.max_bytes,
149        })
150    }
151
152    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
153        partial.merge(other);
154        Ok(())
155    }
156
157    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
158        let dtype = partial.element_dtype.as_nullable();
159        match &partial.state {
160            BoundedMinState::Empty => Ok(Scalar::null(dtype)),
161            BoundedMinState::Value(min) => min.cast(&dtype),
162        }
163    }
164
165    fn reset(&self, partial: &mut Self::Partial) {
166        partial.state = BoundedMinState::Empty;
167    }
168
169    fn is_saturated(&self, _partial: &Self::Partial) -> bool {
170        false
171    }
172
173    fn accumulate(
174        &self,
175        partial: &mut Self::Partial,
176        batch: &Columnar,
177        ctx: &mut ExecutionCtx,
178    ) -> VortexResult<()> {
179        // Delegate to the existing min_max implementation for now. A dedicated bounded-min
180        // aggregate would avoid computing max when only min is needed.
181        let array = match batch {
182            Columnar::Canonical(canonical) => canonical.clone().into_array(),
183            Columnar::Constant(constant) => constant.clone().into_array(),
184        };
185        let Some(result) = min_max(&array, ctx)? else {
186            return Ok(());
187        };
188        if let Some(bound) = truncate_min(result.min, partial.max_bytes.get())? {
189            partial.merge(bound);
190        }
191        Ok(())
192    }
193
194    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
195        Ok(partials)
196    }
197
198    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
199        self.to_scalar(partial)
200    }
201}
202
203fn supported_dtype<'a>(_options: &BoundedMinOptions, input_dtype: &'a DType) -> Option<&'a DType> {
204    MinMax
205        .return_dtype(&EmptyOptions, input_dtype)
206        .map(|_| input_dtype)
207}
208
209fn truncate_min(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
210    let nullability = value.dtype().nullability();
211    match value.dtype() {
212        DType::Utf8(_) => {
213            Ok(
214                lower_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
215                    .map(|(bound, _)| bound),
216            )
217        }
218        DType::Binary(_) => {
219            Ok(
220                lower_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
221                    .map(|(bound, _)| bound),
222            )
223        }
224        _ => Ok(Some(value)),
225    }
226}
227#[cfg(test)]
228mod tests {
229    use std::num::NonZeroUsize;
230
231    use vortex_buffer::buffer;
232    use vortex_error::VortexExpect;
233    use vortex_error::VortexResult;
234    use vortex_session::VortexSession;
235
236    use crate::IntoArray;
237    use crate::LEGACY_SESSION;
238    use crate::VortexSessionExecute;
239    use crate::aggregate_fn::Accumulator;
240    use crate::aggregate_fn::AggregateFnSatisfaction;
241    use crate::aggregate_fn::AggregateFnVTable;
242    use crate::aggregate_fn::AggregateFnVTableExt;
243    use crate::aggregate_fn::DynAccumulator;
244    use crate::aggregate_fn::EmptyOptions;
245    use crate::aggregate_fn::fns::bounded_min::BoundedMin;
246    use crate::aggregate_fn::fns::bounded_min::BoundedMinOptions;
247    use crate::aggregate_fn::fns::max::Max;
248    use crate::aggregate_fn::fns::min::Min;
249    use crate::arrays::PrimitiveArray;
250    use crate::arrays::VarBinViewArray;
251    use crate::dtype::Nullability;
252    use crate::scalar::Scalar;
253    use crate::session::ArraySession;
254    use crate::validity::Validity;
255
256    fn max_bytes(value: usize) -> NonZeroUsize {
257        NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
258    }
259
260    fn fresh_session() -> VortexSession {
261        VortexSession::empty().with::<ArraySession>()
262    }
263
264    #[test]
265    fn bounded_min_truncates_utf8_to_lower_bound() -> VortexResult<()> {
266        let mut ctx = LEGACY_SESSION.create_execution_ctx();
267        let array =
268            VarBinViewArray::from_iter_str(["snowman⛄️snowman", "untruncated"]).into_array();
269        let mut acc = Accumulator::try_new(
270            BoundedMin,
271            BoundedMinOptions {
272                max_bytes: max_bytes(9),
273            },
274            array.dtype().clone(),
275        )?;
276
277        acc.accumulate(&array, &mut ctx)?;
278
279        assert_eq!(
280            acc.finish()?,
281            Scalar::utf8("snowman", Nullability::Nullable)
282        );
283        Ok(())
284    }
285
286    #[test]
287    fn bounded_min_keeps_fixed_width_values_exact() -> VortexResult<()> {
288        let mut ctx = LEGACY_SESSION.create_execution_ctx();
289        let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
290        let mut acc = Accumulator::try_new(
291            BoundedMin,
292            BoundedMinOptions {
293                max_bytes: max_bytes(9),
294            },
295            array.dtype().clone(),
296        )?;
297
298        acc.accumulate(&array, &mut ctx)?;
299
300        assert_eq!(
301            acc.finish()?,
302            Scalar::primitive(5i32, Nullability::Nullable)
303        );
304        Ok(())
305    }
306
307    #[test]
308    fn bounded_min_null_partial_does_not_poison_existing_bound() -> VortexResult<()> {
309        let mut ctx = fresh_session().create_execution_ctx();
310        let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
311        let mut acc = Accumulator::try_new(
312            BoundedMin,
313            BoundedMinOptions {
314                max_bytes: max_bytes(2),
315            },
316            values.dtype().clone(),
317        )?;
318
319        acc.accumulate(&values, &mut ctx)?;
320        acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
321
322        assert_eq!(
323            acc.finish()?,
324            Scalar::binary(buffer![1u8], Nullability::Nullable)
325        );
326        Ok(())
327    }
328
329    #[test]
330    fn bounded_min_satisfies_min_bounds() {
331        let stored = BoundedMin.bind(BoundedMinOptions {
332            max_bytes: max_bytes(5),
333        });
334        let same = BoundedMin.bind(BoundedMinOptions {
335            max_bytes: max_bytes(5),
336        });
337        let looser_bounded = BoundedMin.bind(BoundedMinOptions {
338            max_bytes: max_bytes(4),
339        });
340        let tighter_bounded = BoundedMin.bind(BoundedMinOptions {
341            max_bytes: max_bytes(6),
342        });
343
344        assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
345        assert_eq!(
346            stored.can_satisfy(&looser_bounded),
347            AggregateFnSatisfaction::Approximate
348        );
349        assert_eq!(
350            stored.can_satisfy(&tighter_bounded),
351            AggregateFnSatisfaction::No
352        );
353        assert_eq!(
354            stored.can_satisfy(&Min.bind(EmptyOptions)),
355            AggregateFnSatisfaction::Approximate
356        );
357        assert_eq!(
358            Min.bind(EmptyOptions).can_satisfy(&stored),
359            AggregateFnSatisfaction::Approximate
360        );
361        assert_eq!(
362            stored.can_satisfy(&Max.bind(EmptyOptions)),
363            AggregateFnSatisfaction::No
364        );
365    }
366
367    #[test]
368    fn bounded_min_options_round_trip() -> VortexResult<()> {
369        let options = BoundedMinOptions {
370            max_bytes: max_bytes(64),
371        };
372        let metadata = BoundedMin
373            .serialize(&options)?
374            .expect("serializable options");
375        let roundtrip = BoundedMin.deserialize(&metadata, &VortexSession::empty())?;
376
377        assert_eq!(roundtrip, options);
378        Ok(())
379    }
380}