Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_catalog::catalog::Catalog;
5use reifydb_core::{
6	error::diagnostic::flow::{
7		flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8	},
9	interface::catalog::{
10		flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11		id::SubscriptionId,
12		view::View,
13	},
14	internal,
15};
16use reifydb_rql::{
17	flow::{
18		flow::{FlowBuilder, FlowDag},
19		node::{self, FlowNodeType},
20	},
21	query::QueryPlan,
22};
23use reifydb_type::{Result, error::Error, value::blob::Blob};
24
25pub mod operator;
26pub mod primitive;
27
28use postcard::to_stdvec;
29use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
30
31use crate::flow::compiler::{
32	operator::{
33		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
34		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
35		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
36	},
37	primitive::{
38		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
39		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
40	},
41};
42
43/// Public API for compiling logical plans to Flows with an existing flow ID.
44pub fn compile_flow(
45	catalog: &Catalog,
46	txn: &mut AdminTransaction,
47	plan: QueryPlan,
48	sink: Option<&View>,
49	flow_id: FlowId,
50) -> Result<FlowDag> {
51	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
52	compiler.compile(&mut Transaction::Admin(txn), plan, sink)
53}
54
55/// Compile a subscription flow without persisting to the catalog.
56///
57/// Uses local ID counters and skips catalog writes. The resulting FlowDag
58/// is used for ephemeral in-memory subscription flow registration.
59pub fn compile_subscription_flow_ephemeral(
60	catalog: &Catalog,
61	txn: &mut Transaction<'_>,
62	plan: QueryPlan,
63	subscription_id: SubscriptionId,
64	flow_id: FlowId,
65) -> Result<FlowDag> {
66	let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
67	compiler.compile_with_subscription_id(txn, plan, subscription_id)
68}
69
70/// Compiler for converting RQL plans into executable Flows
71pub(crate) struct FlowCompiler {
72	/// The catalog for persisting flow nodes and edges
73	pub(crate) catalog: Catalog,
74	/// The flow builder being used for construction
75	builder: FlowBuilder,
76	/// The sink view shape (for terminal nodes)
77	pub(crate) sink: Option<View>,
78	/// When true, skip catalog persistence and use local ID counters.
79	ephemeral: bool,
80	/// Local node ID counter for ephemeral mode.
81	local_node_counter: u64,
82	/// Local edge ID counter for ephemeral mode.
83	local_edge_counter: u64,
84	/// Maximum ID value before overflow in ephemeral mode.
85	local_id_limit: u64,
86}
87
88impl FlowCompiler {
89	/// Creates a new FlowCompiler instance with an existing flow ID
90	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
91		Self {
92			catalog,
93			builder: FlowDag::builder(flow_id),
94			sink: None,
95			ephemeral: false,
96			local_node_counter: 0,
97			local_edge_counter: 0,
98			local_id_limit: 0,
99		}
100	}
101
102	/// Creates a new ephemeral FlowCompiler that builds in-memory only.
103	///
104	/// Does not persist nodes/edges to the catalog and uses local ID counters.
105	/// Node/edge IDs are offset by `flow_id * 100` to avoid collisions when
106	/// multiple ephemeral flows share the same FlowEngine. Each flow is limited to 99 IDs.
107	pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
108		let base = flow_id.0 * 100;
109		Self {
110			catalog,
111			builder: FlowDag::builder(flow_id),
112			sink: None,
113			ephemeral: true,
114			local_node_counter: base,
115			local_edge_counter: base,
116			local_id_limit: base + 99,
117		}
118	}
119
120	/// Gets the next available operator ID
121	fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
122		if self.ephemeral {
123			if self.local_node_counter >= self.local_id_limit {
124				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
125			}
126			self.local_node_counter += 1;
127			Ok(FlowNodeId(self.local_node_counter))
128		} else {
129			self.catalog.next_flow_node_id(txn.admin_mut())
130		}
131	}
132
133	/// Gets the next available edge ID
134	fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
135		if self.ephemeral {
136			if self.local_edge_counter >= self.local_id_limit {
137				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
138			}
139			self.local_edge_counter += 1;
140			Ok(FlowEdgeId(self.local_edge_counter))
141		} else {
142			self.catalog.next_flow_edge_id(txn.admin_mut())
143		}
144	}
145
146	/// Adds an edge between two nodes
147	pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
148		let edge_id = self.next_edge_id(txn)?;
149		let flow_id = self.builder.id();
150
151		if !self.ephemeral {
152			// Create the catalog entry
153			let edge_def = FlowEdge {
154				id: edge_id,
155				flow: flow_id,
156				source: *from,
157				target: *to,
158			};
159
160			// Persist to catalog
161			self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
162		}
163
164		// Add to in-memory builder
165		self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
166		Ok(())
167	}
168
169	/// Adds a operator to the flow graph
170	pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
171		let node_id = self.next_node_id(txn)?;
172		let flow_id = self.builder.id();
173
174		if !self.ephemeral {
175			// Serialize the node type to blob
176			let data = to_stdvec(&node_type)
177				.map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
178
179			// Create the catalog entry
180			let node_def = FlowNode {
181				id: node_id,
182				flow: flow_id,
183				node_type: node_type.discriminator(),
184				data: Blob::from(data),
185			};
186
187			// Persist to catalog
188			self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
189		}
190
191		// Add to in-memory builder
192		self.builder.add_node(node::FlowNode::new(node_id, node_type));
193		Ok(node_id)
194	}
195
196	/// Compiles a query plan into a FlowGraph
197	pub(crate) fn compile(
198		mut self,
199		txn: &mut Transaction<'_>,
200		plan: QueryPlan,
201		sink: Option<&View>,
202	) -> Result<FlowDag> {
203		// Store sink view for terminal nodes (if provided)
204		self.sink = sink.cloned();
205		let root_node_id = self.compile_plan(txn, plan)?;
206
207		if let Some(sink_view) = sink {
208			let node_type = match sink_view {
209				View::Table(t) => FlowNodeType::SinkTableView {
210					view: sink_view.id(),
211					table: t.underlying,
212				},
213				View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
214					view: sink_view.id(),
215					ringbuffer: rb.underlying,
216					capacity: rb.capacity,
217					propagate_evictions: rb.propagate_evictions,
218				},
219				View::Series(s) => FlowNodeType::SinkSeriesView {
220					view: sink_view.id(),
221					series: s.underlying,
222					key: s.key.clone(),
223				},
224			};
225			let result_node = self.add_node(txn, node_type)?;
226			self.add_edge(txn, &root_node_id, &result_node)?;
227		}
228
229		let flow = self.builder.build();
230
231		if !has_real_source(&flow) {
232			return Err(Error(Box::new(flow_source_required())));
233		}
234
235		Ok(flow)
236	}
237
238	pub(crate) fn compile_with_subscription_id(
239		mut self,
240		txn: &mut Transaction<'_>,
241		plan: QueryPlan,
242		subscription_id: SubscriptionId,
243	) -> Result<FlowDag> {
244		let root_node_id = self.compile_plan(txn, plan)?;
245
246		// Add SinkSubscription node
247		let result_node = self.add_node(
248			txn,
249			FlowNodeType::SinkSubscription {
250				subscription: subscription_id,
251			},
252		)?;
253
254		self.add_edge(txn, &root_node_id, &result_node)?;
255
256		let flow = self.builder.build();
257
258		if !has_real_source(&flow) {
259			return Err(Error(Box::new(flow_source_required())));
260		}
261
262		Ok(flow)
263	}
264
265	/// Compiles a query plan operator into the FlowGraph
266	pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
267		match plan {
268			QueryPlan::IndexScan(_index_scan) => {
269				// TODO: Implement IndexScanCompiler for flow
270				unimplemented!("IndexScan compilation not yet implemented for flow")
271			}
272			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
273			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
274			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
275			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
276			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
277			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
278			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
279			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
280			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
281			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
282			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
283			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
284			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
285			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
286			QueryPlan::JoinNatural(_) => {
287				unimplemented!()
288			}
289			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
290			QueryPlan::Patch(_) => {
291				unimplemented!("Patch compilation not yet implemented for flow")
292			}
293			QueryPlan::TableVirtualScan(_scan) => {
294				// TODO: Implement VirtualScanCompiler
295				unimplemented!("VirtualScan compilation not yet implemented")
296			}
297			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
298			QueryPlan::Generator(_generator) => {
299				// TODO: Implement GeneratorCompiler for flow
300				unimplemented!("Generator compilation not yet implemented for flow")
301			}
302			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
303			QueryPlan::Variable(_) => {
304				panic!("Variable references are not supported in flow graphs");
305			}
306			QueryPlan::Scalarize(_) => {
307				panic!("Scalarize operations are not supported in flow graphs");
308			}
309			QueryPlan::Environment(_) => {
310				panic!("Environment operations are not supported in flow graphs");
311			}
312			QueryPlan::RowPointLookup(_) => {
313				// TODO: Implement optimized row point lookup for flow graphs
314				unimplemented!("RowPointLookup compilation not yet implemented for flow")
315			}
316			QueryPlan::RowListLookup(_) => {
317				// TODO: Implement optimized row list lookup for flow graphs
318				unimplemented!("RowListLookup compilation not yet implemented for flow")
319			}
320			QueryPlan::RowRangeScan(_) => {
321				// TODO: Implement optimized row range scan for flow graphs
322				unimplemented!("RowRangeScan compilation not yet implemented for flow")
323			}
324			QueryPlan::DictionaryScan(_) => {
325				// TODO: Implement DictionaryScan for flow graphs
326				unimplemented!("DictionaryScan compilation not yet implemented for flow")
327			}
328			QueryPlan::Assert(_) => {
329				unimplemented!("Assert compilation not yet implemented for flow")
330			}
331			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
332			QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
333			QueryPlan::RunTests(_) => {
334				panic!("RunTests is not supported in flow graphs");
335			}
336			QueryPlan::CallFunction(_) => {
337				panic!("CallFunction is not supported in flow graphs");
338			}
339		}
340	}
341}
342
343/// Returns true if the flow contains at least one real source node
344/// (i.e., not just inline data).
345fn has_real_source(flow: &FlowDag) -> bool {
346	flow.get_node_ids().any(|node_id| {
347		if let Some(node) = flow.get_node(&node_id) {
348			matches!(
349				node.ty,
350				FlowNodeType::SourceTable { .. }
351					| FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
352					| FlowNodeType::SourceRingBuffer { .. }
353					| FlowNodeType::SourceSeries { .. }
354			)
355		} else {
356			false
357		}
358	})
359}
360
361/// Trait for compiling operator from physical plans to flow nodes
362pub(crate) trait CompileOperator {
363	/// Compiles this operator into a flow operator
364	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
365}