vortex_array/aggregate_fn/fns/bounded_max/
mod.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Columnar;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::aggregate_fn::AggregateFnId;
20use crate::aggregate_fn::AggregateFnRef;
21use crate::aggregate_fn::AggregateFnSatisfaction;
22use crate::aggregate_fn::AggregateFnVTable;
23use crate::aggregate_fn::EmptyOptions;
24use crate::aggregate_fn::fns::max::Max;
25use crate::aggregate_fn::fns::min_max::MinMax;
26use crate::aggregate_fn::fns::min_max::min_max;
27use crate::dtype::DType;
28use crate::partial_ord::partial_max;
29use crate::scalar::Scalar;
30use crate::scalar::ScalarTruncation;
31use crate::scalar::upper_bound;
32
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct BoundedMaxOptions {
36 pub max_bytes: NonZeroUsize,
38}
39
40impl Display for BoundedMaxOptions {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.max_bytes.get())
43 }
44}
45
46#[derive(Clone, Debug)]
48pub struct BoundedMax;
49
50enum BoundedMaxState {
51 Empty,
52 Value(Scalar),
53 Unknown,
54}
55
56pub struct BoundedMaxPartial {
58 state: BoundedMaxState,
59 element_dtype: DType,
60 max_bytes: NonZeroUsize,
61}
62
63impl BoundedMaxPartial {
64 fn merge(&mut self, max: Scalar) {
65 if max.is_null() {
66 self.state = BoundedMaxState::Unknown;
70 return;
71 }
72
73 self.state = match std::mem::replace(&mut self.state, BoundedMaxState::Empty) {
74 BoundedMaxState::Empty => BoundedMaxState::Value(max),
75 BoundedMaxState::Value(current) => BoundedMaxState::Value(
76 partial_max(max, current).vortex_expect("incomparable bounded max scalars"),
77 ),
78 BoundedMaxState::Unknown => BoundedMaxState::Unknown,
79 };
80 }
81
82 fn unknown(&mut self) {
83 self.state = BoundedMaxState::Unknown;
84 }
85}
86
87impl AggregateFnVTable for BoundedMax {
88 type Options = BoundedMaxOptions;
89 type Partial = BoundedMaxPartial;
90
91 fn id(&self) -> AggregateFnId {
92 AggregateFnId::new("vortex.bounded_max")
93 }
94
95 fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
96 let max_bytes = u64::try_from(options.max_bytes.get())?;
97 Ok(Some(max_bytes.to_le_bytes().to_vec()))
98 }
99
100 fn deserialize(
101 &self,
102 metadata: &[u8],
103 _session: &VortexSession,
104 ) -> VortexResult<Self::Options> {
105 vortex_ensure!(
106 metadata.len() == size_of::<u64>(),
107 "BoundedMax options expected {} bytes, got {}",
108 size_of::<u64>(),
109 metadata.len()
110 );
111 let mut bytes = [0u8; size_of::<u64>()];
112 bytes.copy_from_slice(metadata);
113 let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
114 vortex_ensure!(max_bytes > 0, "BoundedMax requires max_bytes > 0");
115 Ok(BoundedMaxOptions {
116 max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
117 })
118 }
119
120 fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
121 supported_dtype(options, input_dtype).map(DType::as_nullable)
122 }
123
124 fn can_satisfy(
125 &self,
126 options: &Self::Options,
127 requested: &AggregateFnRef,
128 ) -> AggregateFnSatisfaction {
129 if let Some(other) = requested.as_opt::<Self>() {
130 return if other == options {
131 AggregateFnSatisfaction::Exact
132 } else if options.max_bytes >= other.max_bytes {
133 AggregateFnSatisfaction::Approximate
134 } else {
135 AggregateFnSatisfaction::No
136 };
137 }
138
139 if requested.is::<Max>() {
140 AggregateFnSatisfaction::Approximate
141 } else {
142 AggregateFnSatisfaction::No
143 }
144 }
145
146 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
147 self.return_dtype(options, input_dtype)
148 }
149
150 fn empty_partial(
151 &self,
152 options: &Self::Options,
153 input_dtype: &DType,
154 ) -> VortexResult<Self::Partial> {
155 Ok(BoundedMaxPartial {
156 state: BoundedMaxState::Empty,
157 element_dtype: input_dtype.clone(),
158 max_bytes: options.max_bytes,
159 })
160 }
161
162 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
163 partial.merge(other);
164 Ok(())
165 }
166
167 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
168 let dtype = partial.element_dtype.as_nullable();
169 match &partial.state {
170 BoundedMaxState::Value(max) => max.cast(&dtype),
171 BoundedMaxState::Empty | BoundedMaxState::Unknown => Ok(Scalar::null(dtype)),
172 }
173 }
174
175 fn reset(&self, partial: &mut Self::Partial) {
176 partial.state = BoundedMaxState::Empty;
177 }
178
179 fn is_saturated(&self, partial: &Self::Partial) -> bool {
180 matches!(partial.state, BoundedMaxState::Unknown)
181 }
182
183 fn accumulate(
184 &self,
185 partial: &mut Self::Partial,
186 batch: &Columnar,
187 ctx: &mut ExecutionCtx,
188 ) -> VortexResult<()> {
189 let array = match batch {
192 Columnar::Canonical(canonical) => canonical.clone().into_array(),
193 Columnar::Constant(constant) => constant.clone().into_array(),
194 };
195 let Some(result) = min_max(&array, ctx)? else {
196 return Ok(());
197 };
198 match truncate_max(result.max, partial.max_bytes.get())? {
199 Some(bound) => partial.merge(bound),
200 None => partial.unknown(),
201 }
202 Ok(())
203 }
204
205 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
206 Ok(partials)
207 }
208
209 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
210 self.to_scalar(partial)
211 }
212}
213
214fn supported_dtype<'a>(_options: &BoundedMaxOptions, input_dtype: &'a DType) -> Option<&'a DType> {
215 MinMax
216 .return_dtype(&EmptyOptions, input_dtype)
217 .map(|_| input_dtype)
218}
219
220fn truncate_max(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
221 let nullability = value.dtype().nullability();
222 match value.dtype() {
223 DType::Utf8(_) => {
224 Ok(
225 upper_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
226 .map(|(bound, _)| bound),
227 )
228 }
229 DType::Binary(_) => {
230 Ok(
231 upper_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
232 .map(|(bound, _)| bound),
233 )
234 }
235 _ => Ok(Some(value)),
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use std::num::NonZeroUsize;
242
243 use vortex_buffer::buffer;
244 use vortex_error::VortexExpect;
245 use vortex_error::VortexResult;
246 use vortex_session::VortexSession;
247
248 use crate::IntoArray;
249 use crate::LEGACY_SESSION;
250 use crate::VortexSessionExecute;
251 use crate::aggregate_fn::Accumulator;
252 use crate::aggregate_fn::AggregateFnSatisfaction;
253 use crate::aggregate_fn::AggregateFnVTable;
254 use crate::aggregate_fn::AggregateFnVTableExt;
255 use crate::aggregate_fn::DynAccumulator;
256 use crate::aggregate_fn::EmptyOptions;
257 use crate::aggregate_fn::fns::bounded_max::BoundedMax;
258 use crate::aggregate_fn::fns::bounded_max::BoundedMaxOptions;
259 use crate::aggregate_fn::fns::max::Max;
260 use crate::aggregate_fn::fns::min::Min;
261 use crate::arrays::PrimitiveArray;
262 use crate::arrays::VarBinViewArray;
263 use crate::dtype::Nullability;
264 use crate::scalar::Scalar;
265 use crate::session::ArraySession;
266 use crate::validity::Validity;
267
268 fn max_bytes(value: usize) -> NonZeroUsize {
269 NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
270 }
271
272 fn fresh_session() -> VortexSession {
273 VortexSession::empty().with::<ArraySession>()
274 }
275
276 #[test]
277 fn bounded_max_truncates_utf8_to_upper_bound() -> VortexResult<()> {
278 let mut ctx = LEGACY_SESSION.create_execution_ctx();
279 let array = VarBinViewArray::from_iter_str(["aardvark", "char🪩"]).into_array();
280 let mut acc = Accumulator::try_new(
281 BoundedMax,
282 BoundedMaxOptions {
283 max_bytes: max_bytes(5),
284 },
285 array.dtype().clone(),
286 )?;
287
288 acc.accumulate(&array, &mut ctx)?;
289
290 assert_eq!(acc.finish()?, Scalar::utf8("chas", Nullability::Nullable));
291 Ok(())
292 }
293
294 #[test]
295 fn bounded_max_unknown_upper_bound_returns_null() -> VortexResult<()> {
296 let mut ctx = LEGACY_SESSION.create_execution_ctx();
297 let array = VarBinViewArray::from_iter_bin([&[255u8, 255, 255][..]]).into_array();
298 let mut acc = Accumulator::try_new(
299 BoundedMax,
300 BoundedMaxOptions {
301 max_bytes: max_bytes(2),
302 },
303 array.dtype().clone(),
304 )?;
305
306 acc.accumulate(&array, &mut ctx)?;
307
308 assert_eq!(acc.finish()?, Scalar::null(array.dtype().as_nullable()));
309 Ok(())
310 }
311
312 #[test]
313 fn bounded_max_empty_does_not_poison_later_values() -> VortexResult<()> {
314 let mut ctx = LEGACY_SESSION.create_execution_ctx();
315 let empty = VarBinViewArray::from_iter_bin(Vec::<&[u8]>::new()).into_array();
316 let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
317 let mut acc = Accumulator::try_new(
318 BoundedMax,
319 BoundedMaxOptions {
320 max_bytes: max_bytes(2),
321 },
322 empty.dtype().clone(),
323 )?;
324
325 acc.accumulate(&empty, &mut ctx)?;
326 acc.accumulate(&values, &mut ctx)?;
327
328 assert_eq!(
329 acc.finish()?,
330 Scalar::binary(buffer![1u8], Nullability::Nullable)
331 );
332 Ok(())
333 }
334
335 #[test]
336 fn bounded_max_unknown_poisons_later_values() -> VortexResult<()> {
337 let mut ctx = LEGACY_SESSION.create_execution_ctx();
338 let unknown = VarBinViewArray::from_iter_bin([&[255u8, 255, 255][..]]).into_array();
339 let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
340 let mut acc = Accumulator::try_new(
341 BoundedMax,
342 BoundedMaxOptions {
343 max_bytes: max_bytes(2),
344 },
345 unknown.dtype().clone(),
346 )?;
347
348 acc.accumulate(&unknown, &mut ctx)?;
349 acc.accumulate(&values, &mut ctx)?;
350
351 assert_eq!(acc.finish()?, Scalar::null(unknown.dtype().as_nullable()));
352 Ok(())
353 }
354
355 #[test]
356 fn bounded_max_null_partial_poisons_existing_bound() -> VortexResult<()> {
357 let mut ctx = fresh_session().create_execution_ctx();
358 let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
359 let mut acc = Accumulator::try_new(
360 BoundedMax,
361 BoundedMaxOptions {
362 max_bytes: max_bytes(2),
363 },
364 values.dtype().clone(),
365 )?;
366
367 acc.accumulate(&values, &mut ctx)?;
368 acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
369
370 assert_eq!(acc.finish()?, Scalar::null(values.dtype().as_nullable()));
371 Ok(())
372 }
373
374 #[test]
375 fn bounded_max_keeps_fixed_width_values_exact() -> VortexResult<()> {
376 let mut ctx = LEGACY_SESSION.create_execution_ctx();
377 let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
378 let mut acc = Accumulator::try_new(
379 BoundedMax,
380 BoundedMaxOptions {
381 max_bytes: max_bytes(9),
382 },
383 array.dtype().clone(),
384 )?;
385
386 acc.accumulate(&array, &mut ctx)?;
387
388 assert_eq!(
389 acc.finish()?,
390 Scalar::primitive(20i32, Nullability::Nullable)
391 );
392 Ok(())
393 }
394
395 #[test]
396 fn bounded_max_satisfies_max_bounds() {
397 let stored = BoundedMax.bind(BoundedMaxOptions {
398 max_bytes: max_bytes(5),
399 });
400 let same = BoundedMax.bind(BoundedMaxOptions {
401 max_bytes: max_bytes(5),
402 });
403 let looser_bounded = BoundedMax.bind(BoundedMaxOptions {
404 max_bytes: max_bytes(4),
405 });
406 let tighter_bounded = BoundedMax.bind(BoundedMaxOptions {
407 max_bytes: max_bytes(6),
408 });
409
410 assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
411 assert_eq!(
412 stored.can_satisfy(&looser_bounded),
413 AggregateFnSatisfaction::Approximate
414 );
415 assert_eq!(
416 stored.can_satisfy(&tighter_bounded),
417 AggregateFnSatisfaction::No
418 );
419 assert_eq!(
420 stored.can_satisfy(&Max.bind(EmptyOptions)),
421 AggregateFnSatisfaction::Approximate
422 );
423 assert_eq!(
424 Max.bind(EmptyOptions).can_satisfy(&stored),
425 AggregateFnSatisfaction::Approximate
426 );
427 assert_eq!(
428 stored.can_satisfy(&Min.bind(EmptyOptions)),
429 AggregateFnSatisfaction::No
430 );
431 }
432
433 #[test]
434 fn bounded_max_options_round_trip() -> VortexResult<()> {
435 let options = BoundedMaxOptions {
436 max_bytes: max_bytes(64),
437 };
438 let metadata = BoundedMax
439 .serialize(&options)?
440 .expect("serializable options");
441 let roundtrip = BoundedMax.deserialize(&metadata, &VortexSession::empty())?;
442
443 assert_eq!(roundtrip, options);
444 Ok(())
445 }
446}