Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses AdminTransaction directly instead of being generic
7//! over MultiVersionAdminTransaction to avoid lifetime issues with async recursion.
8
9use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11	error::diagnostic::flow::flow_remote_source_unsupported,
12	interface::catalog::{
13		flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
14		subscription::SubscriptionDef,
15		view::ViewDef,
16	},
17	internal,
18};
19use reifydb_rql::{
20	flow::{
21		flow::{FlowBuilder, FlowDag},
22		node::{FlowEdge, FlowNode, FlowNodeType},
23	},
24	query::QueryPlan,
25};
26use reifydb_type::{Result, error::Error, value::blob::Blob};
27
28pub mod operator;
29pub mod primitive;
30
31use postcard::to_stdvec;
32use reifydb_transaction::transaction::admin::AdminTransaction;
33
34use crate::flow::compiler::{
35	operator::{
36		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
37		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
38		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
39	},
40	primitive::{
41		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
42		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
43	},
44};
45
46/// Public API for compiling logical plans to Flows with an existing flow ID.
47pub fn compile_flow(
48	catalog: &Catalog,
49	txn: &mut AdminTransaction,
50	plan: QueryPlan,
51	sink: Option<&ViewDef>,
52	flow_id: FlowId,
53) -> Result<FlowDag> {
54	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
55	compiler.compile(txn, plan, sink)
56}
57
58pub fn compile_subscription_flow(
59	catalog: &Catalog,
60	txn: &mut AdminTransaction,
61	plan: QueryPlan,
62	subscription: &SubscriptionDef,
63	flow_id: FlowId,
64) -> Result<FlowDag> {
65	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
66	compiler.compile_with_subscription(txn, plan, subscription)
67}
68
69/// Compiler for converting RQL plans into executable Flows
70pub(crate) struct FlowCompiler {
71	/// The catalog for persisting flow nodes and edges
72	pub(crate) catalog: Catalog,
73	/// The flow builder being used for construction
74	builder: FlowBuilder,
75	/// The sink view schema (for terminal nodes)
76	pub(crate) sink: Option<ViewDef>,
77}
78
79impl FlowCompiler {
80	/// Creates a new FlowCompiler instance with an existing flow ID
81	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
82		Self {
83			catalog,
84			builder: FlowDag::builder(flow_id),
85			sink: None,
86		}
87	}
88
89	/// Gets the next available operator ID
90	fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
91		self.catalog.next_flow_node_id(txn).map_err(Into::into)
92	}
93
94	/// Gets the next available edge ID
95	fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
96		self.catalog.next_flow_edge_id(txn).map_err(Into::into)
97	}
98
99	/// Adds an edge between two nodes
100	pub(crate) fn add_edge(
101		&mut self,
102		txn: &mut AdminTransaction,
103		from: &FlowNodeId,
104		to: &FlowNodeId,
105	) -> Result<()> {
106		let edge_id = self.next_edge_id(txn)?;
107		let flow_id = self.builder.id();
108
109		// Create the catalog entry
110		let edge_def = FlowEdgeDef {
111			id: edge_id,
112			flow: flow_id,
113			source: *from,
114			target: *to,
115		};
116
117		// Persist to catalog
118		self.catalog.create_flow_edge(txn, &edge_def)?;
119
120		// Add to in-memory builder
121		self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
122		Ok(())
123	}
124
125	/// Adds a operator to the flow graph
126	pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
127		let node_id = self.next_node_id(txn)?;
128		let flow_id = self.builder.id();
129
130		// Serialize the node type to blob
131		let data = to_stdvec(&node_type)
132			.map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
133
134		// Create the catalog entry
135		let node_def = FlowNodeDef {
136			id: node_id,
137			flow: flow_id,
138			node_type: node_type.discriminator(),
139			data: Blob::from(data),
140		};
141
142		// Persist to catalog
143		self.catalog.create_flow_node(txn, &node_def)?;
144
145		// Add to in-memory builder
146		self.builder.add_node(FlowNode::new(node_id, node_type));
147		Ok(node_id)
148	}
149
150	/// Compiles a query plan into a FlowGraph
151	pub(crate) fn compile(
152		mut self,
153		txn: &mut AdminTransaction,
154		plan: QueryPlan,
155		sink: Option<&ViewDef>,
156	) -> Result<FlowDag> {
157		// Store sink view for terminal nodes (if provided)
158		self.sink = sink.cloned();
159		let root_node_id = self.compile_plan(txn, plan)?;
160
161		// Only add SinkView node if sink is provided
162		if let Some(sink_view) = sink {
163			let result_node = self.add_node(
164				txn,
165				FlowNodeType::SinkView {
166					view: sink_view.id,
167				},
168			)?;
169
170			self.add_edge(txn, &root_node_id, &result_node)?;
171		}
172
173		Ok(self.builder.build())
174	}
175
176	/// Compiles a query plan into a FlowGraph with a subscription sink
177	pub(crate) fn compile_with_subscription(
178		mut self,
179		txn: &mut AdminTransaction,
180		plan: QueryPlan,
181		subscription: &SubscriptionDef,
182	) -> Result<FlowDag> {
183		let root_node_id = self.compile_plan(txn, plan)?;
184
185		// Add SinkSubscription node
186		let result_node = self.add_node(
187			txn,
188			FlowNodeType::SinkSubscription {
189				subscription: subscription.id,
190			},
191		)?;
192
193		self.add_edge(txn, &root_node_id, &result_node)?;
194
195		Ok(self.builder.build())
196	}
197
198	/// Compiles a query plan operator into the FlowGraph
199	pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
200		match plan {
201			QueryPlan::IndexScan(_index_scan) => {
202				// TODO: Implement IndexScanCompiler for flow
203				unimplemented!("IndexScan compilation not yet implemented for flow")
204			}
205			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
206			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
207			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
208			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
209			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
210			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
211			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
212			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
213			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
214			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
215			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
216			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
217			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
218			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
219			QueryPlan::JoinNatural(_) => {
220				unimplemented!()
221			}
222			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
223			QueryPlan::Patch(_) => {
224				unimplemented!("Patch compilation not yet implemented for flow")
225			}
226			QueryPlan::TableVirtualScan(_scan) => {
227				// TODO: Implement VirtualScanCompiler
228				unimplemented!("VirtualScan compilation not yet implemented")
229			}
230			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
231			QueryPlan::Generator(_generator) => {
232				// TODO: Implement GeneratorCompiler for flow
233				unimplemented!("Generator compilation not yet implemented for flow")
234			}
235			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
236			QueryPlan::Variable(_) => {
237				panic!("Variable references are not supported in flow graphs");
238			}
239			QueryPlan::Scalarize(_) => {
240				panic!("Scalarize operations are not supported in flow graphs");
241			}
242			QueryPlan::Environment(_) => {
243				panic!("Environment operations are not supported in flow graphs");
244			}
245			QueryPlan::RowPointLookup(_) => {
246				// TODO: Implement optimized row point lookup for flow graphs
247				unimplemented!("RowPointLookup compilation not yet implemented for flow")
248			}
249			QueryPlan::RowListLookup(_) => {
250				// TODO: Implement optimized row list lookup for flow graphs
251				unimplemented!("RowListLookup compilation not yet implemented for flow")
252			}
253			QueryPlan::RowRangeScan(_) => {
254				// TODO: Implement optimized row range scan for flow graphs
255				unimplemented!("RowRangeScan compilation not yet implemented for flow")
256			}
257			QueryPlan::DictionaryScan(_) => {
258				// TODO: Implement DictionaryScan for flow graphs
259				unimplemented!("DictionaryScan compilation not yet implemented for flow")
260			}
261			QueryPlan::Assert(_) => {
262				unimplemented!("Assert compilation not yet implemented for flow")
263			}
264			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
265			QueryPlan::RemoteScan(_) => {
266				return Err(Error(flow_remote_source_unsupported()));
267			}
268			QueryPlan::RunTests(_) => {
269				panic!("RunTests is not supported in flow graphs");
270			}
271			QueryPlan::CallFunction(_) => {
272				panic!("CallFunction is not supported in flow graphs");
273			}
274		}
275	}
276}
277
278/// Trait for compiling operator from physical plans to flow nodes
279pub(crate) trait CompileOperator {
280	/// Compiles this operator into a flow operator
281	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
282}