Skip to main content

reifydb_engine/flow/compiler/operator/
window.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::{
5	common::{WindowSize, WindowSlide, WindowType},
6	interface::catalog::flow::FlowNodeId,
7};
8use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
9use reifydb_transaction::transaction::admin::AdminTransaction;
10use reifydb_type::Result;
11
12use crate::flow::compiler::{CompileOperator, FlowCompiler};
13
14pub(crate) struct WindowCompiler {
15	pub input: Option<Box<QueryPlan>>,
16	pub window_type: WindowType,
17	pub size: WindowSize,
18	pub slide: Option<WindowSlide>,
19	pub group_by: Vec<Expression>,
20	pub aggregations: Vec<Expression>,
21	pub min_events: usize,
22	pub max_window_count: Option<usize>,
23	pub max_window_age: Option<std::time::Duration>,
24}
25
26impl From<WindowNode> for WindowCompiler {
27	fn from(node: WindowNode) -> Self {
28		Self {
29			input: node.input,
30			window_type: node.window_type,
31			size: node.size,
32			slide: node.slide,
33			group_by: node.group_by,
34			aggregations: node.aggregations,
35			min_events: node.min_events,
36			max_window_count: node.max_window_count,
37			max_window_age: node.max_window_age,
38		}
39	}
40}
41
42impl CompileOperator for WindowCompiler {
43	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
44		// Compile input first if present
45		let input_node = if let Some(input) = self.input {
46			Some(compiler.compile_plan(txn, *input)?)
47		} else {
48			None
49		};
50
51		let node_id = compiler.add_node(
52			txn,
53			Window {
54				window_type: self.window_type,
55				size: self.size,
56				slide: self.slide,
57				group_by: self.group_by,
58				aggregations: self.aggregations,
59				min_events: self.min_events,
60				max_window_count: self.max_window_count,
61				max_window_age: self.max_window_age,
62			},
63		)?;
64
65		// Add input edge if we have one
66		if let Some(input_node) = input_node {
67			compiler.add_edge(txn, &input_node, &node_id)?;
68		}
69
70		Ok(node_id)
71	}
72}