reifydb_engine/execute/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use query::{
7	aggregate::AggregateNode,
8	assign::AssignNode,
9	compile::compile,
10	declare::DeclareNode,
11	environment::EnvironmentNode,
12	extend::{ExtendNode, ExtendWithoutInputNode},
13	filter::FilterNode,
14	generator::GeneratorNode,
15	index_scan::IndexScanNode,
16	inline::InlineDataNode,
17	join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
18	map::{MapNode, MapWithoutInputNode},
19	ring_buffer_scan::RingBufferScan,
20	scalarize::ScalarizeNode,
21	sort::SortNode,
22	table_scan::TableScanNode,
23	table_virtual_scan::VirtualScanNode,
24	take::TakeNode,
25	variable::VariableNode,
26	view_scan::ViewScanNode,
27};
28use reifydb_core::{
29	Frame,
30	interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
31	value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
32};
33use reifydb_rql::{
34	ast,
35	plan::{physical::PhysicalPlan, plan},
36};
37
38use crate::{
39	StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
40	function::{Functions, generator, math},
41	stack::{Stack, Variable},
42	table_virtual::system::FlowOperatorStore,
43};
44
45mod catalog;
46mod mutate;
47mod query;
48
49/// Unified trait for query execution nodes following the volcano iterator
50/// pattern
51pub(crate) trait QueryNode<'a> {
52	/// Initialize the operator with execution context
53	/// Called once before iteration begins
54	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
55
56	/// Get the next batch of results (volcano iterator pattern)
57	/// Returns None when exhausted
58	fn next(
59		&mut self,
60		rx: &mut StandardTransaction<'a>,
61		ctx: &mut ExecutionContext<'a>,
62	) -> crate::Result<Option<Batch<'a>>>;
63
64	/// Get the headers of columns this node produces
65	fn headers(&self) -> Option<ColumnHeaders<'a>>;
66}
67
68#[derive(Clone)]
69pub struct ExecutionContext<'a> {
70	pub executor: Executor,
71	pub source: Option<ResolvedSource<'a>>,
72	pub batch_size: u64,
73	pub params: Params,
74	pub stack: Stack,
75}
76
77#[derive(Debug)]
78pub struct Batch<'a> {
79	pub columns: Columns<'a>,
80}
81
82pub(crate) enum ExecutionPlan<'a> {
83	Aggregate(AggregateNode<'a>),
84	Filter(FilterNode<'a>),
85	IndexScan(IndexScanNode<'a>),
86	InlineData(InlineDataNode<'a>),
87	InnerJoin(InnerJoinNode<'a>),
88	LeftJoin(LeftJoinNode<'a>),
89	NaturalJoin(NaturalJoinNode<'a>),
90	Map(MapNode<'a>),
91	MapWithoutInput(MapWithoutInputNode<'a>),
92	Extend(ExtendNode<'a>),
93	ExtendWithoutInput(ExtendWithoutInputNode<'a>),
94	Sort(SortNode<'a>),
95	TableScan(TableScanNode<'a>),
96	Take(TakeNode<'a>),
97	ViewScan(ViewScanNode<'a>),
98	Variable(VariableNode<'a>),
99	Environment(EnvironmentNode),
100	VirtualScan(VirtualScanNode<'a>),
101	RingBufferScan(RingBufferScan<'a>),
102	Generator(GeneratorNode<'a>),
103	Declare(DeclareNode<'a>),
104	Assign(AssignNode<'a>),
105	Conditional(query::conditional::ConditionalNode<'a>),
106	Scalarize(ScalarizeNode<'a>),
107}
108
109// Implement QueryNode for Box<ExecutionPlan> to allow chaining
110impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
111	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
112		(**self).initialize(rx, ctx)
113	}
114
115	fn next(
116		&mut self,
117		rx: &mut StandardTransaction<'a>,
118		ctx: &mut ExecutionContext<'a>,
119	) -> crate::Result<Option<Batch<'a>>> {
120		(**self).next(rx, ctx)
121	}
122
123	fn headers(&self) -> Option<ColumnHeaders<'a>> {
124		(**self).headers()
125	}
126}
127
128impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
129	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
130		match self {
131			ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
132			ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
133			ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
134			ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
135			ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
136			ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
137			ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
138			ExecutionPlan::Map(node) => node.initialize(rx, ctx),
139			ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
140			ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
141			ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
142			ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
143			ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
144			ExecutionPlan::Take(node) => node.initialize(rx, ctx),
145			ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
146			ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
147			ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
148			ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
149			ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
150			ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
151			ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
152			ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
153			ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
154			ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
155		}
156	}
157
158	fn next(
159		&mut self,
160		rx: &mut StandardTransaction<'a>,
161		ctx: &mut ExecutionContext<'a>,
162	) -> crate::Result<Option<Batch<'a>>> {
163		match self {
164			ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
165			ExecutionPlan::Filter(node) => node.next(rx, ctx),
166			ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
167			ExecutionPlan::InlineData(node) => node.next(rx, ctx),
168			ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
169			ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
170			ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
171			ExecutionPlan::Map(node) => node.next(rx, ctx),
172			ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
173			ExecutionPlan::Extend(node) => node.next(rx, ctx),
174			ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
175			ExecutionPlan::Sort(node) => node.next(rx, ctx),
176			ExecutionPlan::TableScan(node) => node.next(rx, ctx),
177			ExecutionPlan::Take(node) => node.next(rx, ctx),
178			ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
179			ExecutionPlan::Variable(node) => node.next(rx, ctx),
180			ExecutionPlan::Environment(node) => node.next(rx, ctx),
181			ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
182			ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
183			ExecutionPlan::Generator(node) => node.next(rx, ctx),
184			ExecutionPlan::Declare(node) => node.next(rx, ctx),
185			ExecutionPlan::Assign(node) => node.next(rx, ctx),
186			ExecutionPlan::Conditional(node) => node.next(rx, ctx),
187			ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
188		}
189	}
190
191	fn headers(&self) -> Option<ColumnHeaders<'a>> {
192		match self {
193			ExecutionPlan::Aggregate(node) => node.headers(),
194			ExecutionPlan::Filter(node) => node.headers(),
195			ExecutionPlan::IndexScan(node) => node.headers(),
196			ExecutionPlan::InlineData(node) => node.headers(),
197			ExecutionPlan::InnerJoin(node) => node.headers(),
198			ExecutionPlan::LeftJoin(node) => node.headers(),
199			ExecutionPlan::NaturalJoin(node) => node.headers(),
200			ExecutionPlan::Map(node) => node.headers(),
201			ExecutionPlan::MapWithoutInput(node) => node.headers(),
202			ExecutionPlan::Extend(node) => node.headers(),
203			ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
204			ExecutionPlan::Sort(node) => node.headers(),
205			ExecutionPlan::TableScan(node) => node.headers(),
206			ExecutionPlan::Take(node) => node.headers(),
207			ExecutionPlan::ViewScan(node) => node.headers(),
208			ExecutionPlan::Variable(node) => node.headers(),
209			ExecutionPlan::Environment(node) => node.headers(),
210			ExecutionPlan::VirtualScan(node) => node.headers(),
211			ExecutionPlan::RingBufferScan(node) => node.headers(),
212			ExecutionPlan::Generator(node) => node.headers(),
213			ExecutionPlan::Declare(node) => node.headers(),
214			ExecutionPlan::Assign(node) => node.headers(),
215			ExecutionPlan::Conditional(node) => node.headers(),
216			ExecutionPlan::Scalarize(node) => node.headers(),
217		}
218	}
219}
220
221pub struct Executor(Arc<ExecutorInner>);
222
223pub struct ExecutorInner {
224	pub functions: Functions,
225	pub flow_operator_store: FlowOperatorStore,
226}
227
228impl Clone for Executor {
229	fn clone(&self) -> Self {
230		Self(self.0.clone())
231	}
232}
233
234impl std::ops::Deref for Executor {
235	type Target = ExecutorInner;
236
237	fn deref(&self) -> &Self::Target {
238		&self.0
239	}
240}
241
242impl Executor {
243	pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
244		Self(Arc::new(ExecutorInner {
245			functions,
246			flow_operator_store,
247		}))
248	}
249
250	#[allow(dead_code)]
251	pub fn testing() -> Self {
252		Self::new(
253			Functions::builder()
254				.register_aggregate("math::sum", math::aggregate::Sum::new)
255				.register_aggregate("math::min", math::aggregate::Min::new)
256				.register_aggregate("math::max", math::aggregate::Max::new)
257				.register_aggregate("math::avg", math::aggregate::Avg::new)
258				.register_aggregate("math::count", math::aggregate::Count::new)
259				.register_scalar("math::abs", math::scalar::Abs::new)
260				.register_scalar("math::avg", math::scalar::Avg::new)
261				.register_generator("generate_series", generator::GenerateSeries::new)
262				.build(),
263			FlowOperatorStore::new(),
264		)
265	}
266}
267
268impl ExecuteCommand<StandardCommandTransaction> for Executor {
269	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
270		let mut result = vec![];
271		let statements = ast::parse_str(cmd.rql)?;
272
273		// Create a single persistent Stack for all statements in this command
274		let mut persistent_stack = Stack::new();
275
276		// Populate the stack with parameters so they can be accessed as variables
277		match &cmd.params {
278			reifydb_core::interface::Params::Positional(values) => {
279				// For positional parameters, use $1, $2, $3, etc.
280				for (index, value) in values.iter().enumerate() {
281					let param_name = (index + 1).to_string(); // 1-based indexing
282					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
283				}
284			}
285			reifydb_core::interface::Params::Named(map) => {
286				// For named parameters, use the parameter name directly
287				for (name, value) in map {
288					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
289				}
290			}
291			reifydb_core::interface::Params::None => {
292				// No parameters to populate
293			}
294		}
295
296		for statement in statements {
297			if let Some(plan) = plan(txn, statement)? {
298				if let Some(er) =
299					self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
300				{
301					result.push(Frame::from(er));
302				}
303			}
304		}
305
306		Ok(result)
307	}
308}
309
310impl ExecuteQuery<StandardQueryTransaction> for Executor {
311	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
312		let mut result = vec![];
313		let statements = ast::parse_str(qry.rql)?;
314
315		// Create a single persistent Stack for all statements in this query
316		let mut persistent_stack = Stack::new();
317
318		// Populate the stack with parameters so they can be accessed as variables
319		match &qry.params {
320			reifydb_core::interface::Params::Positional(values) => {
321				// For positional parameters, use $1, $2, $3, etc.
322				for (index, value) in values.iter().enumerate() {
323					let param_name = (index + 1).to_string(); // 1-based indexing
324					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
325				}
326			}
327			reifydb_core::interface::Params::Named(map) => {
328				// For named parameters, use the parameter name directly
329				for (name, value) in map {
330					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
331				}
332			}
333			reifydb_core::interface::Params::None => {
334				// No parameters to populate
335			}
336		}
337
338		for statement in statements {
339			if let Some(plan) = plan(txn, statement)? {
340				if let Some(er) =
341					self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
342				{
343					result.push(Frame::from(er));
344				}
345			}
346		}
347
348		Ok(result)
349	}
350}
351
352impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
353
354impl Executor {
355	pub(crate) fn execute_query_plan<'a>(
356		&self,
357		rx: &'a mut StandardQueryTransaction,
358		plan: PhysicalPlan<'a>,
359		params: Params,
360		stack: &mut Stack,
361	) -> crate::Result<Option<Columns<'a>>> {
362		match plan {
363			// Query
364			PhysicalPlan::Aggregate(_)
365			| PhysicalPlan::Filter(_)
366			| PhysicalPlan::IndexScan(_)
367			| PhysicalPlan::JoinInner(_)
368			| PhysicalPlan::JoinLeft(_)
369			| PhysicalPlan::JoinNatural(_)
370			| PhysicalPlan::Take(_)
371			| PhysicalPlan::Sort(_)
372			| PhysicalPlan::Map(_)
373			| PhysicalPlan::Extend(_)
374			| PhysicalPlan::InlineData(_)
375			| PhysicalPlan::Generator(_)
376			| PhysicalPlan::Delete(_)
377			| PhysicalPlan::DeleteRingBuffer(_)
378			| PhysicalPlan::InsertTable(_)
379			| PhysicalPlan::InsertRingBuffer(_)
380			| PhysicalPlan::Update(_)
381			| PhysicalPlan::UpdateRingBuffer(_)
382			| PhysicalPlan::TableScan(_)
383			| PhysicalPlan::ViewScan(_)
384			| PhysicalPlan::FlowScan(_)
385			| PhysicalPlan::TableVirtualScan(_)
386			| PhysicalPlan::RingBufferScan(_)
387			| PhysicalPlan::Variable(_)
388			| PhysicalPlan::Environment(_)
389			| PhysicalPlan::Conditional(_)
390			| PhysicalPlan::Scalarize(_) => {
391				let mut std_txn = StandardTransaction::from(rx);
392				self.query(&mut std_txn, plan, params, stack)
393			}
394			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
395				let mut std_txn = StandardTransaction::from(rx);
396				self.query(&mut std_txn, plan, params, stack)?;
397				Ok(None)
398			}
399			PhysicalPlan::AlterSequence(_)
400			| PhysicalPlan::AlterTable(_)
401			| PhysicalPlan::AlterView(_)
402			| PhysicalPlan::AlterFlow(_)
403			| PhysicalPlan::CreateDeferredView(_)
404			| PhysicalPlan::CreateTransactionalView(_)
405			| PhysicalPlan::CreateNamespace(_)
406			| PhysicalPlan::CreateTable(_)
407			| PhysicalPlan::CreateRingBuffer(_)
408			| PhysicalPlan::CreateFlow(_)
409			| PhysicalPlan::Distinct(_)
410			| PhysicalPlan::Apply(_) => {
411				// Apply operator requires flow engine for mod
412				// execution
413				unimplemented!(
414					"Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
415				)
416			}
417			PhysicalPlan::Window(_) => {
418				// Window operator requires flow engine for mod
419				// execution
420				unimplemented!(
421					"Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
422				)
423			}
424		}
425	}
426
427	pub fn execute_command_plan<'a>(
428		&self,
429		txn: &'a mut StandardCommandTransaction,
430		plan: PhysicalPlan<'a>,
431		params: Params,
432		stack: &mut Stack,
433	) -> crate::Result<Option<Columns<'a>>> {
434		match plan {
435			PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
436			PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
437			PhysicalPlan::CreateTransactionalView(plan) => {
438				Ok(Some(self.create_transactional_view(txn, plan)?))
439			}
440			PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
441			PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
442			PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
443			PhysicalPlan::CreateFlow(_plan) => {
444				// TODO: Implement create_flow
445				unimplemented!("CREATE FLOW not yet implemented")
446			}
447			PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
448			PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
449			PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
450			PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
451			PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
452			PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
453
454			PhysicalPlan::Aggregate(_)
455			| PhysicalPlan::Filter(_)
456			| PhysicalPlan::IndexScan(_)
457			| PhysicalPlan::JoinInner(_)
458			| PhysicalPlan::JoinLeft(_)
459			| PhysicalPlan::JoinNatural(_)
460			| PhysicalPlan::Take(_)
461			| PhysicalPlan::Sort(_)
462			| PhysicalPlan::Map(_)
463			| PhysicalPlan::Extend(_)
464			| PhysicalPlan::InlineData(_)
465			| PhysicalPlan::Generator(_)
466			| PhysicalPlan::TableScan(_)
467			| PhysicalPlan::ViewScan(_)
468			| PhysicalPlan::FlowScan(_)
469			| PhysicalPlan::TableVirtualScan(_)
470			| PhysicalPlan::RingBufferScan(_)
471			| PhysicalPlan::Distinct(_)
472			| PhysicalPlan::Variable(_)
473			| PhysicalPlan::Environment(_)
474			| PhysicalPlan::Apply(_)
475			| PhysicalPlan::Conditional(_)
476			| PhysicalPlan::Scalarize(_) => {
477				let mut std_txn = StandardTransaction::from(txn);
478				self.query(&mut std_txn, plan, params, stack)
479			}
480			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
481				let mut std_txn = StandardTransaction::from(txn);
482				self.query(&mut std_txn, plan, params, stack)?;
483				Ok(None)
484			}
485			PhysicalPlan::Window(_) => {
486				let mut std_txn = StandardTransaction::from(txn);
487				self.query(&mut std_txn, plan, params, stack)
488			}
489
490			PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
491			PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
492			PhysicalPlan::AlterFlow(_plan) => {
493				// TODO: Implement alter_flow
494				unimplemented!("ALTER FLOW not yet implemented")
495			}
496		}
497	}
498
499	fn query<'a>(
500		&self,
501		rx: &mut StandardTransaction<'a>,
502		plan: PhysicalPlan<'a>,
503		params: Params,
504		stack: &mut Stack,
505	) -> crate::Result<Option<Columns<'a>>> {
506		let context = Arc::new(ExecutionContext {
507			executor: self.clone(),
508			source: None,
509			batch_size: 1024,
510			params: params.clone(),
511			stack: stack.clone(),
512		});
513		let mut node = compile(plan, rx, context.clone());
514
515		// Initialize the operator before execution
516		node.initialize(rx, &context)?;
517
518		let mut result: Option<Columns> = None;
519		let mut mutable_context = (*context).clone();
520
521		while let Some(Batch {
522			columns,
523		}) = node.next(rx, &mut mutable_context)?
524		{
525			if let Some(mut result_columns) = result.take() {
526				result_columns.append_columns(columns)?;
527				result = Some(result_columns);
528			} else {
529				result = Some(columns);
530			}
531		}
532
533		// Copy stack changes back to persistent stack
534		*stack = mutable_context.stack;
535
536		let headers = node.headers();
537
538		if let Some(mut columns) = result {
539			if let Some(headers) = headers {
540				columns.apply_headers(&headers);
541			}
542
543			Ok(columns.into())
544		} else {
545			// empty columns - reconstruct table,
546			// for better UX
547			let columns: Vec<Column<'a>> = node
548				.headers()
549				.unwrap_or(ColumnHeaders {
550					columns: vec![],
551				})
552				.columns
553				.into_iter()
554				.map(|name| Column {
555					name,
556					data: ColumnData::undefined(0),
557				})
558				.collect();
559
560			Ok(Some(Columns::new(columns)))
561		}
562	}
563}