reifydb_engine/flow/compiler/operator/
window.rs1use std::time;
5
6use reifydb_core::{
7 common::{WindowSize, WindowSlide, WindowType},
8 interface::catalog::flow::FlowNodeId,
9};
10use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
11use reifydb_transaction::transaction::admin::AdminTransaction;
12use reifydb_type::Result;
13
14use crate::flow::compiler::{CompileOperator, FlowCompiler};
15
16pub(crate) struct WindowCompiler {
17 pub input: Option<Box<QueryPlan>>,
18 pub window_type: WindowType,
19 pub size: WindowSize,
20 pub slide: Option<WindowSlide>,
21 pub group_by: Vec<Expression>,
22 pub aggregations: Vec<Expression>,
23 pub min_events: usize,
24 pub max_window_count: Option<usize>,
25 pub max_window_age: Option<time::Duration>,
26}
27
28impl From<WindowNode> for WindowCompiler {
29 fn from(node: WindowNode) -> Self {
30 Self {
31 input: node.input,
32 window_type: node.window_type,
33 size: node.size,
34 slide: node.slide,
35 group_by: node.group_by,
36 aggregations: node.aggregations,
37 min_events: node.min_events,
38 max_window_count: node.max_window_count,
39 max_window_age: node.max_window_age,
40 }
41 }
42}
43
44impl CompileOperator for WindowCompiler {
45 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
46 let input_node = if let Some(input) = self.input {
48 Some(compiler.compile_plan(txn, *input)?)
49 } else {
50 None
51 };
52
53 let node_id = compiler.add_node(
54 txn,
55 Window {
56 window_type: self.window_type,
57 size: self.size,
58 slide: self.slide,
59 group_by: self.group_by,
60 aggregations: self.aggregations,
61 min_events: self.min_events,
62 max_window_count: self.max_window_count,
63 max_window_age: self.max_window_age,
64 },
65 )?;
66
67 if let Some(input_node) = input_node {
69 compiler.add_edge(txn, &input_node, &node_id)?;
70 }
71
72 Ok(node_id)
73 }
74}