1use reifydb_catalog::{catalog::Catalog, store::operator_settings::create::create_operator_settings};
5use reifydb_core::{
6 error::diagnostic::flow::{
7 flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8 },
9 interface::catalog::{
10 flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11 id::SubscriptionId,
12 view::View,
13 },
14 internal,
15 row::{JoinTtl, OperatorSettings, Ttl},
16};
17use reifydb_rql::{
18 flow::{
19 flow::{FlowBuilder, FlowDag},
20 node::{self, FlowNodeType},
21 },
22 query::QueryPlan,
23};
24use reifydb_value::{Result, error::Error, value::blob::Blob};
25
26pub mod operator;
27pub mod primitive;
28
29use postcard::to_stdvec;
30use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
31
32use crate::flow::compiler::{
33 operator::{
34 aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
35 extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
36 map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
37 },
38 primitive::{
39 inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
40 series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
41 },
42};
43
44pub fn compile_flow(
45 catalog: &Catalog,
46 txn: &mut AdminTransaction,
47 plan: QueryPlan,
48 sink: Option<&View>,
49 flow_id: FlowId,
50) -> Result<FlowDag> {
51 let compiler = FlowCompiler::new(catalog.clone(), flow_id);
52 compiler.compile(&mut Transaction::Admin(txn), plan, sink)
53}
54
55pub fn compile_subscription_flow_ephemeral(
56 catalog: &Catalog,
57 txn: &mut Transaction<'_>,
58 plan: QueryPlan,
59 subscription_id: SubscriptionId,
60 flow_id: FlowId,
61) -> Result<FlowDag> {
62 let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
63 compiler.compile_with_subscription_id(txn, plan, subscription_id)
64}
65
66pub(crate) struct FlowCompiler {
67 pub(crate) catalog: Catalog,
68
69 builder: FlowBuilder,
70
71 pub(crate) sink: Option<View>,
72
73 ephemeral: bool,
74
75 local_node_counter: u64,
76
77 local_edge_counter: u64,
78
79 local_id_limit: u64,
80}
81
82impl FlowCompiler {
83 pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
84 Self {
85 catalog,
86 builder: FlowDag::builder(flow_id),
87 sink: None,
88 ephemeral: false,
89 local_node_counter: 0,
90 local_edge_counter: 0,
91 local_id_limit: 0,
92 }
93 }
94
95 pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
96 let base = flow_id.0 * 100;
97 Self {
98 catalog,
99 builder: FlowDag::builder(flow_id),
100 sink: None,
101 ephemeral: true,
102 local_node_counter: base,
103 local_edge_counter: base,
104 local_id_limit: base + 99,
105 }
106 }
107
108 fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
109 if self.ephemeral {
110 if self.local_node_counter >= self.local_id_limit {
111 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
112 }
113 self.local_node_counter += 1;
114 Ok(FlowNodeId(self.local_node_counter))
115 } else {
116 self.catalog.next_flow_node_id(txn.admin_mut())
117 }
118 }
119
120 fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
121 if self.ephemeral {
122 if self.local_edge_counter >= self.local_id_limit {
123 return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
124 }
125 self.local_edge_counter += 1;
126 Ok(FlowEdgeId(self.local_edge_counter))
127 } else {
128 self.catalog.next_flow_edge_id(txn.admin_mut())
129 }
130 }
131
132 pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
133 let edge_id = self.next_edge_id(txn)?;
134 let flow_id = self.builder.id();
135
136 if !self.ephemeral {
137 let edge_def = FlowEdge {
138 id: edge_id,
139 flow: flow_id,
140 source: *from,
141 target: *to,
142 };
143
144 self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
145 }
146
147 self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
148 Ok(())
149 }
150
151 pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
152 let node_id = self.next_node_id(txn)?;
153 let flow_id = self.builder.id();
154
155 if !self.ephemeral {
156 let data = to_stdvec(&node_type)
157 .map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
158
159 let node_def = FlowNode {
160 id: node_id,
161 flow: flow_id,
162 node_type: node_type.discriminator(),
163 data: Blob::from(data),
164 };
165
166 self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
167 }
168
169 self.builder.add_node(node::FlowNode::new(node_id, node_type));
170 Ok(node_id)
171 }
172
173 pub(crate) fn write_operator_settings(
174 &self,
175 txn: &mut Transaction<'_>,
176 node_id: FlowNodeId,
177 ttl: Option<Ttl>,
178 ) -> Result<()> {
179 if self.ephemeral {
180 return Ok(());
181 }
182 if let Some(ttl) = ttl {
183 create_operator_settings(
184 txn.admin_mut(),
185 node_id,
186 &OperatorSettings {
187 ttl: Some(ttl),
188 join: None,
189 },
190 )?;
191 }
192 Ok(())
193 }
194
195 pub(crate) fn write_operator_settings_join(
196 &self,
197 txn: &mut Transaction<'_>,
198 node_id: FlowNodeId,
199 join: Option<JoinTtl>,
200 ) -> Result<()> {
201 if self.ephemeral {
202 return Ok(());
203 }
204 let Some(join) = join else {
205 return Ok(());
206 };
207 if join.left.is_none() && join.right.is_none() {
208 return Ok(());
209 }
210 create_operator_settings(
211 txn.admin_mut(),
212 node_id,
213 &OperatorSettings {
214 ttl: None,
215 join: Some(join),
216 },
217 )?;
218 Ok(())
219 }
220
221 pub(crate) fn compile(
222 mut self,
223 txn: &mut Transaction<'_>,
224 plan: QueryPlan,
225 sink: Option<&View>,
226 ) -> Result<FlowDag> {
227 self.sink = sink.cloned();
228 let root_node_id = self.compile_plan(txn, plan)?;
229
230 if let Some(sink_view) = sink {
231 let node_type = match sink_view {
232 View::Table(t) => FlowNodeType::SinkTableView {
233 view: sink_view.id(),
234 table: t.underlying,
235 },
236 View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
237 view: sink_view.id(),
238 ringbuffer: rb.underlying,
239 capacity: rb.capacity,
240 propagate_evictions: rb.propagate_evictions,
241 },
242 View::Series(s) => FlowNodeType::SinkSeriesView {
243 view: sink_view.id(),
244 series: s.underlying,
245 key: s.key.clone(),
246 },
247 };
248 let result_node = self.add_node(txn, node_type)?;
249 self.add_edge(txn, &root_node_id, &result_node)?;
250 }
251
252 let flow = self.builder.build();
253
254 if !has_real_source(&flow) {
255 return Err(Error(Box::new(flow_source_required())));
256 }
257
258 Ok(flow)
259 }
260
261 pub(crate) fn compile_with_subscription_id(
262 mut self,
263 txn: &mut Transaction<'_>,
264 plan: QueryPlan,
265 subscription_id: SubscriptionId,
266 ) -> Result<FlowDag> {
267 let root_node_id = self.compile_plan(txn, plan)?;
268
269 let result_node = self.add_node(
270 txn,
271 FlowNodeType::SinkSubscription {
272 subscription: subscription_id,
273 },
274 )?;
275
276 self.add_edge(txn, &root_node_id, &result_node)?;
277
278 let flow = self.builder.build();
279
280 if !has_real_source(&flow) {
281 return Err(Error(Box::new(flow_source_required())));
282 }
283
284 Ok(flow)
285 }
286
287 pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
288 match plan {
289 QueryPlan::IndexScan(_index_scan) => {
290 unimplemented!("IndexScan compilation not yet implemented for flow")
292 }
293 QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
294 QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
295 QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
296 QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
297 QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
298 QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
299 QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
300 QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
301 QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
302 QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
303 QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
304 QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
305 QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
306 QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
307 QueryPlan::JoinNatural(_) => {
308 unimplemented!()
309 }
310 QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
311 QueryPlan::Patch(_) => {
312 unimplemented!("Patch compilation not yet implemented for flow")
313 }
314 QueryPlan::TableVirtualScan(_scan) => {
315 unimplemented!("VirtualScan compilation not yet implemented")
317 }
318 QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
319 QueryPlan::Generator(_generator) => {
320 unimplemented!("Generator compilation not yet implemented for flow")
322 }
323 QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
324 QueryPlan::Variable(_) => {
325 panic!("Variable references are not supported in flow graphs");
326 }
327 QueryPlan::Scalarize(_) => {
328 panic!("Scalarize operations are not supported in flow graphs");
329 }
330 QueryPlan::Environment(_) => {
331 panic!("Environment operations are not supported in flow graphs");
332 }
333 QueryPlan::RowPointLookup(_) => {
334 unimplemented!("RowPointLookup compilation not yet implemented for flow")
336 }
337 QueryPlan::RowListLookup(_) => {
338 unimplemented!("RowListLookup compilation not yet implemented for flow")
340 }
341 QueryPlan::RowRangeScan(_) => {
342 unimplemented!("RowRangeScan compilation not yet implemented for flow")
344 }
345 QueryPlan::DictionaryScan(_) => {
346 unimplemented!("DictionaryScan compilation not yet implemented for flow")
348 }
349 QueryPlan::Assert(_) => {
350 unimplemented!("Assert compilation not yet implemented for flow")
351 }
352 QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
353 QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
354 QueryPlan::RunTests(_) => {
355 panic!("RunTests is not supported in flow graphs");
356 }
357 QueryPlan::CallFunction(_) => {
358 panic!("CallFunction is not supported in flow graphs");
359 }
360 }
361 }
362}
363
364fn has_real_source(flow: &FlowDag) -> bool {
365 flow.get_node_ids().any(|node_id| {
366 if let Some(node) = flow.get_node(&node_id) {
367 matches!(
368 node.ty,
369 FlowNodeType::SourceTable { .. }
370 | FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
371 | FlowNodeType::SourceRingBuffer { .. }
372 | FlowNodeType::SourceSeries { .. }
373 )
374 } else {
375 false
376 }
377 })
378}
379
380pub(crate) trait CompileOperator {
381 fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
382}