reifydb_engine/flow/compiler/
mod.rs1use async_recursion::async_recursion;
10use reifydb_catalog::{
11 CatalogStore,
12 store::sequence::flow::{next_flow_edge_id, next_flow_node_id},
13};
14use reifydb_core::{
15 Result,
16 interface::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId, ViewDef},
17};
18use reifydb_rql::{
19 flow::{Flow, FlowBuilder, FlowEdge, FlowNode, FlowNodeType},
20 plan::physical::PhysicalPlan,
21};
22use reifydb_type::Blob;
23
24use crate::StandardCommandTransaction;
25
26mod operator;
27mod source;
28
29use operator::{
30 AggregateCompiler, ApplyCompiler, DistinctCompiler, ExtendCompiler, FilterCompiler, JoinCompiler, MapCompiler,
31 MergeCompiler, SortCompiler, TakeCompiler, WindowCompiler,
32};
33use source::{FlowScanCompiler, InlineDataCompiler, TableScanCompiler, ViewScanCompiler};
34
35pub async fn compile_flow(
37 txn: &mut StandardCommandTransaction,
38 plan: PhysicalPlan,
39 sink: Option<&ViewDef>,
40 flow_id: FlowId,
41) -> Result<Flow> {
42 let compiler = FlowCompiler::new(flow_id);
43 compiler.compile(txn, plan, sink).await
44}
45
46pub(crate) struct FlowCompiler {
48 builder: FlowBuilder,
50 pub(crate) sink: Option<ViewDef>,
52}
53
54impl FlowCompiler {
55 pub fn new(flow_id: FlowId) -> Self {
57 Self {
58 builder: Flow::builder(flow_id),
59 sink: None,
60 }
61 }
62
63 async fn next_node_id(&mut self, txn: &mut StandardCommandTransaction) -> Result<FlowNodeId> {
65 next_flow_node_id(txn).await.map_err(Into::into)
66 }
67
68 async fn next_edge_id(&mut self, txn: &mut StandardCommandTransaction) -> Result<FlowEdgeId> {
70 next_flow_edge_id(txn).await.map_err(Into::into)
71 }
72
73 pub(crate) async fn add_edge(
75 &mut self,
76 txn: &mut StandardCommandTransaction,
77 from: &FlowNodeId,
78 to: &FlowNodeId,
79 ) -> Result<()> {
80 let edge_id = self.next_edge_id(txn).await?;
81 let flow_id = self.builder.id();
82
83 let edge_def = FlowEdgeDef {
85 id: edge_id,
86 flow: flow_id,
87 source: *from,
88 target: *to,
89 };
90
91 CatalogStore::create_flow_edge(txn, &edge_def).await?;
93
94 self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
96 Ok(())
97 }
98
99 pub(crate) async fn add_node(
101 &mut self,
102 txn: &mut StandardCommandTransaction,
103 node_type: FlowNodeType,
104 ) -> Result<FlowNodeId> {
105 let node_id = self.next_node_id(txn).await?;
106 let flow_id = self.builder.id();
107
108 let data = postcard::to_stdvec(&node_type).map_err(|e| {
110 reifydb_core::Error(reifydb_type::internal!("Failed to serialize FlowNodeType: {}", e))
111 })?;
112
113 let node_def = FlowNodeDef {
115 id: node_id,
116 flow: flow_id,
117 node_type: node_type.discriminator(),
118 data: Blob::from(data),
119 };
120
121 CatalogStore::create_flow_node(txn, &node_def).await?;
123
124 self.builder.add_node(FlowNode::new(node_id, node_type));
126 Ok(node_id)
127 }
128
129 pub(crate) async fn compile(
131 mut self,
132 txn: &mut StandardCommandTransaction,
133 plan: PhysicalPlan,
134 sink: Option<&ViewDef>,
135 ) -> Result<Flow> {
136 self.sink = sink.cloned();
138 let root_node_id = self.compile_plan(txn, plan).await?;
139
140 if let Some(sink_view) = sink {
142 let result_node = self
143 .add_node(
144 txn,
145 FlowNodeType::SinkView {
146 view: sink_view.id,
147 },
148 )
149 .await?;
150
151 self.add_edge(txn, &root_node_id, &result_node).await?;
152 }
153
154 Ok(self.builder.build())
155 }
156
157 #[async_recursion]
162 pub(crate) async fn compile_plan(
163 &mut self,
164 txn: &mut StandardCommandTransaction,
165 plan: PhysicalPlan,
166 ) -> Result<FlowNodeId> {
167 match plan {
168 PhysicalPlan::IndexScan(_index_scan) => {
169 unimplemented!("IndexScan compilation not yet implemented for flow")
171 }
172 PhysicalPlan::TableScan(table_scan) => {
173 TableScanCompiler::from(table_scan).compile(self, txn).await
174 }
175 PhysicalPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn).await,
176 PhysicalPlan::InlineData(inline_data) => {
177 InlineDataCompiler::from(inline_data).compile(self, txn).await
178 }
179 PhysicalPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn).await,
180 PhysicalPlan::Map(map) => MapCompiler::from(map).compile(self, txn).await,
181 PhysicalPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn).await,
182 PhysicalPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn).await,
183 PhysicalPlan::Aggregate(aggregate) => {
184 AggregateCompiler::from(aggregate).compile(self, txn).await
185 }
186 PhysicalPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn).await,
187 PhysicalPlan::Take(take) => TakeCompiler::from(take).compile(self, txn).await,
188 PhysicalPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn).await,
189 PhysicalPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn).await,
190 PhysicalPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn).await,
191 PhysicalPlan::JoinNatural(_) => {
192 unimplemented!()
193 }
194 PhysicalPlan::Merge(merge) => MergeCompiler::from(merge).compile(self, txn).await,
195
196 PhysicalPlan::CreateNamespace(_)
197 | PhysicalPlan::CreateTable(_)
198 | PhysicalPlan::CreateRingBuffer(_)
199 | PhysicalPlan::CreateFlow(_)
200 | PhysicalPlan::CreateDictionary(_)
201 | PhysicalPlan::AlterSequence(_)
202 | PhysicalPlan::AlterTable(_)
203 | PhysicalPlan::AlterView(_)
204 | PhysicalPlan::AlterFlow(_)
205 | PhysicalPlan::CreateDeferredView(_)
206 | PhysicalPlan::CreateTransactionalView(_)
207 | PhysicalPlan::InsertTable(_)
208 | PhysicalPlan::InsertRingBuffer(_)
209 | PhysicalPlan::InsertDictionary(_)
210 | PhysicalPlan::Update(_)
211 | PhysicalPlan::UpdateRingBuffer(_)
212 | PhysicalPlan::Delete(_)
213 | PhysicalPlan::DeleteRingBuffer(_) => {
214 unreachable!()
215 }
216 PhysicalPlan::FlowScan(flow_scan) => FlowScanCompiler::from(flow_scan).compile(self, txn).await,
217 PhysicalPlan::TableVirtualScan(_scan) => {
218 unimplemented!("VirtualScan compilation not yet implemented")
221 }
222 PhysicalPlan::RingBufferScan(_scan) => {
223 unimplemented!("RingBufferScan compilation not yet implemented for flow")
225 }
226 PhysicalPlan::Generator(_generator) => {
227 unimplemented!("Generator compilation not yet implemented for flow")
229 }
230 PhysicalPlan::Window(window) => WindowCompiler::from(window).compile(self, txn).await,
231 PhysicalPlan::Declare(_) => {
232 panic!("Declare statements are not supported in flow graphs");
233 }
234
235 PhysicalPlan::Assign(_) => {
236 panic!("Assign statements are not supported in flow graphs");
237 }
238
239 PhysicalPlan::Conditional(_) => {
240 panic!("Conditional statements are not supported in flow graphs");
241 }
242
243 PhysicalPlan::Variable(_) => {
244 panic!("Variable references are not supported in flow graphs");
245 }
246
247 PhysicalPlan::Scalarize(_) => {
248 panic!("Scalarize operations are not supported in flow graphs");
249 }
250
251 PhysicalPlan::Environment(_) => {
252 panic!("Environment operations are not supported in flow graphs");
253 }
254
255 PhysicalPlan::RowPointLookup(_) => {
256 unimplemented!("RowPointLookup compilation not yet implemented for flow")
258 }
259
260 PhysicalPlan::RowListLookup(_) => {
261 unimplemented!("RowListLookup compilation not yet implemented for flow")
263 }
264
265 PhysicalPlan::RowRangeScan(_) => {
266 unimplemented!("RowRangeScan compilation not yet implemented for flow")
268 }
269
270 PhysicalPlan::DictionaryScan(_) => {
271 unimplemented!("DictionaryScan compilation not yet implemented for flow")
273 }
274 }
275 }
276}
277
278pub(crate) trait CompileOperator {
280 async fn compile(self, compiler: &mut FlowCompiler, txn: &mut StandardCommandTransaction)
282 -> Result<FlowNodeId>;
283}