Skip to main content

reifydb_engine/flow/compiler/
mod.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2026 ReifyDB
3
4use reifydb_catalog::{catalog::Catalog, store::operator_settings::create::create_operator_settings};
5use reifydb_core::{
6	error::diagnostic::flow::{
7		flow_ephemeral_id_capacity_exceeded, flow_remote_source_unsupported, flow_source_required,
8	},
9	interface::catalog::{
10		flow::{FlowEdge, FlowEdgeId, FlowId, FlowNode, FlowNodeId},
11		id::SubscriptionId,
12		view::View,
13	},
14	internal,
15	row::{JoinTtl, OperatorSettings, Ttl},
16};
17use reifydb_rql::{
18	flow::{
19		flow::{FlowBuilder, FlowDag},
20		node::{self, FlowNodeType},
21	},
22	query::QueryPlan,
23};
24use reifydb_value::{Result, error::Error, value::blob::Blob};
25
26pub mod operator;
27pub mod primitive;
28
29use postcard::to_stdvec;
30use reifydb_transaction::transaction::{Transaction, admin::AdminTransaction};
31
32use crate::flow::compiler::{
33	operator::{
34		aggregate::AggregateCompiler, append::AppendCompiler, apply::ApplyCompiler, distinct::DistinctCompiler,
35		extend::ExtendCompiler, filter::FilterCompiler, gate::GateCompiler, join::JoinCompiler,
36		map::MapCompiler, sort::SortCompiler, take::TakeCompiler, window::WindowCompiler,
37	},
38	primitive::{
39		inline_data::InlineDataCompiler, ringbuffer_scan::RingBufferScanCompiler,
40		series_scan::SeriesScanCompiler, table_scan::TableScanCompiler, view_scan::ViewScanCompiler,
41	},
42};
43
44pub fn compile_flow(
45	catalog: &Catalog,
46	txn: &mut AdminTransaction,
47	plan: QueryPlan,
48	sink: Option<&View>,
49	flow_id: FlowId,
50) -> Result<FlowDag> {
51	let compiler = FlowCompiler::new(catalog.clone(), flow_id);
52	compiler.compile(&mut Transaction::Admin(txn), plan, sink)
53}
54
55pub fn compile_subscription_flow_ephemeral(
56	catalog: &Catalog,
57	txn: &mut Transaction<'_>,
58	plan: QueryPlan,
59	subscription_id: SubscriptionId,
60	flow_id: FlowId,
61) -> Result<FlowDag> {
62	let compiler = FlowCompiler::new_ephemeral(catalog.clone(), flow_id);
63	compiler.compile_with_subscription_id(txn, plan, subscription_id)
64}
65
66pub(crate) struct FlowCompiler {
67	pub(crate) catalog: Catalog,
68
69	builder: FlowBuilder,
70
71	pub(crate) sink: Option<View>,
72
73	ephemeral: bool,
74
75	local_node_counter: u64,
76
77	local_edge_counter: u64,
78
79	local_id_limit: u64,
80}
81
82impl FlowCompiler {
83	pub fn new(catalog: Catalog, flow_id: FlowId) -> Self {
84		Self {
85			catalog,
86			builder: FlowDag::builder(flow_id),
87			sink: None,
88			ephemeral: false,
89			local_node_counter: 0,
90			local_edge_counter: 0,
91			local_id_limit: 0,
92		}
93	}
94
95	pub fn new_ephemeral(catalog: Catalog, flow_id: FlowId) -> Self {
96		let base = flow_id.0 * 100;
97		Self {
98			catalog,
99			builder: FlowDag::builder(flow_id),
100			sink: None,
101			ephemeral: true,
102			local_node_counter: base,
103			local_edge_counter: base,
104			local_id_limit: base + 99,
105		}
106	}
107
108	fn next_node_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowNodeId> {
109		if self.ephemeral {
110			if self.local_node_counter >= self.local_id_limit {
111				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
112			}
113			self.local_node_counter += 1;
114			Ok(FlowNodeId(self.local_node_counter))
115		} else {
116			self.catalog.next_flow_node_id(txn.admin_mut())
117		}
118	}
119
120	fn next_edge_id(&mut self, txn: &mut Transaction<'_>) -> Result<FlowEdgeId> {
121		if self.ephemeral {
122			if self.local_edge_counter >= self.local_id_limit {
123				return Err(Error(Box::new(flow_ephemeral_id_capacity_exceeded(self.builder.id().0))));
124			}
125			self.local_edge_counter += 1;
126			Ok(FlowEdgeId(self.local_edge_counter))
127		} else {
128			self.catalog.next_flow_edge_id(txn.admin_mut())
129		}
130	}
131
132	pub(crate) fn add_edge(&mut self, txn: &mut Transaction<'_>, from: &FlowNodeId, to: &FlowNodeId) -> Result<()> {
133		let edge_id = self.next_edge_id(txn)?;
134		let flow_id = self.builder.id();
135
136		if !self.ephemeral {
137			let edge_def = FlowEdge {
138				id: edge_id,
139				flow: flow_id,
140				source: *from,
141				target: *to,
142			};
143
144			self.catalog.create_flow_edge(txn.admin_mut(), &edge_def)?;
145		}
146
147		self.builder.add_edge(node::FlowEdge::new(edge_id, *from, *to))?;
148		Ok(())
149	}
150
151	pub(crate) fn add_node(&mut self, txn: &mut Transaction<'_>, node_type: FlowNodeType) -> Result<FlowNodeId> {
152		let node_id = self.next_node_id(txn)?;
153		let flow_id = self.builder.id();
154
155		if !self.ephemeral {
156			let data = to_stdvec(&node_type)
157				.map_err(|e| Error(Box::new(internal!("Failed to serialize FlowNodeType: {}", e))))?;
158
159			let node_def = FlowNode {
160				id: node_id,
161				flow: flow_id,
162				node_type: node_type.discriminator(),
163				data: Blob::from(data),
164			};
165
166			self.catalog.create_flow_node(txn.admin_mut(), &node_def)?;
167		}
168
169		self.builder.add_node(node::FlowNode::new(node_id, node_type));
170		Ok(node_id)
171	}
172
173	pub(crate) fn write_operator_settings(
174		&self,
175		txn: &mut Transaction<'_>,
176		node_id: FlowNodeId,
177		ttl: Option<Ttl>,
178	) -> Result<()> {
179		if self.ephemeral {
180			return Ok(());
181		}
182		if let Some(ttl) = ttl {
183			create_operator_settings(
184				txn.admin_mut(),
185				node_id,
186				&OperatorSettings {
187					ttl: Some(ttl),
188					join: None,
189				},
190			)?;
191		}
192		Ok(())
193	}
194
195	pub(crate) fn write_operator_settings_join(
196		&self,
197		txn: &mut Transaction<'_>,
198		node_id: FlowNodeId,
199		join: Option<JoinTtl>,
200	) -> Result<()> {
201		if self.ephemeral {
202			return Ok(());
203		}
204		let Some(join) = join else {
205			return Ok(());
206		};
207		if join.left.is_none() && join.right.is_none() {
208			return Ok(());
209		}
210		create_operator_settings(
211			txn.admin_mut(),
212			node_id,
213			&OperatorSettings {
214				ttl: None,
215				join: Some(join),
216			},
217		)?;
218		Ok(())
219	}
220
221	pub(crate) fn compile(
222		mut self,
223		txn: &mut Transaction<'_>,
224		plan: QueryPlan,
225		sink: Option<&View>,
226	) -> Result<FlowDag> {
227		self.sink = sink.cloned();
228		let root_node_id = self.compile_plan(txn, plan)?;
229
230		if let Some(sink_view) = sink {
231			let node_type = match sink_view {
232				View::Table(t) => FlowNodeType::SinkTableView {
233					view: sink_view.id(),
234					table: t.underlying,
235				},
236				View::RingBuffer(rb) => FlowNodeType::SinkRingBufferView {
237					view: sink_view.id(),
238					ringbuffer: rb.underlying,
239					capacity: rb.capacity,
240					propagate_evictions: rb.propagate_evictions,
241				},
242				View::Series(s) => FlowNodeType::SinkSeriesView {
243					view: sink_view.id(),
244					series: s.underlying,
245					key: s.key.clone(),
246				},
247			};
248			let result_node = self.add_node(txn, node_type)?;
249			self.add_edge(txn, &root_node_id, &result_node)?;
250		}
251
252		let flow = self.builder.build();
253
254		if !has_real_source(&flow) {
255			return Err(Error(Box::new(flow_source_required())));
256		}
257
258		Ok(flow)
259	}
260
261	pub(crate) fn compile_with_subscription_id(
262		mut self,
263		txn: &mut Transaction<'_>,
264		plan: QueryPlan,
265		subscription_id: SubscriptionId,
266	) -> Result<FlowDag> {
267		let root_node_id = self.compile_plan(txn, plan)?;
268
269		let result_node = self.add_node(
270			txn,
271			FlowNodeType::SinkSubscription {
272				subscription: subscription_id,
273			},
274		)?;
275
276		self.add_edge(txn, &root_node_id, &result_node)?;
277
278		let flow = self.builder.build();
279
280		if !has_real_source(&flow) {
281			return Err(Error(Box::new(flow_source_required())));
282		}
283
284		Ok(flow)
285	}
286
287	pub(crate) fn compile_plan(&mut self, txn: &mut Transaction<'_>, plan: QueryPlan) -> Result<FlowNodeId> {
288		match plan {
289			QueryPlan::IndexScan(_index_scan) => {
290				// TODO: Implement IndexScanCompiler for flow
291				unimplemented!("IndexScan compilation not yet implemented for flow")
292			}
293			QueryPlan::TableScan(table_scan) => TableScanCompiler::from(table_scan).compile(self, txn),
294			QueryPlan::ViewScan(view_scan) => ViewScanCompiler::from(view_scan).compile(self, txn),
295			QueryPlan::InlineData(inline_data) => InlineDataCompiler::from(inline_data).compile(self, txn),
296			QueryPlan::Filter(filter) => FilterCompiler::from(filter).compile(self, txn),
297			QueryPlan::Gate(gate) => GateCompiler::from(gate).compile(self, txn),
298			QueryPlan::Map(map) => MapCompiler::from(map).compile(self, txn),
299			QueryPlan::Extend(extend) => ExtendCompiler::from(extend).compile(self, txn),
300			QueryPlan::Apply(apply) => ApplyCompiler::from(apply).compile(self, txn),
301			QueryPlan::Aggregate(aggregate) => AggregateCompiler::from(aggregate).compile(self, txn),
302			QueryPlan::Distinct(distinct) => DistinctCompiler::from(distinct).compile(self, txn),
303			QueryPlan::Take(take) => TakeCompiler::from(take).compile(self, txn),
304			QueryPlan::Sort(sort) => SortCompiler::from(sort).compile(self, txn),
305			QueryPlan::JoinInner(join) => JoinCompiler::from(join).compile(self, txn),
306			QueryPlan::JoinLeft(join) => JoinCompiler::from(join).compile(self, txn),
307			QueryPlan::JoinNatural(_) => {
308				unimplemented!()
309			}
310			QueryPlan::Append(append) => AppendCompiler::from(append).compile(self, txn),
311			QueryPlan::Patch(_) => {
312				unimplemented!("Patch compilation not yet implemented for flow")
313			}
314			QueryPlan::TableVirtualScan(_scan) => {
315				// TODO: Implement VirtualScanCompiler
316				unimplemented!("VirtualScan compilation not yet implemented")
317			}
318			QueryPlan::RingBufferScan(scan) => RingBufferScanCompiler::from(scan).compile(self, txn),
319			QueryPlan::Generator(_generator) => {
320				// TODO: Implement GeneratorCompiler for flow
321				unimplemented!("Generator compilation not yet implemented for flow")
322			}
323			QueryPlan::Window(window) => WindowCompiler::from(window).compile(self, txn),
324			QueryPlan::Variable(_) => {
325				panic!("Variable references are not supported in flow graphs");
326			}
327			QueryPlan::Scalarize(_) => {
328				panic!("Scalarize operations are not supported in flow graphs");
329			}
330			QueryPlan::Environment(_) => {
331				panic!("Environment operations are not supported in flow graphs");
332			}
333			QueryPlan::RowPointLookup(_) => {
334				// TODO: Implement optimized row point lookup for flow graphs
335				unimplemented!("RowPointLookup compilation not yet implemented for flow")
336			}
337			QueryPlan::RowListLookup(_) => {
338				// TODO: Implement optimized row list lookup for flow graphs
339				unimplemented!("RowListLookup compilation not yet implemented for flow")
340			}
341			QueryPlan::RowRangeScan(_) => {
342				// TODO: Implement optimized row range scan for flow graphs
343				unimplemented!("RowRangeScan compilation not yet implemented for flow")
344			}
345			QueryPlan::DictionaryScan(_) => {
346				// TODO: Implement DictionaryScan for flow graphs
347				unimplemented!("DictionaryScan compilation not yet implemented for flow")
348			}
349			QueryPlan::Assert(_) => {
350				unimplemented!("Assert compilation not yet implemented for flow")
351			}
352			QueryPlan::SeriesScan(series_scan) => SeriesScanCompiler::from(series_scan).compile(self, txn),
353			QueryPlan::RemoteScan(_) => Err(Error(Box::new(flow_remote_source_unsupported()))),
354			QueryPlan::RunTests(_) => {
355				panic!("RunTests is not supported in flow graphs");
356			}
357			QueryPlan::CallFunction(_) => {
358				panic!("CallFunction is not supported in flow graphs");
359			}
360		}
361	}
362}
363
364fn has_real_source(flow: &FlowDag) -> bool {
365	flow.get_node_ids().any(|node_id| {
366		if let Some(node) = flow.get_node(&node_id) {
367			matches!(
368				node.ty,
369				FlowNodeType::SourceTable { .. }
370					| FlowNodeType::SourceView { .. } | FlowNodeType::SourceFlow { .. }
371					| FlowNodeType::SourceRingBuffer { .. }
372					| FlowNodeType::SourceSeries { .. }
373			)
374		} else {
375			false
376		}
377	})
378}
379
380pub(crate) trait CompileOperator {
381	fn compile(self, compiler: &mut FlowCompiler, txn: &mut Transaction<'_>) -> Result<FlowNodeId>;
382}