Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6	error::diagnostic::flow::{
7		flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8	},
9	interface::catalog::{
10		flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11		id::SubscriptionId,
12		view::View,
13	},
14	internal,
15};
16use reifydb_rql::{
17	flow::{
18		flow::{FlowBuilder, FlowDag},
19		node::{self, FlowNodeType},
20	},
21	query::QueryPlan,
22};
23use reifydb_type::{Result, error::Error, value::blob::Blob};
24
25pub mod operator;
26pub mod primitive;
27
28use postcard::to_stdvec;
29use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
30
31use crate::flow::compiler::{
32	operator::{
33		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
34		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
35		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
36	},
37	primitive::{
38		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
39		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
40	},
41};
42
43pub fn compile_flow(
44	catalog: &Catalog,
45	txn: &mut AdminTransaction,
46	plan: QueryPlan,
47	sink: Option<&View>,
48	flow_id: FlowId,
49) -> Result<FlowDag> {
50	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
51	compiler.compile(&mut Transaction::Admin(txn), plan, sink)
52}
53
54pub fn compile_subscription_flow_ephemeral(
55	catalog: &Catalog,
56	txn: &mut Transaction<'_>,
57	plan: QueryPlan,
58	subscription_id: SubscriptionId,
59	flow_id: FlowId,
60) -> Result<FlowDag> {
61	let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
62	compiler.compile_with_subscription_id(txn, plan, subscription_id)
63}
64
65pub(crate) struct FlowCompiler {
66	pub(crate) catalog: Catalog,
67
68	builder: FlowBuilder,
69
70	pub(crate) sink: Option<View>,
71
72	ephemeral: bool,
73
74	local_node_counter: u64,
75
76	local_edge_counter: u64,
77
78	local_id_limit: u64,
79}
80
81impl FlowCompiler {
82	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
83		Self {
84			catalog,
85			builder: FlowDag::builder(flow_id),
86			sink: None,
87			ephemeral: false,
88			local_node_counter: 0,
89			local_edge_counter: 0,
90			local_id_limit: 0,
91		}
92	}
93
94	pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
95		let base = flow_id.0 * 100;
96		Self {
97			catalog,
98			builder: FlowDag::builder(flow_id),
99			sink: None,
100			ephemeral: true,
101			local_node_counter: base,
102			local_edge_counter: base,
103			local_id_limit: base + 99,
104		}
105	}
106
107	fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
108		if self.ephemeral {
109			if self.local_node_counter >= self.local_id_limit {
110				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
111			}
112			self.local_node_counter += 1;
113			Ok(FlowNodeId(self.local_node_counter))
114		} else {
115			self.catalog.next_flow_node_id(txn.admin_mut())
116		}
117	}
118
119	fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
120		if self.ephemeral {
121			if self.local_edge_counter >= self.local_id_limit {
122				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
123			}
124			self.local_edge_counter += 1;
125			Ok(FlowEdgeId(self.local_edge_counter))
126		} else {
127			self.catalog.next_flow_edge_id(txn.admin_mut())
128		}
129	}
130
131	pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
132		let edge_id = self.next_edge_id(txn)?;
133		let flow_id = self.builder.id();
134
135		if !self.ephemeral {
136			let edge_def = FlowEdge {
137				id: edge_id,
138				flow: flow_id,
139				source: *from,
140				target: *to,
141			};
142
143			self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
144		}
145
146		self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
147		Ok(())
148	}
149
150	pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
151		let node_id = self.next_node_id(txn)?;
152		let flow_id = self.builder.id();
153
154		if !self.ephemeral {
155			let data = to_stdvec(&node_type)
156				.map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
157
158			let node_def = FlowNode {
159				id: node_id,
160				flow: flow_id,
161				node_type: node_type.discriminator(),
162				data: Blob::from(data),
163			};
164
165			self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
166		}
167
168		self.builder.add_node(node::FlowNode::new(node_id, node_type));
169		Ok(node_id)
170	}
171
172	pub(crate) fn compile(
173		mut self,
174		txn: &mut Transaction<'_>,
175		plan: QueryPlan,
176		sink: Option<&View>,
177	) -> Result<FlowDag> {
178		self.sink = sink.cloned();
179		let root_node_id = self.compile_plan(txn, plan)?;
180
181		if let Some(sink_view) = sink {
182			let node_type = match sink_view {
183				View::Table(t) => FlowNodeType::SinkTableView {
184					view: sink_view.id(),
185					table: t.underlying,
186				},
187				View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
188					view: sink_view.id(),
189					ringbuffer: rb.underlying,
190					capacity: rb.capacity,
191					propagate_evictions: rb.propagate_evictions,
192				},
193				View::Series(s) => FlowNodeType::SinkSeriesView {
194					view: sink_view.id(),
195					series: s.underlying,
196					key: s.key.clone(),
197				},
198			};
199			let result_node = self.add_node(txn, node_type)?;
200			self.add_edge(txn, &root_node_id, &result_node)?;
201		}
202
203		let flow = self.builder.build();
204
205		if !has_real_source(&flow) {
206			return Err(Error(Box::new(flow_source_required())));
207		}
208
209		Ok(flow)
210	}
211
212	pub(crate) fn compile_with_subscription_id(
213		mut self,
214		txn: &mut Transaction<'_>,
215		plan: QueryPlan,
216		subscription_id: SubscriptionId,
217	) -> Result<FlowDag> {
218		let root_node_id = self.compile_plan(txn, plan)?;
219
220		let result_node = self.add_node(
221			txn,
222			FlowNodeType::SinkSubscription {
223				subscription: subscription_id,
224			},
225		)?;
226
227		self.add_edge(txn, &root_node_id, &result_node)?;
228
229		let flow = self.builder.build();
230
231		if !has_real_source(&flow) {
232			return Err(Error(Box::new(flow_source_required())));
233		}
234
235		Ok(flow)
236	}
237
238	pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
239		match plan {
240			QueryPlan::IndexScan(_index_scan) => {
241				// TODO: Implement IndexScanCompiler for flow
242				unimplemented!("IndexScan compilation not yet implemented for flow")
243			}
244			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
245			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
246			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
247			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
248			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
249			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
250			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
251			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
252			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
253			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
254			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
255			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
256			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
257			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
258			QueryPlan::JoinNatural(_) => {
259				unimplemented!()
260			}
261			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
262			QueryPlan::Patch(_) => {
263				unimplemented!("Patch compilation not yet implemented for flow")
264			}
265			QueryPlan::TableVirtualScan(_scan) => {
266				// TODO: Implement VirtualScanCompiler
267				unimplemented!("VirtualScan compilation not yet implemented")
268			}
269			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
270			QueryPlan::Generator(_generator) => {
271				// TODO: Implement GeneratorCompiler for flow
272				unimplemented!("Generator compilation not yet implemented for flow")
273			}
274			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
275			QueryPlan::Variable(_) => {
276				panic!("Variable references are not supported in flow graphs");
277			}
278			QueryPlan::Scalarize(_) => {
279				panic!("Scalarize operations are not supported in flow graphs");
280			}
281			QueryPlan::Environment(_) => {
282				panic!("Environment operations are not supported in flow graphs");
283			}
284			QueryPlan::RowPointLookup(_) => {
285				// TODO: Implement optimized row point lookup for flow graphs
286				unimplemented!("RowPointLookup compilation not yet implemented for flow")
287			}
288			QueryPlan::RowListLookup(_) => {
289				// TODO: Implement optimized row list lookup for flow graphs
290				unimplemented!("RowListLookup compilation not yet implemented for flow")
291			}
292			QueryPlan::RowRangeScan(_) => {
293				// TODO: Implement optimized row range scan for flow graphs
294				unimplemented!("RowRangeScan compilation not yet implemented for flow")
295			}
296			QueryPlan::DictionaryScan(_) => {
297				// TODO: Implement DictionaryScan for flow graphs
298				unimplemented!("DictionaryScan compilation not yet implemented for flow")
299			}
300			QueryPlan::Assert(_) => {
301				unimplemented!("Assert compilation not yet implemented for flow")
302			}
303			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
304			QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
305			QueryPlan::RunTests(_) => {
306				panic!("RunTests is not supported in flow graphs");
307			}
308			QueryPlan::CallFunction(_) => {
309				panic!("CallFunction is not supported in flow graphs");
310			}
311		}
312	}
313}
314
315fn has_real_source(flow: &FlowDag) -> bool {
316	flow.get_node_ids().any(|node_id| {
317		if let Some(node) = flow.get_node(&node_id) {
318			matches!(
319				node.ty,
320				FlowNodeType::SourceTable { .. }
321					| FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
322					| FlowNodeType::SourceRingBuffer { .. }
323					| FlowNodeType::SourceSeries { .. }
324			)
325		} else {
326			false
327		}
328	})
329}
330
331pub(crate) trait CompileOperator {
332	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
333}