reifydb_function/math/aggregate/
max.rs1use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::data::ColumnData;
8use reifydb_type::value::Value;
9
10use crate::{AggregateFunction, AggregateFunctionContext, error::AggregateFunctionResult};
11
12pub struct Max {
13 pub maxs: IndexMap<Vec<Value>, f64>,
14}
15
16impl Max {
17 pub fn new() -> Self {
18 Self {
19 maxs: IndexMap::new(),
20 }
21 }
22}
23
24impl AggregateFunction for Max {
25 fn aggregate(&mut self, ctx: AggregateFunctionContext) -> AggregateFunctionResult<()> {
26 let column = ctx.column;
27 let groups = &ctx.groups;
28
29 match &column.data() {
30 ColumnData::Float8(container) => {
31 for (group, indices) in groups.iter() {
32 let max_val = indices
33 .iter()
34 .filter_map(|&i| container.get(i))
35 .max_by(|a, b| a.partial_cmp(b).unwrap());
36
37 if let Some(max_val) = max_val {
38 self.maxs
39 .entry(group.clone())
40 .and_modify(|v| *v = f64::max(*v, *max_val))
41 .or_insert(*max_val);
42 }
43 }
44 Ok(())
45 }
46 ColumnData::Float4(container) => {
47 for (group, indices) in groups.iter() {
48 let max_val = indices
49 .iter()
50 .filter_map(|&i| container.get(i))
51 .max_by(|a, b| a.partial_cmp(b).unwrap());
52
53 if let Some(max_val) = max_val {
54 self.maxs
55 .entry(group.clone())
56 .and_modify(|v| *v = f64::max(*v, *max_val as f64))
57 .or_insert(*max_val as f64);
58 }
59 }
60 Ok(())
61 }
62 ColumnData::Int4(container) => {
63 for (group, indices) in groups.iter() {
64 let max_val = indices
65 .iter()
66 .filter_map(|&i| container.get(i))
67 .max_by(|a, b| a.partial_cmp(b).unwrap());
68
69 if let Some(max_val) = max_val {
70 self.maxs
71 .entry(group.clone())
72 .and_modify(|v| *v = f64::max(*v, *max_val as f64))
73 .or_insert(*max_val as f64);
74 }
75 }
76 Ok(())
77 }
78 _ => unimplemented!(),
79 }
80 }
81
82 fn finalize(&mut self) -> AggregateFunctionResult<(Vec<Vec<Value>>, ColumnData)> {
83 let mut keys = Vec::with_capacity(self.maxs.len());
84 let mut data = ColumnData::float8_with_capacity(self.maxs.len());
85
86 for (key, max) in mem::take(&mut self.maxs) {
87 keys.push(key);
88 data.push_value(Value::float8(max));
89 }
90
91 Ok((keys, data))
92 }
93}