1use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11 error::diagnostic::flow::{
12 flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
13 },
14 interface::catalog::{
15 flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
16 id::SubscriptionId,
17 view::View,
18 },
19 internal,
20};
21use reifydb_rql::{
22 flow::{
23 flow::{FlowBuilder, FlowDag},
24 node::{self, FlowNodeType},
25 },
26 query::QueryPlan,
27};
28use reifydb_type::{Result, error::Error, value::blob::Blob};
29
30pub mod operator;
31pub mod primitive;
32
33use postcard::to_stdvec;
34use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
35
36use crate::flow::compiler::{
37 operator::{
38 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
39 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
40 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
41 },
42 primitive::{
43 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
44 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
45 },
46};
47
48pub fn compile_flow(
50 catalog: &Catalog,
51 txn: &mut AdminTransaction,
52 plan: QueryPlan,
53 sink: Option<&View>,
54 flow_id: FlowId,
55) -> Result<FlowDag> {
56 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
57 compiler.compile(&mut Transaction::Admin(txn), plan, sink)
58}
59
60pub fn compile_subscription_flow_ephemeral(
65 catalog: &Catalog,
66 txn: &mut Transaction<'_>,
67 plan: QueryPlan,
68 subscription_id: SubscriptionId,
69 flow_id: FlowId,
70) -> Result<FlowDag> {
71 let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
72 compiler.compile_with_subscription_id(txn, plan, subscription_id)
73}
74
75pub(crate) struct FlowCompiler {
77 pub(crate) catalog: Catalog,
79 builder: FlowBuilder,
81 pub(crate) sink: Option<View>,
83 ephemeral: bool,
85 local_node_counter: u64,
87 local_edge_counter: u64,
89 local_id_limit: u64,
91}
92
93impl FlowCompiler {
94 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
96 Self {
97 catalog,
98 builder: FlowDag::builder(flow_id),
99 sink: None,
100 ephemeral: false,
101 local_node_counter: 0,
102 local_edge_counter: 0,
103 local_id_limit: 0,
104 }
105 }
106
107 pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
113 let base = flow_id.0 * 100;
114 Self {
115 catalog,
116 builder: FlowDag::builder(flow_id),
117 sink: None,
118 ephemeral: true,
119 local_node_counter: base,
120 local_edge_counter: base,
121 local_id_limit: base + 99,
122 }
123 }
124
125 fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
127 if self.ephemeral {
128 if self.local_node_counter >= self.local_id_limit {
129 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
130 }
131 self.local_node_counter += 1;
132 Ok(FlowNodeId(self.local_node_counter))
133 } else {
134 self.catalog.next_flow_node_id(txn.admin_mut())
135 }
136 }
137
138 fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
140 if self.ephemeral {
141 if self.local_edge_counter >= self.local_id_limit {
142 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
143 }
144 self.local_edge_counter += 1;
145 Ok(FlowEdgeId(self.local_edge_counter))
146 } else {
147 self.catalog.next_flow_edge_id(txn.admin_mut())
148 }
149 }
150
151 pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
153 let edge_id = self.next_edge_id(txn)?;
154 let flow_id = self.builder.id();
155
156 if !self.ephemeral {
157 let edge_def = FlowEdge {
159 id: edge_id,
160 flow: flow_id,
161 source: *from,
162 target: *to,
163 };
164
165 self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
167 }
168
169 self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
171 Ok(())
172 }
173
174 pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
176 let node_id = self.next_node_id(txn)?;
177 let flow_id = self.builder.id();
178
179 if !self.ephemeral {
180 let data = to_stdvec(&node_type)
182 .map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
183
184 let node_def = FlowNode {
186 id: node_id,
187 flow: flow_id,
188 node_type: node_type.discriminator(),
189 data: Blob::from(data),
190 };
191
192 self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
194 }
195
196 self.builder.add_node(node::FlowNode::new(node_id, node_type));
198 Ok(node_id)
199 }
200
201 pub(crate) fn compile(
203 mut self,
204 txn: &mut Transaction<'_>,
205 plan: QueryPlan,
206 sink: Option<&View>,
207 ) -> Result<FlowDag> {
208 self.sink = sink.cloned();
210 let root_node_id = self.compile_plan(txn, plan)?;
211
212 if let Some(sink_view) = sink {
213 let node_type = match sink_view {
214 View::Table(t) => FlowNodeType::SinkTableView {
215 view: sink_view.id(),
216 table: t.underlying,
217 },
218 View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
219 view: sink_view.id(),
220 ringbuffer: rb.underlying,
221 capacity: rb.capacity,
222 propagate_evictions: rb.propagate_evictions,
223 },
224 View::Series(s) => FlowNodeType::SinkSeriesView {
225 view: sink_view.id(),
226 series: s.underlying,
227 key: s.key.clone(),
228 },
229 };
230 let result_node = self.add_node(txn, node_type)?;
231 self.add_edge(txn, &root_node_id, &result_node)?;
232 }
233
234 let flow = self.builder.build();
235
236 if !has_real_source(&flow) {
237 return Err(Error(Box::new(flow_source_required())));
238 }
239
240 Ok(flow)
241 }
242
243 pub(crate) fn compile_with_subscription_id(
244 mut self,
245 txn: &mut Transaction<'_>,
246 plan: QueryPlan,
247 subscription_id: SubscriptionId,
248 ) -> Result<FlowDag> {
249 let root_node_id = self.compile_plan(txn, plan)?;
250
251 let result_node = self.add_node(
253 txn,
254 FlowNodeType::SinkSubscription {
255 subscription: subscription_id,
256 },
257 )?;
258
259 self.add_edge(txn, &root_node_id, &result_node)?;
260
261 let flow = self.builder.build();
262
263 if !has_real_source(&flow) {
264 return Err(Error(Box::new(flow_source_required())));
265 }
266
267 Ok(flow)
268 }
269
270 pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
272 match plan {
273 QueryPlan::IndexScan(_index_scan) => {
274 unimplemented!("IndexScan compilation not yet implemented for flow")
276 }
277 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
278 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
279 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
280 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
281 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
282 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
283 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
284 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
285 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
286 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
287 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
288 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
289 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
290 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
291 QueryPlan::JoinNatural(_) => {
292 unimplemented!()
293 }
294 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
295 QueryPlan::Patch(_) => {
296 unimplemented!("Patch compilation not yet implemented for flow")
297 }
298 QueryPlan::TableVirtualScan(_scan) => {
299 unimplemented!("VirtualScan compilation not yet implemented")
301 }
302 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
303 QueryPlan::Generator(_generator) => {
304 unimplemented!("Generator compilation not yet implemented for flow")
306 }
307 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
308 QueryPlan::Variable(_) => {
309 panic!("Variable references are not supported in flow graphs");
310 }
311 QueryPlan::Scalarize(_) => {
312 panic!("Scalarize operations are not supported in flow graphs");
313 }
314 QueryPlan::Environment(_) => {
315 panic!("Environment operations are not supported in flow graphs");
316 }
317 QueryPlan::RowPointLookup(_) => {
318 unimplemented!("RowPointLookup compilation not yet implemented for flow")
320 }
321 QueryPlan::RowListLookup(_) => {
322 unimplemented!("RowListLookup compilation not yet implemented for flow")
324 }
325 QueryPlan::RowRangeScan(_) => {
326 unimplemented!("RowRangeScan compilation not yet implemented for flow")
328 }
329 QueryPlan::DictionaryScan(_) => {
330 unimplemented!("DictionaryScan compilation not yet implemented for flow")
332 }
333 QueryPlan::Assert(_) => {
334 unimplemented!("Assert compilation not yet implemented for flow")
335 }
336 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
337 QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
338 QueryPlan::RunTests(_) => {
339 panic!("RunTests is not supported in flow graphs");
340 }
341 QueryPlan::CallFunction(_) => {
342 panic!("CallFunction is not supported in flow graphs");
343 }
344 }
345 }
346}
347
348fn has_real_source(flow: &FlowDag) -> bool {
351 flow.get_node_ids().any(|node_id| {
352 if let Some(node) = flow.get_node(&node_id) {
353 matches!(
354 node.ty,
355 FlowNodeType::SourceTable { .. }
356 | FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
357 | FlowNodeType::SourceRingBuffer { .. }
358 | FlowNodeType::SourceSeries { .. }
359 )
360 } else {
361 false
362 }
363 })
364}
365
366pub(crate) trait CompileOperator {
368 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
370}