reifydb_engine/flow/compiler/
mod.rs1use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11 error::diagnostic::flow::{flow_remote_source_unsupported, flow_source_required},
12 interface::catalog::{
13 flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
14 subscription::Subscription,
15 view::View,
16 },
17 internal,
18};
19use reifydb_rql::{
20 flow::{
21 flow::{FlowBuilder, FlowDag},
22 node::{self, FlowNodeType},
23 },
24 query::QueryPlan,
25};
26use reifydb_type::{Result, error::Error, value::blob::Blob};
27
28pub mod operator;
29pub mod primitive;
30
31use postcard::to_stdvec;
32use reifydb_transaction::transaction::admin::AdminTransaction;
33
34use crate::flow::compiler::{
35 operator::{
36 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
37 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
38 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
39 },
40 primitive::{
41 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
42 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
43 },
44};
45
46pub fn compile_flow(
48 catalog: &Catalog,
49 txn: &mut AdminTransaction,
50 plan: QueryPlan,
51 sink: Option<&View>,
52 flow_id: FlowId,
53) -> Result<FlowDag> {
54 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
55 compiler.compile(txn, plan, sink)
56}
57
58pub fn compile_subscription_flow(
59 catalog: &Catalog,
60 txn: &mut AdminTransaction,
61 plan: QueryPlan,
62 subscription: &Subscription,
63 flow_id: FlowId,
64) -> Result<FlowDag> {
65 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
66 compiler.compile_with_subscription(txn, plan, subscription)
67}
68
69pub(crate) struct FlowCompiler {
71 pub(crate) catalog: Catalog,
73 builder: FlowBuilder,
75 pub(crate) sink: Option<View>,
77}
78
79impl FlowCompiler {
80 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
82 Self {
83 catalog,
84 builder: FlowDag::builder(flow_id),
85 sink: None,
86 }
87 }
88
89 fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
91 self.catalog.next_flow_node_id(txn).map_err(Into::into)
92 }
93
94 fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
96 self.catalog.next_flow_edge_id(txn).map_err(Into::into)
97 }
98
99 pub(crate) fn add_edge(
101 &mut self,
102 txn: &mut AdminTransaction,
103 from: &FlowNodeId,
104 to: &FlowNodeId,
105 ) -> Result<()> {
106 let edge_id = self.next_edge_id(txn)?;
107 let flow_id = self.builder.id();
108
109 let edge_def = FlowEdge {
111 id: edge_id,
112 flow: flow_id,
113 source: *from,
114 target: *to,
115 };
116
117 self.catalog.create_flow_edge(txn, &edge_def)?;
119
120 self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
122 Ok(())
123 }
124
125 pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
127 let node_id = self.next_node_id(txn)?;
128 let flow_id = self.builder.id();
129
130 let data = to_stdvec(&node_type)
132 .map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
133
134 let node_def = FlowNode {
136 id: node_id,
137 flow: flow_id,
138 node_type: node_type.discriminator(),
139 data: Blob::from(data),
140 };
141
142 self.catalog.create_flow_node(txn, &node_def)?;
144
145 self.builder.add_node(node::FlowNode::new(node_id, node_type));
147 Ok(node_id)
148 }
149
150 pub(crate) fn compile(
152 mut self,
153 txn: &mut AdminTransaction,
154 plan: QueryPlan,
155 sink: Option<&View>,
156 ) -> Result<FlowDag> {
157 self.sink = sink.cloned();
159 let root_node_id = self.compile_plan(txn, plan)?;
160
161 if let Some(sink_view) = sink {
162 let node_type = match sink_view {
163 View::Table(t) => FlowNodeType::SinkTableView {
164 view: sink_view.id(),
165 table: t.underlying,
166 },
167 View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
168 view: sink_view.id(),
169 ringbuffer: rb.underlying,
170 capacity: rb.capacity,
171 propagate_evictions: rb.propagate_evictions,
172 },
173 View::Series(s) => FlowNodeType::SinkSeriesView {
174 view: sink_view.id(),
175 series: s.underlying,
176 key: s.key.clone(),
177 },
178 };
179 let result_node = self.add_node(txn, node_type)?;
180 self.add_edge(txn, &root_node_id, &result_node)?;
181 }
182
183 let flow = self.builder.build();
184
185 if !has_real_source(&flow) {
186 return Err(Error(flow_source_required()));
187 }
188
189 Ok(flow)
190 }
191
192 pub(crate) fn compile_with_subscription(
194 mut self,
195 txn: &mut AdminTransaction,
196 plan: QueryPlan,
197 subscription: &Subscription,
198 ) -> Result<FlowDag> {
199 let root_node_id = self.compile_plan(txn, plan)?;
200
201 let result_node = self.add_node(
203 txn,
204 FlowNodeType::SinkSubscription {
205 subscription: subscription.id,
206 },
207 )?;
208
209 self.add_edge(txn, &root_node_id, &result_node)?;
210
211 let flow = self.builder.build();
212
213 if !has_real_source(&flow) {
214 return Err(Error(flow_source_required()));
215 }
216
217 Ok(flow)
218 }
219
220 pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
222 match plan {
223 QueryPlan::IndexScan(_index_scan) => {
224 unimplemented!("IndexScan compilation not yet implemented for flow")
226 }
227 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
228 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
229 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
230 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
231 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
232 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
233 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
234 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
235 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
236 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
237 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
238 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
239 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
240 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
241 QueryPlan::JoinNatural(_) => {
242 unimplemented!()
243 }
244 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
245 QueryPlan::Patch(_) => {
246 unimplemented!("Patch compilation not yet implemented for flow")
247 }
248 QueryPlan::TableVirtualScan(_scan) => {
249 unimplemented!("VirtualScan compilation not yet implemented")
251 }
252 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
253 QueryPlan::Generator(_generator) => {
254 unimplemented!("Generator compilation not yet implemented for flow")
256 }
257 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
258 QueryPlan::Variable(_) => {
259 panic!("Variable references are not supported in flow graphs");
260 }
261 QueryPlan::Scalarize(_) => {
262 panic!("Scalarize operations are not supported in flow graphs");
263 }
264 QueryPlan::Environment(_) => {
265 panic!("Environment operations are not supported in flow graphs");
266 }
267 QueryPlan::RowPointLookup(_) => {
268 unimplemented!("RowPointLookup compilation not yet implemented for flow")
270 }
271 QueryPlan::RowListLookup(_) => {
272 unimplemented!("RowListLookup compilation not yet implemented for flow")
274 }
275 QueryPlan::RowRangeScan(_) => {
276 unimplemented!("RowRangeScan compilation not yet implemented for flow")
278 }
279 QueryPlan::DictionaryScan(_) => {
280 unimplemented!("DictionaryScan compilation not yet implemented for flow")
282 }
283 QueryPlan::Assert(_) => {
284 unimplemented!("Assert compilation not yet implemented for flow")
285 }
286 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
287 QueryPlan::RemoteScan(_) => {
288 return Err(Error(flow_remote_source_unsupported()));
289 }
290 QueryPlan::RunTests(_) => {
291 panic!("RunTests is not supported in flow graphs");
292 }
293 QueryPlan::CallFunction(_) => {
294 panic!("CallFunction is not supported in flow graphs");
295 }
296 }
297 }
298}
299
300fn has_real_source(flow: &FlowDag) -> bool {
303 flow.get_node_ids().any(|node_id| {
304 if let Some(node) = flow.get_node(&node_id) {
305 matches!(
306 node.ty,
307 FlowNodeType::SourceTable { .. }
308 | FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
309 | FlowNodeType::SourceRingBuffer { .. }
310 | FlowNodeType::SourceSeries { .. }
311 )
312 } else {
313 false
314 }
315 })
316}
317
318pub(crate) trait CompileOperator {
320 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
322}