reifydb_engine/execute/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use query::{
7	aggregate::AggregateNode,
8	assign::AssignNode,
9	compile::compile,
10	declare::DeclareNode,
11	dictionary_scan::DictionaryScan,
12	environment::EnvironmentNode,
13	extend::{ExtendNode, ExtendWithoutInputNode},
14	filter::FilterNode,
15	generator::GeneratorNode,
16	index_scan::IndexScanNode,
17	inline::InlineDataNode,
18	join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19	map::{MapNode, MapWithoutInputNode},
20	ring_buffer_scan::RingBufferScan,
21	row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22	scalarize::ScalarizeNode,
23	sort::SortNode,
24	table_scan::TableScanNode,
25	table_virtual_scan::VirtualScanNode,
26	take::TakeNode,
27	variable::VariableNode,
28	view_scan::ViewScanNode,
29};
30use reifydb_core::{
31	Frame,
32	interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33	value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36	ast,
37	plan::{physical::PhysicalPlan, plan},
38};
39
40use crate::{
41	StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
42	function::{Functions, generator, math},
43	stack::{Stack, Variable},
44	table_virtual::system::FlowOperatorStore,
45};
46
47mod catalog;
48mod mutate;
49mod query;
50
51/// Unified trait for query execution nodes following the volcano iterator
52/// pattern
53pub(crate) trait QueryNode<'a> {
54	/// Initialize the operator with execution context
55	/// Called once before iteration begins
56	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
57
58	/// Get the next batch of results (volcano iterator pattern)
59	/// Returns None when exhausted
60	fn next(
61		&mut self,
62		rx: &mut StandardTransaction<'a>,
63		ctx: &mut ExecutionContext<'a>,
64	) -> crate::Result<Option<Batch<'a>>>;
65
66	/// Get the headers of columns this node produces
67	fn headers(&self) -> Option<ColumnHeaders<'a>>;
68}
69
70#[derive(Clone)]
71pub struct ExecutionContext<'a> {
72	pub executor: Executor,
73	pub source: Option<ResolvedSource<'a>>,
74	pub batch_size: u64,
75	pub params: Params,
76	pub stack: Stack,
77}
78
79#[derive(Debug)]
80pub struct Batch<'a> {
81	pub columns: Columns<'a>,
82}
83
84pub(crate) enum ExecutionPlan<'a> {
85	Aggregate(AggregateNode<'a>),
86	DictionaryScan(DictionaryScan<'a>),
87	Filter(FilterNode<'a>),
88	IndexScan(IndexScanNode<'a>),
89	InlineData(InlineDataNode<'a>),
90	InnerJoin(InnerJoinNode<'a>),
91	LeftJoin(LeftJoinNode<'a>),
92	NaturalJoin(NaturalJoinNode<'a>),
93	Map(MapNode<'a>),
94	MapWithoutInput(MapWithoutInputNode<'a>),
95	Extend(ExtendNode<'a>),
96	ExtendWithoutInput(ExtendWithoutInputNode<'a>),
97	Sort(SortNode<'a>),
98	TableScan(TableScanNode<'a>),
99	Take(TakeNode<'a>),
100	ViewScan(ViewScanNode<'a>),
101	Variable(VariableNode<'a>),
102	Environment(EnvironmentNode),
103	VirtualScan(VirtualScanNode<'a>),
104	RingBufferScan(RingBufferScan<'a>),
105	Generator(GeneratorNode<'a>),
106	Declare(DeclareNode<'a>),
107	Assign(AssignNode<'a>),
108	Conditional(query::conditional::ConditionalNode<'a>),
109	Scalarize(ScalarizeNode<'a>),
110	// Row-number optimized access
111	RowPointLookup(RowPointLookupNode<'a>),
112	RowListLookup(RowListLookupNode<'a>),
113	RowRangeScan(RowRangeScanNode<'a>),
114}
115
116// Implement QueryNode for Box<ExecutionPlan> to allow chaining
117impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
118	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
119		(**self).initialize(rx, ctx)
120	}
121
122	fn next(
123		&mut self,
124		rx: &mut StandardTransaction<'a>,
125		ctx: &mut ExecutionContext<'a>,
126	) -> crate::Result<Option<Batch<'a>>> {
127		(**self).next(rx, ctx)
128	}
129
130	fn headers(&self) -> Option<ColumnHeaders<'a>> {
131		(**self).headers()
132	}
133}
134
135impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
136	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
137		match self {
138			ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
139			ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
140			ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
141			ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
142			ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
143			ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
144			ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
145			ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
146			ExecutionPlan::Map(node) => node.initialize(rx, ctx),
147			ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
148			ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
149			ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
150			ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
151			ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
152			ExecutionPlan::Take(node) => node.initialize(rx, ctx),
153			ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
154			ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
155			ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
156			ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
157			ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
158			ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
159			ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
160			ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
161			ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
162			ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
163			ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
164			ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
165			ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
166		}
167	}
168
169	fn next(
170		&mut self,
171		rx: &mut StandardTransaction<'a>,
172		ctx: &mut ExecutionContext<'a>,
173	) -> crate::Result<Option<Batch<'a>>> {
174		match self {
175			ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
176			ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
177			ExecutionPlan::Filter(node) => node.next(rx, ctx),
178			ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
179			ExecutionPlan::InlineData(node) => node.next(rx, ctx),
180			ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
181			ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
182			ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
183			ExecutionPlan::Map(node) => node.next(rx, ctx),
184			ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
185			ExecutionPlan::Extend(node) => node.next(rx, ctx),
186			ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
187			ExecutionPlan::Sort(node) => node.next(rx, ctx),
188			ExecutionPlan::TableScan(node) => node.next(rx, ctx),
189			ExecutionPlan::Take(node) => node.next(rx, ctx),
190			ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
191			ExecutionPlan::Variable(node) => node.next(rx, ctx),
192			ExecutionPlan::Environment(node) => node.next(rx, ctx),
193			ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
194			ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
195			ExecutionPlan::Generator(node) => node.next(rx, ctx),
196			ExecutionPlan::Declare(node) => node.next(rx, ctx),
197			ExecutionPlan::Assign(node) => node.next(rx, ctx),
198			ExecutionPlan::Conditional(node) => node.next(rx, ctx),
199			ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
200			ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
201			ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
202			ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
203		}
204	}
205
206	fn headers(&self) -> Option<ColumnHeaders<'a>> {
207		match self {
208			ExecutionPlan::Aggregate(node) => node.headers(),
209			ExecutionPlan::DictionaryScan(node) => node.headers(),
210			ExecutionPlan::Filter(node) => node.headers(),
211			ExecutionPlan::IndexScan(node) => node.headers(),
212			ExecutionPlan::InlineData(node) => node.headers(),
213			ExecutionPlan::InnerJoin(node) => node.headers(),
214			ExecutionPlan::LeftJoin(node) => node.headers(),
215			ExecutionPlan::NaturalJoin(node) => node.headers(),
216			ExecutionPlan::Map(node) => node.headers(),
217			ExecutionPlan::MapWithoutInput(node) => node.headers(),
218			ExecutionPlan::Extend(node) => node.headers(),
219			ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
220			ExecutionPlan::Sort(node) => node.headers(),
221			ExecutionPlan::TableScan(node) => node.headers(),
222			ExecutionPlan::Take(node) => node.headers(),
223			ExecutionPlan::ViewScan(node) => node.headers(),
224			ExecutionPlan::Variable(node) => node.headers(),
225			ExecutionPlan::Environment(node) => node.headers(),
226			ExecutionPlan::VirtualScan(node) => node.headers(),
227			ExecutionPlan::RingBufferScan(node) => node.headers(),
228			ExecutionPlan::Generator(node) => node.headers(),
229			ExecutionPlan::Declare(node) => node.headers(),
230			ExecutionPlan::Assign(node) => node.headers(),
231			ExecutionPlan::Conditional(node) => node.headers(),
232			ExecutionPlan::Scalarize(node) => node.headers(),
233			ExecutionPlan::RowPointLookup(node) => node.headers(),
234			ExecutionPlan::RowListLookup(node) => node.headers(),
235			ExecutionPlan::RowRangeScan(node) => node.headers(),
236		}
237	}
238}
239
240pub struct Executor(Arc<ExecutorInner>);
241
242pub struct ExecutorInner {
243	pub functions: Functions,
244	pub flow_operator_store: FlowOperatorStore,
245}
246
247impl Clone for Executor {
248	fn clone(&self) -> Self {
249		Self(self.0.clone())
250	}
251}
252
253impl std::ops::Deref for Executor {
254	type Target = ExecutorInner;
255
256	fn deref(&self) -> &Self::Target {
257		&self.0
258	}
259}
260
261impl Executor {
262	pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
263		Self(Arc::new(ExecutorInner {
264			functions,
265			flow_operator_store,
266		}))
267	}
268
269	#[allow(dead_code)]
270	pub fn testing() -> Self {
271		Self::new(
272			Functions::builder()
273				.register_aggregate("math::sum", math::aggregate::Sum::new)
274				.register_aggregate("math::min", math::aggregate::Min::new)
275				.register_aggregate("math::max", math::aggregate::Max::new)
276				.register_aggregate("math::avg", math::aggregate::Avg::new)
277				.register_aggregate("math::count", math::aggregate::Count::new)
278				.register_scalar("math::abs", math::scalar::Abs::new)
279				.register_scalar("math::avg", math::scalar::Avg::new)
280				.register_generator("generate_series", generator::GenerateSeries::new)
281				.build(),
282			FlowOperatorStore::new(),
283		)
284	}
285}
286
287impl ExecuteCommand<StandardCommandTransaction> for Executor {
288	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
289		let mut result = vec![];
290		let statements = ast::parse_str(cmd.rql)?;
291
292		// Create a single persistent Stack for all statements in this command
293		let mut persistent_stack = Stack::new();
294
295		// Populate the stack with parameters so they can be accessed as variables
296		match &cmd.params {
297			reifydb_core::interface::Params::Positional(values) => {
298				// For positional parameters, use $1, $2, $3, etc.
299				for (index, value) in values.iter().enumerate() {
300					let param_name = (index + 1).to_string(); // 1-based indexing
301					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
302				}
303			}
304			reifydb_core::interface::Params::Named(map) => {
305				// For named parameters, use the parameter name directly
306				for (name, value) in map {
307					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
308				}
309			}
310			reifydb_core::interface::Params::None => {
311				// No parameters to populate
312			}
313		}
314
315		for statement in statements {
316			if let Some(plan) = plan(txn, statement)? {
317				if let Some(er) =
318					self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
319				{
320					result.push(Frame::from(er));
321				}
322			}
323		}
324
325		Ok(result)
326	}
327}
328
329impl ExecuteQuery<StandardQueryTransaction> for Executor {
330	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
331		let mut result = vec![];
332		let statements = ast::parse_str(qry.rql)?;
333
334		// Create a single persistent Stack for all statements in this query
335		let mut persistent_stack = Stack::new();
336
337		// Populate the stack with parameters so they can be accessed as variables
338		match &qry.params {
339			reifydb_core::interface::Params::Positional(values) => {
340				// For positional parameters, use $1, $2, $3, etc.
341				for (index, value) in values.iter().enumerate() {
342					let param_name = (index + 1).to_string(); // 1-based indexing
343					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
344				}
345			}
346			reifydb_core::interface::Params::Named(map) => {
347				// For named parameters, use the parameter name directly
348				for (name, value) in map {
349					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
350				}
351			}
352			reifydb_core::interface::Params::None => {
353				// No parameters to populate
354			}
355		}
356
357		for statement in statements {
358			if let Some(plan) = plan(txn, statement)? {
359				if let Some(er) =
360					self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
361				{
362					result.push(Frame::from(er));
363				}
364			}
365		}
366
367		Ok(result)
368	}
369}
370
371impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
372
373impl Executor {
374	pub(crate) fn execute_query_plan<'a>(
375		&self,
376		rx: &'a mut StandardQueryTransaction,
377		plan: PhysicalPlan<'a>,
378		params: Params,
379		stack: &mut Stack,
380	) -> crate::Result<Option<Columns<'a>>> {
381		match plan {
382			// Query
383			PhysicalPlan::Aggregate(_)
384			| PhysicalPlan::DictionaryScan(_)
385			| PhysicalPlan::Filter(_)
386			| PhysicalPlan::IndexScan(_)
387			| PhysicalPlan::JoinInner(_)
388			| PhysicalPlan::JoinLeft(_)
389			| PhysicalPlan::JoinNatural(_)
390			| PhysicalPlan::Take(_)
391			| PhysicalPlan::Sort(_)
392			| PhysicalPlan::Map(_)
393			| PhysicalPlan::Extend(_)
394			| PhysicalPlan::InlineData(_)
395			| PhysicalPlan::Generator(_)
396			| PhysicalPlan::Delete(_)
397			| PhysicalPlan::DeleteRingBuffer(_)
398			| PhysicalPlan::InsertTable(_)
399			| PhysicalPlan::InsertRingBuffer(_)
400			| PhysicalPlan::InsertDictionary(_)
401			| PhysicalPlan::Update(_)
402			| PhysicalPlan::UpdateRingBuffer(_)
403			| PhysicalPlan::TableScan(_)
404			| PhysicalPlan::ViewScan(_)
405			| PhysicalPlan::FlowScan(_)
406			| PhysicalPlan::TableVirtualScan(_)
407			| PhysicalPlan::RingBufferScan(_)
408			| PhysicalPlan::Variable(_)
409			| PhysicalPlan::Environment(_)
410			| PhysicalPlan::Conditional(_)
411			| PhysicalPlan::Scalarize(_)
412			| PhysicalPlan::RowPointLookup(_)
413			| PhysicalPlan::RowListLookup(_)
414			| PhysicalPlan::RowRangeScan(_) => {
415				let mut std_txn = StandardTransaction::from(rx);
416				self.query(&mut std_txn, plan, params, stack)
417			}
418			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
419				let mut std_txn = StandardTransaction::from(rx);
420				self.query(&mut std_txn, plan, params, stack)?;
421				Ok(None)
422			}
423			PhysicalPlan::AlterSequence(_)
424			| PhysicalPlan::AlterTable(_)
425			| PhysicalPlan::AlterView(_)
426			| PhysicalPlan::AlterFlow(_)
427			| PhysicalPlan::CreateDeferredView(_)
428			| PhysicalPlan::CreateTransactionalView(_)
429			| PhysicalPlan::CreateNamespace(_)
430			| PhysicalPlan::CreateTable(_)
431			| PhysicalPlan::CreateRingBuffer(_)
432			| PhysicalPlan::CreateFlow(_)
433			| PhysicalPlan::CreateDictionary(_)
434			| PhysicalPlan::Distinct(_)
435			| PhysicalPlan::Apply(_) => {
436				// Apply operator requires flow engine for mod
437				// execution
438				unimplemented!(
439					"Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
440				)
441			}
442			PhysicalPlan::Window(_) => {
443				// Window operator requires flow engine for mod
444				// execution
445				unimplemented!(
446					"Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
447				)
448			}
449		}
450	}
451
452	pub fn execute_command_plan<'a>(
453		&self,
454		txn: &'a mut StandardCommandTransaction,
455		plan: PhysicalPlan<'a>,
456		params: Params,
457		stack: &mut Stack,
458	) -> crate::Result<Option<Columns<'a>>> {
459		match plan {
460			PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
461			PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
462			PhysicalPlan::CreateTransactionalView(plan) => {
463				Ok(Some(self.create_transactional_view(txn, plan)?))
464			}
465			PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
466			PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
467			PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
468			PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
469			PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
470			PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
471			PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
472			PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
473			PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
474			PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
475			PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
476			PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
477
478			PhysicalPlan::Aggregate(_)
479			| PhysicalPlan::DictionaryScan(_)
480			| PhysicalPlan::Filter(_)
481			| PhysicalPlan::IndexScan(_)
482			| PhysicalPlan::JoinInner(_)
483			| PhysicalPlan::JoinLeft(_)
484			| PhysicalPlan::JoinNatural(_)
485			| PhysicalPlan::Take(_)
486			| PhysicalPlan::Sort(_)
487			| PhysicalPlan::Map(_)
488			| PhysicalPlan::Extend(_)
489			| PhysicalPlan::InlineData(_)
490			| PhysicalPlan::Generator(_)
491			| PhysicalPlan::TableScan(_)
492			| PhysicalPlan::ViewScan(_)
493			| PhysicalPlan::FlowScan(_)
494			| PhysicalPlan::TableVirtualScan(_)
495			| PhysicalPlan::RingBufferScan(_)
496			| PhysicalPlan::Distinct(_)
497			| PhysicalPlan::Variable(_)
498			| PhysicalPlan::Environment(_)
499			| PhysicalPlan::Apply(_)
500			| PhysicalPlan::Conditional(_)
501			| PhysicalPlan::Scalarize(_)
502			| PhysicalPlan::RowPointLookup(_)
503			| PhysicalPlan::RowListLookup(_)
504			| PhysicalPlan::RowRangeScan(_) => {
505				let mut std_txn = StandardTransaction::from(txn);
506				self.query(&mut std_txn, plan, params, stack)
507			}
508			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
509				let mut std_txn = StandardTransaction::from(txn);
510				self.query(&mut std_txn, plan, params, stack)?;
511				Ok(None)
512			}
513			PhysicalPlan::Window(_) => {
514				let mut std_txn = StandardTransaction::from(txn);
515				self.query(&mut std_txn, plan, params, stack)
516			}
517
518			PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
519			PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
520			PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
521		}
522	}
523
524	fn query<'a>(
525		&self,
526		rx: &mut StandardTransaction<'a>,
527		plan: PhysicalPlan<'a>,
528		params: Params,
529		stack: &mut Stack,
530	) -> crate::Result<Option<Columns<'a>>> {
531		let context = Arc::new(ExecutionContext {
532			executor: self.clone(),
533			source: None,
534			batch_size: 1024,
535			params: params.clone(),
536			stack: stack.clone(),
537		});
538		let mut node = compile(plan, rx, context.clone());
539
540		// Initialize the operator before execution
541		node.initialize(rx, &context)?;
542
543		let mut result: Option<Columns> = None;
544		let mut mutable_context = (*context).clone();
545
546		while let Some(Batch {
547			columns,
548		}) = node.next(rx, &mut mutable_context)?
549		{
550			if let Some(mut result_columns) = result.take() {
551				result_columns.append_columns(columns)?;
552				result = Some(result_columns);
553			} else {
554				result = Some(columns);
555			}
556		}
557
558		// Copy stack changes back to persistent stack
559		*stack = mutable_context.stack;
560
561		let headers = node.headers();
562
563		if let Some(mut columns) = result {
564			if let Some(headers) = headers {
565				columns.apply_headers(&headers);
566			}
567
568			Ok(columns.into())
569		} else {
570			// empty columns - reconstruct table,
571			// for better UX
572			let columns: Vec<Column<'a>> = node
573				.headers()
574				.unwrap_or(ColumnHeaders {
575					columns: vec![],
576				})
577				.columns
578				.into_iter()
579				.map(|name| Column {
580					name,
581					data: ColumnData::undefined(0),
582				})
583				.collect();
584
585			Ok(Some(Columns::new(columns)))
586		}
587	}
588}