Skip to main content

reifydb_engine/flow/compiler/operator/
window.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4use std::time;
5
6use reifydb_core::{
7	common::{WindowSize, WindowSlide, WindowType},
8	interface::catalog::flow::FlowNodeId,
9};
10use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
11use reifydb_transaction::transaction::admin::AdminTransaction;
12use reifydb_type::Result;
13
14use crate::flow::compiler::{CompileOperator, FlowCompiler};
15
16pub(crate) struct WindowCompiler {
17	pub input: Option<Box<QueryPlan>>,
18	pub window_type: WindowType,
19	pub size: WindowSize,
20	pub slide: Option<WindowSlide>,
21	pub group_by: Vec<Expression>,
22	pub aggregations: Vec<Expression>,
23	pub min_events: usize,
24	pub max_window_count: Option<usize>,
25	pub max_window_age: Option<time::Duration>,
26}
27
28impl From<WindowNode> for WindowCompiler {
29	fn from(node: WindowNode) -> Self {
30		Self {
31			input: node.input,
32			window_type: node.window_type,
33			size: node.size,
34			slide: node.slide,
35			group_by: node.group_by,
36			aggregations: node.aggregations,
37			min_events: node.min_events,
38			max_window_count: node.max_window_count,
39			max_window_age: node.max_window_age,
40		}
41	}
42}
43
44impl CompileOperator for WindowCompiler {
45	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
46		// Compile input first if present
47		let input_node = if let Some(input) = self.input {
48			Some(compiler.compile_plan(txn, *input)?)
49		} else {
50			None
51		};
52
53		let node_id = compiler.add_node(
54			txn,
55			Window {
56				window_type: self.window_type,
57				size: self.size,
58				slide: self.slide,
59				group_by: self.group_by,
60				aggregations: self.aggregations,
61				min_events: self.min_events,
62				max_window_count: self.max_window_count,
63				max_window_age: self.max_window_age,
64			},
65		)?;
66
67		// Add input edge if we have one
68		if let Some(input_node) = input_node {
69			compiler.add_edge(txn, &input_node, &node_id)?;
70		}
71
72		Ok(node_id)
73	}
74}