reifydb_engine/execute/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use query::{
7	aggregate::AggregateNode,
8	assign::AssignNode,
9	compile::compile,
10	declare::DeclareNode,
11	dictionary_scan::DictionaryScan,
12	environment::EnvironmentNode,
13	extend::{ExtendNode, ExtendWithoutInputNode},
14	filter::FilterNode,
15	generator::GeneratorNode,
16	index_scan::IndexScanNode,
17	inline::InlineDataNode,
18	join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19	map::{MapNode, MapWithoutInputNode},
20	ring_buffer_scan::RingBufferScan,
21	row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22	scalarize::ScalarizeNode,
23	sort::SortNode,
24	table_scan::TableScanNode,
25	table_virtual_scan::VirtualScanNode,
26	take::TakeNode,
27	variable::VariableNode,
28	view_scan::ViewScanNode,
29};
30use reifydb_core::{
31	Frame,
32	interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33	value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36	ast,
37	plan::{physical::PhysicalPlan, plan},
38};
39use tracing::instrument;
40
41use crate::{
42	StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
43	function::{Functions, generator, math},
44	stack::{Stack, Variable},
45	table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
46};
47
48mod catalog;
49mod mutate;
50mod query;
51
52/// Unified trait for query execution nodes following the volcano iterator
53/// pattern
54pub(crate) trait QueryNode<'a> {
55	/// Initialize the operator with execution context
56	/// Called once before iteration begins
57	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
58
59	/// Get the next batch of results (volcano iterator pattern)
60	/// Returns None when exhausted
61	fn next(
62		&mut self,
63		rx: &mut StandardTransaction<'a>,
64		ctx: &mut ExecutionContext<'a>,
65	) -> crate::Result<Option<Batch<'a>>>;
66
67	/// Get the headers of columns this node produces
68	fn headers(&self) -> Option<ColumnHeaders<'a>>;
69}
70
71#[derive(Clone)]
72pub struct ExecutionContext<'a> {
73	pub executor: Executor,
74	pub source: Option<ResolvedSource<'a>>,
75	pub batch_size: u64,
76	pub params: Params,
77	pub stack: Stack,
78}
79
80#[derive(Debug)]
81pub struct Batch<'a> {
82	pub columns: Columns<'a>,
83}
84
85pub(crate) enum ExecutionPlan<'a> {
86	Aggregate(AggregateNode<'a>),
87	DictionaryScan(DictionaryScan<'a>),
88	Filter(FilterNode<'a>),
89	IndexScan(IndexScanNode<'a>),
90	InlineData(InlineDataNode<'a>),
91	InnerJoin(InnerJoinNode<'a>),
92	LeftJoin(LeftJoinNode<'a>),
93	NaturalJoin(NaturalJoinNode<'a>),
94	Map(MapNode<'a>),
95	MapWithoutInput(MapWithoutInputNode<'a>),
96	Extend(ExtendNode<'a>),
97	ExtendWithoutInput(ExtendWithoutInputNode<'a>),
98	Sort(SortNode<'a>),
99	TableScan(TableScanNode<'a>),
100	Take(TakeNode<'a>),
101	ViewScan(ViewScanNode<'a>),
102	Variable(VariableNode<'a>),
103	Environment(EnvironmentNode),
104	VirtualScan(VirtualScanNode<'a>),
105	RingBufferScan(RingBufferScan<'a>),
106	Generator(GeneratorNode<'a>),
107	Declare(DeclareNode<'a>),
108	Assign(AssignNode<'a>),
109	Conditional(query::conditional::ConditionalNode<'a>),
110	Scalarize(ScalarizeNode<'a>),
111	// Row-number optimized access
112	RowPointLookup(RowPointLookupNode<'a>),
113	RowListLookup(RowListLookupNode<'a>),
114	RowRangeScan(RowRangeScanNode<'a>),
115}
116
117// Implement QueryNode for Box<ExecutionPlan> to allow chaining
118impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
119	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
120		(**self).initialize(rx, ctx)
121	}
122
123	fn next(
124		&mut self,
125		rx: &mut StandardTransaction<'a>,
126		ctx: &mut ExecutionContext<'a>,
127	) -> crate::Result<Option<Batch<'a>>> {
128		(**self).next(rx, ctx)
129	}
130
131	fn headers(&self) -> Option<ColumnHeaders<'a>> {
132		(**self).headers()
133	}
134}
135
136impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
137	fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
138		match self {
139			ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
140			ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
141			ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
142			ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
143			ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
144			ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
145			ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
146			ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
147			ExecutionPlan::Map(node) => node.initialize(rx, ctx),
148			ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
149			ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
150			ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
151			ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
152			ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
153			ExecutionPlan::Take(node) => node.initialize(rx, ctx),
154			ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
155			ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
156			ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
157			ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
158			ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
159			ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
160			ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
161			ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
162			ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
163			ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
164			ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
165			ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
166			ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
167		}
168	}
169
170	fn next(
171		&mut self,
172		rx: &mut StandardTransaction<'a>,
173		ctx: &mut ExecutionContext<'a>,
174	) -> crate::Result<Option<Batch<'a>>> {
175		match self {
176			ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
177			ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
178			ExecutionPlan::Filter(node) => node.next(rx, ctx),
179			ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
180			ExecutionPlan::InlineData(node) => node.next(rx, ctx),
181			ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
182			ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
183			ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
184			ExecutionPlan::Map(node) => node.next(rx, ctx),
185			ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
186			ExecutionPlan::Extend(node) => node.next(rx, ctx),
187			ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
188			ExecutionPlan::Sort(node) => node.next(rx, ctx),
189			ExecutionPlan::TableScan(node) => node.next(rx, ctx),
190			ExecutionPlan::Take(node) => node.next(rx, ctx),
191			ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
192			ExecutionPlan::Variable(node) => node.next(rx, ctx),
193			ExecutionPlan::Environment(node) => node.next(rx, ctx),
194			ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
195			ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
196			ExecutionPlan::Generator(node) => node.next(rx, ctx),
197			ExecutionPlan::Declare(node) => node.next(rx, ctx),
198			ExecutionPlan::Assign(node) => node.next(rx, ctx),
199			ExecutionPlan::Conditional(node) => node.next(rx, ctx),
200			ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
201			ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
202			ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
203			ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
204		}
205	}
206
207	fn headers(&self) -> Option<ColumnHeaders<'a>> {
208		match self {
209			ExecutionPlan::Aggregate(node) => node.headers(),
210			ExecutionPlan::DictionaryScan(node) => node.headers(),
211			ExecutionPlan::Filter(node) => node.headers(),
212			ExecutionPlan::IndexScan(node) => node.headers(),
213			ExecutionPlan::InlineData(node) => node.headers(),
214			ExecutionPlan::InnerJoin(node) => node.headers(),
215			ExecutionPlan::LeftJoin(node) => node.headers(),
216			ExecutionPlan::NaturalJoin(node) => node.headers(),
217			ExecutionPlan::Map(node) => node.headers(),
218			ExecutionPlan::MapWithoutInput(node) => node.headers(),
219			ExecutionPlan::Extend(node) => node.headers(),
220			ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
221			ExecutionPlan::Sort(node) => node.headers(),
222			ExecutionPlan::TableScan(node) => node.headers(),
223			ExecutionPlan::Take(node) => node.headers(),
224			ExecutionPlan::ViewScan(node) => node.headers(),
225			ExecutionPlan::Variable(node) => node.headers(),
226			ExecutionPlan::Environment(node) => node.headers(),
227			ExecutionPlan::VirtualScan(node) => node.headers(),
228			ExecutionPlan::RingBufferScan(node) => node.headers(),
229			ExecutionPlan::Generator(node) => node.headers(),
230			ExecutionPlan::Declare(node) => node.headers(),
231			ExecutionPlan::Assign(node) => node.headers(),
232			ExecutionPlan::Conditional(node) => node.headers(),
233			ExecutionPlan::Scalarize(node) => node.headers(),
234			ExecutionPlan::RowPointLookup(node) => node.headers(),
235			ExecutionPlan::RowListLookup(node) => node.headers(),
236			ExecutionPlan::RowRangeScan(node) => node.headers(),
237		}
238	}
239}
240
241pub struct Executor(Arc<ExecutorInner>);
242
243pub struct ExecutorInner {
244	pub functions: Functions,
245	pub flow_operator_store: FlowOperatorStore,
246	pub virtual_table_registry: TableVirtualUserRegistry,
247}
248
249impl Clone for Executor {
250	fn clone(&self) -> Self {
251		Self(self.0.clone())
252	}
253}
254
255impl std::ops::Deref for Executor {
256	type Target = ExecutorInner;
257
258	fn deref(&self) -> &Self::Target {
259		&self.0
260	}
261}
262
263impl Executor {
264	pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
265		Self(Arc::new(ExecutorInner {
266			functions,
267			flow_operator_store,
268			virtual_table_registry: TableVirtualUserRegistry::new(),
269		}))
270	}
271
272	pub fn with_virtual_table_registry(
273		functions: Functions,
274		flow_operator_store: FlowOperatorStore,
275		virtual_table_registry: TableVirtualUserRegistry,
276	) -> Self {
277		Self(Arc::new(ExecutorInner {
278			functions,
279			flow_operator_store,
280			virtual_table_registry,
281		}))
282	}
283
284	#[allow(dead_code)]
285	pub fn testing() -> Self {
286		Self::new(
287			Functions::builder()
288				.register_aggregate("math::sum", math::aggregate::Sum::new)
289				.register_aggregate("math::min", math::aggregate::Min::new)
290				.register_aggregate("math::max", math::aggregate::Max::new)
291				.register_aggregate("math::avg", math::aggregate::Avg::new)
292				.register_aggregate("math::count", math::aggregate::Count::new)
293				.register_scalar("math::abs", math::scalar::Abs::new)
294				.register_scalar("math::avg", math::scalar::Avg::new)
295				.register_generator("generate_series", generator::GenerateSeries::new)
296				.build(),
297			FlowOperatorStore::new(),
298		)
299	}
300}
301
302impl ExecuteCommand<StandardCommandTransaction> for Executor {
303	#[instrument(level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
304	fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
305		let mut result = vec![];
306		let statements = ast::parse_str(cmd.rql)?;
307
308		// Create a single persistent Stack for all statements in this command
309		let mut persistent_stack = Stack::new();
310
311		// Populate the stack with parameters so they can be accessed as variables
312		match &cmd.params {
313			Params::Positional(values) => {
314				// For positional parameters, use $1, $2, $3, etc.
315				for (index, value) in values.iter().enumerate() {
316					let param_name = (index + 1).to_string(); // 1-based indexing
317					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
318				}
319			}
320			Params::Named(map) => {
321				// For named parameters, use the parameter name directly
322				for (name, value) in map {
323					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
324				}
325			}
326			Params::None => {
327				// No parameters to populate
328			}
329		}
330
331		for statement in statements {
332			if let Some(plan) = plan(txn, statement)? {
333				if let Some(er) =
334					self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
335				{
336					result.push(Frame::from(er));
337				}
338			}
339		}
340
341		Ok(result)
342	}
343}
344
345impl ExecuteQuery<StandardQueryTransaction> for Executor {
346	#[instrument(level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
347	fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
348		let mut result = vec![];
349		let statements = ast::parse_str(qry.rql)?;
350
351		// Create a single persistent Stack for all statements in this query
352		let mut persistent_stack = Stack::new();
353
354		// Populate the stack with parameters so they can be accessed as variables
355		match &qry.params {
356			Params::Positional(values) => {
357				// For positional parameters, use $1, $2, $3, etc.
358				for (index, value) in values.iter().enumerate() {
359					let param_name = (index + 1).to_string(); // 1-based indexing
360					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
361				}
362			}
363			Params::Named(map) => {
364				// For named parameters, use the parameter name directly
365				for (name, value) in map {
366					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
367				}
368			}
369			Params::None => {
370				// No parameters to populate
371			}
372		}
373
374		for statement in statements {
375			if let Some(plan) = plan(txn, statement)? {
376				if let Some(er) =
377					self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
378				{
379					result.push(Frame::from(er));
380				}
381			}
382		}
383
384		Ok(result)
385	}
386}
387
388impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
389
390impl Executor {
391	#[instrument(level = "debug", skip(self, rx, plan, params, stack))]
392	pub(crate) fn execute_query_plan<'a>(
393		&self,
394		rx: &'a mut StandardQueryTransaction,
395		plan: PhysicalPlan<'a>,
396		params: Params,
397		stack: &mut Stack,
398	) -> crate::Result<Option<Columns<'a>>> {
399		match plan {
400			// Query
401			PhysicalPlan::Aggregate(_)
402			| PhysicalPlan::DictionaryScan(_)
403			| PhysicalPlan::Filter(_)
404			| PhysicalPlan::IndexScan(_)
405			| PhysicalPlan::JoinInner(_)
406			| PhysicalPlan::JoinLeft(_)
407			| PhysicalPlan::JoinNatural(_)
408			| PhysicalPlan::Take(_)
409			| PhysicalPlan::Sort(_)
410			| PhysicalPlan::Map(_)
411			| PhysicalPlan::Extend(_)
412			| PhysicalPlan::InlineData(_)
413			| PhysicalPlan::Generator(_)
414			| PhysicalPlan::Delete(_)
415			| PhysicalPlan::DeleteRingBuffer(_)
416			| PhysicalPlan::InsertTable(_)
417			| PhysicalPlan::InsertRingBuffer(_)
418			| PhysicalPlan::InsertDictionary(_)
419			| PhysicalPlan::Update(_)
420			| PhysicalPlan::UpdateRingBuffer(_)
421			| PhysicalPlan::TableScan(_)
422			| PhysicalPlan::ViewScan(_)
423			| PhysicalPlan::FlowScan(_)
424			| PhysicalPlan::TableVirtualScan(_)
425			| PhysicalPlan::RingBufferScan(_)
426			| PhysicalPlan::Variable(_)
427			| PhysicalPlan::Environment(_)
428			| PhysicalPlan::Conditional(_)
429			| PhysicalPlan::Scalarize(_)
430			| PhysicalPlan::RowPointLookup(_)
431			| PhysicalPlan::RowListLookup(_)
432			| PhysicalPlan::RowRangeScan(_) => {
433				let mut std_txn = StandardTransaction::from(rx);
434				self.query(&mut std_txn, plan, params, stack)
435			}
436			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
437				let mut std_txn = StandardTransaction::from(rx);
438				self.query(&mut std_txn, plan, params, stack)?;
439				Ok(None)
440			}
441			PhysicalPlan::AlterSequence(_)
442			| PhysicalPlan::AlterTable(_)
443			| PhysicalPlan::AlterView(_)
444			| PhysicalPlan::AlterFlow(_)
445			| PhysicalPlan::CreateDeferredView(_)
446			| PhysicalPlan::CreateTransactionalView(_)
447			| PhysicalPlan::CreateNamespace(_)
448			| PhysicalPlan::CreateTable(_)
449			| PhysicalPlan::CreateRingBuffer(_)
450			| PhysicalPlan::CreateFlow(_)
451			| PhysicalPlan::CreateDictionary(_)
452			| PhysicalPlan::Distinct(_)
453			| PhysicalPlan::Apply(_) => {
454				// Apply operator requires flow engine for mod
455				// execution
456				unimplemented!(
457					"Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
458				)
459			}
460			PhysicalPlan::Window(_) => {
461				// Window operator requires flow engine for mod
462				// execution
463				unimplemented!(
464					"Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
465				)
466			}
467			PhysicalPlan::Merge(_) => {
468				// Merge operator requires flow engine
469				unimplemented!(
470					"Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
471				)
472			}
473		}
474	}
475
476	#[instrument(level = "debug", skip(self, txn, plan, params, stack))]
477	pub fn execute_command_plan<'a>(
478		&self,
479		txn: &'a mut StandardCommandTransaction,
480		plan: PhysicalPlan<'a>,
481		params: Params,
482		stack: &mut Stack,
483	) -> crate::Result<Option<Columns<'a>>> {
484		match plan {
485			PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
486			PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
487			PhysicalPlan::CreateTransactionalView(plan) => {
488				Ok(Some(self.create_transactional_view(txn, plan)?))
489			}
490			PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
491			PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
492			PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
493			PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
494			PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
495			PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
496			PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
497			PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
498			PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
499			PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
500			PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
501			PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
502
503			PhysicalPlan::Aggregate(_)
504			| PhysicalPlan::DictionaryScan(_)
505			| PhysicalPlan::Filter(_)
506			| PhysicalPlan::IndexScan(_)
507			| PhysicalPlan::JoinInner(_)
508			| PhysicalPlan::JoinLeft(_)
509			| PhysicalPlan::JoinNatural(_)
510			| PhysicalPlan::Take(_)
511			| PhysicalPlan::Sort(_)
512			| PhysicalPlan::Map(_)
513			| PhysicalPlan::Extend(_)
514			| PhysicalPlan::InlineData(_)
515			| PhysicalPlan::Generator(_)
516			| PhysicalPlan::TableScan(_)
517			| PhysicalPlan::ViewScan(_)
518			| PhysicalPlan::FlowScan(_)
519			| PhysicalPlan::TableVirtualScan(_)
520			| PhysicalPlan::RingBufferScan(_)
521			| PhysicalPlan::Distinct(_)
522			| PhysicalPlan::Variable(_)
523			| PhysicalPlan::Environment(_)
524			| PhysicalPlan::Apply(_)
525			| PhysicalPlan::Conditional(_)
526			| PhysicalPlan::Scalarize(_)
527			| PhysicalPlan::RowPointLookup(_)
528			| PhysicalPlan::RowListLookup(_)
529			| PhysicalPlan::RowRangeScan(_) => {
530				let mut std_txn = StandardTransaction::from(txn);
531				self.query(&mut std_txn, plan, params, stack)
532			}
533			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
534				let mut std_txn = StandardTransaction::from(txn);
535				self.query(&mut std_txn, plan, params, stack)?;
536				Ok(None)
537			}
538			PhysicalPlan::Window(_) => {
539				let mut std_txn = StandardTransaction::from(txn);
540				self.query(&mut std_txn, plan, params, stack)
541			}
542			PhysicalPlan::Merge(_) => {
543				let mut std_txn = StandardTransaction::from(txn);
544				self.query(&mut std_txn, plan, params, stack)
545			}
546
547			PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
548			PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
549			PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
550		}
551	}
552
553	#[instrument(level = "debug", skip(self, rx, plan, params, stack))]
554	fn query<'a>(
555		&self,
556		rx: &mut StandardTransaction<'a>,
557		plan: PhysicalPlan<'a>,
558		params: Params,
559		stack: &mut Stack,
560	) -> crate::Result<Option<Columns<'a>>> {
561		let context = Arc::new(ExecutionContext {
562			executor: self.clone(),
563			source: None,
564			batch_size: 1024,
565			params: params.clone(),
566			stack: stack.clone(),
567		});
568		let mut node = compile(plan, rx, context.clone());
569
570		// Initialize the operator before execution
571		node.initialize(rx, &context)?;
572
573		let mut result: Option<Columns> = None;
574		let mut mutable_context = (*context).clone();
575
576		while let Some(Batch {
577			columns,
578		}) = node.next(rx, &mut mutable_context)?
579		{
580			if let Some(mut result_columns) = result.take() {
581				result_columns.append_columns(columns)?;
582				result = Some(result_columns);
583			} else {
584				result = Some(columns);
585			}
586		}
587
588		// Copy stack changes back to persistent stack
589		*stack = mutable_context.stack;
590
591		let headers = node.headers();
592
593		if let Some(mut columns) = result {
594			if let Some(headers) = headers {
595				columns.apply_headers(&headers);
596			}
597
598			Ok(columns.into())
599		} else {
600			// empty columns - reconstruct table,
601			// for better UX
602			let columns: Vec<Column<'a>> = node
603				.headers()
604				.unwrap_or(ColumnHeaders {
605					columns: vec![],
606				})
607				.columns
608				.into_iter()
609				.map(|name| Column {
610					name,
611					data: ColumnData::undefined(0),
612				})
613				.collect();
614
615			Ok(Some(Columns::new(columns)))
616		}
617	}
618}