reifydb_engine/flow/compiler/operator/
apply.rs1use reifydb_catalog::store::ttl::create::create_operator_ttl;
5use reifydb_core::{interface::catalog::flow::FlowNodeId, row::Ttl};
6use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Apply, nodes::ApplyNode, query::QueryPlan};
7use reifydb_transaction::transaction::Transaction;
8use reifydb_type::{Result, fragment::Fragment};
9
10use crate::flow::compiler::{CompileOperator, FlowCompiler};
11
12pub(crate) struct ApplyCompiler {
13 pub input: Option<Box<QueryPlan>>,
14 pub operator: Fragment,
15 pub arguments: Vec<Expression>,
16 pub ttl: Option<Ttl>,
17}
18
19impl From<ApplyNode> for ApplyCompiler {
20 fn from(node: ApplyNode) -> Self {
21 Self {
22 input: node.input,
23 operator: node.operator,
24 arguments: node.expressions,
25 ttl: node.ttl,
26 }
27 }
28}
29
30impl CompileOperator for ApplyCompiler {
31 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
32 let input_node = if let Some(input) = self.input {
33 Some(compiler.compile_plan(txn, *input)?)
34 } else {
35 None
36 };
37
38 let ttl = self.ttl.clone();
39 let node_id = compiler.add_node(
40 txn,
41 Apply {
42 operator: self.operator.text().to_string(),
43 expressions: self.arguments,
44 ttl: self.ttl,
45 },
46 )?;
47
48 if let Some(ttl) = ttl
49 && let Transaction::Admin(admin) = txn
50 {
51 create_operator_ttl(admin, node_id, &ttl)?;
52 }
53
54 if let Some(input) = input_node {
55 compiler.add_edge(txn, &input, &node_id)?;
56 }
57
58 Ok(node_id)
59 }
60}