reifydb_routine/function/math/
min.rs1use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::{
8 Column,
9 columns::Columns,
10 data::ColumnData,
11 view::group_by::{GroupByView, GroupKey},
12};
13use reifydb_type::{
14 fragment::Fragment,
15 value::{
16 Value,
17 decimal::Decimal,
18 int::Int,
19 r#type::{Type, input_types::InputTypes},
20 uint::Uint,
21 },
22};
23
24use crate::function::{Accumulator, Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
25
26pub struct Min {
27 info: FunctionInfo,
28}
29
30impl Default for Min {
31 fn default() -> Self {
32 Self::new()
33 }
34}
35
36impl Min {
37 pub fn new() -> Self {
38 Self {
39 info: FunctionInfo::new("math::min"),
40 }
41 }
42}
43
44impl Function for Min {
45 fn info(&self) -> &FunctionInfo {
46 &self.info
47 }
48
49 fn capabilities(&self) -> &[FunctionCapability] {
50 &[FunctionCapability::Scalar, FunctionCapability::Aggregate]
51 }
52
53 fn return_type(&self, input_types: &[Type]) -> Type {
54 input_types.first().cloned().unwrap_or(Type::Float8)
55 }
56
57 fn accepted_types(&self) -> InputTypes {
58 InputTypes::numeric()
59 }
60
61 fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
62 if args.is_empty() {
63 return Err(FunctionError::ArityMismatch {
64 function: ctx.fragment.clone(),
65 expected: 1,
66 actual: 0,
67 });
68 }
69
70 for (i, col) in args.iter().enumerate() {
71 if !col.get_type().is_number() {
72 return Err(FunctionError::InvalidArgumentType {
73 function: ctx.fragment.clone(),
74 argument_index: i,
75 expected: InputTypes::numeric().expected_at(0).to_vec(),
76 actual: col.get_type(),
77 });
78 }
79 }
80
81 let row_count = args.row_count();
82 let input_type = args[0].get_type();
83 let mut data = ColumnData::with_capacity(input_type, row_count);
84
85 for i in 0..row_count {
86 let mut row_min: Option<Value> = None;
87 for col in args.iter() {
88 if col.data().is_defined(i) {
89 let val = col.data().get_value(i);
90 row_min = Some(match row_min {
91 Some(current) if val < current => val,
92 Some(current) => current,
93 None => val,
94 });
95 }
96 }
97 data.push_value(row_min.unwrap_or(Value::none()));
98 }
99
100 Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), data)]))
101 }
102
103 fn accumulator(&self, _ctx: &FunctionContext) -> Option<Box<dyn Accumulator>> {
104 Some(Box::new(MinAccumulator::new()))
105 }
106}
107
108struct MinAccumulator {
109 pub mins: IndexMap<GroupKey, Value>,
110 input_type: Option<Type>,
111}
112
113impl MinAccumulator {
114 pub fn new() -> Self {
115 Self {
116 mins: IndexMap::new(),
117 input_type: None,
118 }
119 }
120}
121
122macro_rules! min_arm {
123 ($self:expr, $column:expr, $groups:expr, $container:expr, $ctor:expr) => {
124 for (group, indices) in $groups.iter() {
125 let mut min = None;
126 for &i in indices {
127 if $column.data().is_defined(i) {
128 if let Some(&val) = $container.get(i) {
129 min = Some(match min {
130 Some(current) if val < current => val,
131 Some(current) => current,
132 None => val,
133 });
134 }
135 }
136 }
137 if let Some(v) = min {
138 $self.mins.insert(group.clone(), $ctor(v));
139 } else {
140 $self.mins.entry(group.clone()).or_insert(Value::none());
141 }
142 }
143 };
144}
145
146impl Accumulator for MinAccumulator {
147 fn update(&mut self, args: &Columns, groups: &GroupByView) -> Result<(), FunctionError> {
148 let column = &args[0];
149 let (data, _bitvec) = column.data().unwrap_option();
150
151 if self.input_type.is_none() {
152 self.input_type = Some(data.get_type());
153 }
154
155 match data {
156 ColumnData::Int1(container) => {
157 min_arm!(self, column, groups, container, Value::Int1);
158 Ok(())
159 }
160 ColumnData::Int2(container) => {
161 min_arm!(self, column, groups, container, Value::Int2);
162 Ok(())
163 }
164 ColumnData::Int4(container) => {
165 min_arm!(self, column, groups, container, Value::Int4);
166 Ok(())
167 }
168 ColumnData::Int8(container) => {
169 min_arm!(self, column, groups, container, Value::Int8);
170 Ok(())
171 }
172 ColumnData::Int16(container) => {
173 min_arm!(self, column, groups, container, Value::Int16);
174 Ok(())
175 }
176 ColumnData::Uint1(container) => {
177 min_arm!(self, column, groups, container, Value::Uint1);
178 Ok(())
179 }
180 ColumnData::Uint2(container) => {
181 min_arm!(self, column, groups, container, Value::Uint2);
182 Ok(())
183 }
184 ColumnData::Uint4(container) => {
185 min_arm!(self, column, groups, container, Value::Uint4);
186 Ok(())
187 }
188 ColumnData::Uint8(container) => {
189 min_arm!(self, column, groups, container, Value::Uint8);
190 Ok(())
191 }
192 ColumnData::Uint16(container) => {
193 min_arm!(self, column, groups, container, Value::Uint16);
194 Ok(())
195 }
196 ColumnData::Float4(container) => {
197 for (group, indices) in groups.iter() {
198 let mut min: Option<f32> = None;
199 for &i in indices {
200 if column.data().is_defined(i)
201 && let Some(&val) = container.get(i)
202 {
203 min = Some(match min {
204 Some(current) => f32::min(current, val),
205 None => val,
206 });
207 }
208 }
209 if let Some(v) = min {
210 self.mins.insert(group.clone(), Value::float4(v));
211 } else {
212 self.mins.entry(group.clone()).or_insert(Value::none());
213 }
214 }
215 Ok(())
216 }
217 ColumnData::Float8(container) => {
218 for (group, indices) in groups.iter() {
219 let mut min: Option<f64> = None;
220 for &i in indices {
221 if column.data().is_defined(i)
222 && let Some(&val) = container.get(i)
223 {
224 min = Some(match min {
225 Some(current) => f64::min(current, val),
226 None => val,
227 });
228 }
229 }
230 if let Some(v) = min {
231 self.mins.insert(group.clone(), Value::float8(v));
232 } else {
233 self.mins.entry(group.clone()).or_insert(Value::none());
234 }
235 }
236 Ok(())
237 }
238 ColumnData::Int {
239 container,
240 ..
241 } => {
242 for (group, indices) in groups.iter() {
243 let mut min: Option<Int> = None;
244 for &i in indices {
245 if column.data().is_defined(i)
246 && let Some(val) = container.get(i)
247 {
248 min = Some(match min {
249 Some(current) if *val < current => val.clone(),
250 Some(current) => current,
251 None => val.clone(),
252 });
253 }
254 }
255 if let Some(v) = min {
256 self.mins.insert(group.clone(), Value::Int(v));
257 } else {
258 self.mins.entry(group.clone()).or_insert(Value::none());
259 }
260 }
261 Ok(())
262 }
263 ColumnData::Uint {
264 container,
265 ..
266 } => {
267 for (group, indices) in groups.iter() {
268 let mut min: Option<Uint> = None;
269 for &i in indices {
270 if column.data().is_defined(i)
271 && let Some(val) = container.get(i)
272 {
273 min = Some(match min {
274 Some(current) if *val < current => val.clone(),
275 Some(current) => current,
276 None => val.clone(),
277 });
278 }
279 }
280 if let Some(v) = min {
281 self.mins.insert(group.clone(), Value::Uint(v));
282 } else {
283 self.mins.entry(group.clone()).or_insert(Value::none());
284 }
285 }
286 Ok(())
287 }
288 ColumnData::Decimal {
289 container,
290 ..
291 } => {
292 for (group, indices) in groups.iter() {
293 let mut min: Option<Decimal> = None;
294 for &i in indices {
295 if column.data().is_defined(i)
296 && let Some(val) = container.get(i)
297 {
298 min = Some(match min {
299 Some(current) if *val < current => val.clone(),
300 Some(current) => current,
301 None => val.clone(),
302 });
303 }
304 }
305 if let Some(v) = min {
306 self.mins.insert(group.clone(), Value::Decimal(v));
307 } else {
308 self.mins.entry(group.clone()).or_insert(Value::none());
309 }
310 }
311 Ok(())
312 }
313 other => Err(FunctionError::InvalidArgumentType {
314 function: Fragment::internal("math::min"),
315 argument_index: 0,
316 expected: InputTypes::numeric().expected_at(0).to_vec(),
317 actual: other.get_type(),
318 }),
319 }
320 }
321
322 fn finalize(&mut self) -> Result<(Vec<GroupKey>, ColumnData), FunctionError> {
323 let ty = self.input_type.take().unwrap_or(Type::Float8);
324 let mut keys = Vec::with_capacity(self.mins.len());
325 let mut data = ColumnData::with_capacity(ty, self.mins.len());
326
327 for (key, min) in mem::take(&mut self.mins) {
328 keys.push(key);
329 data.push_value(min);
330 }
331
332 Ok((keys, data))
333 }
334}