Skip to main content

reifydb_function/math/aggregate/
min.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::data::ColumnData;
8use reifydb_type::value::Value;
9
10use crate::{AggregateFunction, AggregateFunctionContext, error::AggregateFunctionResult};
11
12pub struct Min {
13	pub mins: IndexMap<Vec<Value>, f64>,
14}
15
16impl Min {
17	pub fn new() -> Self {
18		Self {
19			mins: IndexMap::new(),
20		}
21	}
22}
23
24impl AggregateFunction for Min {
25	fn aggregate(&mut self, ctx: AggregateFunctionContext) -> AggregateFunctionResult<()> {
26		let column = ctx.column;
27		let groups = &ctx.groups;
28
29		match &column.data() {
30			ColumnData::Float8(container) => {
31				for (group, indices) in groups.iter() {
32					let min_val = indices
33						.iter()
34						.filter_map(|&i| container.get(i))
35						.min_by(|a, b| a.partial_cmp(b).unwrap());
36
37					if let Some(min_val) = min_val {
38						self.mins
39							.entry(group.clone())
40							.and_modify(|v| *v = f64::min(*v, *min_val))
41							.or_insert(*min_val);
42					}
43				}
44				Ok(())
45			}
46			ColumnData::Float4(container) => {
47				for (group, indices) in groups.iter() {
48					let min_val = indices
49						.iter()
50						.filter_map(|&i| container.get(i))
51						.min_by(|a, b| a.partial_cmp(b).unwrap());
52
53					if let Some(min_val) = min_val {
54						self.mins
55							.entry(group.clone())
56							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
57							.or_insert(*min_val as f64);
58					}
59				}
60				Ok(())
61			}
62			ColumnData::Int2(container) => {
63				for (group, indices) in groups.iter() {
64					let min_val = indices
65						.iter()
66						.filter_map(|&i| container.get(i))
67						.min_by(|a, b| a.partial_cmp(b).unwrap());
68
69					if let Some(min_val) = min_val {
70						self.mins
71							.entry(group.clone())
72							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
73							.or_insert(*min_val as f64);
74					}
75				}
76				Ok(())
77			}
78			ColumnData::Int4(container) => {
79				for (group, indices) in groups.iter() {
80					let min_val = indices
81						.iter()
82						.filter_map(|&i| container.get(i))
83						.min_by(|a, b| a.partial_cmp(b).unwrap());
84
85					if let Some(min_val) = min_val {
86						self.mins
87							.entry(group.clone())
88							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
89							.or_insert(*min_val as f64);
90					}
91				}
92				Ok(())
93			}
94			_ => unimplemented!(),
95		}
96	}
97
98	fn finalize(&mut self) -> AggregateFunctionResult<(Vec<Vec<Value>>, ColumnData)> {
99		let mut keys = Vec::with_capacity(self.mins.len());
100		let mut data = ColumnData::float8_with_capacity(self.mins.len());
101
102		for (key, min) in mem::take(&mut self.mins) {
103			keys.push(key);
104			data.push_value(Value::float8(min));
105		}
106
107		Ok((keys, data))
108	}
109}