Skip to main content

reifydb_engine/flow/compiler/operator/
apply.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::store::ttl::create::create_operator_ttl;
5use reifydb_core::{interface::catalog::flow::FlowNodeId, row::Ttl};
6use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Apply, nodes::ApplyNode, query::QueryPlan};
7use reifydb_transaction::transaction::Transaction;
8use reifydb_type::{Result, fragment::Fragment};
9
10use crate::flow::compiler::{CompileOperator, FlowCompiler};
11
12pub(crate) struct ApplyCompiler {
13	pub input: Option<Box<QueryPlan>>,
14	pub operator: Fragment,
15	pub arguments: Vec<Expression>,
16	pub ttl: Option<Ttl>,
17}
18
19impl From<ApplyNode> for ApplyCompiler {
20	fn from(node: ApplyNode) -> Self {
21		Self {
22			input: node.input,
23			operator: node.operator,
24			arguments: node.expressions,
25			ttl: node.ttl,
26		}
27	}
28}
29
30impl CompileOperator for ApplyCompiler {
31	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
32		let input_node = if let Some(input) = self.input {
33			Some(compiler.compile_plan(txn, *input)?)
34		} else {
35			None
36		};
37
38		let ttl = self.ttl.clone();
39		let node_id = compiler.add_node(
40			txn,
41			Apply {
42				operator: self.operator.text().to_string(),
43				expressions: self.arguments,
44				ttl: self.ttl,
45			},
46		)?;
47
48		if let Some(ttl) = ttl
49			&& let Transaction::Admin(admin) = txn
50		{
51			create_operator_ttl(admin, node_id, &ttl)?;
52		}
53
54		if let Some(input) = input_node {
55			compiler.add_edge(txn, &input, &node_id)?;
56		}
57
58		Ok(node_id)
59	}
60}