Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses AdminTransaction directly instead of being generic
7//! over MultiVersionAdminTransaction to avoid lifetime issues with async recursion.
8
9use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11	error::diagnostic::flow::{
12		flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
13	},
14	interface::catalog::{
15		flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
16		id::SubscriptionId,
17		view::View,
18	},
19	internal,
20};
21use reifydb_rql::{
22	flow::{
23		flow::{FlowBuilder, FlowDag},
24		node::{self, FlowNodeType},
25	},
26	query::QueryPlan,
27};
28use reifydb_type::{Result, error::Error, value::blob::Blob};
29
30pub mod operator;
31pub mod primitive;
32
33use postcard::to_stdvec;
34use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
35
36use crate::flow::compiler::{
37	operator::{
38		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
39		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
40		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
41	},
42	primitive::{
43		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
44		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
45	},
46};
47
48/// Public API for compiling logical plans to Flows with an existing flow ID.
49pub fn compile_flow(
50	catalog: &Catalog,
51	txn: &mut AdminTransaction,
52	plan: QueryPlan,
53	sink: Option<&View>,
54	flow_id: FlowId,
55) -> Result<FlowDag> {
56	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
57	compiler.compile(&mut Transaction::Admin(txn), plan, sink)
58}
59
60/// Compile a subscription flow without persisting to the catalog.
61///
62/// Uses local ID counters and skips catalog writes. The resulting FlowDag
63/// is used for ephemeral in-memory subscription flow registration.
64pub fn compile_subscription_flow_ephemeral(
65	catalog: &Catalog,
66	txn: &mut Transaction<'_>,
67	plan: QueryPlan,
68	subscription_id: SubscriptionId,
69	flow_id: FlowId,
70) -> Result<FlowDag> {
71	let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
72	compiler.compile_with_subscription_id(txn, plan, subscription_id)
73}
74
75/// Compiler for converting RQL plans into executable Flows
76pub(crate) struct FlowCompiler {
77	/// The catalog for persisting flow nodes and edges
78	pub(crate) catalog: Catalog,
79	/// The flow builder being used for construction
80	builder: FlowBuilder,
81	/// The sink view shape (for terminal nodes)
82	pub(crate) sink: Option<View>,
83	/// When true, skip catalog persistence and use local ID counters.
84	ephemeral: bool,
85	/// Local node ID counter for ephemeral mode.
86	local_node_counter: u64,
87	/// Local edge ID counter for ephemeral mode.
88	local_edge_counter: u64,
89	/// Maximum ID value before overflow in ephemeral mode.
90	local_id_limit: u64,
91}
92
93impl FlowCompiler {
94	/// Creates a new FlowCompiler instance with an existing flow ID
95	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
96		Self {
97			catalog,
98			builder: FlowDag::builder(flow_id),
99			sink: None,
100			ephemeral: false,
101			local_node_counter: 0,
102			local_edge_counter: 0,
103			local_id_limit: 0,
104		}
105	}
106
107	/// Creates a new ephemeral FlowCompiler that builds in-memory only.
108	///
109	/// Does not persist nodes/edges to the catalog and uses local ID counters.
110	/// Node/edge IDs are offset by `flow_id * 100` to avoid collisions when
111	/// multiple ephemeral flows share the same FlowEngine. Each flow is limited to 99 IDs.
112	pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
113		let base = flow_id.0 * 100;
114		Self {
115			catalog,
116			builder: FlowDag::builder(flow_id),
117			sink: None,
118			ephemeral: true,
119			local_node_counter: base,
120			local_edge_counter: base,
121			local_id_limit: base + 99,
122		}
123	}
124
125	/// Gets the next available operator ID
126	fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
127		if self.ephemeral {
128			if self.local_node_counter >= self.local_id_limit {
129				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
130			}
131			self.local_node_counter += 1;
132			Ok(FlowNodeId(self.local_node_counter))
133		} else {
134			self.catalog.next_flow_node_id(txn.admin_mut())
135		}
136	}
137
138	/// Gets the next available edge ID
139	fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
140		if self.ephemeral {
141			if self.local_edge_counter >= self.local_id_limit {
142				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
143			}
144			self.local_edge_counter += 1;
145			Ok(FlowEdgeId(self.local_edge_counter))
146		} else {
147			self.catalog.next_flow_edge_id(txn.admin_mut())
148		}
149	}
150
151	/// Adds an edge between two nodes
152	pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
153		let edge_id = self.next_edge_id(txn)?;
154		let flow_id = self.builder.id();
155
156		if !self.ephemeral {
157			// Create the catalog entry
158			let edge_def = FlowEdge {
159				id: edge_id,
160				flow: flow_id,
161				source: *from,
162				target: *to,
163			};
164
165			// Persist to catalog
166			self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
167		}
168
169		// Add to in-memory builder
170		self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
171		Ok(())
172	}
173
174	/// Adds a operator to the flow graph
175	pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
176		let node_id = self.next_node_id(txn)?;
177		let flow_id = self.builder.id();
178
179		if !self.ephemeral {
180			// Serialize the node type to blob
181			let data = to_stdvec(&node_type)
182				.map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
183
184			// Create the catalog entry
185			let node_def = FlowNode {
186				id: node_id,
187				flow: flow_id,
188				node_type: node_type.discriminator(),
189				data: Blob::from(data),
190			};
191
192			// Persist to catalog
193			self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
194		}
195
196		// Add to in-memory builder
197		self.builder.add_node(node::FlowNode::new(node_id, node_type));
198		Ok(node_id)
199	}
200
201	/// Compiles a query plan into a FlowGraph
202	pub(crate) fn compile(
203		mut self,
204		txn: &mut Transaction<'_>,
205		plan: QueryPlan,
206		sink: Option<&View>,
207	) -> Result<FlowDag> {
208		// Store sink view for terminal nodes (if provided)
209		self.sink = sink.cloned();
210		let root_node_id = self.compile_plan(txn, plan)?;
211
212		if let Some(sink_view) = sink {
213			let node_type = match sink_view {
214				View::Table(t) => FlowNodeType::SinkTableView {
215					view: sink_view.id(),
216					table: t.underlying,
217				},
218				View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
219					view: sink_view.id(),
220					ringbuffer: rb.underlying,
221					capacity: rb.capacity,
222					propagate_evictions: rb.propagate_evictions,
223				},
224				View::Series(s) => FlowNodeType::SinkSeriesView {
225					view: sink_view.id(),
226					series: s.underlying,
227					key: s.key.clone(),
228				},
229			};
230			let result_node = self.add_node(txn, node_type)?;
231			self.add_edge(txn, &root_node_id, &result_node)?;
232		}
233
234		let flow = self.builder.build();
235
236		if !has_real_source(&flow) {
237			return Err(Error(Box::new(flow_source_required())));
238		}
239
240		Ok(flow)
241	}
242
243	pub(crate) fn compile_with_subscription_id(
244		mut self,
245		txn: &mut Transaction<'_>,
246		plan: QueryPlan,
247		subscription_id: SubscriptionId,
248	) -> Result<FlowDag> {
249		let root_node_id = self.compile_plan(txn, plan)?;
250
251		// Add SinkSubscription node
252		let result_node = self.add_node(
253			txn,
254			FlowNodeType::SinkSubscription {
255				subscription: subscription_id,
256			},
257		)?;
258
259		self.add_edge(txn, &root_node_id, &result_node)?;
260
261		let flow = self.builder.build();
262
263		if !has_real_source(&flow) {
264			return Err(Error(Box::new(flow_source_required())));
265		}
266
267		Ok(flow)
268	}
269
270	/// Compiles a query plan operator into the FlowGraph
271	pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
272		match plan {
273			QueryPlan::IndexScan(_index_scan) => {
274				// TODO: Implement IndexScanCompiler for flow
275				unimplemented!("IndexScan compilation not yet implemented for flow")
276			}
277			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
278			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
279			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
280			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
281			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
282			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
283			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
284			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
285			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
286			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
287			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
288			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
289			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
290			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
291			QueryPlan::JoinNatural(_) => {
292				unimplemented!()
293			}
294			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
295			QueryPlan::Patch(_) => {
296				unimplemented!("Patch compilation not yet implemented for flow")
297			}
298			QueryPlan::TableVirtualScan(_scan) => {
299				// TODO: Implement VirtualScanCompiler
300				unimplemented!("VirtualScan compilation not yet implemented")
301			}
302			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
303			QueryPlan::Generator(_generator) => {
304				// TODO: Implement GeneratorCompiler for flow
305				unimplemented!("Generator compilation not yet implemented for flow")
306			}
307			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
308			QueryPlan::Variable(_) => {
309				panic!("Variable references are not supported in flow graphs");
310			}
311			QueryPlan::Scalarize(_) => {
312				panic!("Scalarize operations are not supported in flow graphs");
313			}
314			QueryPlan::Environment(_) => {
315				panic!("Environment operations are not supported in flow graphs");
316			}
317			QueryPlan::RowPointLookup(_) => {
318				// TODO: Implement optimized row point lookup for flow graphs
319				unimplemented!("RowPointLookup compilation not yet implemented for flow")
320			}
321			QueryPlan::RowListLookup(_) => {
322				// TODO: Implement optimized row list lookup for flow graphs
323				unimplemented!("RowListLookup compilation not yet implemented for flow")
324			}
325			QueryPlan::RowRangeScan(_) => {
326				// TODO: Implement optimized row range scan for flow graphs
327				unimplemented!("RowRangeScan compilation not yet implemented for flow")
328			}
329			QueryPlan::DictionaryScan(_) => {
330				// TODO: Implement DictionaryScan for flow graphs
331				unimplemented!("DictionaryScan compilation not yet implemented for flow")
332			}
333			QueryPlan::Assert(_) => {
334				unimplemented!("Assert compilation not yet implemented for flow")
335			}
336			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
337			QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
338			QueryPlan::RunTests(_) => {
339				panic!("RunTests is not supported in flow graphs");
340			}
341			QueryPlan::CallFunction(_) => {
342				panic!("CallFunction is not supported in flow graphs");
343			}
344		}
345	}
346}
347
348/// Returns true if the flow contains at least one real source node
349/// (i.e., not just inline data).
350fn has_real_source(flow: &FlowDag) -> bool {
351	flow.get_node_ids().any(|node_id| {
352		if let Some(node) = flow.get_node(&node_id) {
353			matches!(
354				node.ty,
355				FlowNodeType::SourceTable { .. }
356					| FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
357					| FlowNodeType::SourceRingBuffer { .. }
358					| FlowNodeType::SourceSeries { .. }
359			)
360		} else {
361			false
362		}
363	})
364}
365
366/// Trait for compiling operator from physical plans to flow nodes
367pub(crate) trait CompileOperator {
368	/// Compiles this operator into a flow operator
369	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
370}