vortex_array/aggregate_fn/fns/bounded_min/
mod.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14use vortex_session::registry::CachedId;
15
16use crate::ArrayRef;
17use crate::Columnar;
18use crate::ExecutionCtx;
19use crate::IntoArray;
20use crate::aggregate_fn::AggregateFnId;
21use crate::aggregate_fn::AggregateFnRef;
22use crate::aggregate_fn::AggregateFnSatisfaction;
23use crate::aggregate_fn::AggregateFnVTable;
24use crate::aggregate_fn::NumericalAggregateOpts;
25use crate::aggregate_fn::fns::min::Min;
26use crate::aggregate_fn::fns::min_max::MinMax;
27use crate::aggregate_fn::fns::min_max::min_max;
28use crate::dtype::DType;
29use crate::partial_ord::partial_min;
30use crate::scalar::Scalar;
31use crate::scalar::ScalarTruncation;
32use crate::scalar::lower_bound;
33
34#[derive(Clone, Debug, PartialEq, Eq, Hash)]
36pub struct BoundedMinOptions {
37 pub max_bytes: NonZeroUsize,
39}
40
41impl Display for BoundedMinOptions {
42 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
43 write!(f, "{}", self.max_bytes.get())
44 }
45}
46
47#[derive(Clone, Debug)]
49pub struct BoundedMin;
50
51enum BoundedMinState {
52 Empty,
53 Value(Scalar),
54}
55
56pub struct BoundedMinPartial {
58 state: BoundedMinState,
59 element_dtype: DType,
60 max_bytes: NonZeroUsize,
61}
62
63impl BoundedMinPartial {
64 fn merge(&mut self, min: Scalar) {
65 if min.is_null() {
66 return;
67 }
68
69 self.state = match std::mem::replace(&mut self.state, BoundedMinState::Empty) {
70 BoundedMinState::Empty => BoundedMinState::Value(min),
71 BoundedMinState::Value(current) => BoundedMinState::Value(
72 partial_min(min, current).vortex_expect("incomparable bounded min scalars"),
73 ),
74 };
75 }
76}
77
78impl AggregateFnVTable for BoundedMin {
79 type Options = BoundedMinOptions;
80 type Partial = BoundedMinPartial;
81
82 fn id(&self) -> AggregateFnId {
83 static ID: CachedId = CachedId::new("vortex.bounded_min");
84 *ID
85 }
86
87 fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
88 let max_bytes = u64::try_from(options.max_bytes.get())?;
89 Ok(Some(max_bytes.to_le_bytes().to_vec()))
90 }
91
92 fn deserialize(
93 &self,
94 metadata: &[u8],
95 _session: &VortexSession,
96 ) -> VortexResult<Self::Options> {
97 vortex_ensure!(
98 metadata.len() == size_of::<u64>(),
99 "BoundedMin options expected {} bytes, got {}",
100 size_of::<u64>(),
101 metadata.len()
102 );
103 let mut bytes = [0u8; size_of::<u64>()];
104 bytes.copy_from_slice(metadata);
105 let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
106 vortex_ensure!(max_bytes > 0, "BoundedMin requires max_bytes > 0");
107 Ok(BoundedMinOptions {
108 max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
109 })
110 }
111
112 fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
113 supported_dtype(options, input_dtype).map(DType::as_nullable)
114 }
115
116 fn can_satisfy(
117 &self,
118 options: &Self::Options,
119 requested: &AggregateFnRef,
120 ) -> AggregateFnSatisfaction {
121 if let Some(other) = requested.as_opt::<Self>() {
122 return if other == options {
123 AggregateFnSatisfaction::Exact
124 } else if options.max_bytes >= other.max_bytes {
125 AggregateFnSatisfaction::Approximate
126 } else {
127 AggregateFnSatisfaction::No
128 };
129 }
130
131 if requested
133 .as_opt::<Min>()
134 .is_some_and(|options| options.skip_nans)
135 {
136 AggregateFnSatisfaction::Approximate
137 } else {
138 AggregateFnSatisfaction::No
139 }
140 }
141
142 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
143 self.return_dtype(options, input_dtype)
144 }
145
146 fn empty_partial(
147 &self,
148 options: &Self::Options,
149 input_dtype: &DType,
150 ) -> VortexResult<Self::Partial> {
151 Ok(BoundedMinPartial {
152 state: BoundedMinState::Empty,
153 element_dtype: input_dtype.clone(),
154 max_bytes: options.max_bytes,
155 })
156 }
157
158 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
159 partial.merge(other);
160 Ok(())
161 }
162
163 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
164 let dtype = partial.element_dtype.as_nullable();
165 match &partial.state {
166 BoundedMinState::Empty => Ok(Scalar::null(dtype)),
167 BoundedMinState::Value(min) => min.cast(&dtype),
168 }
169 }
170
171 fn reset(&self, partial: &mut Self::Partial) {
172 partial.state = BoundedMinState::Empty;
173 }
174
175 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
176 false
177 }
178
179 fn accumulate(
180 &self,
181 partial: &mut Self::Partial,
182 batch: &Columnar,
183 ctx: &mut ExecutionCtx,
184 ) -> VortexResult<()> {
185 let array = match batch {
188 Columnar::Canonical(canonical) => canonical.clone().into_array(),
189 Columnar::Constant(constant) => constant.clone().into_array(),
190 };
191 let Some(result) = min_max(&array, ctx, NumericalAggregateOpts::default())? else {
192 return Ok(());
193 };
194 if let Some(bound) = truncate_min(result.min, partial.max_bytes.get())? {
195 partial.merge(bound);
196 }
197 Ok(())
198 }
199
200 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
201 Ok(partials)
202 }
203
204 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
205 self.to_scalar(partial)
206 }
207}
208
209fn supported_dtype<'a>(_options: &BoundedMinOptions, input_dtype: &'a DType) -> Option<&'a DType> {
210 MinMax
211 .return_dtype(&NumericalAggregateOpts::default(), input_dtype)
212 .map(|_| input_dtype)
213}
214
215fn truncate_min(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
216 let nullability = value.dtype().nullability();
217 match value.dtype() {
218 DType::Utf8(_) => {
219 Ok(
220 lower_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
221 .map(|(bound, _)| bound),
222 )
223 }
224 DType::Binary(_) => {
225 Ok(
226 lower_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
227 .map(|(bound, _)| bound),
228 )
229 }
230 _ => Ok(Some(value)),
231 }
232}
233#[cfg(test)]
234mod tests {
235 use std::num::NonZeroUsize;
236
237 use vortex_buffer::buffer;
238 use vortex_error::VortexExpect;
239 use vortex_error::VortexResult;
240 use vortex_session::VortexSession;
241
242 use crate::IntoArray;
243 use crate::VortexSessionExecute;
244 use crate::aggregate_fn::Accumulator;
245 use crate::aggregate_fn::AggregateFnSatisfaction;
246 use crate::aggregate_fn::AggregateFnVTable;
247 use crate::aggregate_fn::AggregateFnVTableExt;
248 use crate::aggregate_fn::DynAccumulator;
249 use crate::aggregate_fn::NumericalAggregateOpts;
250 use crate::aggregate_fn::fns::bounded_min::BoundedMin;
251 use crate::aggregate_fn::fns::bounded_min::BoundedMinOptions;
252 use crate::aggregate_fn::fns::max::Max;
253 use crate::aggregate_fn::fns::min::Min;
254 use crate::array_session;
255 use crate::arrays::PrimitiveArray;
256 use crate::arrays::VarBinViewArray;
257 use crate::dtype::Nullability;
258 use crate::scalar::Scalar;
259 use crate::validity::Validity;
260
261 fn max_bytes(value: usize) -> NonZeroUsize {
262 NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
263 }
264
265 fn fresh_session() -> VortexSession {
266 array_session()
267 }
268
269 #[test]
270 fn bounded_min_truncates_utf8_to_lower_bound() -> VortexResult<()> {
271 let mut ctx = array_session().create_execution_ctx();
272 let array =
273 VarBinViewArray::from_iter_str(["snowman⛄️snowman", "untruncated"]).into_array();
274 let mut acc = Accumulator::try_new(
275 BoundedMin,
276 BoundedMinOptions {
277 max_bytes: max_bytes(9),
278 },
279 array.dtype().clone(),
280 )?;
281
282 acc.accumulate(&array, &mut ctx)?;
283
284 assert_eq!(
285 acc.finish()?,
286 Scalar::utf8("snowman", Nullability::Nullable)
287 );
288 Ok(())
289 }
290
291 #[test]
292 fn bounded_min_keeps_fixed_width_values_exact() -> VortexResult<()> {
293 let mut ctx = array_session().create_execution_ctx();
294 let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
295 let mut acc = Accumulator::try_new(
296 BoundedMin,
297 BoundedMinOptions {
298 max_bytes: max_bytes(9),
299 },
300 array.dtype().clone(),
301 )?;
302
303 acc.accumulate(&array, &mut ctx)?;
304
305 assert_eq!(
306 acc.finish()?,
307 Scalar::primitive(5i32, Nullability::Nullable)
308 );
309 Ok(())
310 }
311
312 #[test]
313 fn bounded_min_null_partial_does_not_poison_existing_bound() -> VortexResult<()> {
314 let mut ctx = fresh_session().create_execution_ctx();
315 let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
316 let mut acc = Accumulator::try_new(
317 BoundedMin,
318 BoundedMinOptions {
319 max_bytes: max_bytes(2),
320 },
321 values.dtype().clone(),
322 )?;
323
324 acc.accumulate(&values, &mut ctx)?;
325 acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
326
327 assert_eq!(
328 acc.finish()?,
329 Scalar::binary(buffer![1u8], Nullability::Nullable)
330 );
331 Ok(())
332 }
333
334 #[test]
335 fn bounded_min_satisfies_min_bounds() {
336 let stored = BoundedMin.bind(BoundedMinOptions {
337 max_bytes: max_bytes(5),
338 });
339 let same = BoundedMin.bind(BoundedMinOptions {
340 max_bytes: max_bytes(5),
341 });
342 let looser_bounded = BoundedMin.bind(BoundedMinOptions {
343 max_bytes: max_bytes(4),
344 });
345 let tighter_bounded = BoundedMin.bind(BoundedMinOptions {
346 max_bytes: max_bytes(6),
347 });
348
349 assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
350 assert_eq!(
351 stored.can_satisfy(&looser_bounded),
352 AggregateFnSatisfaction::Approximate
353 );
354 assert_eq!(
355 stored.can_satisfy(&tighter_bounded),
356 AggregateFnSatisfaction::No
357 );
358 assert_eq!(
359 stored.can_satisfy(&Min.bind(NumericalAggregateOpts::default())),
360 AggregateFnSatisfaction::Approximate
361 );
362 assert_eq!(
363 stored.can_satisfy(&Min.bind(NumericalAggregateOpts::include_nans())),
364 AggregateFnSatisfaction::No
365 );
366 assert_eq!(
367 Min.bind(NumericalAggregateOpts::include_nans())
368 .can_satisfy(&stored),
369 AggregateFnSatisfaction::No
370 );
371 assert_eq!(
372 Min.bind(NumericalAggregateOpts::default())
373 .can_satisfy(&stored),
374 AggregateFnSatisfaction::Approximate
375 );
376 assert_eq!(
377 stored.can_satisfy(&Max.bind(NumericalAggregateOpts::default())),
378 AggregateFnSatisfaction::No
379 );
380 }
381
382 #[test]
383 fn bounded_min_options_round_trip() -> VortexResult<()> {
384 let options = BoundedMinOptions {
385 max_bytes: max_bytes(64),
386 };
387 let metadata = BoundedMin
388 .serialize(&options)?
389 .expect("serializable options");
390 let roundtrip = BoundedMin.deserialize(&metadata, &VortexSession::empty())?;
391
392 assert_eq!(roundtrip, options);
393 Ok(())
394 }
395}