reifydb_engine/flow/compiler/
mod.rs1use reifydb_catalog::catalog::Catalog;
10use reifydb_core::interface::catalog::{
11 flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
12 subscription::SubscriptionDef,
13 view::ViewDef,
14};
15use reifydb_rql::{
16 flow::{
17 flow::{FlowBuilder, FlowDag},
18 node::{FlowEdge, FlowNode, FlowNodeType},
19 },
20 query::QueryPlan,
21};
22use reifydb_type::{Result, value::blob::Blob};
23
24pub mod operator;
25pub mod primitive;
26
27use reifydb_transaction::transaction::admin::AdminTransaction;
28
29use crate::flow::compiler::{
30 operator::{
31 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
32 extend::ExtendCompiler, filter::FilterCompiler, join::JoinCompiler, map::MapCompiler,
33 sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
34 },
35 primitive::{
36 flow_scan::FlowScanCompiler, inline_data::InlineDataCompiler, table_scan::TableScanCompiler,
37 view_scan::ViewScanCompiler,
38 },
39};
40
41pub fn compile_flow(
43 catalog: &Catalog,
44 txn: &mut AdminTransaction,
45 plan: QueryPlan,
46 sink: Option<&ViewDef>,
47 flow_id: FlowId,
48) -> Result<FlowDag> {
49 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
50 compiler.compile(txn, plan, sink)
51}
52
53pub fn compile_subscription_flow(
54 catalog: &Catalog,
55 txn: &mut AdminTransaction,
56 plan: QueryPlan,
57 subscription: &SubscriptionDef,
58 flow_id: FlowId,
59) -> Result<FlowDag> {
60 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
61 compiler.compile_with_subscription(txn, plan, subscription)
62}
63
64pub(crate) struct FlowCompiler {
66 pub(crate) catalog: Catalog,
68 builder: FlowBuilder,
70 pub(crate) sink: Option<ViewDef>,
72}
73
74impl FlowCompiler {
75 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
77 Self {
78 catalog,
79 builder: FlowDag::builder(flow_id),
80 sink: None,
81 }
82 }
83
84 fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
86 self.catalog.next_flow_node_id(txn).map_err(Into::into)
87 }
88
89 fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
91 self.catalog.next_flow_edge_id(txn).map_err(Into::into)
92 }
93
94 pub(crate) fn add_edge(
96 &mut self,
97 txn: &mut AdminTransaction,
98 from: &FlowNodeId,
99 to: &FlowNodeId,
100 ) -> Result<()> {
101 let edge_id = self.next_edge_id(txn)?;
102 let flow_id = self.builder.id();
103
104 let edge_def = FlowEdgeDef {
106 id: edge_id,
107 flow: flow_id,
108 source: *from,
109 target: *to,
110 };
111
112 self.catalog.create_flow_edge(txn, &edge_def)?;
114
115 self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
117 Ok(())
118 }
119
120 pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
122 let node_id = self.next_node_id(txn)?;
123 let flow_id = self.builder.id();
124
125 let data = postcard::to_stdvec(&node_type).map_err(|e| {
127 reifydb_type::error::Error(reifydb_core::internal!("Failed to serialize FlowNodeType: {}", e))
128 })?;
129
130 let node_def = FlowNodeDef {
132 id: node_id,
133 flow: flow_id,
134 node_type: node_type.discriminator(),
135 data: Blob::from(data),
136 };
137
138 self.catalog.create_flow_node(txn, &node_def)?;
140
141 self.builder.add_node(FlowNode::new(node_id, node_type));
143 Ok(node_id)
144 }
145
146 pub(crate) fn compile(
148 mut self,
149 txn: &mut AdminTransaction,
150 plan: QueryPlan,
151 sink: Option<&ViewDef>,
152 ) -> Result<FlowDag> {
153 self.sink = sink.cloned();
155 let root_node_id = self.compile_plan(txn, plan)?;
156
157 if let Some(sink_view) = sink {
159 let result_node = self.add_node(
160 txn,
161 FlowNodeType::SinkView {
162 view: sink_view.id,
163 },
164 )?;
165
166 self.add_edge(txn, &root_node_id, &result_node)?;
167 }
168
169 Ok(self.builder.build())
170 }
171
172 pub(crate) fn compile_with_subscription(
174 mut self,
175 txn: &mut AdminTransaction,
176 plan: QueryPlan,
177 subscription: &SubscriptionDef,
178 ) -> Result<FlowDag> {
179 let root_node_id = self.compile_plan(txn, plan)?;
180
181 let result_node = self.add_node(
183 txn,
184 FlowNodeType::SinkSubscription {
185 subscription: subscription.id,
186 },
187 )?;
188
189 self.add_edge(txn, &root_node_id, &result_node)?;
190
191 Ok(self.builder.build())
192 }
193
194 pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
196 match plan {
197 QueryPlan::IndexScan(_index_scan) => {
198 unimplemented!("IndexScan compilation not yet implemented for flow")
200 }
201 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
202 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
203 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
204 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
205 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
206 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
207 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
208 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
209 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
210 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
211 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
212 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
213 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
214 QueryPlan::JoinNatural(_) => {
215 unimplemented!()
216 }
217 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
218 QueryPlan::Patch(_) => {
219 unimplemented!("Patch compilation not yet implemented for flow")
220 }
221 QueryPlan::FlowScan(flow_scan) => FlowScanCompiler::from(flow_scan).compile(self, txn),
222 QueryPlan::TableVirtualScan(_scan) => {
223 unimplemented!("VirtualScan compilation not yet implemented")
225 }
226 QueryPlan::RingBufferScan(_scan) => {
227 unimplemented!("RingBufferScan compilation not yet implemented for flow")
229 }
230 QueryPlan::Generator(_generator) => {
231 unimplemented!("Generator compilation not yet implemented for flow")
233 }
234 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
235 QueryPlan::Variable(_) => {
236 panic!("Variable references are not supported in flow graphs");
237 }
238 QueryPlan::Scalarize(_) => {
239 panic!("Scalarize operations are not supported in flow graphs");
240 }
241 QueryPlan::Environment(_) => {
242 panic!("Environment operations are not supported in flow graphs");
243 }
244 QueryPlan::RowPointLookup(_) => {
245 unimplemented!("RowPointLookup compilation not yet implemented for flow")
247 }
248 QueryPlan::RowListLookup(_) => {
249 unimplemented!("RowListLookup compilation not yet implemented for flow")
251 }
252 QueryPlan::RowRangeScan(_) => {
253 unimplemented!("RowRangeScan compilation not yet implemented for flow")
255 }
256 QueryPlan::DictionaryScan(_) => {
257 unimplemented!("DictionaryScan compilation not yet implemented for flow")
259 }
260 QueryPlan::Assert(_) => {
261 unimplemented!("Assert compilation not yet implemented for flow")
262 }
263 }
264 }
265}
266
267pub(crate) trait CompileOperator {
269 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
271}