Skip to main content

vortex_array/aggregate_fn/fns/bounded_max/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Columnar;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::aggregate_fn::AggregateFnId;
20use crate::aggregate_fn::AggregateFnRef;
21use crate::aggregate_fn::AggregateFnSatisfaction;
22use crate::aggregate_fn::AggregateFnVTable;
23use crate::aggregate_fn::EmptyOptions;
24use crate::aggregate_fn::fns::max::Max;
25use crate::aggregate_fn::fns::min_max::MinMax;
26use crate::aggregate_fn::fns::min_max::min_max;
27use crate::dtype::DType;
28use crate::partial_ord::partial_max;
29use crate::scalar::Scalar;
30use crate::scalar::ScalarTruncation;
31use crate::scalar::upper_bound;
32
33/// Options for [`BoundedMax`].
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct BoundedMaxOptions {
36    /// Maximum byte length for UTF8/Binary bounds.
37    pub max_bytes: NonZeroUsize,
38}
39
40impl Display for BoundedMaxOptions {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        write!(f, "{}", self.max_bytes.get())
43    }
44}
45
46/// Compute a byte-bounded upper bound for the maximum non-null value of a UTF8/Binary array.
47#[derive(Clone, Debug)]
48pub struct BoundedMax;
49
50enum BoundedMaxState {
51    Empty,
52    Value(Scalar),
53    Unknown,
54}
55
56/// Partial accumulator state for the bounded maximum aggregate.
57pub struct BoundedMaxPartial {
58    state: BoundedMaxState,
59    element_dtype: DType,
60    max_bytes: NonZeroUsize,
61}
62
63impl BoundedMaxPartial {
64    fn merge(&mut self, max: Scalar) {
65        if max.is_null() {
66            // Serialized partials encode both empty input and unknown upper bounds as null.
67            // Treat null as unknown when merging; this may lose a bound from an empty shard, but
68            // it preserves pruning soundness.
69            self.state = BoundedMaxState::Unknown;
70            return;
71        }
72
73        self.state = match std::mem::replace(&mut self.state, BoundedMaxState::Empty) {
74            BoundedMaxState::Empty => BoundedMaxState::Value(max),
75            BoundedMaxState::Value(current) => BoundedMaxState::Value(
76                partial_max(max, current).vortex_expect("incomparable bounded max scalars"),
77            ),
78            BoundedMaxState::Unknown => BoundedMaxState::Unknown,
79        };
80    }
81
82    fn unknown(&mut self) {
83        self.state = BoundedMaxState::Unknown;
84    }
85}
86
87impl AggregateFnVTable for BoundedMax {
88    type Options = BoundedMaxOptions;
89    type Partial = BoundedMaxPartial;
90
91    fn id(&self) -> AggregateFnId {
92        AggregateFnId::new("vortex.bounded_max")
93    }
94
95    fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
96        let max_bytes = u64::try_from(options.max_bytes.get())?;
97        Ok(Some(max_bytes.to_le_bytes().to_vec()))
98    }
99
100    fn deserialize(
101        &self,
102        metadata: &[u8],
103        _session: &VortexSession,
104    ) -> VortexResult<Self::Options> {
105        vortex_ensure!(
106            metadata.len() == size_of::<u64>(),
107            "BoundedMax options expected {} bytes, got {}",
108            size_of::<u64>(),
109            metadata.len()
110        );
111        let mut bytes = [0u8; size_of::<u64>()];
112        bytes.copy_from_slice(metadata);
113        let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
114        vortex_ensure!(max_bytes > 0, "BoundedMax requires max_bytes > 0");
115        Ok(BoundedMaxOptions {
116            max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
117        })
118    }
119
120    fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
121        supported_dtype(options, input_dtype).map(DType::as_nullable)
122    }
123
124    fn can_satisfy(
125        &self,
126        options: &Self::Options,
127        requested: &AggregateFnRef,
128    ) -> AggregateFnSatisfaction {
129        if let Some(other) = requested.as_opt::<Self>() {
130            return if other == options {
131                AggregateFnSatisfaction::Exact
132            } else if options.max_bytes >= other.max_bytes {
133                AggregateFnSatisfaction::Approximate
134            } else {
135                AggregateFnSatisfaction::No
136            };
137        }
138
139        if requested.is::<Max>() {
140            AggregateFnSatisfaction::Approximate
141        } else {
142            AggregateFnSatisfaction::No
143        }
144    }
145
146    fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
147        self.return_dtype(options, input_dtype)
148    }
149
150    fn empty_partial(
151        &self,
152        options: &Self::Options,
153        input_dtype: &DType,
154    ) -> VortexResult<Self::Partial> {
155        Ok(BoundedMaxPartial {
156            state: BoundedMaxState::Empty,
157            element_dtype: input_dtype.clone(),
158            max_bytes: options.max_bytes,
159        })
160    }
161
162    fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
163        partial.merge(other);
164        Ok(())
165    }
166
167    fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
168        let dtype = partial.element_dtype.as_nullable();
169        match &partial.state {
170            BoundedMaxState::Value(max) => max.cast(&dtype),
171            BoundedMaxState::Empty | BoundedMaxState::Unknown => Ok(Scalar::null(dtype)),
172        }
173    }
174
175    fn reset(&self, partial: &mut Self::Partial) {
176        partial.state = BoundedMaxState::Empty;
177    }
178
179    fn is_saturated(&self, partial: &Self::Partial) -> bool {
180        matches!(partial.state, BoundedMaxState::Unknown)
181    }
182
183    fn accumulate(
184        &self,
185        partial: &mut Self::Partial,
186        batch: &Columnar,
187        ctx: &mut ExecutionCtx,
188    ) -> VortexResult<()> {
189        // Delegate to the existing min_max implementation for now. A dedicated bounded-max
190        // aggregate would avoid computing min when only max is needed.
191        let array = match batch {
192            Columnar::Canonical(canonical) => canonical.clone().into_array(),
193            Columnar::Constant(constant) => constant.clone().into_array(),
194        };
195        let Some(result) = min_max(&array, ctx)? else {
196            return Ok(());
197        };
198        match truncate_max(result.max, partial.max_bytes.get())? {
199            Some(bound) => partial.merge(bound),
200            None => partial.unknown(),
201        }
202        Ok(())
203    }
204
205    fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
206        Ok(partials)
207    }
208
209    fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
210        self.to_scalar(partial)
211    }
212}
213
214fn supported_dtype<'a>(_options: &BoundedMaxOptions, input_dtype: &'a DType) -> Option<&'a DType> {
215    MinMax
216        .return_dtype(&EmptyOptions, input_dtype)
217        .map(|_| input_dtype)
218}
219
220fn truncate_max(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
221    let nullability = value.dtype().nullability();
222    match value.dtype() {
223        DType::Utf8(_) => {
224            Ok(
225                upper_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
226                    .map(|(bound, _)| bound),
227            )
228        }
229        DType::Binary(_) => {
230            Ok(
231                upper_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
232                    .map(|(bound, _)| bound),
233            )
234        }
235        _ => Ok(Some(value)),
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use std::num::NonZeroUsize;
242
243    use vortex_buffer::buffer;
244    use vortex_error::VortexExpect;
245    use vortex_error::VortexResult;
246    use vortex_session::VortexSession;
247
248    use crate::IntoArray;
249    use crate::LEGACY_SESSION;
250    use crate::VortexSessionExecute;
251    use crate::aggregate_fn::Accumulator;
252    use crate::aggregate_fn::AggregateFnSatisfaction;
253    use crate::aggregate_fn::AggregateFnVTable;
254    use crate::aggregate_fn::AggregateFnVTableExt;
255    use crate::aggregate_fn::DynAccumulator;
256    use crate::aggregate_fn::EmptyOptions;
257    use crate::aggregate_fn::fns::bounded_max::BoundedMax;
258    use crate::aggregate_fn::fns::bounded_max::BoundedMaxOptions;
259    use crate::aggregate_fn::fns::max::Max;
260    use crate::aggregate_fn::fns::min::Min;
261    use crate::arrays::PrimitiveArray;
262    use crate::arrays::VarBinViewArray;
263    use crate::dtype::Nullability;
264    use crate::scalar::Scalar;
265    use crate::session::ArraySession;
266    use crate::validity::Validity;
267
268    fn max_bytes(value: usize) -> NonZeroUsize {
269        NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
270    }
271
272    fn fresh_session() -> VortexSession {
273        VortexSession::empty().with::<ArraySession>()
274    }
275
276    #[test]
277    fn bounded_max_truncates_utf8_to_upper_bound() -> VortexResult<()> {
278        let mut ctx = LEGACY_SESSION.create_execution_ctx();
279        let array = VarBinViewArray::from_iter_str(["aardvark", "char🪩"]).into_array();
280        let mut acc = Accumulator::try_new(
281            BoundedMax,
282            BoundedMaxOptions {
283                max_bytes: max_bytes(5),
284            },
285            array.dtype().clone(),
286        )?;
287
288        acc.accumulate(&array, &mut ctx)?;
289
290        assert_eq!(acc.finish()?, Scalar::utf8("chas", Nullability::Nullable));
291        Ok(())
292    }
293
294    #[test]
295    fn bounded_max_unknown_upper_bound_returns_null() -> VortexResult<()> {
296        let mut ctx = LEGACY_SESSION.create_execution_ctx();
297        let array = VarBinViewArray::from_iter_bin([&[255u8, 255, 255][..]]).into_array();
298        let mut acc = Accumulator::try_new(
299            BoundedMax,
300            BoundedMaxOptions {
301                max_bytes: max_bytes(2),
302            },
303            array.dtype().clone(),
304        )?;
305
306        acc.accumulate(&array, &mut ctx)?;
307
308        assert_eq!(acc.finish()?, Scalar::null(array.dtype().as_nullable()));
309        Ok(())
310    }
311
312    #[test]
313    fn bounded_max_empty_does_not_poison_later_values() -> VortexResult<()> {
314        let mut ctx = LEGACY_SESSION.create_execution_ctx();
315        let empty = VarBinViewArray::from_iter_bin(Vec::<&[u8]>::new()).into_array();
316        let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
317        let mut acc = Accumulator::try_new(
318            BoundedMax,
319            BoundedMaxOptions {
320                max_bytes: max_bytes(2),
321            },
322            empty.dtype().clone(),
323        )?;
324
325        acc.accumulate(&empty, &mut ctx)?;
326        acc.accumulate(&values, &mut ctx)?;
327
328        assert_eq!(
329            acc.finish()?,
330            Scalar::binary(buffer![1u8], Nullability::Nullable)
331        );
332        Ok(())
333    }
334
335    #[test]
336    fn bounded_max_unknown_poisons_later_values() -> VortexResult<()> {
337        let mut ctx = LEGACY_SESSION.create_execution_ctx();
338        let unknown = VarBinViewArray::from_iter_bin([&[255u8, 255, 255][..]]).into_array();
339        let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
340        let mut acc = Accumulator::try_new(
341            BoundedMax,
342            BoundedMaxOptions {
343                max_bytes: max_bytes(2),
344            },
345            unknown.dtype().clone(),
346        )?;
347
348        acc.accumulate(&unknown, &mut ctx)?;
349        acc.accumulate(&values, &mut ctx)?;
350
351        assert_eq!(acc.finish()?, Scalar::null(unknown.dtype().as_nullable()));
352        Ok(())
353    }
354
355    #[test]
356    fn bounded_max_null_partial_poisons_existing_bound() -> VortexResult<()> {
357        let mut ctx = fresh_session().create_execution_ctx();
358        let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
359        let mut acc = Accumulator::try_new(
360            BoundedMax,
361            BoundedMaxOptions {
362                max_bytes: max_bytes(2),
363            },
364            values.dtype().clone(),
365        )?;
366
367        acc.accumulate(&values, &mut ctx)?;
368        acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
369
370        assert_eq!(acc.finish()?, Scalar::null(values.dtype().as_nullable()));
371        Ok(())
372    }
373
374    #[test]
375    fn bounded_max_keeps_fixed_width_values_exact() -> VortexResult<()> {
376        let mut ctx = LEGACY_SESSION.create_execution_ctx();
377        let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
378        let mut acc = Accumulator::try_new(
379            BoundedMax,
380            BoundedMaxOptions {
381                max_bytes: max_bytes(9),
382            },
383            array.dtype().clone(),
384        )?;
385
386        acc.accumulate(&array, &mut ctx)?;
387
388        assert_eq!(
389            acc.finish()?,
390            Scalar::primitive(20i32, Nullability::Nullable)
391        );
392        Ok(())
393    }
394
395    #[test]
396    fn bounded_max_satisfies_max_bounds() {
397        let stored = BoundedMax.bind(BoundedMaxOptions {
398            max_bytes: max_bytes(5),
399        });
400        let same = BoundedMax.bind(BoundedMaxOptions {
401            max_bytes: max_bytes(5),
402        });
403        let looser_bounded = BoundedMax.bind(BoundedMaxOptions {
404            max_bytes: max_bytes(4),
405        });
406        let tighter_bounded = BoundedMax.bind(BoundedMaxOptions {
407            max_bytes: max_bytes(6),
408        });
409
410        assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
411        assert_eq!(
412            stored.can_satisfy(&looser_bounded),
413            AggregateFnSatisfaction::Approximate
414        );
415        assert_eq!(
416            stored.can_satisfy(&tighter_bounded),
417            AggregateFnSatisfaction::No
418        );
419        assert_eq!(
420            stored.can_satisfy(&Max.bind(EmptyOptions)),
421            AggregateFnSatisfaction::Approximate
422        );
423        assert_eq!(
424            Max.bind(EmptyOptions).can_satisfy(&stored),
425            AggregateFnSatisfaction::Approximate
426        );
427        assert_eq!(
428            stored.can_satisfy(&Min.bind(EmptyOptions)),
429            AggregateFnSatisfaction::No
430        );
431    }
432
433    #[test]
434    fn bounded_max_options_round_trip() -> VortexResult<()> {
435        let options = BoundedMaxOptions {
436            max_bytes: max_bytes(64),
437        };
438        let metadata = BoundedMax
439            .serialize(&options)?
440            .expect("serializable options");
441        let roundtrip = BoundedMax.deserialize(&metadata, &VortexSession::empty())?;
442
443        assert_eq!(roundtrip, options);
444        Ok(())
445    }
446}