Skip to main content

reifydb_engine/flow/compiler/operator/
window.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{common::WindowKind, interface::catalog::flow::FlowNodeId};
5use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
6use reifydb_transaction::transaction::admin::AdminTransaction;
7use reifydb_type::Result;
8
9use crate::flow::compiler::{CompileOperator, FlowCompiler};
10
11pub(crate) struct WindowCompiler {
12	pub input: Option<Box<QueryPlan>>,
13	pub kind: WindowKind,
14	pub group_by: Vec<Expression>,
15	pub aggregations: Vec<Expression>,
16	pub ts: Option<String>,
17}
18
19impl From<WindowNode> for WindowCompiler {
20	fn from(node: WindowNode) -> Self {
21		Self {
22			input: node.input,
23			kind: node.kind,
24			group_by: node.group_by,
25			aggregations: node.aggregations,
26			ts: node.ts,
27		}
28	}
29}
30
31impl CompileOperator for WindowCompiler {
32	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
33		let input_node = if let Some(input) = self.input {
34			Some(compiler.compile_plan(txn, *input)?)
35		} else {
36			None
37		};
38
39		let node_id = compiler.add_node(
40			txn,
41			Window {
42				kind: self.kind,
43				group_by: self.group_by,
44				aggregations: self.aggregations,
45				ts: self.ts,
46			},
47		)?;
48
49		if let Some(input_node) = input_node {
50			compiler.add_edge(txn, &input_node, &node_id)?;
51		}
52
53		Ok(node_id)
54	}
55}