reifydb_engine/flow/compiler/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4//! Flow compilation - compiles RQL physical plans into Flows
5//!
6//! This module uses StandardCommandTransaction directly instead of being generic
7//! over CommandTransaction to avoid lifetime issues with async recursion.
8
9use async_recursion::async_recursion;
10use reifydb_catalog::{
11	CatalogStore,
12	store::sequence::flow::{next_flow_edge_id, next_flow_node_id},
13};
14use reifydb_core::{
15	Result,
16	interface::{FlowEdgeDef, FlowEdgeId, FlowId, FlowNodeDef, FlowNodeId, ViewDef},
17};
18use reifydb_rql::{
19	flow::{Flow, FlowBuilder, FlowEdge, FlowNode, FlowNodeType},
20	plan::physical::PhysicalPlan,
21};
22use reifydb_type::Blob;
23
24use crate::StandardCommandTransaction;
25
26mod operator;
27mod source;
28
29use operator::{
30	AggregateCompiler, ApplyCompiler, DistinctCompiler, ExtendCompiler, FilterCompiler, JoinCompiler, MapCompiler,
31	MergeCompiler, SortCompiler, TakeCompiler, WindowCompiler,
32};
33use source::{FlowScanCompiler, InlineDataCompiler, TableScanCompiler, ViewScanCompiler};
34
35/// Public API for compiling logical plans to Flows with an existing flow ID.
36pub async fn compile_flow(
37	txn: &mut StandardCommandTransaction,
38	plan: PhysicalPlan,
39	sink: Option<&ViewDef>,
40	flow_id: FlowId,
41) -> Result<Flow> {
42	let compiler = FlowCompiler::new(flow_id);
43	compiler.compile(txn, plan, sink).await
44}
45
46/// Compiler for converting RQL plans into executable Flows
47pub(crate) struct FlowCompiler {
48	/// The flow builder being used for construction
49	builder: FlowBuilder,
50	/// The sink view schema (for terminal nodes)
51	pub(crate) sink: Option<ViewDef>,
52}
53
54impl FlowCompiler {
55	/// Creates a new FlowCompiler instance with an existing flow ID
56	pub fn new(flow_id: FlowId) -> Self {
57		Self {
58			builder: Flow::builder(flow_id),
59			sink: None,
60		}
61	}
62
63	/// Gets the next available operator ID
64	async fn next_node_id(&mut self, txn: &mut StandardCommandTransaction) -> Result<FlowNodeId> {
65		next_flow_node_id(txn).await.map_err(Into::into)
66	}
67
68	/// Gets the next available edge ID
69	async fn next_edge_id(&mut self, txn: &mut StandardCommandTransaction) -> Result<FlowEdgeId> {
70		next_flow_edge_id(txn).await.map_err(Into::into)
71	}
72
73	/// Adds an edge between two nodes
74	pub(crate) async fn add_edge(
75		&mut self,
76		txn: &mut StandardCommandTransaction,
77		from: &FlowNodeId,
78		to: &FlowNodeId,
79	) -> Result<()> {
80		let edge_id = self.next_edge_id(txn).await?;
81		let flow_id = self.builder.id();
82
83		// Create the catalog entry
84		let edge_def = FlowEdgeDef {
85			id: edge_id,
86			flow: flow_id,
87			source: *from,
88			target: *to,
89		};
90
91		// Persist to catalog
92		CatalogStore::create_flow_edge(txn, &edge_def).await?;
93
94		// Add to in-memory builder
95		self.builder.add_edge(FlowEdge::new(edge_id, *from, *to))?;
96		Ok(())
97	}
98
99	/// Adds a operator to the flow graph
100	pub(crate) async fn add_node(
101		&mut self,
102		txn: &mut StandardCommandTransaction,
103		node_type: FlowNodeType,
104	) -> Result<FlowNodeId> {
105		let node_id = self.next_node_id(txn).await?;
106		let flow_id = self.builder.id();
107
108		// Serialize the node type to blob
109		let data = postcard::to_stdvec(&node_type).map_err(|e| {
110			reifydb_core::Error(reifydb_type::internal!("Failed to serialize FlowNodeType: {}", e))
111		})?;
112
113		// Create the catalog entry
114		let node_def = FlowNodeDef {
115			id: node_id,
116			flow: flow_id,
117			node_type: node_type.discriminator(),
118			data: Blob::from(data),
119		};
120
121		// Persist to catalog
122		CatalogStore::create_flow_node(txn, &node_def).await?;
123
124		// Add to in-memory builder
125		self.builder.add_node(FlowNode::new(node_id, node_type));
126		Ok(node_id)
127	}
128
129	/// Compiles a physical plan into a FlowGraph
130	pub(crate) async fn compile(
131		mut self,
132		txn: &mut StandardCommandTransaction,
133		plan: PhysicalPlan,
134		sink: Option<&ViewDef>,
135	) -> Result<Flow> {
136		// Store sink view for terminal nodes (if provided)
137		self.sink = sink.cloned();
138		let root_node_id = self.compile_plan(txn, plan).await?;
139
140		// Only add SinkView node if sink is provided
141		if let Some(sink_view) = sink {
142			let result_node = self
143				.add_node(
144					txn,
145					FlowNodeType::SinkView {
146						view: sink_view.id,
147					},
148				)
149				.await?;
150
151			self.add_edge(txn, &root_node_id, &result_node).await?;
152		}
153
154		Ok(self.builder.build())
155	}
156
157	/// Compiles a physical plan operator into the FlowGraph
158	///
159	/// Uses async_recursion to handle the recursive async calls.
160	/// With the concrete StandardCommandTransaction type, the future is Send.
161	#[async_recursion]
162	pub(crate) async fn compile_plan(
163		&mut self,
164		txn: &mut StandardCommandTransaction,
165		plan: PhysicalPlan,
166	) -> Result<FlowNodeId> {
167		match plan {
168			PhysicalPlan::IndexScan(_index_scan) => {
169				// TODO: Implement IndexScanCompiler for flow
170				unimplemented!("IndexScan compilation not yet implemented for flow")
171			}
172			PhysicalPlan::TableScan(table_scan) => {
173				TableScanCompiler::from(table_scan).compile(self, txn).await
174			}
175			PhysicalPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn).await,
176			PhysicalPlan::InlineData(inline_data) => {
177				InlineDataCompiler::from(inline_data).compile(self, txn).await
178			}
179			PhysicalPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn).await,
180			PhysicalPlan::Map(map) => MapCompiler::from(map).compile(self, txn).await,
181			PhysicalPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn).await,
182			PhysicalPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn).await,
183			PhysicalPlan::Aggregate(aggregate) => {
184				AggregateCompiler::from(aggregate).compile(self, txn).await
185			}
186			PhysicalPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn).await,
187			PhysicalPlan::Take(take) => TakeCompiler::from(take).compile(self, txn).await,
188			PhysicalPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn).await,
189			PhysicalPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn).await,
190			PhysicalPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn).await,
191			PhysicalPlan::JoinNatural(_) => {
192				unimplemented!()
193			}
194			PhysicalPlan::Merge(merge) => MergeCompiler::from(merge).compile(self, txn).await,
195
196			PhysicalPlan::CreateNamespace(_)
197			| PhysicalPlan::CreateTable(_)
198			| PhysicalPlan::CreateRingBuffer(_)
199			| PhysicalPlan::CreateFlow(_)
200			| PhysicalPlan::CreateDictionary(_)
201			| PhysicalPlan::AlterSequence(_)
202			| PhysicalPlan::AlterTable(_)
203			| PhysicalPlan::AlterView(_)
204			| PhysicalPlan::AlterFlow(_)
205			| PhysicalPlan::CreateDeferredView(_)
206			| PhysicalPlan::CreateTransactionalView(_)
207			| PhysicalPlan::InsertTable(_)
208			| PhysicalPlan::InsertRingBuffer(_)
209			| PhysicalPlan::InsertDictionary(_)
210			| PhysicalPlan::Update(_)
211			| PhysicalPlan::UpdateRingBuffer(_)
212			| PhysicalPlan::Delete(_)
213			| PhysicalPlan::DeleteRingBuffer(_) => {
214				unreachable!()
215			}
216			PhysicalPlan::FlowScan(flow_scan) => FlowScanCompiler::from(flow_scan).compile(self, txn).await,
217			PhysicalPlan::TableVirtualScan(_scan) => {
218				// TODO: Implement VirtualScanCompiler
219				// For now, return a placeholder
220				unimplemented!("VirtualScan compilation not yet implemented")
221			}
222			PhysicalPlan::RingBufferScan(_scan) => {
223				// TODO: Implement RingBufferScanCompiler for flow
224				unimplemented!("RingBufferScan compilation not yet implemented for flow")
225			}
226			PhysicalPlan::Generator(_generator) => {
227				// TODO: Implement GeneratorCompiler for flow
228				unimplemented!("Generator compilation not yet implemented for flow")
229			}
230			PhysicalPlan::Window(window) => WindowCompiler::from(window).compile(self, txn).await,
231			PhysicalPlan::Declare(_) => {
232				panic!("Declare statements are not supported in flow graphs");
233			}
234
235			PhysicalPlan::Assign(_) => {
236				panic!("Assign statements are not supported in flow graphs");
237			}
238
239			PhysicalPlan::Conditional(_) => {
240				panic!("Conditional statements are not supported in flow graphs");
241			}
242
243			PhysicalPlan::Variable(_) => {
244				panic!("Variable references are not supported in flow graphs");
245			}
246
247			PhysicalPlan::Scalarize(_) => {
248				panic!("Scalarize operations are not supported in flow graphs");
249			}
250
251			PhysicalPlan::Environment(_) => {
252				panic!("Environment operations are not supported in flow graphs");
253			}
254
255			PhysicalPlan::RowPointLookup(_) => {
256				// TODO: Implement optimized row point lookup for flow graphs
257				unimplemented!("RowPointLookup compilation not yet implemented for flow")
258			}
259
260			PhysicalPlan::RowListLookup(_) => {
261				// TODO: Implement optimized row list lookup for flow graphs
262				unimplemented!("RowListLookup compilation not yet implemented for flow")
263			}
264
265			PhysicalPlan::RowRangeScan(_) => {
266				// TODO: Implement optimized row range scan for flow graphs
267				unimplemented!("RowRangeScan compilation not yet implemented for flow")
268			}
269
270			PhysicalPlan::DictionaryScan(_) => {
271				// TODO: Implement DictionaryScan for flow graphs
272				unimplemented!("DictionaryScan compilation not yet implemented for flow")
273			}
274		}
275	}
276}
277
278/// Trait for compiling operator from physical plans to flow nodes
279pub(crate) trait CompileOperator {
280	/// Compiles this operator into a flow operator
281	async fn compile(self, compiler: &mut FlowCompiler, txn: &mut StandardCommandTransaction)
282	-> Result<FlowNodeId>;
283}