Skip to main content

reifydb_engine/flow/compiler/operator/
apply.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use reifydb_core::{interface::catalog::flow::FlowNodeId, row::Ttl};
5use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Apply, nodes::ApplyNode, query::QueryPlan};
6use reifydb_transaction::transaction::Transaction;
7use reifydb_value::{Result, fragment::Fragment};
8
9use crate::flow::compiler::{CompileOperator, FlowCompiler};
10
11pub(crate) struct ApplyCompiler {
12	pub input: Option<Box<QueryPlan>>,
13	pub operator: Fragment,
14	pub arguments: Vec<Expression>,
15	pub ttl: Option<Ttl>,
16}
17
18impl From<ApplyNode> for ApplyCompiler {
19	fn from(node: ApplyNode) -> Self {
20		Self {
21			input: node.input,
22			operator: node.operator,
23			arguments: node.expressions,
24			ttl: node.ttl,
25		}
26	}
27}
28
29impl CompileOperator for ApplyCompiler {
30	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
31		let input_node = if let Some(input) = self.input {
32			Some(compiler.compile_plan(txn, *input)?)
33		} else {
34			None
35		};
36
37		let node_id = compiler.add_node(
38			txn,
39			Apply {
40				operator: self.operator.text().to_string(),
41				expressions: self.arguments,
42			},
43		)?;
44
45		compiler.write_operator_settings(txn, node_id, self.ttl)?;
46
47		if let Some(input) = input_node {
48			compiler.add_edge(txn, &input, &node_id)?;
49		}
50
51		Ok(node_id)
52	}
53}