reifydb_engine/function/math/aggregate/
min.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use indexmap::IndexMap;
5use reifydb_core::value::column::ColumnData;
6use reifydb_type::Value;
7
8use crate::function::{AggregateFunction, AggregateFunctionContext};
9
10pub struct Min {
11	pub mins: IndexMap<Vec<Value>, f64>,
12}
13
14impl Min {
15	pub fn new() -> Self {
16		Self {
17			mins: IndexMap::new(),
18		}
19	}
20}
21
22impl AggregateFunction for Min {
23	fn aggregate(&mut self, ctx: AggregateFunctionContext) -> crate::Result<()> {
24		let column = ctx.column;
25		let groups = &ctx.groups;
26
27		match &column.data() {
28			ColumnData::Float8(container) => {
29				for (group, indices) in groups.iter() {
30					let min_val = indices
31						.iter()
32						.filter_map(|&i| container.get(i))
33						.min_by(|a, b| a.partial_cmp(b).unwrap());
34
35					if let Some(min_val) = min_val {
36						self.mins
37							.entry(group.clone())
38							.and_modify(|v| *v = f64::min(*v, *min_val))
39							.or_insert(*min_val);
40					}
41				}
42				Ok(())
43			}
44			ColumnData::Float4(container) => {
45				for (group, indices) in groups.iter() {
46					let min_val = indices
47						.iter()
48						.filter_map(|&i| container.get(i))
49						.min_by(|a, b| a.partial_cmp(b).unwrap());
50
51					if let Some(min_val) = min_val {
52						self.mins
53							.entry(group.clone())
54							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
55							.or_insert(*min_val as f64);
56					}
57				}
58				Ok(())
59			}
60			ColumnData::Int2(container) => {
61				for (group, indices) in groups.iter() {
62					let min_val = indices
63						.iter()
64						.filter_map(|&i| container.get(i))
65						.min_by(|a, b| a.partial_cmp(b).unwrap());
66
67					if let Some(min_val) = min_val {
68						self.mins
69							.entry(group.clone())
70							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
71							.or_insert(*min_val as f64);
72					}
73				}
74				Ok(())
75			}
76			ColumnData::Int4(container) => {
77				for (group, indices) in groups.iter() {
78					let min_val = indices
79						.iter()
80						.filter_map(|&i| container.get(i))
81						.min_by(|a, b| a.partial_cmp(b).unwrap());
82
83					if let Some(min_val) = min_val {
84						self.mins
85							.entry(group.clone())
86							.and_modify(|v| *v = f64::min(*v, *min_val as f64))
87							.or_insert(*min_val as f64);
88					}
89				}
90				Ok(())
91			}
92			_ => unimplemented!(),
93		}
94	}
95
96	fn finalize(&mut self) -> crate::Result<(Vec<Vec<Value>>, ColumnData)> {
97		let mut keys = Vec::with_capacity(self.mins.len());
98		let mut data = ColumnData::float8_with_capacity(self.mins.len());
99
100		for (key, min) in std::mem::take(&mut self.mins) {
101			keys.push(key);
102			data.push_value(Value::float8(min));
103		}
104
105		Ok((keys, data))
106	}
107}