reifydb_engine/flow/compiler/operator/
window.rs1use reifydb_core::{
5 common::{WindowSize, WindowSlide, WindowType},
6 interface::catalog::flow::FlowNodeId,
7};
8use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
9use reifydb_transaction::transaction::admin::AdminTransaction;
10use reifydb_type::Result;
11
12use crate::flow::compiler::{CompileOperator, FlowCompiler};
13
14pub(crate) struct WindowCompiler {
15 pub input: Option<Box<QueryPlan>>,
16 pub window_type: WindowType,
17 pub size: WindowSize,
18 pub slide: Option<WindowSlide>,
19 pub group_by: Vec<Expression>,
20 pub aggregations: Vec<Expression>,
21 pub min_events: usize,
22 pub max_window_count: Option<usize>,
23 pub max_window_age: Option<std::time::Duration>,
24}
25
26impl From<WindowNode> for WindowCompiler {
27 fn from(node: WindowNode) -> Self {
28 Self {
29 input: node.input,
30 window_type: node.window_type,
31 size: node.size,
32 slide: node.slide,
33 group_by: node.group_by,
34 aggregations: node.aggregations,
35 min_events: node.min_events,
36 max_window_count: node.max_window_count,
37 max_window_age: node.max_window_age,
38 }
39 }
40}
41
42impl CompileOperator for WindowCompiler {
43 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
44 let input_node = if let Some(input) = self.input {
46 Some(compiler.compile_plan(txn, *input)?)
47 } else {
48 None
49 };
50
51 let node_id = compiler.add_node(
52 txn,
53 Window {
54 window_type: self.window_type,
55 size: self.size,
56 slide: self.slide,
57 group_by: self.group_by,
58 aggregations: self.aggregations,
59 min_events: self.min_events,
60 max_window_count: self.max_window_count,
61 max_window_age: self.max_window_age,
62 },
63 )?;
64
65 if let Some(input_node) = input_node {
67 compiler.add_edge(txn, &input_node, &node_id)?;
68 }
69
70 Ok(node_id)
71 }
72}