reifydb_engine/flow/compiler/operator/
apply.rs1use reifydb_core::{interface::catalog::flow::FlowNodeId, row::Ttl};
5use reifydb_rql::{expression::Expression, flow::node::FlowNodeType::Apply, nodes::ApplyNode, query::QueryPlan};
6use reifydb_transaction::transaction::Transaction;
7use reifydb_value::{Result, fragment::Fragment};
8
9use crate::flow::compiler::{CompileOperator, FlowCompiler};
10
11pub(crate) struct ApplyCompiler {
12 pub input: Option<Box<QueryPlan>>,
13 pub operator: Fragment,
14 pub arguments: Vec<Expression>,
15 pub ttl: Option<Ttl>,
16}
17
18impl From<ApplyNode> for ApplyCompiler {
19 fn from(node: ApplyNode) -> Self {
20 Self {
21 input: node.input,
22 operator: node.operator,
23 arguments: node.expressions,
24 ttl: node.ttl,
25 }
26 }
27}
28
29impl CompileOperator for ApplyCompiler {
30 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
31 let input_node = if let Some(input) = self.input {
32 Some(compiler.compile_plan(txn, *input)?)
33 } else {
34 None
35 };
36
37 let node_id = compiler.add_node(
38 txn,
39 Apply {
40 operator: self.operator.text().to_string(),
41 expressions: self.arguments,
42 },
43 )?;
44
45 compiler.write_operator_settings(txn, node_id, self.ttl)?;
46
47 if let Some(input) = input_node {
48 compiler.add_edge(txn, &input, &node_id)?;
49 }
50
51 Ok(node_id)
52 }
53}