reifydb_engine/flow/compiler/
mod.rs1use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11 interface::catalog::{
12 flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
13 subscription::SubscriptionDef,
14 view::ViewDef,
15 },
16 internal,
17};
18use reifydb_rql::{
19 flow::{
20 flow::{FlowBuilder, FlowDag},
21 node::{FlowEdge, FlowNode, FlowNodeType},
22 },
23 query::QueryPlan,
24};
25use reifydb_type::{Result, error::Error, value::blob::Blob};
26
27pub mod operator;
28pub mod primitive;
29
30use postcard::to_stdvec;
31use reifydb_transaction::transaction::admin::AdminTransaction;
32
33use crate::flow::compiler::{
34 operator::{
35 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
36 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
37 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
38 },
39 primitive::{
40 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
41 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
42 },
43};
44
45pub fn compile_flow(
47 catalog: &Catalog,
48 txn: &mut AdminTransaction,
49 plan: QueryPlan,
50 sink: Option<&ViewDef>,
51 flow_id: FlowId,
52) -> Result<FlowDag> {
53 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
54 compiler.compile(txn, plan, sink)
55}
56
57pub fn compile_subscription_flow(
58 catalog: &Catalog,
59 txn: &mut AdminTransaction,
60 plan: QueryPlan,
61 subscription: &SubscriptionDef,
62 flow_id: FlowId,
63) -> Result<FlowDag> {
64 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
65 compiler.compile_with_subscription(txn, plan, subscription)
66}
67
68pub(crate) struct FlowCompiler {
70 pub(crate) catalog: Catalog,
72 builder: FlowBuilder,
74 pub(crate) sink: Option<ViewDef>,
76}
77
78impl FlowCompiler {
79 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
81 Self {
82 catalog,
83 builder: FlowDag::builder(flow_id),
84 sink: None,
85 }
86 }
87
88 fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
90 self.catalog.next_flow_node_id(txn).map_err(Into::into)
91 }
92
93 fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
95 self.catalog.next_flow_edge_id(txn).map_err(Into::into)
96 }
97
98 pub(crate) fn add_edge(
100 &mut self,
101 txn: &mut AdminTransaction,
102 from: &FlowNodeId,
103 to: &FlowNodeId,
104 ) -> Result<()> {
105 let edge_id = self.next_edge_id(txn)?;
106 let flow_id = self.builder.id();
107
108 let edge_def = FlowEdgeDef {
110 id: edge_id,
111 flow: flow_id,
112 source: *from,
113 target: *to,
114 };
115
116 self.catalog.create_flow_edge(txn, &edge_def)?;
118
119 self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
121 Ok(())
122 }
123
124 pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
126 let node_id = self.next_node_id(txn)?;
127 let flow_id = self.builder.id();
128
129 let data = to_stdvec(&node_type)
131 .map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
132
133 let node_def = FlowNodeDef {
135 id: node_id,
136 flow: flow_id,
137 node_type: node_type.discriminator(),
138 data: Blob::from(data),
139 };
140
141 self.catalog.create_flow_node(txn, &node_def)?;
143
144 self.builder.add_node(FlowNode::new(node_id, node_type));
146 Ok(node_id)
147 }
148
149 pub(crate) fn compile(
151 mut self,
152 txn: &mut AdminTransaction,
153 plan: QueryPlan,
154 sink: Option<&ViewDef>,
155 ) -> Result<FlowDag> {
156 self.sink = sink.cloned();
158 let root_node_id = self.compile_plan(txn, plan)?;
159
160 if let Some(sink_view) = sink {
162 let result_node = self.add_node(
163 txn,
164 FlowNodeType::SinkView {
165 view: sink_view.id,
166 },
167 )?;
168
169 self.add_edge(txn, &root_node_id, &result_node)?;
170 }
171
172 Ok(self.builder.build())
173 }
174
175 pub(crate) fn compile_with_subscription(
177 mut self,
178 txn: &mut AdminTransaction,
179 plan: QueryPlan,
180 subscription: &SubscriptionDef,
181 ) -> Result<FlowDag> {
182 let root_node_id = self.compile_plan(txn, plan)?;
183
184 let result_node = self.add_node(
186 txn,
187 FlowNodeType::SinkSubscription {
188 subscription: subscription.id,
189 },
190 )?;
191
192 self.add_edge(txn, &root_node_id, &result_node)?;
193
194 Ok(self.builder.build())
195 }
196
197 pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
199 match plan {
200 QueryPlan::IndexScan(_index_scan) => {
201 unimplemented!("IndexScan compilation not yet implemented for flow")
203 }
204 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
205 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
206 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
207 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
208 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
209 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
210 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
211 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
212 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
213 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
214 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
215 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
216 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
217 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
218 QueryPlan::JoinNatural(_) => {
219 unimplemented!()
220 }
221 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
222 QueryPlan::Patch(_) => {
223 unimplemented!("Patch compilation not yet implemented for flow")
224 }
225 QueryPlan::TableVirtualScan(_scan) => {
226 unimplemented!("VirtualScan compilation not yet implemented")
228 }
229 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
230 QueryPlan::Generator(_generator) => {
231 unimplemented!("Generator compilation not yet implemented for flow")
233 }
234 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
235 QueryPlan::Variable(_) => {
236 panic!("Variable references are not supported in flow graphs");
237 }
238 QueryPlan::Scalarize(_) => {
239 panic!("Scalarize operations are not supported in flow graphs");
240 }
241 QueryPlan::Environment(_) => {
242 panic!("Environment operations are not supported in flow graphs");
243 }
244 QueryPlan::RowPointLookup(_) => {
245 unimplemented!("RowPointLookup compilation not yet implemented for flow")
247 }
248 QueryPlan::RowListLookup(_) => {
249 unimplemented!("RowListLookup compilation not yet implemented for flow")
251 }
252 QueryPlan::RowRangeScan(_) => {
253 unimplemented!("RowRangeScan compilation not yet implemented for flow")
255 }
256 QueryPlan::DictionaryScan(_) => {
257 unimplemented!("DictionaryScan compilation not yet implemented for flow")
259 }
260 QueryPlan::Assert(_) => {
261 unimplemented!("Assert compilation not yet implemented for flow")
262 }
263 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
264 }
265 }
266}
267
268pub(crate) trait CompileOperator {
270 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
272}