Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses AdminTransaction directly instead of being generic
7//! over MultiVersionAdminTransaction to avoid lifetime issues with async recursion.
8
9use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11	error::diagnostic::flow::{flow_remote_source_unsupported, flow_source_required},
12	interface::catalog::{
13		flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
14		subscription::Subscription,
15		view::View,
16	},
17	internal,
18};
19use reifydb_rql::{
20	flow::{
21		flow::{FlowBuilder, FlowDag},
22		node::{self, FlowNodeType},
23	},
24	query::QueryPlan,
25};
26use reifydb_type::{Result, error::Error, value::blob::Blob};
27
28pub mod operator;
29pub mod primitive;
30
31use postcard::to_stdvec;
32use reifydb_transaction::transaction::admin::AdminTransaction;
33
34use crate::flow::compiler::{
35	operator::{
36		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
37		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
38		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
39	},
40	primitive::{
41		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
42		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
43	},
44};
45
46/// Public API for compiling logical plans to Flows with an existing flow ID.
47pub fn compile_flow(
48	catalog: &Catalog,
49	txn: &mut AdminTransaction,
50	plan: QueryPlan,
51	sink: Option<&View>,
52	flow_id: FlowId,
53) -> Result<FlowDag> {
54	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
55	compiler.compile(txn, plan, sink)
56}
57
58pub fn compile_subscription_flow(
59	catalog: &Catalog,
60	txn: &mut AdminTransaction,
61	plan: QueryPlan,
62	subscription: &Subscription,
63	flow_id: FlowId,
64) -> Result<FlowDag> {
65	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
66	compiler.compile_with_subscription(txn, plan, subscription)
67}
68
69/// Compiler for converting RQL plans into executable Flows
70pub(crate) struct FlowCompiler {
71	/// The catalog for persisting flow nodes and edges
72	pub(crate) catalog: Catalog,
73	/// The flow builder being used for construction
74	builder: FlowBuilder,
75	/// The sink view schema (for terminal nodes)
76	pub(crate) sink: Option<View>,
77}
78
79impl FlowCompiler {
80	/// Creates a new FlowCompiler instance with an existing flow ID
81	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
82		Self {
83			catalog,
84			builder: FlowDag::builder(flow_id),
85			sink: None,
86		}
87	}
88
89	/// Gets the next available operator ID
90	fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
91		self.catalog.next_flow_node_id(txn).map_err(Into::into)
92	}
93
94	/// Gets the next available edge ID
95	fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
96		self.catalog.next_flow_edge_id(txn).map_err(Into::into)
97	}
98
99	/// Adds an edge between two nodes
100	pub(crate) fn add_edge(
101		&mut self,
102		txn: &mut AdminTransaction,
103		from: &FlowNodeId,
104		to: &FlowNodeId,
105	) -> Result<()> {
106		let edge_id = self.next_edge_id(txn)?;
107		let flow_id = self.builder.id();
108
109		// Create the catalog entry
110		let edge_def = FlowEdge {
111			id: edge_id,
112			flow: flow_id,
113			source: *from,
114			target: *to,
115		};
116
117		// Persist to catalog
118		self.catalog.create_flow_edge(txn, &edge_def)?;
119
120		// Add to in-memory builder
121		self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
122		Ok(())
123	}
124
125	/// Adds a operator to the flow graph
126	pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
127		let node_id = self.next_node_id(txn)?;
128		let flow_id = self.builder.id();
129
130		// Serialize the node type to blob
131		let data = to_stdvec(&node_type)
132			.map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
133
134		// Create the catalog entry
135		let node_def = FlowNode {
136			id: node_id,
137			flow: flow_id,
138			node_type: node_type.discriminator(),
139			data: Blob::from(data),
140		};
141
142		// Persist to catalog
143		self.catalog.create_flow_node(txn, &node_def)?;
144
145		// Add to in-memory builder
146		self.builder.add_node(node::FlowNode::new(node_id, node_type));
147		Ok(node_id)
148	}
149
150	/// Compiles a query plan into a FlowGraph
151	pub(crate) fn compile(
152		mut self,
153		txn: &mut AdminTransaction,
154		plan: QueryPlan,
155		sink: Option<&View>,
156	) -> Result<FlowDag> {
157		// Store sink view for terminal nodes (if provided)
158		self.sink = sink.cloned();
159		let root_node_id = self.compile_plan(txn, plan)?;
160
161		if let Some(sink_view) = sink {
162			let node_type = match sink_view {
163				View::Table(t) => FlowNodeType::SinkTableView {
164					view: sink_view.id(),
165					table: t.underlying,
166				},
167				View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
168					view: sink_view.id(),
169					ringbuffer: rb.underlying,
170					capacity: rb.capacity,
171					propagate_evictions: rb.propagate_evictions,
172				},
173				View::Series(s) => FlowNodeType::SinkSeriesView {
174					view: sink_view.id(),
175					series: s.underlying,
176					key: s.key.clone(),
177				},
178			};
179			let result_node = self.add_node(txn, node_type)?;
180			self.add_edge(txn, &root_node_id, &result_node)?;
181		}
182
183		let flow = self.builder.build();
184
185		if !has_real_source(&flow) {
186			return Err(Error(flow_source_required()));
187		}
188
189		Ok(flow)
190	}
191
192	/// Compiles a query plan into a FlowGraph with a subscription sink
193	pub(crate) fn compile_with_subscription(
194		mut self,
195		txn: &mut AdminTransaction,
196		plan: QueryPlan,
197		subscription: &Subscription,
198	) -> Result<FlowDag> {
199		let root_node_id = self.compile_plan(txn, plan)?;
200
201		// Add SinkSubscription node
202		let result_node = self.add_node(
203			txn,
204			FlowNodeType::SinkSubscription {
205				subscription: subscription.id,
206			},
207		)?;
208
209		self.add_edge(txn, &root_node_id, &result_node)?;
210
211		let flow = self.builder.build();
212
213		if !has_real_source(&flow) {
214			return Err(Error(flow_source_required()));
215		}
216
217		Ok(flow)
218	}
219
220	/// Compiles a query plan operator into the FlowGraph
221	pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
222		match plan {
223			QueryPlan::IndexScan(_index_scan) => {
224				// TODO: Implement IndexScanCompiler for flow
225				unimplemented!("IndexScan compilation not yet implemented for flow")
226			}
227			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
228			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
229			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
230			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
231			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
232			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
233			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
234			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
235			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
236			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
237			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
238			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
239			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
240			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
241			QueryPlan::JoinNatural(_) => {
242				unimplemented!()
243			}
244			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
245			QueryPlan::Patch(_) => {
246				unimplemented!("Patch compilation not yet implemented for flow")
247			}
248			QueryPlan::TableVirtualScan(_scan) => {
249				// TODO: Implement VirtualScanCompiler
250				unimplemented!("VirtualScan compilation not yet implemented")
251			}
252			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
253			QueryPlan::Generator(_generator) => {
254				// TODO: Implement GeneratorCompiler for flow
255				unimplemented!("Generator compilation not yet implemented for flow")
256			}
257			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
258			QueryPlan::Variable(_) => {
259				panic!("Variable references are not supported in flow graphs");
260			}
261			QueryPlan::Scalarize(_) => {
262				panic!("Scalarize operations are not supported in flow graphs");
263			}
264			QueryPlan::Environment(_) => {
265				panic!("Environment operations are not supported in flow graphs");
266			}
267			QueryPlan::RowPointLookup(_) => {
268				// TODO: Implement optimized row point lookup for flow graphs
269				unimplemented!("RowPointLookup compilation not yet implemented for flow")
270			}
271			QueryPlan::RowListLookup(_) => {
272				// TODO: Implement optimized row list lookup for flow graphs
273				unimplemented!("RowListLookup compilation not yet implemented for flow")
274			}
275			QueryPlan::RowRangeScan(_) => {
276				// TODO: Implement optimized row range scan for flow graphs
277				unimplemented!("RowRangeScan compilation not yet implemented for flow")
278			}
279			QueryPlan::DictionaryScan(_) => {
280				// TODO: Implement DictionaryScan for flow graphs
281				unimplemented!("DictionaryScan compilation not yet implemented for flow")
282			}
283			QueryPlan::Assert(_) => {
284				unimplemented!("Assert compilation not yet implemented for flow")
285			}
286			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
287			QueryPlan::RemoteScan(_) => {
288				return Err(Error(flow_remote_source_unsupported()));
289			}
290			QueryPlan::RunTests(_) => {
291				panic!("RunTests is not supported in flow graphs");
292			}
293			QueryPlan::CallFunction(_) => {
294				panic!("CallFunction is not supported in flow graphs");
295			}
296		}
297	}
298}
299
300/// Returns true if the flow contains at least one real source node
301/// (i.e., not just inline data).
302fn has_real_source(flow: &FlowDag) -> bool {
303	flow.get_node_ids().any(|node_id| {
304		if let Some(node) = flow.get_node(&node_id) {
305			matches!(
306				node.ty,
307				FlowNodeType::SourceTable { .. }
308					| FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
309					| FlowNodeType::SourceRingBuffer { .. }
310					| FlowNodeType::SourceSeries { .. }
311			)
312		} else {
313			false
314		}
315	})
316}
317
318/// Trait for compiling operator from physical plans to flow nodes
319pub(crate) trait CompileOperator {
320	/// Compiles this operator into a flow operator
321	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
322}