Skip to main content

reifydb_routine/function/math/
min.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::{
8	Column,
9	columns::Columns,
10	data::ColumnData,
11	view::group_by::{GroupByView, GroupKey},
12};
13use reifydb_type::{
14	fragment::Fragment,
15	value::{
16		Value,
17		decimal::Decimal,
18		int::Int,
19		r#type::{Type, input_types::InputTypes},
20		uint::Uint,
21	},
22};
23
24use crate::function::{Accumulator, Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
25
26pub struct Min {
27	info: FunctionInfo,
28}
29
30impl Default for Min {
31	fn default() -> Self {
32		Self::new()
33	}
34}
35
36impl Min {
37	pub fn new() -> Self {
38		Self {
39			info: FunctionInfo::new("math::min"),
40		}
41	}
42}
43
44impl Function for Min {
45	fn info(&self) -> &FunctionInfo {
46		&self.info
47	}
48
49	fn capabilities(&self) -> &[FunctionCapability] {
50		&[FunctionCapability::Scalar, FunctionCapability::Aggregate]
51	}
52
53	fn return_type(&self, input_types: &[Type]) -> Type {
54		input_types.first().cloned().unwrap_or(Type::Float8)
55	}
56
57	fn accepted_types(&self) -> InputTypes {
58		InputTypes::numeric()
59	}
60
61	fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
62		if args.is_empty() {
63			return Err(FunctionError::ArityMismatch {
64				function: ctx.fragment.clone(),
65				expected: 1,
66				actual: 0,
67			});
68		}
69
70		for (i, col) in args.iter().enumerate() {
71			if !col.get_type().is_number() {
72				return Err(FunctionError::InvalidArgumentType {
73					function: ctx.fragment.clone(),
74					argument_index: i,
75					expected: InputTypes::numeric().expected_at(0).to_vec(),
76					actual: col.get_type(),
77				});
78			}
79		}
80
81		let row_count = args.row_count();
82		let input_type = args[0].get_type();
83		let mut data = ColumnData::with_capacity(input_type, row_count);
84
85		for i in 0..row_count {
86			let mut row_min: Option<Value> = None;
87			for col in args.iter() {
88				if col.data().is_defined(i) {
89					let val = col.data().get_value(i);
90					row_min = Some(match row_min {
91						Some(current) if val < current => val,
92						Some(current) => current,
93						None => val,
94					});
95				}
96			}
97			data.push_value(row_min.unwrap_or(Value::none()));
98		}
99
100		Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), data)]))
101	}
102
103	fn accumulator(&self, _ctx: &FunctionContext) -> Option<Box<dyn Accumulator>> {
104		Some(Box::new(MinAccumulator::new()))
105	}
106}
107
108struct MinAccumulator {
109	pub mins: IndexMap<GroupKey, Value>,
110	input_type: Option<Type>,
111}
112
113impl MinAccumulator {
114	pub fn new() -> Self {
115		Self {
116			mins: IndexMap::new(),
117			input_type: None,
118		}
119	}
120}
121
122macro_rules! min_arm {
123	($self:expr, $column:expr, $groups:expr, $container:expr, $ctor:expr) => {
124		for (group, indices) in $groups.iter() {
125			let mut min = None;
126			for &i in indices {
127				if $column.data().is_defined(i) {
128					if let Some(&val) = $container.get(i) {
129						min = Some(match min {
130							Some(current) if val < current => val,
131							Some(current) => current,
132							None => val,
133						});
134					}
135				}
136			}
137			if let Some(v) = min {
138				$self.mins.insert(group.clone(), $ctor(v));
139			} else {
140				$self.mins.entry(group.clone()).or_insert(Value::none());
141			}
142		}
143	};
144}
145
146impl Accumulator for MinAccumulator {
147	fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), FunctionError> {
148		let column = &args[0];
149		let (data, _bitvec) = column.data().unwrap_option();
150
151		if self.input_type.is_none() {
152			self.input_type = Some(data.get_type());
153		}
154
155		match data {
156			ColumnData::Int1(container) => {
157				min_arm!(self, column, groups, container, Value::Int1);
158				Ok(())
159			}
160			ColumnData::Int2(container) => {
161				min_arm!(self, column, groups, container, Value::Int2);
162				Ok(())
163			}
164			ColumnData::Int4(container) => {
165				min_arm!(self, column, groups, container, Value::Int4);
166				Ok(())
167			}
168			ColumnData::Int8(container) => {
169				min_arm!(self, column, groups, container, Value::Int8);
170				Ok(())
171			}
172			ColumnData::Int16(container) => {
173				min_arm!(self, column, groups, container, Value::Int16);
174				Ok(())
175			}
176			ColumnData::Uint1(container) => {
177				min_arm!(self, column, groups, container, Value::Uint1);
178				Ok(())
179			}
180			ColumnData::Uint2(container) => {
181				min_arm!(self, column, groups, container, Value::Uint2);
182				Ok(())
183			}
184			ColumnData::Uint4(container) => {
185				min_arm!(self, column, groups, container, Value::Uint4);
186				Ok(())
187			}
188			ColumnData::Uint8(container) => {
189				min_arm!(self, column, groups, container, Value::Uint8);
190				Ok(())
191			}
192			ColumnData::Uint16(container) => {
193				min_arm!(self, column, groups, container, Value::Uint16);
194				Ok(())
195			}
196			ColumnData::Float4(container) => {
197				for (group, indices) in groups.iter() {
198					let mut min: Option<f32> = None;
199					for &i in indices {
200						if column.data().is_defined(i)
201							&& let Some(&val) = container.get(i)
202						{
203							min = Some(match min {
204								Some(current) => f32::min(current, val),
205								None => val,
206							});
207						}
208					}
209					if let Some(v) = min {
210						self.mins.insert(group.clone(), Value::float4(v));
211					} else {
212						self.mins.entry(group.clone()).or_insert(Value::none());
213					}
214				}
215				Ok(())
216			}
217			ColumnData::Float8(container) => {
218				for (group, indices) in groups.iter() {
219					let mut min: Option<f64> = None;
220					for &i in indices {
221						if column.data().is_defined(i)
222							&& let Some(&val) = container.get(i)
223						{
224							min = Some(match min {
225								Some(current) => f64::min(current, val),
226								None => val,
227							});
228						}
229					}
230					if let Some(v) = min {
231						self.mins.insert(group.clone(), Value::float8(v));
232					} else {
233						self.mins.entry(group.clone()).or_insert(Value::none());
234					}
235				}
236				Ok(())
237			}
238			ColumnData::Int {
239				container,
240				..
241			} => {
242				for (group, indices) in groups.iter() {
243					let mut min: Option<Int> = None;
244					for &i in indices {
245						if column.data().is_defined(i)
246							&& let Some(val) = container.get(i)
247						{
248							min = Some(match min {
249								Some(current) if *val < current => val.clone(),
250								Some(current) => current,
251								None => val.clone(),
252							});
253						}
254					}
255					if let Some(v) = min {
256						self.mins.insert(group.clone(), Value::Int(v));
257					} else {
258						self.mins.entry(group.clone()).or_insert(Value::none());
259					}
260				}
261				Ok(())
262			}
263			ColumnData::Uint {
264				container,
265				..
266			} => {
267				for (group, indices) in groups.iter() {
268					let mut min: Option<Uint> = None;
269					for &i in indices {
270						if column.data().is_defined(i)
271							&& let Some(val) = container.get(i)
272						{
273							min = Some(match min {
274								Some(current) if *val < current => val.clone(),
275								Some(current) => current,
276								None => val.clone(),
277							});
278						}
279					}
280					if let Some(v) = min {
281						self.mins.insert(group.clone(), Value::Uint(v));
282					} else {
283						self.mins.entry(group.clone()).or_insert(Value::none());
284					}
285				}
286				Ok(())
287			}
288			ColumnData::Decimal {
289				container,
290				..
291			} => {
292				for (group, indices) in groups.iter() {
293					let mut min: Option<Decimal> = None;
294					for &i in indices {
295						if column.data().is_defined(i)
296							&& let Some(val) = container.get(i)
297						{
298							min = Some(match min {
299								Some(current) if *val < current => val.clone(),
300								Some(current) => current,
301								None => val.clone(),
302							});
303						}
304					}
305					if let Some(v) = min {
306						self.mins.insert(group.clone(), Value::Decimal(v));
307					} else {
308						self.mins.entry(group.clone()).or_insert(Value::none());
309					}
310				}
311				Ok(())
312			}
313			other => Err(FunctionError::InvalidArgumentType {
314				function: Fragment::internal("math::min"),
315				argument_index: 0,
316				expected: InputTypes::numeric().expected_at(0).to_vec(),
317				actual: other.get_type(),
318			}),
319		}
320	}
321
322	fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnData), FunctionError> {
323		let ty = self.input_type.take().unwrap_or(Type::Float8);
324		let mut keys = Vec::with_capacity(self.mins.len());
325		let mut data = ColumnData::with_capacity(ty, self.mins.len());
326
327		for (key, min) in mem::take(&mut self.mins) {
328			keys.push(key);
329			data.push_value(min);
330		}
331
332		Ok((keys, data))
333	}
334}