reifydb_engine/flow/compiler/operator/
window.rs1use reifydb_core::{common::WindowKind, interface::catalog::flow::FlowNodeId};
5use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Window, nodes::WindowNode, query::QueryPlan};
6use reifydb_transaction::transaction::admin::AdminTransaction;
7use reifydb_type::Result;
8
9use crate::flow::compiler::{CompileOperator, FlowCompiler};
10
11pub(crate) struct WindowCompiler {
12 pub input: Option<Box<QueryPlan>>,
13 pub kind: WindowKind,
14 pub group_by: Vec<Expression>,
15 pub aggregations: Vec<Expression>,
16 pub ts: Option<String>,
17}
18
19impl From<WindowNode> for WindowCompiler {
20 fn from(node: WindowNode) -> Self {
21 Self {
22 input: node.input,
23 kind: node.kind,
24 group_by: node.group_by,
25 aggregations: node.aggregations,
26 ts: node.ts,
27 }
28 }
29}
30
31impl CompileOperator for WindowCompiler {
32 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
33 let input_node = if let Some(input) = self.input {
34 Some(compiler.compile_plan(txn, *input)?)
35 } else {
36 None
37 };
38
39 let node_id = compiler.add_node(
40 txn,
41 Window {
42 kind: self.kind,
43 group_by: self.group_by,
44 aggregations: self.aggregations,
45 ts: self.ts,
46 },
47 )?;
48
49 if let Some(input_node) = input_node {
50 compiler.add_edge(txn, &input_node, &node_id)?;
51 }
52
53 Ok(node_id)
54 }
55}