reifydb_engine/execute/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use query::{
7	aggregate::AggregateNode,
8	assign::AssignNode,
9	compile::compile,
10	declare::DeclareNode,
11	dictionary_scan::DictionaryScan,
12	environment::EnvironmentNode,
13	extend::{ExtendNode, ExtendWithoutInputNode},
14	filter::FilterNode,
15	generator::GeneratorNode,
16	index_scan::IndexScanNode,
17	inline::InlineDataNode,
18	join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19	map::{MapNode, MapWithoutInputNode},
20	ring_buffer_scan::RingBufferScan,
21	row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22	scalarize::ScalarizeNode,
23	sort::SortNode,
24	table_scan::TableScanNode,
25	table_virtual_scan::VirtualScanNode,
26	take::TakeNode,
27	variable::VariableNode,
28	view_scan::ViewScanNode,
29};
30use reifydb_core::{
31	Frame,
32	interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33	value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36	ast,
37	plan::{physical::PhysicalPlan, plan},
38};
39use tracing::instrument;
40
41use crate::{
42	StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
43	function::{Functions, generator, math},
44	stack::{Stack, Variable},
45	table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
46};
47
48mod catalog;
49mod mutate;
50mod query;
51
52/// Unified trait for query execution nodes following the volcano iterator
53/// pattern
54pub(crate) trait QueryNode<'a> {
55	/// Initialize the operator with execution context
56	/// Called once before iteration begins
57	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
58
59	/// Get the next batch of results (volcano iterator pattern)
60	/// Returns None when exhausted
61	fn next(
62		&mut self,
63		rx: &mut StandardTransaction<'a>,
64		ctx: &mut ExecutionContext<'a>,
65	) -> crate::Result<Option<Batch<'a>>>;
66
67	/// Get the headers of columns this node produces
68	fn headers(&self) -> Option<ColumnHeaders<'a>>;
69}
70
71#[derive(Clone)]
72pub struct ExecutionContext<'a> {
73	pub executor: Executor,
74	pub source: Option<ResolvedSource<'a>>,
75	pub batch_size: u64,
76	pub params: Params,
77	pub stack: Stack,
78}
79
80#[derive(Debug)]
81pub struct Batch<'a> {
82	pub columns: Columns<'a>,
83}
84
85pub(crate) enum ExecutionPlan<'a> {
86	Aggregate(AggregateNode<'a>),
87	DictionaryScan(DictionaryScan<'a>),
88	Filter(FilterNode<'a>),
89	IndexScan(IndexScanNode<'a>),
90	InlineData(InlineDataNode<'a>),
91	InnerJoin(InnerJoinNode<'a>),
92	LeftJoin(LeftJoinNode<'a>),
93	NaturalJoin(NaturalJoinNode<'a>),
94	Map(MapNode<'a>),
95	MapWithoutInput(MapWithoutInputNode<'a>),
96	Extend(ExtendNode<'a>),
97	ExtendWithoutInput(ExtendWithoutInputNode<'a>),
98	Sort(SortNode<'a>),
99	TableScan(TableScanNode<'a>),
100	Take(TakeNode<'a>),
101	ViewScan(ViewScanNode<'a>),
102	Variable(VariableNode<'a>),
103	Environment(EnvironmentNode),
104	VirtualScan(VirtualScanNode<'a>),
105	RingBufferScan(RingBufferScan<'a>),
106	Generator(GeneratorNode<'a>),
107	Declare(DeclareNode<'a>),
108	Assign(AssignNode<'a>),
109	Conditional(query::conditional::ConditionalNode<'a>),
110	Scalarize(ScalarizeNode<'a>),
111	// Row-number optimized access
112	RowPointLookup(RowPointLookupNode<'a>),
113	RowListLookup(RowListLookupNode<'a>),
114	RowRangeScan(RowRangeScanNode<'a>),
115}
116
117// Implement QueryNode for Box<ExecutionPlan> to allow chaining
118impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
119	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
120		(**self).initialize(rx, ctx)
121	}
122
123	fn next(
124		&mut self,
125		rx: &mut StandardTransaction<'a>,
126		ctx: &mut ExecutionContext<'a>,
127	) -> crate::Result<Option<Batch<'a>>> {
128		(**self).next(rx, ctx)
129	}
130
131	fn headers(&self) -> Option<ColumnHeaders<'a>> {
132		(**self).headers()
133	}
134}
135
136impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
137	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
138		match self {
139			ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
140			ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
141			ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
142			ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
143			ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
144			ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
145			ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
146			ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
147			ExecutionPlan::Map(node) => node.initialize(rx, ctx),
148			ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
149			ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
150			ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
151			ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
152			ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
153			ExecutionPlan::Take(node) => node.initialize(rx, ctx),
154			ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
155			ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
156			ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
157			ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
158			ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
159			ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
160			ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
161			ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
162			ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
163			ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
164			ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
165			ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
166			ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
167		}
168	}
169
170	fn next(
171		&mut self,
172		rx: &mut StandardTransaction<'a>,
173		ctx: &mut ExecutionContext<'a>,
174	) -> crate::Result<Option<Batch<'a>>> {
175		match self {
176			ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
177			ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
178			ExecutionPlan::Filter(node) => node.next(rx, ctx),
179			ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
180			ExecutionPlan::InlineData(node) => node.next(rx, ctx),
181			ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
182			ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
183			ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
184			ExecutionPlan::Map(node) => node.next(rx, ctx),
185			ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
186			ExecutionPlan::Extend(node) => node.next(rx, ctx),
187			ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
188			ExecutionPlan::Sort(node) => node.next(rx, ctx),
189			ExecutionPlan::TableScan(node) => node.next(rx, ctx),
190			ExecutionPlan::Take(node) => node.next(rx, ctx),
191			ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
192			ExecutionPlan::Variable(node) => node.next(rx, ctx),
193			ExecutionPlan::Environment(node) => node.next(rx, ctx),
194			ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
195			ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
196			ExecutionPlan::Generator(node) => node.next(rx, ctx),
197			ExecutionPlan::Declare(node) => node.next(rx, ctx),
198			ExecutionPlan::Assign(node) => node.next(rx, ctx),
199			ExecutionPlan::Conditional(node) => node.next(rx, ctx),
200			ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
201			ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
202			ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
203			ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
204		}
205	}
206
207	fn headers(&self) -> Option<ColumnHeaders<'a>> {
208		match self {
209			ExecutionPlan::Aggregate(node) => node.headers(),
210			ExecutionPlan::DictionaryScan(node) => node.headers(),
211			ExecutionPlan::Filter(node) => node.headers(),
212			ExecutionPlan::IndexScan(node) => node.headers(),
213			ExecutionPlan::InlineData(node) => node.headers(),
214			ExecutionPlan::InnerJoin(node) => node.headers(),
215			ExecutionPlan::LeftJoin(node) => node.headers(),
216			ExecutionPlan::NaturalJoin(node) => node.headers(),
217			ExecutionPlan::Map(node) => node.headers(),
218			ExecutionPlan::MapWithoutInput(node) => node.headers(),
219			ExecutionPlan::Extend(node) => node.headers(),
220			ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
221			ExecutionPlan::Sort(node) => node.headers(),
222			ExecutionPlan::TableScan(node) => node.headers(),
223			ExecutionPlan::Take(node) => node.headers(),
224			ExecutionPlan::ViewScan(node) => node.headers(),
225			ExecutionPlan::Variable(node) => node.headers(),
226			ExecutionPlan::Environment(node) => node.headers(),
227			ExecutionPlan::VirtualScan(node) => node.headers(),
228			ExecutionPlan::RingBufferScan(node) => node.headers(),
229			ExecutionPlan::Generator(node) => node.headers(),
230			ExecutionPlan::Declare(node) => node.headers(),
231			ExecutionPlan::Assign(node) => node.headers(),
232			ExecutionPlan::Conditional(node) => node.headers(),
233			ExecutionPlan::Scalarize(node) => node.headers(),
234			ExecutionPlan::RowPointLookup(node) => node.headers(),
235			ExecutionPlan::RowListLookup(node) => node.headers(),
236			ExecutionPlan::RowRangeScan(node) => node.headers(),
237		}
238	}
239}
240
241pub struct Executor(Arc<ExecutorInner>);
242
243pub struct ExecutorInner {
244	pub functions: Functions,
245	pub flow_operator_store: FlowOperatorStore,
246	pub virtual_table_registry: TableVirtualUserRegistry,
247}
248
249impl Clone for Executor {
250	fn clone(&self) -> Self {
251		Self(self.0.clone())
252	}
253}
254
255impl std::ops::Deref for Executor {
256	type Target = ExecutorInner;
257
258	fn deref(&self) -> &Self::Target {
259		&self.0
260	}
261}
262
263impl Executor {
264	pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
265		Self(Arc::new(ExecutorInner {
266			functions,
267			flow_operator_store,
268			virtual_table_registry: TableVirtualUserRegistry::new(),
269		}))
270	}
271
272	pub fn with_virtual_table_registry(
273		functions: Functions,
274		flow_operator_store: FlowOperatorStore,
275		virtual_table_registry: TableVirtualUserRegistry,
276	) -> Self {
277		Self(Arc::new(ExecutorInner {
278			functions,
279			flow_operator_store,
280			virtual_table_registry,
281		}))
282	}
283
284	#[allow(dead_code)]
285	pub fn testing() -> Self {
286		Self::new(
287			Functions::builder()
288				.register_aggregate("math::sum", math::aggregate::Sum::new)
289				.register_aggregate("math::min", math::aggregate::Min::new)
290				.register_aggregate("math::max", math::aggregate::Max::new)
291				.register_aggregate("math::avg", math::aggregate::Avg::new)
292				.register_aggregate("math::count", math::aggregate::Count::new)
293				.register_scalar("math::abs", math::scalar::Abs::new)
294				.register_scalar("math::avg", math::scalar::Avg::new)
295				.register_generator("generate_series", generator::GenerateSeries::new)
296				.build(),
297			FlowOperatorStore::new(),
298		)
299	}
300}
301
302impl ExecuteCommand<StandardCommandTransaction> for Executor {
303	#[instrument(level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
304	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
305		let mut result = vec![];
306		let statements = ast::parse_str(cmd.rql)?;
307
308		// Create a single persistent Stack for all statements in this command
309		let mut persistent_stack = Stack::new();
310
311		// Populate the stack with parameters so they can be accessed as variables
312		match &cmd.params {
313			reifydb_core::interface::Params::Positional(values) => {
314				// For positional parameters, use $1, $2, $3, etc.
315				for (index, value) in values.iter().enumerate() {
316					let param_name = (index + 1).to_string(); // 1-based indexing
317					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
318				}
319			}
320			reifydb_core::interface::Params::Named(map) => {
321				// For named parameters, use the parameter name directly
322				for (name, value) in map {
323					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
324				}
325			}
326			reifydb_core::interface::Params::None => {
327				// No parameters to populate
328			}
329		}
330
331		for statement in statements {
332			if let Some(plan) = plan(txn, statement)? {
333				if let Some(er) =
334					self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
335				{
336					result.push(Frame::from(er));
337				}
338			}
339		}
340
341		Ok(result)
342	}
343}
344
345impl ExecuteQuery<StandardQueryTransaction> for Executor {
346	#[instrument(level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
347	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
348		let mut result = vec![];
349		let statements = ast::parse_str(qry.rql)?;
350
351		// Create a single persistent Stack for all statements in this query
352		let mut persistent_stack = Stack::new();
353
354		// Populate the stack with parameters so they can be accessed as variables
355		match &qry.params {
356			reifydb_core::interface::Params::Positional(values) => {
357				// For positional parameters, use $1, $2, $3, etc.
358				for (index, value) in values.iter().enumerate() {
359					let param_name = (index + 1).to_string(); // 1-based indexing
360					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
361				}
362			}
363			reifydb_core::interface::Params::Named(map) => {
364				// For named parameters, use the parameter name directly
365				for (name, value) in map {
366					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
367				}
368			}
369			reifydb_core::interface::Params::None => {
370				// No parameters to populate
371			}
372		}
373
374		for statement in statements {
375			if let Some(plan) = plan(txn, statement)? {
376				if let Some(er) =
377					self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
378				{
379					result.push(Frame::from(er));
380				}
381			}
382		}
383
384		Ok(result)
385	}
386}
387
388impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
389
390impl Executor {
391	pub(crate) fn execute_query_plan<'a>(
392		&self,
393		rx: &'a mut StandardQueryTransaction,
394		plan: PhysicalPlan<'a>,
395		params: Params,
396		stack: &mut Stack,
397	) -> crate::Result<Option<Columns<'a>>> {
398		match plan {
399			// Query
400			PhysicalPlan::Aggregate(_)
401			| PhysicalPlan::DictionaryScan(_)
402			| PhysicalPlan::Filter(_)
403			| PhysicalPlan::IndexScan(_)
404			| PhysicalPlan::JoinInner(_)
405			| PhysicalPlan::JoinLeft(_)
406			| PhysicalPlan::JoinNatural(_)
407			| PhysicalPlan::Take(_)
408			| PhysicalPlan::Sort(_)
409			| PhysicalPlan::Map(_)
410			| PhysicalPlan::Extend(_)
411			| PhysicalPlan::InlineData(_)
412			| PhysicalPlan::Generator(_)
413			| PhysicalPlan::Delete(_)
414			| PhysicalPlan::DeleteRingBuffer(_)
415			| PhysicalPlan::InsertTable(_)
416			| PhysicalPlan::InsertRingBuffer(_)
417			| PhysicalPlan::InsertDictionary(_)
418			| PhysicalPlan::Update(_)
419			| PhysicalPlan::UpdateRingBuffer(_)
420			| PhysicalPlan::TableScan(_)
421			| PhysicalPlan::ViewScan(_)
422			| PhysicalPlan::FlowScan(_)
423			| PhysicalPlan::TableVirtualScan(_)
424			| PhysicalPlan::RingBufferScan(_)
425			| PhysicalPlan::Variable(_)
426			| PhysicalPlan::Environment(_)
427			| PhysicalPlan::Conditional(_)
428			| PhysicalPlan::Scalarize(_)
429			| PhysicalPlan::RowPointLookup(_)
430			| PhysicalPlan::RowListLookup(_)
431			| PhysicalPlan::RowRangeScan(_) => {
432				let mut std_txn = StandardTransaction::from(rx);
433				self.query(&mut std_txn, plan, params, stack)
434			}
435			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
436				let mut std_txn = StandardTransaction::from(rx);
437				self.query(&mut std_txn, plan, params, stack)?;
438				Ok(None)
439			}
440			PhysicalPlan::AlterSequence(_)
441			| PhysicalPlan::AlterTable(_)
442			| PhysicalPlan::AlterView(_)
443			| PhysicalPlan::AlterFlow(_)
444			| PhysicalPlan::CreateDeferredView(_)
445			| PhysicalPlan::CreateTransactionalView(_)
446			| PhysicalPlan::CreateNamespace(_)
447			| PhysicalPlan::CreateTable(_)
448			| PhysicalPlan::CreateRingBuffer(_)
449			| PhysicalPlan::CreateFlow(_)
450			| PhysicalPlan::CreateDictionary(_)
451			| PhysicalPlan::Distinct(_)
452			| PhysicalPlan::Apply(_) => {
453				// Apply operator requires flow engine for mod
454				// execution
455				unimplemented!(
456					"Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
457				)
458			}
459			PhysicalPlan::Window(_) => {
460				// Window operator requires flow engine for mod
461				// execution
462				unimplemented!(
463					"Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
464				)
465			}
466			PhysicalPlan::Merge(_) => {
467				// Merge operator requires flow engine
468				unimplemented!(
469					"Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
470				)
471			}
472		}
473	}
474
475	pub fn execute_command_plan<'a>(
476		&self,
477		txn: &'a mut StandardCommandTransaction,
478		plan: PhysicalPlan<'a>,
479		params: Params,
480		stack: &mut Stack,
481	) -> crate::Result<Option<Columns<'a>>> {
482		match plan {
483			PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
484			PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
485			PhysicalPlan::CreateTransactionalView(plan) => {
486				Ok(Some(self.create_transactional_view(txn, plan)?))
487			}
488			PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
489			PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
490			PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
491			PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
492			PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
493			PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
494			PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
495			PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
496			PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
497			PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
498			PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
499			PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
500
501			PhysicalPlan::Aggregate(_)
502			| PhysicalPlan::DictionaryScan(_)
503			| PhysicalPlan::Filter(_)
504			| PhysicalPlan::IndexScan(_)
505			| PhysicalPlan::JoinInner(_)
506			| PhysicalPlan::JoinLeft(_)
507			| PhysicalPlan::JoinNatural(_)
508			| PhysicalPlan::Take(_)
509			| PhysicalPlan::Sort(_)
510			| PhysicalPlan::Map(_)
511			| PhysicalPlan::Extend(_)
512			| PhysicalPlan::InlineData(_)
513			| PhysicalPlan::Generator(_)
514			| PhysicalPlan::TableScan(_)
515			| PhysicalPlan::ViewScan(_)
516			| PhysicalPlan::FlowScan(_)
517			| PhysicalPlan::TableVirtualScan(_)
518			| PhysicalPlan::RingBufferScan(_)
519			| PhysicalPlan::Distinct(_)
520			| PhysicalPlan::Variable(_)
521			| PhysicalPlan::Environment(_)
522			| PhysicalPlan::Apply(_)
523			| PhysicalPlan::Conditional(_)
524			| PhysicalPlan::Scalarize(_)
525			| PhysicalPlan::RowPointLookup(_)
526			| PhysicalPlan::RowListLookup(_)
527			| PhysicalPlan::RowRangeScan(_) => {
528				let mut std_txn = StandardTransaction::from(txn);
529				self.query(&mut std_txn, plan, params, stack)
530			}
531			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
532				let mut std_txn = StandardTransaction::from(txn);
533				self.query(&mut std_txn, plan, params, stack)?;
534				Ok(None)
535			}
536			PhysicalPlan::Window(_) => {
537				let mut std_txn = StandardTransaction::from(txn);
538				self.query(&mut std_txn, plan, params, stack)
539			}
540			PhysicalPlan::Merge(_) => {
541				let mut std_txn = StandardTransaction::from(txn);
542				self.query(&mut std_txn, plan, params, stack)
543			}
544
545			PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
546			PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
547			PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
548		}
549	}
550
551	fn query<'a>(
552		&self,
553		rx: &mut StandardTransaction<'a>,
554		plan: PhysicalPlan<'a>,
555		params: Params,
556		stack: &mut Stack,
557	) -> crate::Result<Option<Columns<'a>>> {
558		let context = Arc::new(ExecutionContext {
559			executor: self.clone(),
560			source: None,
561			batch_size: 1024,
562			params: params.clone(),
563			stack: stack.clone(),
564		});
565		let mut node = compile(plan, rx, context.clone());
566
567		// Initialize the operator before execution
568		node.initialize(rx, &context)?;
569
570		let mut result: Option<Columns> = None;
571		let mut mutable_context = (*context).clone();
572
573		while let Some(Batch {
574			columns,
575		}) = node.next(rx, &mut mutable_context)?
576		{
577			if let Some(mut result_columns) = result.take() {
578				result_columns.append_columns(columns)?;
579				result = Some(result_columns);
580			} else {
581				result = Some(columns);
582			}
583		}
584
585		// Copy stack changes back to persistent stack
586		*stack = mutable_context.stack;
587
588		let headers = node.headers();
589
590		if let Some(mut columns) = result {
591			if let Some(headers) = headers {
592				columns.apply_headers(&headers);
593			}
594
595			Ok(columns.into())
596		} else {
597			// empty columns - reconstruct table,
598			// for better UX
599			let columns: Vec<Column<'a>> = node
600				.headers()
601				.unwrap_or(ColumnHeaders {
602					columns: vec![],
603				})
604				.columns
605				.into_iter()
606				.map(|name| Column {
607					name,
608					data: ColumnData::undefined(0),
609				})
610				.collect();
611
612			Ok(Some(Columns::new(columns)))
613		}
614	}
615}