1use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6 error::diagnostic::flow::{
7 flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8 },
9 interface::catalog::{
10 flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11 id::SubscriptionId,
12 view::View,
13 },
14 internal,
15};
16use reifydb_rql::{
17 flow::{
18 flow::{FlowBuilder, FlowDag},
19 node::{self, FlowNodeType},
20 },
21 query::QueryPlan,
22};
23use reifydb_type::{Result, error::Error, value::blob::Blob};
24
25pub mod operator;
26pub mod primitive;
27
28use postcard::to_stdvec;
29use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
30
31use crate::flow::compiler::{
32 operator::{
33 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
34 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
35 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
36 },
37 primitive::{
38 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
39 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
40 },
41};
42
43pub fn compile_flow(
45 catalog: &Catalog,
46 txn: &mut AdminTransaction,
47 plan: QueryPlan,
48 sink: Option<&View>,
49 flow_id: FlowId,
50) -> Result<FlowDag> {
51 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
52 compiler.compile(&mut Transaction::Admin(txn), plan, sink)
53}
54
55pub fn compile_subscription_flow_ephemeral(
60 catalog: &Catalog,
61 txn: &mut Transaction<'_>,
62 plan: QueryPlan,
63 subscription_id: SubscriptionId,
64 flow_id: FlowId,
65) -> Result<FlowDag> {
66 let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
67 compiler.compile_with_subscription_id(txn, plan, subscription_id)
68}
69
70pub(crate) struct FlowCompiler {
72 pub(crate) catalog: Catalog,
74 builder: FlowBuilder,
76 pub(crate) sink: Option<View>,
78 ephemeral: bool,
80 local_node_counter: u64,
82 local_edge_counter: u64,
84 local_id_limit: u64,
86}
87
88impl FlowCompiler {
89 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
91 Self {
92 catalog,
93 builder: FlowDag::builder(flow_id),
94 sink: None,
95 ephemeral: false,
96 local_node_counter: 0,
97 local_edge_counter: 0,
98 local_id_limit: 0,
99 }
100 }
101
102 pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
108 let base = flow_id.0 * 100;
109 Self {
110 catalog,
111 builder: FlowDag::builder(flow_id),
112 sink: None,
113 ephemeral: true,
114 local_node_counter: base,
115 local_edge_counter: base,
116 local_id_limit: base + 99,
117 }
118 }
119
120 fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
122 if self.ephemeral {
123 if self.local_node_counter >= self.local_id_limit {
124 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
125 }
126 self.local_node_counter += 1;
127 Ok(FlowNodeId(self.local_node_counter))
128 } else {
129 self.catalog.next_flow_node_id(txn.admin_mut())
130 }
131 }
132
133 fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
135 if self.ephemeral {
136 if self.local_edge_counter >= self.local_id_limit {
137 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
138 }
139 self.local_edge_counter += 1;
140 Ok(FlowEdgeId(self.local_edge_counter))
141 } else {
142 self.catalog.next_flow_edge_id(txn.admin_mut())
143 }
144 }
145
146 pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
148 let edge_id = self.next_edge_id(txn)?;
149 let flow_id = self.builder.id();
150
151 if !self.ephemeral {
152 let edge_def = FlowEdge {
154 id: edge_id,
155 flow: flow_id,
156 source: *from,
157 target: *to,
158 };
159
160 self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
162 }
163
164 self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
166 Ok(())
167 }
168
169 pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
171 let node_id = self.next_node_id(txn)?;
172 let flow_id = self.builder.id();
173
174 if !self.ephemeral {
175 let data = to_stdvec(&node_type)
177 .map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
178
179 let node_def = FlowNode {
181 id: node_id,
182 flow: flow_id,
183 node_type: node_type.discriminator(),
184 data: Blob::from(data),
185 };
186
187 self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
189 }
190
191 self.builder.add_node(node::FlowNode::new(node_id, node_type));
193 Ok(node_id)
194 }
195
196 pub(crate) fn compile(
198 mut self,
199 txn: &mut Transaction<'_>,
200 plan: QueryPlan,
201 sink: Option<&View>,
202 ) -> Result<FlowDag> {
203 self.sink = sink.cloned();
205 let root_node_id = self.compile_plan(txn, plan)?;
206
207 if let Some(sink_view) = sink {
208 let node_type = match sink_view {
209 View::Table(t) => FlowNodeType::SinkTableView {
210 view: sink_view.id(),
211 table: t.underlying,
212 },
213 View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
214 view: sink_view.id(),
215 ringbuffer: rb.underlying,
216 capacity: rb.capacity,
217 propagate_evictions: rb.propagate_evictions,
218 },
219 View::Series(s) => FlowNodeType::SinkSeriesView {
220 view: sink_view.id(),
221 series: s.underlying,
222 key: s.key.clone(),
223 },
224 };
225 let result_node = self.add_node(txn, node_type)?;
226 self.add_edge(txn, &root_node_id, &result_node)?;
227 }
228
229 let flow = self.builder.build();
230
231 if !has_real_source(&flow) {
232 return Err(Error(Box::new(flow_source_required())));
233 }
234
235 Ok(flow)
236 }
237
238 pub(crate) fn compile_with_subscription_id(
239 mut self,
240 txn: &mut Transaction<'_>,
241 plan: QueryPlan,
242 subscription_id: SubscriptionId,
243 ) -> Result<FlowDag> {
244 let root_node_id = self.compile_plan(txn, plan)?;
245
246 let result_node = self.add_node(
248 txn,
249 FlowNodeType::SinkSubscription {
250 subscription: subscription_id,
251 },
252 )?;
253
254 self.add_edge(txn, &root_node_id, &result_node)?;
255
256 let flow = self.builder.build();
257
258 if !has_real_source(&flow) {
259 return Err(Error(Box::new(flow_source_required())));
260 }
261
262 Ok(flow)
263 }
264
265 pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
267 match plan {
268 QueryPlan::IndexScan(_index_scan) => {
269 unimplemented!("IndexScan compilation not yet implemented for flow")
271 }
272 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
273 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
274 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
275 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
276 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
277 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
278 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
279 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
280 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
281 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
282 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
283 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
284 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
285 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
286 QueryPlan::JoinNatural(_) => {
287 unimplemented!()
288 }
289 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
290 QueryPlan::Patch(_) => {
291 unimplemented!("Patch compilation not yet implemented for flow")
292 }
293 QueryPlan::TableVirtualScan(_scan) => {
294 unimplemented!("VirtualScan compilation not yet implemented")
296 }
297 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
298 QueryPlan::Generator(_generator) => {
299 unimplemented!("Generator compilation not yet implemented for flow")
301 }
302 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
303 QueryPlan::Variable(_) => {
304 panic!("Variable references are not supported in flow graphs");
305 }
306 QueryPlan::Scalarize(_) => {
307 panic!("Scalarize operations are not supported in flow graphs");
308 }
309 QueryPlan::Environment(_) => {
310 panic!("Environment operations are not supported in flow graphs");
311 }
312 QueryPlan::RowPointLookup(_) => {
313 unimplemented!("RowPointLookup compilation not yet implemented for flow")
315 }
316 QueryPlan::RowListLookup(_) => {
317 unimplemented!("RowListLookup compilation not yet implemented for flow")
319 }
320 QueryPlan::RowRangeScan(_) => {
321 unimplemented!("RowRangeScan compilation not yet implemented for flow")
323 }
324 QueryPlan::DictionaryScan(_) => {
325 unimplemented!("DictionaryScan compilation not yet implemented for flow")
327 }
328 QueryPlan::Assert(_) => {
329 unimplemented!("Assert compilation not yet implemented for flow")
330 }
331 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
332 QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
333 QueryPlan::RunTests(_) => {
334 panic!("RunTests is not supported in flow graphs");
335 }
336 QueryPlan::CallFunction(_) => {
337 panic!("CallFunction is not supported in flow graphs");
338 }
339 }
340 }
341}
342
343fn has_real_source(flow: &FlowDag) -> bool {
346 flow.get_node_ids().any(|node_id| {
347 if let Some(node) = flow.get_node(&node_id) {
348 matches!(
349 node.ty,
350 FlowNodeType::SourceTable { .. }
351 | FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
352 | FlowNodeType::SourceRingBuffer { .. }
353 | FlowNodeType::SourceSeries { .. }
354 )
355 } else {
356 false
357 }
358 })
359}
360
361pub(crate) trait CompileOperator {
363 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
365}