reifydb_function/math/aggregate/
min.rs1use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::data::ColumnData;
8use reifydb_type::value::Value;
9
10use crate::{AggregateFunction, AggregateFunctionContext, error::AggregateFunctionResult};
11
12pub struct Min {
13 pub mins: IndexMap<Vec<Value>, f64>,
14}
15
16impl Min {
17 pub fn new() -> Self {
18 Self {
19 mins: IndexMap::new(),
20 }
21 }
22}
23
24impl AggregateFunction for Min {
25 fn aggregate(&mut self, ctx: AggregateFunctionContext) -> AggregateFunctionResult<()> {
26 let column = ctx.column;
27 let groups = &ctx.groups;
28
29 match &column.data() {
30 ColumnData::Float8(container) => {
31 for (group, indices) in groups.iter() {
32 let min_val = indices
33 .iter()
34 .filter_map(|&i| container.get(i))
35 .min_by(|a, b| a.partial_cmp(b).unwrap());
36
37 if let Some(min_val) = min_val {
38 self.mins
39 .entry(group.clone())
40 .and_modify(|v| *v = f64::min(*v, *min_val))
41 .or_insert(*min_val);
42 }
43 }
44 Ok(())
45 }
46 ColumnData::Float4(container) => {
47 for (group, indices) in groups.iter() {
48 let min_val = indices
49 .iter()
50 .filter_map(|&i| container.get(i))
51 .min_by(|a, b| a.partial_cmp(b).unwrap());
52
53 if let Some(min_val) = min_val {
54 self.mins
55 .entry(group.clone())
56 .and_modify(|v| *v = f64::min(*v, *min_val as f64))
57 .or_insert(*min_val as f64);
58 }
59 }
60 Ok(())
61 }
62 ColumnData::Int2(container) => {
63 for (group, indices) in groups.iter() {
64 let min_val = indices
65 .iter()
66 .filter_map(|&i| container.get(i))
67 .min_by(|a, b| a.partial_cmp(b).unwrap());
68
69 if let Some(min_val) = min_val {
70 self.mins
71 .entry(group.clone())
72 .and_modify(|v| *v = f64::min(*v, *min_val as f64))
73 .or_insert(*min_val as f64);
74 }
75 }
76 Ok(())
77 }
78 ColumnData::Int4(container) => {
79 for (group, indices) in groups.iter() {
80 let min_val = indices
81 .iter()
82 .filter_map(|&i| container.get(i))
83 .min_by(|a, b| a.partial_cmp(b).unwrap());
84
85 if let Some(min_val) = min_val {
86 self.mins
87 .entry(group.clone())
88 .and_modify(|v| *v = f64::min(*v, *min_val as f64))
89 .or_insert(*min_val as f64);
90 }
91 }
92 Ok(())
93 }
94 _ => unimplemented!(),
95 }
96 }
97
98 fn finalize(&mut self) -> AggregateFunctionResult<(Vec<Vec<Value>>, ColumnData)> {
99 let mut keys = Vec::with_capacity(self.mins.len());
100 let mut data = ColumnData::float8_with_capacity(self.mins.len());
101
102 for (key, min) in mem::take(&mut self.mins) {
103 keys.push(key);
104 data.push_value(Value::float8(min));
105 }
106
107 Ok((keys, data))
108 }
109}