reifydb_engine/flow/compiler/operator/
aggregate.rs1use reifydb_core::interface::catalog::flow::FlowNodeId;
5use reifydb_rql::{
6 expression::Expression, flow::node::FlowNodeType::Aggregate, nodes::AggregateNode, query::QueryPlan,
7};
8use reifydb_transaction::transaction::admin::AdminTransaction;
9use reifydb_type::Result;
10
11use crate::flow::compiler::{CompileOperator, FlowCompiler};
12
13pub(crate) struct AggregateCompiler {
14 pub input: Box<QueryPlan>,
15 pub by: Vec<Expression>,
16 pub map: Vec<Expression>,
17}
18
19impl From<AggregateNode> for AggregateCompiler {
20 fn from(node: AggregateNode) -> Self {
21 Self {
22 input: node.input,
23 by: node.by,
24 map: node.map,
25 }
26 }
27}
28
29impl CompileOperator for AggregateCompiler {
30 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
31 let input_node = compiler.compile_plan(txn, *self.input)?;
32
33 let node_id = compiler.add_node(
34 txn,
35 Aggregate {
36 by: self.by,
37 map: self.map,
38 },
39 )?;
40
41 compiler.add_edge(txn, &input_node, &node_id)?;
42 Ok(node_id)
43 }
44}