Skip to main content

reifydb_function/math/aggregate/
max.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::mem;
5
6use indexmap::IndexMap;
7use reifydb_core::value::column::data::ColumnData;
8use reifydb_type::value::Value;
9
10use crate::{AggregateFunction, AggregateFunctionContext, error::AggregateFunctionResult};
11
12pub struct Max {
13	pub maxs: IndexMap<Vec<Value>, f64>,
14}
15
16impl Max {
17	pub fn new() -> Self {
18		Self {
19			maxs: IndexMap::new(),
20		}
21	}
22}
23
24impl AggregateFunction for Max {
25	fn aggregate(&mut self, ctx: AggregateFunctionContext) -> AggregateFunctionResult<()> {
26		let column = ctx.column;
27		let groups = &ctx.groups;
28
29		match &column.data() {
30			ColumnData::Float8(container) => {
31				for (group, indices) in groups.iter() {
32					let max_val = indices
33						.iter()
34						.filter_map(|&i| container.get(i))
35						.max_by(|a, b| a.partial_cmp(b).unwrap());
36
37					if let Some(max_val) = max_val {
38						self.maxs
39							.entry(group.clone())
40							.and_modify(|v| *v = f64::max(*v, *max_val))
41							.or_insert(*max_val);
42					}
43				}
44				Ok(())
45			}
46			ColumnData::Float4(container) => {
47				for (group, indices) in groups.iter() {
48					let max_val = indices
49						.iter()
50						.filter_map(|&i| container.get(i))
51						.max_by(|a, b| a.partial_cmp(b).unwrap());
52
53					if let Some(max_val) = max_val {
54						self.maxs
55							.entry(group.clone())
56							.and_modify(|v| *v = f64::max(*v, *max_val as f64))
57							.or_insert(*max_val as f64);
58					}
59				}
60				Ok(())
61			}
62			ColumnData::Int4(container) => {
63				for (group, indices) in groups.iter() {
64					let max_val = indices
65						.iter()
66						.filter_map(|&i| container.get(i))
67						.max_by(|a, b| a.partial_cmp(b).unwrap());
68
69					if let Some(max_val) = max_val {
70						self.maxs
71							.entry(group.clone())
72							.and_modify(|v| *v = f64::max(*v, *max_val as f64))
73							.or_insert(*max_val as f64);
74					}
75				}
76				Ok(())
77			}
78			_ => unimplemented!(),
79		}
80	}
81
82	fn finalize(&mut self) -> AggregateFunctionResult<(Vec<Vec<Value>>, ColumnData)> {
83		let mut keys = Vec::with_capacity(self.maxs.len());
84		let mut data = ColumnData::float8_with_capacity(self.maxs.len());
85
86		for (key, max) in mem::take(&mut self.maxs) {
87			keys.push(key);
88			data.push_value(Value::float8(max));
89		}
90
91		Ok((keys, data))
92	}
93}