Skip to main content

reifydb_function/math/scalar/
min.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::data::ColumnData;
5use reifydb_type::value::{container::number::NumberContainer, decimal::Decimal, int::Int, r#type::Type, uint::Uint};
6
7use crate::{
8	ScalarFunction, ScalarFunctionContext,
9	error::{ScalarFunctionError, ScalarFunctionResult},
10	propagate_options,
11};
12
13pub struct Min;
14
15impl Min {
16	pub fn new() -> Self {
17		Self
18	}
19}
20
21impl ScalarFunction for Min {
22	fn scalar(&self, ctx: ScalarFunctionContext) -> ScalarFunctionResult<ColumnData> {
23		if let Some(result) = propagate_options(self, &ctx) {
24			return result;
25		}
26		let columns = ctx.columns;
27		let row_count = ctx.row_count;
28
29		// Validate at least 1 argument
30		if columns.is_empty() {
31			return Err(ScalarFunctionError::ArityMismatch {
32				function: ctx.fragment.clone(),
33				expected: 1,
34				actual: 0,
35			});
36		}
37
38		// For min function, we need to find the minimum value across all columns for each row
39		let first_column = columns.get(0).unwrap();
40
41		match first_column.data() {
42			ColumnData::Int1(_) => {
43				let mut result = Vec::with_capacity(row_count);
44				let mut bitvec = Vec::with_capacity(row_count);
45
46				for row_idx in 0..row_count {
47					let mut min_value: Option<i8> = None;
48
49					// Check all columns for this row
50					for column in columns.iter() {
51						if let ColumnData::Int1(container) = column.data() {
52							if let Some(value) = container.get(row_idx) {
53								min_value = Some(match min_value {
54									None => *value,
55									Some(current_min) => current_min.min(*value),
56								});
57							}
58						}
59					}
60
61					match min_value {
62						Some(v) => {
63							result.push(v);
64							bitvec.push(true);
65						}
66						None => {
67							result.push(0);
68							bitvec.push(false);
69						}
70					}
71				}
72
73				Ok(ColumnData::int1_with_bitvec(result, bitvec))
74			}
75			ColumnData::Int2(_) => {
76				let mut result = Vec::with_capacity(row_count);
77				let mut bitvec = Vec::with_capacity(row_count);
78
79				for row_idx in 0..row_count {
80					let mut min_value: Option<i16> = None;
81
82					for column in columns.iter() {
83						if let ColumnData::Int2(container) = column.data() {
84							if let Some(value) = container.get(row_idx) {
85								min_value = Some(match min_value {
86									None => *value,
87									Some(current_min) => current_min.min(*value),
88								});
89							}
90						}
91					}
92
93					match min_value {
94						Some(v) => {
95							result.push(v);
96							bitvec.push(true);
97						}
98						None => {
99							result.push(0);
100							bitvec.push(false);
101						}
102					}
103				}
104
105				Ok(ColumnData::int2_with_bitvec(result, bitvec))
106			}
107			ColumnData::Int4(_) => {
108				let mut result = Vec::with_capacity(row_count);
109				let mut bitvec = Vec::with_capacity(row_count);
110
111				for row_idx in 0..row_count {
112					let mut min_value: Option<i32> = None;
113
114					for column in columns.iter() {
115						if let ColumnData::Int4(container) = column.data() {
116							if let Some(value) = container.get(row_idx) {
117								min_value = Some(match min_value {
118									None => *value,
119									Some(current_min) => current_min.min(*value),
120								});
121							}
122						}
123					}
124
125					match min_value {
126						Some(v) => {
127							result.push(v);
128							bitvec.push(true);
129						}
130						None => {
131							result.push(0);
132							bitvec.push(false);
133						}
134					}
135				}
136
137				Ok(ColumnData::int4_with_bitvec(result, bitvec))
138			}
139			ColumnData::Int8(_) => {
140				let mut result = Vec::with_capacity(row_count);
141				let mut bitvec = Vec::with_capacity(row_count);
142
143				for row_idx in 0..row_count {
144					let mut min_value: Option<i64> = None;
145
146					for column in columns.iter() {
147						if let ColumnData::Int8(container) = column.data() {
148							if let Some(value) = container.get(row_idx) {
149								min_value = Some(match min_value {
150									None => *value,
151									Some(current_min) => current_min.min(*value),
152								});
153							}
154						}
155					}
156
157					match min_value {
158						Some(v) => {
159							result.push(v);
160							bitvec.push(true);
161						}
162						None => {
163							result.push(0);
164							bitvec.push(false);
165						}
166					}
167				}
168
169				Ok(ColumnData::int8_with_bitvec(result, bitvec))
170			}
171			ColumnData::Int16(_) => {
172				let mut result = Vec::with_capacity(row_count);
173				let mut bitvec = Vec::with_capacity(row_count);
174
175				for row_idx in 0..row_count {
176					let mut min_value: Option<i128> = None;
177
178					for column in columns.iter() {
179						if let ColumnData::Int16(container) = column.data() {
180							if let Some(value) = container.get(row_idx) {
181								min_value = Some(match min_value {
182									None => *value,
183									Some(current_min) => current_min.min(*value),
184								});
185							}
186						}
187					}
188
189					match min_value {
190						Some(v) => {
191							result.push(v);
192							bitvec.push(true);
193						}
194						None => {
195							result.push(0);
196							bitvec.push(false);
197						}
198					}
199				}
200
201				Ok(ColumnData::int16_with_bitvec(result, bitvec))
202			}
203			ColumnData::Uint1(_) => {
204				let mut result = Vec::with_capacity(row_count);
205				let mut bitvec = Vec::with_capacity(row_count);
206
207				for row_idx in 0..row_count {
208					let mut min_value: Option<u8> = None;
209
210					for column in columns.iter() {
211						if let ColumnData::Uint1(container) = column.data() {
212							if let Some(value) = container.get(row_idx) {
213								min_value = Some(match min_value {
214									None => *value,
215									Some(current_min) => current_min.min(*value),
216								});
217							}
218						}
219					}
220
221					match min_value {
222						Some(v) => {
223							result.push(v);
224							bitvec.push(true);
225						}
226						None => {
227							result.push(0);
228							bitvec.push(false);
229						}
230					}
231				}
232
233				Ok(ColumnData::uint1_with_bitvec(result, bitvec))
234			}
235			ColumnData::Uint2(_) => {
236				let mut result = Vec::with_capacity(row_count);
237				let mut bitvec = Vec::with_capacity(row_count);
238
239				for row_idx in 0..row_count {
240					let mut min_value: Option<u16> = None;
241
242					for column in columns.iter() {
243						if let ColumnData::Uint2(container) = column.data() {
244							if let Some(value) = container.get(row_idx) {
245								min_value = Some(match min_value {
246									None => *value,
247									Some(current_min) => current_min.min(*value),
248								});
249							}
250						}
251					}
252
253					match min_value {
254						Some(v) => {
255							result.push(v);
256							bitvec.push(true);
257						}
258						None => {
259							result.push(0);
260							bitvec.push(false);
261						}
262					}
263				}
264
265				Ok(ColumnData::uint2_with_bitvec(result, bitvec))
266			}
267			ColumnData::Uint4(_) => {
268				let mut result = Vec::with_capacity(row_count);
269				let mut bitvec = Vec::with_capacity(row_count);
270
271				for row_idx in 0..row_count {
272					let mut min_value: Option<u32> = None;
273
274					for column in columns.iter() {
275						if let ColumnData::Uint4(container) = column.data() {
276							if let Some(value) = container.get(row_idx) {
277								min_value = Some(match min_value {
278									None => *value,
279									Some(current_min) => current_min.min(*value),
280								});
281							}
282						}
283					}
284
285					match min_value {
286						Some(v) => {
287							result.push(v);
288							bitvec.push(true);
289						}
290						None => {
291							result.push(0);
292							bitvec.push(false);
293						}
294					}
295				}
296
297				Ok(ColumnData::uint4_with_bitvec(result, bitvec))
298			}
299			ColumnData::Uint8(_) => {
300				let mut result = Vec::with_capacity(row_count);
301				let mut bitvec = Vec::with_capacity(row_count);
302
303				for row_idx in 0..row_count {
304					let mut min_value: Option<u64> = None;
305
306					for column in columns.iter() {
307						if let ColumnData::Uint8(container) = column.data() {
308							if let Some(value) = container.get(row_idx) {
309								min_value = Some(match min_value {
310									None => *value,
311									Some(current_min) => current_min.min(*value),
312								});
313							}
314						}
315					}
316
317					match min_value {
318						Some(v) => {
319							result.push(v);
320							bitvec.push(true);
321						}
322						None => {
323							result.push(0);
324							bitvec.push(false);
325						}
326					}
327				}
328
329				Ok(ColumnData::uint8_with_bitvec(result, bitvec))
330			}
331			ColumnData::Uint16(_) => {
332				let mut result = Vec::with_capacity(row_count);
333				let mut bitvec = Vec::with_capacity(row_count);
334
335				for row_idx in 0..row_count {
336					let mut min_value: Option<u128> = None;
337
338					for column in columns.iter() {
339						if let ColumnData::Uint16(container) = column.data() {
340							if let Some(value) = container.get(row_idx) {
341								min_value = Some(match min_value {
342									None => *value,
343									Some(current_min) => current_min.min(*value),
344								});
345							}
346						}
347					}
348
349					match min_value {
350						Some(v) => {
351							result.push(v);
352							bitvec.push(true);
353						}
354						None => {
355							result.push(0);
356							bitvec.push(false);
357						}
358					}
359				}
360
361				Ok(ColumnData::uint16_with_bitvec(result, bitvec))
362			}
363			ColumnData::Float4(_) => {
364				let mut result = Vec::with_capacity(row_count);
365				let mut bitvec = Vec::with_capacity(row_count);
366
367				for row_idx in 0..row_count {
368					let mut min_value: Option<f32> = None;
369
370					for column in columns.iter() {
371						if let ColumnData::Float4(container) = column.data() {
372							if let Some(value) = container.get(row_idx) {
373								min_value = Some(match min_value {
374									None => *value,
375									Some(current_min) => current_min.min(*value),
376								});
377							}
378						}
379					}
380
381					match min_value {
382						Some(v) => {
383							result.push(v);
384							bitvec.push(true);
385						}
386						None => {
387							result.push(0.0);
388							bitvec.push(false);
389						}
390					}
391				}
392
393				Ok(ColumnData::float4_with_bitvec(result, bitvec))
394			}
395			ColumnData::Float8(_) => {
396				let mut result = Vec::with_capacity(row_count);
397				let mut bitvec = Vec::with_capacity(row_count);
398
399				for row_idx in 0..row_count {
400					let mut min_value: Option<f64> = None;
401
402					for column in columns.iter() {
403						if let ColumnData::Float8(container) = column.data() {
404							if let Some(value) = container.get(row_idx) {
405								min_value = Some(match min_value {
406									None => *value,
407									Some(current_min) => current_min.min(*value),
408								});
409							}
410						}
411					}
412
413					match min_value {
414						Some(v) => {
415							result.push(v);
416							bitvec.push(true);
417						}
418						None => {
419							result.push(0.0);
420							bitvec.push(false);
421						}
422					}
423				}
424
425				Ok(ColumnData::float8_with_bitvec(result, bitvec))
426			}
427			ColumnData::Int {
428				max_bytes,
429				..
430			} => {
431				let mut result = Vec::with_capacity(row_count);
432				let mut bitvec = Vec::with_capacity(row_count);
433
434				for row_idx in 0..row_count {
435					let mut min_value: Option<Int> = None;
436
437					for column in columns.iter() {
438						if let ColumnData::Int {
439							container,
440							..
441						} = column.data()
442						{
443							if let Some(value) = container.get(row_idx) {
444								min_value = Some(match min_value {
445									None => value.clone(),
446									Some(current_min) => {
447										if value < &current_min {
448											value.clone()
449										} else {
450											current_min
451										}
452									}
453								});
454							}
455						}
456					}
457
458					match min_value {
459						Some(v) => {
460							result.push(v);
461							bitvec.push(true);
462						}
463						None => {
464							result.push(Int::default());
465							bitvec.push(false);
466						}
467					}
468				}
469
470				Ok(ColumnData::Int {
471					container: NumberContainer::new(result),
472					max_bytes: *max_bytes,
473				})
474			}
475			ColumnData::Uint {
476				max_bytes,
477				..
478			} => {
479				let mut result = Vec::with_capacity(row_count);
480				let mut bitvec = Vec::with_capacity(row_count);
481
482				for row_idx in 0..row_count {
483					let mut min_value: Option<Uint> = None;
484
485					for column in columns.iter() {
486						if let ColumnData::Uint {
487							container,
488							..
489						} = column.data()
490						{
491							if let Some(value) = container.get(row_idx) {
492								min_value = Some(match min_value {
493									None => value.clone(),
494									Some(current_min) => {
495										if value < &current_min {
496											value.clone()
497										} else {
498											current_min
499										}
500									}
501								});
502							}
503						}
504					}
505
506					match min_value {
507						Some(v) => {
508							result.push(v);
509							bitvec.push(true);
510						}
511						None => {
512							result.push(Uint::default());
513							bitvec.push(false);
514						}
515					}
516				}
517
518				Ok(ColumnData::Uint {
519					container: NumberContainer::new(result),
520					max_bytes: *max_bytes,
521				})
522			}
523			ColumnData::Decimal {
524				precision,
525				scale,
526				..
527			} => {
528				let mut result = Vec::with_capacity(row_count);
529				let mut bitvec = Vec::with_capacity(row_count);
530
531				for row_idx in 0..row_count {
532					let mut min_value: Option<Decimal> = None;
533
534					for column in columns.iter() {
535						if let ColumnData::Decimal {
536							container,
537							..
538						} = column.data()
539						{
540							if let Some(value) = container.get(row_idx) {
541								min_value = Some(match min_value {
542									None => value.clone(),
543									Some(current_min) => {
544										if value < &current_min {
545											value.clone()
546										} else {
547											current_min
548										}
549									}
550								});
551							}
552						}
553					}
554
555					match min_value {
556						Some(v) => {
557							result.push(v);
558							bitvec.push(true);
559						}
560						None => {
561							result.push(Decimal::default());
562							bitvec.push(false);
563						}
564					}
565				}
566
567				Ok(ColumnData::Decimal {
568					container: NumberContainer::new(result),
569					precision: *precision,
570					scale: *scale,
571				})
572			}
573			other => Err(ScalarFunctionError::InvalidArgumentType {
574				function: ctx.fragment.clone(),
575				argument_index: 0,
576				expected: vec![
577					Type::Int1,
578					Type::Int2,
579					Type::Int4,
580					Type::Int8,
581					Type::Int16,
582					Type::Uint1,
583					Type::Uint2,
584					Type::Uint4,
585					Type::Uint8,
586					Type::Uint16,
587					Type::Float4,
588					Type::Float8,
589					Type::Int,
590					Type::Uint,
591					Type::Decimal,
592				],
593				actual: other.get_type(),
594			}),
595		}
596	}
597
598	fn return_type(&self, input_types: &[Type]) -> Type {
599		input_types[0].clone()
600	}
601}