vortex_array/aggregate_fn/fns/bounded_min/
mod.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6use std::num::NonZeroUsize;
7
8use vortex_buffer::BufferString;
9use vortex_buffer::ByteBuffer;
10use vortex_error::VortexExpect;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13use vortex_session::VortexSession;
14
15use crate::ArrayRef;
16use crate::Columnar;
17use crate::ExecutionCtx;
18use crate::IntoArray;
19use crate::aggregate_fn::AggregateFnId;
20use crate::aggregate_fn::AggregateFnRef;
21use crate::aggregate_fn::AggregateFnSatisfaction;
22use crate::aggregate_fn::AggregateFnVTable;
23use crate::aggregate_fn::EmptyOptions;
24use crate::aggregate_fn::fns::min::Min;
25use crate::aggregate_fn::fns::min_max::MinMax;
26use crate::aggregate_fn::fns::min_max::min_max;
27use crate::dtype::DType;
28use crate::partial_ord::partial_min;
29use crate::scalar::Scalar;
30use crate::scalar::ScalarTruncation;
31use crate::scalar::lower_bound;
32
33#[derive(Clone, Debug, PartialEq, Eq, Hash)]
35pub struct BoundedMinOptions {
36 pub max_bytes: NonZeroUsize,
38}
39
40impl Display for BoundedMinOptions {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 write!(f, "{}", self.max_bytes.get())
43 }
44}
45
46#[derive(Clone, Debug)]
48pub struct BoundedMin;
49
50enum BoundedMinState {
51 Empty,
52 Value(Scalar),
53}
54
55pub struct BoundedMinPartial {
57 state: BoundedMinState,
58 element_dtype: DType,
59 max_bytes: NonZeroUsize,
60}
61
62impl BoundedMinPartial {
63 fn merge(&mut self, min: Scalar) {
64 if min.is_null() {
65 return;
66 }
67
68 self.state = match std::mem::replace(&mut self.state, BoundedMinState::Empty) {
69 BoundedMinState::Empty => BoundedMinState::Value(min),
70 BoundedMinState::Value(current) => BoundedMinState::Value(
71 partial_min(min, current).vortex_expect("incomparable bounded min scalars"),
72 ),
73 };
74 }
75}
76
77impl AggregateFnVTable for BoundedMin {
78 type Options = BoundedMinOptions;
79 type Partial = BoundedMinPartial;
80
81 fn id(&self) -> AggregateFnId {
82 AggregateFnId::new("vortex.bounded_min")
83 }
84
85 fn serialize(&self, options: &Self::Options) -> VortexResult<Option<Vec<u8>>> {
86 let max_bytes = u64::try_from(options.max_bytes.get())?;
87 Ok(Some(max_bytes.to_le_bytes().to_vec()))
88 }
89
90 fn deserialize(
91 &self,
92 metadata: &[u8],
93 _session: &VortexSession,
94 ) -> VortexResult<Self::Options> {
95 vortex_ensure!(
96 metadata.len() == size_of::<u64>(),
97 "BoundedMin options expected {} bytes, got {}",
98 size_of::<u64>(),
99 metadata.len()
100 );
101 let mut bytes = [0u8; size_of::<u64>()];
102 bytes.copy_from_slice(metadata);
103 let max_bytes = usize::try_from(u64::from_le_bytes(bytes))?;
104 vortex_ensure!(max_bytes > 0, "BoundedMin requires max_bytes > 0");
105 Ok(BoundedMinOptions {
106 max_bytes: NonZeroUsize::new(max_bytes).vortex_expect("checked non-zero max_bytes"),
107 })
108 }
109
110 fn return_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
111 supported_dtype(options, input_dtype).map(DType::as_nullable)
112 }
113
114 fn can_satisfy(
115 &self,
116 options: &Self::Options,
117 requested: &AggregateFnRef,
118 ) -> AggregateFnSatisfaction {
119 if let Some(other) = requested.as_opt::<Self>() {
120 return if other == options {
121 AggregateFnSatisfaction::Exact
122 } else if options.max_bytes >= other.max_bytes {
123 AggregateFnSatisfaction::Approximate
124 } else {
125 AggregateFnSatisfaction::No
126 };
127 }
128
129 if requested.is::<Min>() {
130 AggregateFnSatisfaction::Approximate
131 } else {
132 AggregateFnSatisfaction::No
133 }
134 }
135
136 fn partial_dtype(&self, options: &Self::Options, input_dtype: &DType) -> Option<DType> {
137 self.return_dtype(options, input_dtype)
138 }
139
140 fn empty_partial(
141 &self,
142 options: &Self::Options,
143 input_dtype: &DType,
144 ) -> VortexResult<Self::Partial> {
145 Ok(BoundedMinPartial {
146 state: BoundedMinState::Empty,
147 element_dtype: input_dtype.clone(),
148 max_bytes: options.max_bytes,
149 })
150 }
151
152 fn combine_partials(&self, partial: &mut Self::Partial, other: Scalar) -> VortexResult<()> {
153 partial.merge(other);
154 Ok(())
155 }
156
157 fn to_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
158 let dtype = partial.element_dtype.as_nullable();
159 match &partial.state {
160 BoundedMinState::Empty => Ok(Scalar::null(dtype)),
161 BoundedMinState::Value(min) => min.cast(&dtype),
162 }
163 }
164
165 fn reset(&self, partial: &mut Self::Partial) {
166 partial.state = BoundedMinState::Empty;
167 }
168
169 fn is_saturated(&self, _partial: &Self::Partial) -> bool {
170 false
171 }
172
173 fn accumulate(
174 &self,
175 partial: &mut Self::Partial,
176 batch: &Columnar,
177 ctx: &mut ExecutionCtx,
178 ) -> VortexResult<()> {
179 let array = match batch {
182 Columnar::Canonical(canonical) => canonical.clone().into_array(),
183 Columnar::Constant(constant) => constant.clone().into_array(),
184 };
185 let Some(result) = min_max(&array, ctx)? else {
186 return Ok(());
187 };
188 if let Some(bound) = truncate_min(result.min, partial.max_bytes.get())? {
189 partial.merge(bound);
190 }
191 Ok(())
192 }
193
194 fn finalize(&self, partials: ArrayRef) -> VortexResult<ArrayRef> {
195 Ok(partials)
196 }
197
198 fn finalize_scalar(&self, partial: &Self::Partial) -> VortexResult<Scalar> {
199 self.to_scalar(partial)
200 }
201}
202
203fn supported_dtype<'a>(_options: &BoundedMinOptions, input_dtype: &'a DType) -> Option<&'a DType> {
204 MinMax
205 .return_dtype(&EmptyOptions, input_dtype)
206 .map(|_| input_dtype)
207}
208
209fn truncate_min(value: Scalar, max_bytes: usize) -> VortexResult<Option<Scalar>> {
210 let nullability = value.dtype().nullability();
211 match value.dtype() {
212 DType::Utf8(_) => {
213 Ok(
214 lower_bound(BufferString::from_scalar(value)?, max_bytes, nullability)
215 .map(|(bound, _)| bound),
216 )
217 }
218 DType::Binary(_) => {
219 Ok(
220 lower_bound(ByteBuffer::from_scalar(value)?, max_bytes, nullability)
221 .map(|(bound, _)| bound),
222 )
223 }
224 _ => Ok(Some(value)),
225 }
226}
227#[cfg(test)]
228mod tests {
229 use std::num::NonZeroUsize;
230
231 use vortex_buffer::buffer;
232 use vortex_error::VortexExpect;
233 use vortex_error::VortexResult;
234 use vortex_session::VortexSession;
235
236 use crate::IntoArray;
237 use crate::LEGACY_SESSION;
238 use crate::VortexSessionExecute;
239 use crate::aggregate_fn::Accumulator;
240 use crate::aggregate_fn::AggregateFnSatisfaction;
241 use crate::aggregate_fn::AggregateFnVTable;
242 use crate::aggregate_fn::AggregateFnVTableExt;
243 use crate::aggregate_fn::DynAccumulator;
244 use crate::aggregate_fn::EmptyOptions;
245 use crate::aggregate_fn::fns::bounded_min::BoundedMin;
246 use crate::aggregate_fn::fns::bounded_min::BoundedMinOptions;
247 use crate::aggregate_fn::fns::max::Max;
248 use crate::aggregate_fn::fns::min::Min;
249 use crate::arrays::PrimitiveArray;
250 use crate::arrays::VarBinViewArray;
251 use crate::dtype::Nullability;
252 use crate::scalar::Scalar;
253 use crate::session::ArraySession;
254 use crate::validity::Validity;
255
256 fn max_bytes(value: usize) -> NonZeroUsize {
257 NonZeroUsize::new(value).vortex_expect("non-zero max_bytes")
258 }
259
260 fn fresh_session() -> VortexSession {
261 VortexSession::empty().with::<ArraySession>()
262 }
263
264 #[test]
265 fn bounded_min_truncates_utf8_to_lower_bound() -> VortexResult<()> {
266 let mut ctx = LEGACY_SESSION.create_execution_ctx();
267 let array =
268 VarBinViewArray::from_iter_str(["snowman⛄️snowman", "untruncated"]).into_array();
269 let mut acc = Accumulator::try_new(
270 BoundedMin,
271 BoundedMinOptions {
272 max_bytes: max_bytes(9),
273 },
274 array.dtype().clone(),
275 )?;
276
277 acc.accumulate(&array, &mut ctx)?;
278
279 assert_eq!(
280 acc.finish()?,
281 Scalar::utf8("snowman", Nullability::Nullable)
282 );
283 Ok(())
284 }
285
286 #[test]
287 fn bounded_min_keeps_fixed_width_values_exact() -> VortexResult<()> {
288 let mut ctx = LEGACY_SESSION.create_execution_ctx();
289 let array = PrimitiveArray::new(buffer![10i32, 20, 5], Validity::NonNullable).into_array();
290 let mut acc = Accumulator::try_new(
291 BoundedMin,
292 BoundedMinOptions {
293 max_bytes: max_bytes(9),
294 },
295 array.dtype().clone(),
296 )?;
297
298 acc.accumulate(&array, &mut ctx)?;
299
300 assert_eq!(
301 acc.finish()?,
302 Scalar::primitive(5i32, Nullability::Nullable)
303 );
304 Ok(())
305 }
306
307 #[test]
308 fn bounded_min_null_partial_does_not_poison_existing_bound() -> VortexResult<()> {
309 let mut ctx = fresh_session().create_execution_ctx();
310 let values = VarBinViewArray::from_iter_bin([&[1u8][..]]).into_array();
311 let mut acc = Accumulator::try_new(
312 BoundedMin,
313 BoundedMinOptions {
314 max_bytes: max_bytes(2),
315 },
316 values.dtype().clone(),
317 )?;
318
319 acc.accumulate(&values, &mut ctx)?;
320 acc.combine_partials(Scalar::null(values.dtype().as_nullable()))?;
321
322 assert_eq!(
323 acc.finish()?,
324 Scalar::binary(buffer![1u8], Nullability::Nullable)
325 );
326 Ok(())
327 }
328
329 #[test]
330 fn bounded_min_satisfies_min_bounds() {
331 let stored = BoundedMin.bind(BoundedMinOptions {
332 max_bytes: max_bytes(5),
333 });
334 let same = BoundedMin.bind(BoundedMinOptions {
335 max_bytes: max_bytes(5),
336 });
337 let looser_bounded = BoundedMin.bind(BoundedMinOptions {
338 max_bytes: max_bytes(4),
339 });
340 let tighter_bounded = BoundedMin.bind(BoundedMinOptions {
341 max_bytes: max_bytes(6),
342 });
343
344 assert_eq!(stored.can_satisfy(&same), AggregateFnSatisfaction::Exact);
345 assert_eq!(
346 stored.can_satisfy(&looser_bounded),
347 AggregateFnSatisfaction::Approximate
348 );
349 assert_eq!(
350 stored.can_satisfy(&tighter_bounded),
351 AggregateFnSatisfaction::No
352 );
353 assert_eq!(
354 stored.can_satisfy(&Min.bind(EmptyOptions)),
355 AggregateFnSatisfaction::Approximate
356 );
357 assert_eq!(
358 Min.bind(EmptyOptions).can_satisfy(&stored),
359 AggregateFnSatisfaction::Approximate
360 );
361 assert_eq!(
362 stored.can_satisfy(&Max.bind(EmptyOptions)),
363 AggregateFnSatisfaction::No
364 );
365 }
366
367 #[test]
368 fn bounded_min_options_round_trip() -> VortexResult<()> {
369 let options = BoundedMinOptions {
370 max_bytes: max_bytes(64),
371 };
372 let metadata = BoundedMin
373 .serialize(&options)?
374 .expect("serializable options");
375 let roundtrip = BoundedMin.deserialize(&metadata, &VortexSession::empty())?;
376
377 assert_eq!(roundtrip, options);
378 Ok(())
379 }
380}