Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses AdminTransaction directly instead of being generic
7//! over MultiVersionAdminTransaction to avoid lifetime issues with async recursion.
8
9use reifydb_catalog::catalog::Catalog;
10use reifydb_core::interface::catalog::{
11	flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
12	subscription::SubscriptionDef,
13	view::ViewDef,
14};
15use reifydb_rql::{
16	flow::{
17		flow::{FlowBuilder, FlowDag},
18		node::{FlowEdge, FlowNode, FlowNodeType},
19	},
20	query::QueryPlan,
21};
22use reifydb_type::{Result, value::blob::Blob};
23
24pub mod operator;
25pub mod primitive;
26
27use reifydb_transaction::transaction::admin::AdminTransaction;
28
29use crate::flow::compiler::{
30	operator::{
31		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
32		extend::ExtendCompiler, filter::FilterCompiler, join::JoinCompiler, map::MapCompiler,
33		sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
34	},
35	primitive::{
36		flow_scan::FlowScanCompiler, inline_data::InlineDataCompiler, table_scan::TableScanCompiler,
37		view_scan::ViewScanCompiler,
38	},
39};
40
41/// Public API for compiling logical plans to Flows with an existing flow ID.
42pub fn compile_flow(
43	catalog: &Catalog,
44	txn: &mut AdminTransaction,
45	plan: QueryPlan,
46	sink: Option<&ViewDef>,
47	flow_id: FlowId,
48) -> Result<FlowDag> {
49	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
50	compiler.compile(txn, plan, sink)
51}
52
53pub fn compile_subscription_flow(
54	catalog: &Catalog,
55	txn: &mut AdminTransaction,
56	plan: QueryPlan,
57	subscription: &SubscriptionDef,
58	flow_id: FlowId,
59) -> Result<FlowDag> {
60	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
61	compiler.compile_with_subscription(txn, plan, subscription)
62}
63
64/// Compiler for converting RQL plans into executable Flows
65pub(crate) struct FlowCompiler {
66	/// The catalog for persisting flow nodes and edges
67	pub(crate) catalog: Catalog,
68	/// The flow builder being used for construction
69	builder: FlowBuilder,
70	/// The sink view schema (for terminal nodes)
71	pub(crate) sink: Option<ViewDef>,
72}
73
74impl FlowCompiler {
75	/// Creates a new FlowCompiler instance with an existing flow ID
76	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
77		Self {
78			catalog,
79			builder: FlowDag::builder(flow_id),
80			sink: None,
81		}
82	}
83
84	/// Gets the next available operator ID
85	fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
86		self.catalog.next_flow_node_id(txn).map_err(Into::into)
87	}
88
89	/// Gets the next available edge ID
90	fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
91		self.catalog.next_flow_edge_id(txn).map_err(Into::into)
92	}
93
94	/// Adds an edge between two nodes
95	pub(crate) fn add_edge(
96		&mut self,
97		txn: &mut AdminTransaction,
98		from: &FlowNodeId,
99		to: &FlowNodeId,
100	) -> Result<()> {
101		let edge_id = self.next_edge_id(txn)?;
102		let flow_id = self.builder.id();
103
104		// Create the catalog entry
105		let edge_def = FlowEdgeDef {
106			id: edge_id,
107			flow: flow_id,
108			source: *from,
109			target: *to,
110		};
111
112		// Persist to catalog
113		self.catalog.create_flow_edge(txn, &edge_def)?;
114
115		// Add to in-memory builder
116		self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
117		Ok(())
118	}
119
120	/// Adds a operator to the flow graph
121	pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
122		let node_id = self.next_node_id(txn)?;
123		let flow_id = self.builder.id();
124
125		// Serialize the node type to blob
126		let data = postcard::to_stdvec(&node_type).map_err(|e| {
127			reifydb_type::error::Error(reifydb_core::internal!("Failed to serialize FlowNodeType: {}", e))
128		})?;
129
130		// Create the catalog entry
131		let node_def = FlowNodeDef {
132			id: node_id,
133			flow: flow_id,
134			node_type: node_type.discriminator(),
135			data: Blob::from(data),
136		};
137
138		// Persist to catalog
139		self.catalog.create_flow_node(txn, &node_def)?;
140
141		// Add to in-memory builder
142		self.builder.add_node(FlowNode::new(node_id, node_type));
143		Ok(node_id)
144	}
145
146	/// Compiles a query plan into a FlowGraph
147	pub(crate) fn compile(
148		mut self,
149		txn: &mut AdminTransaction,
150		plan: QueryPlan,
151		sink: Option<&ViewDef>,
152	) -> Result<FlowDag> {
153		// Store sink view for terminal nodes (if provided)
154		self.sink = sink.cloned();
155		let root_node_id = self.compile_plan(txn, plan)?;
156
157		// Only add SinkView node if sink is provided
158		if let Some(sink_view) = sink {
159			let result_node = self.add_node(
160				txn,
161				FlowNodeType::SinkView {
162					view: sink_view.id,
163				},
164			)?;
165
166			self.add_edge(txn, &root_node_id, &result_node)?;
167		}
168
169		Ok(self.builder.build())
170	}
171
172	/// Compiles a query plan into a FlowGraph with a subscription sink
173	pub(crate) fn compile_with_subscription(
174		mut self,
175		txn: &mut AdminTransaction,
176		plan: QueryPlan,
177		subscription: &SubscriptionDef,
178	) -> Result<FlowDag> {
179		let root_node_id = self.compile_plan(txn, plan)?;
180
181		// Add SinkSubscription node
182		let result_node = self.add_node(
183			txn,
184			FlowNodeType::SinkSubscription {
185				subscription: subscription.id,
186			},
187		)?;
188
189		self.add_edge(txn, &root_node_id, &result_node)?;
190
191		Ok(self.builder.build())
192	}
193
194	/// Compiles a query plan operator into the FlowGraph
195	pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
196		match plan {
197			QueryPlan::IndexScan(_index_scan) => {
198				// TODO: Implement IndexScanCompiler for flow
199				unimplemented!("IndexScan compilation not yet implemented for flow")
200			}
201			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
202			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
203			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
204			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
205			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
206			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
207			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
208			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
209			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
210			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
211			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
212			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
213			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
214			QueryPlan::JoinNatural(_) => {
215				unimplemented!()
216			}
217			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
218			QueryPlan::Patch(_) => {
219				unimplemented!("Patch compilation not yet implemented for flow")
220			}
221			QueryPlan::FlowScan(flow_scan) => FlowScanCompiler::from(flow_scan).compile(self, txn),
222			QueryPlan::TableVirtualScan(_scan) => {
223				// TODO: Implement VirtualScanCompiler
224				unimplemented!("VirtualScan compilation not yet implemented")
225			}
226			QueryPlan::RingBufferScan(_scan) => {
227				// TODO: Implement RingBufferScanCompiler for flow
228				unimplemented!("RingBufferScan compilation not yet implemented for flow")
229			}
230			QueryPlan::Generator(_generator) => {
231				// TODO: Implement GeneratorCompiler for flow
232				unimplemented!("Generator compilation not yet implemented for flow")
233			}
234			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
235			QueryPlan::Variable(_) => {
236				panic!("Variable references are not supported in flow graphs");
237			}
238			QueryPlan::Scalarize(_) => {
239				panic!("Scalarize operations are not supported in flow graphs");
240			}
241			QueryPlan::Environment(_) => {
242				panic!("Environment operations are not supported in flow graphs");
243			}
244			QueryPlan::RowPointLookup(_) => {
245				// TODO: Implement optimized row point lookup for flow graphs
246				unimplemented!("RowPointLookup compilation not yet implemented for flow")
247			}
248			QueryPlan::RowListLookup(_) => {
249				// TODO: Implement optimized row list lookup for flow graphs
250				unimplemented!("RowListLookup compilation not yet implemented for flow")
251			}
252			QueryPlan::RowRangeScan(_) => {
253				// TODO: Implement optimized row range scan for flow graphs
254				unimplemented!("RowRangeScan compilation not yet implemented for flow")
255			}
256			QueryPlan::DictionaryScan(_) => {
257				// TODO: Implement DictionaryScan for flow graphs
258				unimplemented!("DictionaryScan compilation not yet implemented for flow")
259			}
260			QueryPlan::Assert(_) => {
261				unimplemented!("Assert compilation not yet implemented for flow")
262			}
263		}
264	}
265}
266
267/// Trait for compiling operator from physical plans to flow nodes
268pub(crate) trait CompileOperator {
269	/// Compiles this operator into a flow operator
270	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
271}