1use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6 error::diagnostic::flow::{
7 flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8 },
9 interface::catalog::{
10 flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11 id::SubscriptionId,
12 view::View,
13 },
14 internal,
15};
16use reifydb_rql::{
17 flow::{
18 flow::{FlowBuilder, FlowDag},
19 node::{self, FlowNodeType},
20 },
21 query::QueryPlan,
22};
23use reifydb_type::{Result, error::Error, value::blob::Blob};
24
25pub mod operator;
26pub mod primitive;
27
28use postcard::to_stdvec;
29use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
30
31use crate::flow::compiler::{
32 operator::{
33 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
34 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
35 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
36 },
37 primitive::{
38 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
39 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
40 },
41};
42
43pub fn compile_flow(
44 catalog: &Catalog,
45 txn: &mut AdminTransaction,
46 plan: QueryPlan,
47 sink: Option<&View>,
48 flow_id: FlowId,
49) -> Result<FlowDag> {
50 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
51 compiler.compile(&mut Transaction::Admin(txn), plan, sink)
52}
53
54pub fn compile_subscription_flow_ephemeral(
55 catalog: &Catalog,
56 txn: &mut Transaction<'_>,
57 plan: QueryPlan,
58 subscription_id: SubscriptionId,
59 flow_id: FlowId,
60) -> Result<FlowDag> {
61 let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
62 compiler.compile_with_subscription_id(txn, plan, subscription_id)
63}
64
65pub(crate) struct FlowCompiler {
66 pub(crate) catalog: Catalog,
67
68 builder: FlowBuilder,
69
70 pub(crate) sink: Option<View>,
71
72 ephemeral: bool,
73
74 local_node_counter: u64,
75
76 local_edge_counter: u64,
77
78 local_id_limit: u64,
79}
80
81impl FlowCompiler {
82 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
83 Self {
84 catalog,
85 builder: FlowDag::builder(flow_id),
86 sink: None,
87 ephemeral: false,
88 local_node_counter: 0,
89 local_edge_counter: 0,
90 local_id_limit: 0,
91 }
92 }
93
94 pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
95 let base = flow_id.0 * 100;
96 Self {
97 catalog,
98 builder: FlowDag::builder(flow_id),
99 sink: None,
100 ephemeral: true,
101 local_node_counter: base,
102 local_edge_counter: base,
103 local_id_limit: base + 99,
104 }
105 }
106
107 fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
108 if self.ephemeral {
109 if self.local_node_counter >= self.local_id_limit {
110 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
111 }
112 self.local_node_counter += 1;
113 Ok(FlowNodeId(self.local_node_counter))
114 } else {
115 self.catalog.next_flow_node_id(txn.admin_mut())
116 }
117 }
118
119 fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
120 if self.ephemeral {
121 if self.local_edge_counter >= self.local_id_limit {
122 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
123 }
124 self.local_edge_counter += 1;
125 Ok(FlowEdgeId(self.local_edge_counter))
126 } else {
127 self.catalog.next_flow_edge_id(txn.admin_mut())
128 }
129 }
130
131 pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
132 let edge_id = self.next_edge_id(txn)?;
133 let flow_id = self.builder.id();
134
135 if !self.ephemeral {
136 let edge_def = FlowEdge {
137 id: edge_id,
138 flow: flow_id,
139 source: *from,
140 target: *to,
141 };
142
143 self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
144 }
145
146 self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
147 Ok(())
148 }
149
150 pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
151 let node_id = self.next_node_id(txn)?;
152 let flow_id = self.builder.id();
153
154 if !self.ephemeral {
155 let data = to_stdvec(&node_type)
156 .map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
157
158 let node_def = FlowNode {
159 id: node_id,
160 flow: flow_id,
161 node_type: node_type.discriminator(),
162 data: Blob::from(data),
163 };
164
165 self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
166 }
167
168 self.builder.add_node(node::FlowNode::new(node_id, node_type));
169 Ok(node_id)
170 }
171
172 pub(crate) fn compile(
173 mut self,
174 txn: &mut Transaction<'_>,
175 plan: QueryPlan,
176 sink: Option<&View>,
177 ) -> Result<FlowDag> {
178 self.sink = sink.cloned();
179 let root_node_id = self.compile_plan(txn, plan)?;
180
181 if let Some(sink_view) = sink {
182 let node_type = match sink_view {
183 View::Table(t) => FlowNodeType::SinkTableView {
184 view: sink_view.id(),
185 table: t.underlying,
186 },
187 View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
188 view: sink_view.id(),
189 ringbuffer: rb.underlying,
190 capacity: rb.capacity,
191 propagate_evictions: rb.propagate_evictions,
192 },
193 View::Series(s) => FlowNodeType::SinkSeriesView {
194 view: sink_view.id(),
195 series: s.underlying,
196 key: s.key.clone(),
197 },
198 };
199 let result_node = self.add_node(txn, node_type)?;
200 self.add_edge(txn, &root_node_id, &result_node)?;
201 }
202
203 let flow = self.builder.build();
204
205 if !has_real_source(&flow) {
206 return Err(Error(Box::new(flow_source_required())));
207 }
208
209 Ok(flow)
210 }
211
212 pub(crate) fn compile_with_subscription_id(
213 mut self,
214 txn: &mut Transaction<'_>,
215 plan: QueryPlan,
216 subscription_id: SubscriptionId,
217 ) -> Result<FlowDag> {
218 let root_node_id = self.compile_plan(txn, plan)?;
219
220 let result_node = self.add_node(
221 txn,
222 FlowNodeType::SinkSubscription {
223 subscription: subscription_id,
224 },
225 )?;
226
227 self.add_edge(txn, &root_node_id, &result_node)?;
228
229 let flow = self.builder.build();
230
231 if !has_real_source(&flow) {
232 return Err(Error(Box::new(flow_source_required())));
233 }
234
235 Ok(flow)
236 }
237
238 pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
239 match plan {
240 QueryPlan::IndexScan(_index_scan) => {
241 unimplemented!("IndexScan compilation not yet implemented for flow")
243 }
244 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
245 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
246 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
247 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
248 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
249 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
250 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
251 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
252 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
253 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
254 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
255 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
256 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
257 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
258 QueryPlan::JoinNatural(_) => {
259 unimplemented!()
260 }
261 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
262 QueryPlan::Patch(_) => {
263 unimplemented!("Patch compilation not yet implemented for flow")
264 }
265 QueryPlan::TableVirtualScan(_scan) => {
266 unimplemented!("VirtualScan compilation not yet implemented")
268 }
269 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
270 QueryPlan::Generator(_generator) => {
271 unimplemented!("Generator compilation not yet implemented for flow")
273 }
274 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
275 QueryPlan::Variable(_) => {
276 panic!("Variable references are not supported in flow graphs");
277 }
278 QueryPlan::Scalarize(_) => {
279 panic!("Scalarize operations are not supported in flow graphs");
280 }
281 QueryPlan::Environment(_) => {
282 panic!("Environment operations are not supported in flow graphs");
283 }
284 QueryPlan::RowPointLookup(_) => {
285 unimplemented!("RowPointLookup compilation not yet implemented for flow")
287 }
288 QueryPlan::RowListLookup(_) => {
289 unimplemented!("RowListLookup compilation not yet implemented for flow")
291 }
292 QueryPlan::RowRangeScan(_) => {
293 unimplemented!("RowRangeScan compilation not yet implemented for flow")
295 }
296 QueryPlan::DictionaryScan(_) => {
297 unimplemented!("DictionaryScan compilation not yet implemented for flow")
299 }
300 QueryPlan::Assert(_) => {
301 unimplemented!("Assert compilation not yet implemented for flow")
302 }
303 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
304 QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
305 QueryPlan::RunTests(_) => {
306 panic!("RunTests is not supported in flow graphs");
307 }
308 QueryPlan::CallFunction(_) => {
309 panic!("CallFunction is not supported in flow graphs");
310 }
311 }
312 }
313}
314
315fn has_real_source(flow: &FlowDag) -> bool {
316 flow.get_node_ids().any(|node_id| {
317 if let Some(node) = flow.get_node(&node_id) {
318 matches!(
319 node.ty,
320 FlowNodeType::SourceTable { .. }
321 | FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
322 | FlowNodeType::SourceRingBuffer { .. }
323 | FlowNodeType::SourceSeries { .. }
324 )
325 } else {
326 false
327 }
328 })
329}
330
331pub(crate) trait CompileOperator {
332 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
333}