Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses AdminTransaction directly instead of being generic
7//! over MultiVersionAdminTransaction to avoid lifetime issues with async recursion.
8
9use reifydb_catalog::catalog::Catalog;
10use reifydb_core::{
11	interface::catalog::{
12		flow::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId},
13		subscription::SubscriptionDef,
14		view::ViewDef,
15	},
16	internal,
17};
18use reifydb_rql::{
19	flow::{
20		flow::{FlowBuilder, FlowDag},
21		node::{FlowEdge, FlowNode, FlowNodeType},
22	},
23	query::QueryPlan,
24};
25use reifydb_type::{Result, error::Error, value::blob::Blob};
26
27pub mod operator;
28pub mod primitive;
29
30use postcard::to_stdvec;
31use reifydb_transaction::transaction::admin::AdminTransaction;
32
33use crate::flow::compiler::{
34	operator::{
35		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
36		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
37		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
38	},
39	primitive::{
40		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
41		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
42	},
43};
44
45/// Public API for compiling logical plans to Flows with an existing flow ID.
46pub fn compile_flow(
47	catalog: &Catalog,
48	txn: &mut AdminTransaction,
49	plan: QueryPlan,
50	sink: Option<&ViewDef>,
51	flow_id: FlowId,
52) -> Result<FlowDag> {
53	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
54	compiler.compile(txn, plan, sink)
55}
56
57pub fn compile_subscription_flow(
58	catalog: &Catalog,
59	txn: &mut AdminTransaction,
60	plan: QueryPlan,
61	subscription: &SubscriptionDef,
62	flow_id: FlowId,
63) -> Result<FlowDag> {
64	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
65	compiler.compile_with_subscription(txn, plan, subscription)
66}
67
68/// Compiler for converting RQL plans into executable Flows
69pub(crate) struct FlowCompiler {
70	/// The catalog for persisting flow nodes and edges
71	pub(crate) catalog: Catalog,
72	/// The flow builder being used for construction
73	builder: FlowBuilder,
74	/// The sink view schema (for terminal nodes)
75	pub(crate) sink: Option<ViewDef>,
76}
77
78impl FlowCompiler {
79	/// Creates a new FlowCompiler instance with an existing flow ID
80	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
81		Self {
82			catalog,
83			builder: FlowDag::builder(flow_id),
84			sink: None,
85		}
86	}
87
88	/// Gets the next available operator ID
89	fn next_node_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowNodeId> {
90		self.catalog.next_flow_node_id(txn).map_err(Into::into)
91	}
92
93	/// Gets the next available edge ID
94	fn next_edge_id(&mut self, txn: &mut AdminTransaction) -> Result<FlowEdgeId> {
95		self.catalog.next_flow_edge_id(txn).map_err(Into::into)
96	}
97
98	/// Adds an edge between two nodes
99	pub(crate) fn add_edge(
100		&mut self,
101		txn: &mut AdminTransaction,
102		from: &FlowNodeId,
103		to: &FlowNodeId,
104	) -> Result<()> {
105		let edge_id = self.next_edge_id(txn)?;
106		let flow_id = self.builder.id();
107
108		// Create the catalog entry
109		let edge_def = FlowEdgeDef {
110			id: edge_id,
111			flow: flow_id,
112			source: *from,
113			target: *to,
114		};
115
116		// Persist to catalog
117		self.catalog.create_flow_edge(txn, &edge_def)?;
118
119		// Add to in-memory builder
120		self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
121		Ok(())
122	}
123
124	/// Adds a operator to the flow graph
125	pub(crate) fn add_node(&mut self, txn: &mut AdminTransaction, node_type: FlowNodeType) -> Result<FlowNodeId> {
126		let node_id = self.next_node_id(txn)?;
127		let flow_id = self.builder.id();
128
129		// Serialize the node type to blob
130		let data = to_stdvec(&node_type)
131			.map_err(|e| Error(internal!("Failed to serialize FlowNodeType: {}", e)))?;
132
133		// Create the catalog entry
134		let node_def = FlowNodeDef {
135			id: node_id,
136			flow: flow_id,
137			node_type: node_type.discriminator(),
138			data: Blob::from(data),
139		};
140
141		// Persist to catalog
142		self.catalog.create_flow_node(txn, &node_def)?;
143
144		// Add to in-memory builder
145		self.builder.add_node(FlowNode::new(node_id, node_type));
146		Ok(node_id)
147	}
148
149	/// Compiles a query plan into a FlowGraph
150	pub(crate) fn compile(
151		mut self,
152		txn: &mut AdminTransaction,
153		plan: QueryPlan,
154		sink: Option<&ViewDef>,
155	) -> Result<FlowDag> {
156		// Store sink view for terminal nodes (if provided)
157		self.sink = sink.cloned();
158		let root_node_id = self.compile_plan(txn, plan)?;
159
160		// Only add SinkView node if sink is provided
161		if let Some(sink_view) = sink {
162			let result_node = self.add_node(
163				txn,
164				FlowNodeType::SinkView {
165					view: sink_view.id,
166				},
167			)?;
168
169			self.add_edge(txn, &root_node_id, &result_node)?;
170		}
171
172		Ok(self.builder.build())
173	}
174
175	/// Compiles a query plan into a FlowGraph with a subscription sink
176	pub(crate) fn compile_with_subscription(
177		mut self,
178		txn: &mut AdminTransaction,
179		plan: QueryPlan,
180		subscription: &SubscriptionDef,
181	) -> Result<FlowDag> {
182		let root_node_id = self.compile_plan(txn, plan)?;
183
184		// Add SinkSubscription node
185		let result_node = self.add_node(
186			txn,
187			FlowNodeType::SinkSubscription {
188				subscription: subscription.id,
189			},
190		)?;
191
192		self.add_edge(txn, &root_node_id, &result_node)?;
193
194		Ok(self.builder.build())
195	}
196
197	/// Compiles a query plan operator into the FlowGraph
198	pub(crate) fn compile_plan(&mut self, txn: &mut AdminTransaction, plan: QueryPlan) -> Result<FlowNodeId> {
199		match plan {
200			QueryPlan::IndexScan(_index_scan) => {
201				// TODO: Implement IndexScanCompiler for flow
202				unimplemented!("IndexScan compilation not yet implemented for flow")
203			}
204			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
205			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
206			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
207			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
208			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
209			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
210			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
211			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
212			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
213			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
214			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
215			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
216			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
217			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
218			QueryPlan::JoinNatural(_) => {
219				unimplemented!()
220			}
221			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
222			QueryPlan::Patch(_) => {
223				unimplemented!("Patch compilation not yet implemented for flow")
224			}
225			QueryPlan::TableVirtualScan(_scan) => {
226				// TODO: Implement VirtualScanCompiler
227				unimplemented!("VirtualScan compilation not yet implemented")
228			}
229			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
230			QueryPlan::Generator(_generator) => {
231				// TODO: Implement GeneratorCompiler for flow
232				unimplemented!("Generator compilation not yet implemented for flow")
233			}
234			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
235			QueryPlan::Variable(_) => {
236				panic!("Variable references are not supported in flow graphs");
237			}
238			QueryPlan::Scalarize(_) => {
239				panic!("Scalarize operations are not supported in flow graphs");
240			}
241			QueryPlan::Environment(_) => {
242				panic!("Environment operations are not supported in flow graphs");
243			}
244			QueryPlan::RowPointLookup(_) => {
245				// TODO: Implement optimized row point lookup for flow graphs
246				unimplemented!("RowPointLookup compilation not yet implemented for flow")
247			}
248			QueryPlan::RowListLookup(_) => {
249				// TODO: Implement optimized row list lookup for flow graphs
250				unimplemented!("RowListLookup compilation not yet implemented for flow")
251			}
252			QueryPlan::RowRangeScan(_) => {
253				// TODO: Implement optimized row range scan for flow graphs
254				unimplemented!("RowRangeScan compilation not yet implemented for flow")
255			}
256			QueryPlan::DictionaryScan(_) => {
257				// TODO: Implement DictionaryScan for flow graphs
258				unimplemented!("DictionaryScan compilation not yet implemented for flow")
259			}
260			QueryPlan::Assert(_) => {
261				unimplemented!("Assert compilation not yet implemented for flow")
262			}
263			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
264		}
265	}
266}
267
268/// Trait for compiling operator from physical plans to flow nodes
269pub(crate) trait CompileOperator {
270	/// Compiles this operator into a flow operator
271	fn compile(self, compiler: &mut FlowCompiler, txn: &mut AdminTransaction) -> Result<FlowNodeId>;
272}