reifydb_engine/flow/compiler/
mod.rs1use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11 error::diagnostic::flow::flow_remote_source_unsupported,
12 interface::catalog::{
13 flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
14 subscription::SubscriptionDef,
15 view::ViewDef,
16 },
17 internal,
18};
19use reifydb_rql::{
20 flow::{
21 flow::{FlowBuilder, FlowDag},
22 node::{FlowEdge, FlowNode, FlowNodeType},
23 },
24 query::QueryPlan,
25};
26use reifydb_type::{Result, error::Error, value::blob::Blob};
27
28pub mod operator;
29pub mod primitive;
30
31use postcard::to_stdvec;
32use reifydb_transaction::transaction::admin::AdminTransaction;
33
34use crate::flow::compiler::{
35 operator::{
36 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
37 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
38 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
39 },
40 primitive::{
41 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
42 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
43 },
44};
45
46pub fn compile_flow(
48 catalog: &Catalog,
49 txn: &mut AdminTransaction,
50 plan: QueryPlan,
51 sink: Option<&ViewDef>,
52 flow_id: FlowId,
53) -> Result<FlowDag> {
54 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
55 compiler.compile(txn, plan, sink)
56}
57
58pub fn compile_subscription_flow(
59 catalog: &Catalog,
60 txn: &mut AdminTransaction,
61 plan: QueryPlan,
62 subscription: &SubscriptionDef,
63 flow_id: FlowId,
64) -> Result<FlowDag> {
65 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
66 compiler.compile_with_subscription(txn, plan, subscription)
67}
68
69pub(crate) struct FlowCompiler {
71 pub(crate) catalog: Catalog,
73 builder: FlowBuilder,
75 pub(crate) sink: Option<ViewDef>,
77}
78
79impl FlowCompiler {
80 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
82 Self {
83 catalog,
84 builder: FlowDag::builder(flow_id),
85 sink: None,
86 }
87 }
88
89 fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
91 self.catalog.next_flow_node_id(txn).map_err(Into::into)
92 }
93
94 fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
96 self.catalog.next_flow_edge_id(txn).map_err(Into::into)
97 }
98
99 pub(crate) fn add_edge(
101 &mut self,
102 txn: &mut AdminTransaction,
103 from: &FlowNodeId,
104 to: &FlowNodeId,
105 ) -> Result<()> {
106 let edge_id = self.next_edge_id(txn)?;
107 let flow_id = self.builder.id();
108
109 let edge_def = FlowEdgeDef {
111 id: edge_id,
112 flow: flow_id,
113 source: *from,
114 target: *to,
115 };
116
117 self.catalog.create_flow_edge(txn, &edge_def)?;
119
120 self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
122 Ok(())
123 }
124
125 pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
127 let node_id = self.next_node_id(txn)?;
128 let flow_id = self.builder.id();
129
130 let data = to_stdvec(&node_type)
132 .map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
133
134 let node_def = FlowNodeDef {
136 id: node_id,
137 flow: flow_id,
138 node_type: node_type.discriminator(),
139 data: Blob::from(data),
140 };
141
142 self.catalog.create_flow_node(txn, &node_def)?;
144
145 self.builder.add_node(FlowNode::new(node_id, node_type));
147 Ok(node_id)
148 }
149
150 pub(crate) fn compile(
152 mut self,
153 txn: &mut AdminTransaction,
154 plan: QueryPlan,
155 sink: Option<&ViewDef>,
156 ) -> Result<FlowDag> {
157 self.sink = sink.cloned();
159 let root_node_id = self.compile_plan(txn, plan)?;
160
161 if let Some(sink_view) = sink {
163 let result_node = self.add_node(
164 txn,
165 FlowNodeType::SinkView {
166 view: sink_view.id,
167 },
168 )?;
169
170 self.add_edge(txn, &root_node_id, &result_node)?;
171 }
172
173 Ok(self.builder.build())
174 }
175
176 pub(crate) fn compile_with_subscription(
178 mut self,
179 txn: &mut AdminTransaction,
180 plan: QueryPlan,
181 subscription: &SubscriptionDef,
182 ) -> Result<FlowDag> {
183 let root_node_id = self.compile_plan(txn, plan)?;
184
185 let result_node = self.add_node(
187 txn,
188 FlowNodeType::SinkSubscription {
189 subscription: subscription.id,
190 },
191 )?;
192
193 self.add_edge(txn, &root_node_id, &result_node)?;
194
195 Ok(self.builder.build())
196 }
197
198 pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
200 match plan {
201 QueryPlan::IndexScan(_index_scan) => {
202 unimplemented!("IndexScan compilation not yet implemented for flow")
204 }
205 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
206 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
207 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
208 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
209 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
210 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
211 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
212 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
213 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
214 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
215 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
216 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
217 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
218 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
219 QueryPlan::JoinNatural(_) => {
220 unimplemented!()
221 }
222 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
223 QueryPlan::Patch(_) => {
224 unimplemented!("Patch compilation not yet implemented for flow")
225 }
226 QueryPlan::TableVirtualScan(_scan) => {
227 unimplemented!("VirtualScan compilation not yet implemented")
229 }
230 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
231 QueryPlan::Generator(_generator) => {
232 unimplemented!("Generator compilation not yet implemented for flow")
234 }
235 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
236 QueryPlan::Variable(_) => {
237 panic!("Variable references are not supported in flow graphs");
238 }
239 QueryPlan::Scalarize(_) => {
240 panic!("Scalarize operations are not supported in flow graphs");
241 }
242 QueryPlan::Environment(_) => {
243 panic!("Environment operations are not supported in flow graphs");
244 }
245 QueryPlan::RowPointLookup(_) => {
246 unimplemented!("RowPointLookup compilation not yet implemented for flow")
248 }
249 QueryPlan::RowListLookup(_) => {
250 unimplemented!("RowListLookup compilation not yet implemented for flow")
252 }
253 QueryPlan::RowRangeScan(_) => {
254 unimplemented!("RowRangeScan compilation not yet implemented for flow")
256 }
257 QueryPlan::DictionaryScan(_) => {
258 unimplemented!("DictionaryScan compilation not yet implemented for flow")
260 }
261 QueryPlan::Assert(_) => {
262 unimplemented!("Assert compilation not yet implemented for flow")
263 }
264 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
265 QueryPlan::RemoteScan(_) => {
266 return Err(Error(flow_remote_source_unsupported()));
267 }
268 QueryPlan::RunTests(_) => {
269 panic!("RunTests is not supported in flow graphs");
270 }
271 QueryPlan::CallFunction(_) => {
272 panic!("CallFunction is not supported in flow graphs");
273 }
274 }
275 }
276}
277
278pub(crate) trait CompileOperator {
280 fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
282}