reifydb_engine/execute/
mod.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later, see license.md file
3
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use query::{
8	aggregate::AggregateNode,
9	assign::AssignNode,
10	compile::compile,
11	declare::DeclareNode,
12	dictionary_scan::DictionaryScanNode,
13	environment::EnvironmentNode,
14	extend::{ExtendNode, ExtendWithoutInputNode},
15	filter::FilterNode,
16	generator::GeneratorNode,
17	index_scan::IndexScanNode,
18	inline::InlineDataNode,
19	join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
20	map::{MapNode, MapWithoutInputNode},
21	ringbuffer_scan::RingBufferScan,
22	row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
23	scalarize::ScalarizeNode,
24	sort::SortNode,
25	table_scan::TableScanNode,
26	table_virtual_scan::VirtualScanNode,
27	take::TakeNode,
28	top_k::TopKNode,
29	variable::VariableNode,
30	view_scan::ViewScanNode,
31};
32use reifydb_core::{
33	Frame,
34	interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
35	value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
36};
37use reifydb_rql::{
38	ast,
39	plan::{physical::PhysicalPlan, plan},
40};
41use reifydb_transaction::StorageTracker;
42use tracing::instrument;
43
44use crate::{
45	StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
46	function::{Functions, generator, math},
47	stack::{Stack, Variable},
48	table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
49};
50
51mod catalog;
52pub(crate) mod mutate;
53mod query;
54
55/// Unified trait for query execution nodes following the volcano iterator pattern
56#[async_trait]
57pub(crate) trait QueryNode {
58	/// Initialize the operator with execution context
59	/// Called once before iteration begins
60	async fn initialize<'a>(
61		&mut self,
62		rx: &mut StandardTransaction<'a>,
63		ctx: &ExecutionContext,
64	) -> crate::Result<()>;
65
66	/// Get the next batch of results (volcano iterator pattern)
67	/// Returns None when exhausted
68	async fn next<'a>(
69		&mut self,
70		rx: &mut StandardTransaction<'a>,
71		ctx: &mut ExecutionContext,
72	) -> crate::Result<Option<Batch>>;
73
74	/// Get the headers of columns this node produces
75	fn headers(&self) -> Option<ColumnHeaders>;
76}
77
78#[derive(Clone)]
79pub struct ExecutionContext {
80	pub executor: Executor,
81	pub source: Option<ResolvedSource>,
82	pub batch_size: u64,
83	pub params: Params,
84	pub stack: Stack,
85}
86
87#[derive(Debug)]
88pub struct Batch {
89	pub columns: Columns,
90}
91
92pub(crate) enum ExecutionPlan {
93	Aggregate(AggregateNode),
94	DictionaryScan(DictionaryScanNode),
95	Filter(FilterNode),
96	IndexScan(IndexScanNode),
97	InlineData(InlineDataNode),
98	InnerJoin(InnerJoinNode),
99	LeftJoin(LeftJoinNode),
100	NaturalJoin(NaturalJoinNode),
101	Map(MapNode),
102	MapWithoutInput(MapWithoutInputNode),
103	Extend(ExtendNode),
104	ExtendWithoutInput(ExtendWithoutInputNode),
105	Sort(SortNode),
106	TableScan(TableScanNode),
107	Take(TakeNode),
108	TopK(TopKNode),
109	ViewScan(ViewScanNode),
110	Variable(VariableNode),
111	Environment(EnvironmentNode),
112	VirtualScan(VirtualScanNode),
113	RingBufferScan(RingBufferScan),
114	Generator(GeneratorNode),
115	Declare(DeclareNode),
116	Assign(AssignNode),
117	Conditional(query::conditional::ConditionalNode),
118	Scalarize(ScalarizeNode),
119	// Row-number optimized access
120	RowPointLookup(RowPointLookupNode),
121	RowListLookup(RowListLookupNode),
122	RowRangeScan(RowRangeScanNode),
123}
124
125// Implement QueryNode for Box<ExecutionPlan> to allow chaining
126#[async_trait]
127impl QueryNode for Box<ExecutionPlan> {
128	async fn initialize<'a>(
129		&mut self,
130		rx: &mut StandardTransaction<'a>,
131		ctx: &ExecutionContext,
132	) -> crate::Result<()> {
133		(**self).initialize(rx, ctx).await
134	}
135
136	async fn next<'a>(
137		&mut self,
138		rx: &mut StandardTransaction<'a>,
139		ctx: &mut ExecutionContext,
140	) -> crate::Result<Option<Batch>> {
141		(**self).next(rx, ctx).await
142	}
143
144	fn headers(&self) -> Option<ColumnHeaders> {
145		(**self).headers()
146	}
147}
148
149#[async_trait]
150impl QueryNode for ExecutionPlan {
151	async fn initialize<'a>(
152		&mut self,
153		rx: &mut StandardTransaction<'a>,
154		ctx: &ExecutionContext,
155	) -> crate::Result<()> {
156		match self {
157			ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx).await,
158			ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx).await,
159			ExecutionPlan::Filter(node) => node.initialize(rx, ctx).await,
160			ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx).await,
161			ExecutionPlan::InlineData(node) => node.initialize(rx, ctx).await,
162			ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx).await,
163			ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx).await,
164			ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx).await,
165			ExecutionPlan::Map(node) => node.initialize(rx, ctx).await,
166			ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx).await,
167			ExecutionPlan::Extend(node) => node.initialize(rx, ctx).await,
168			ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx).await,
169			ExecutionPlan::Sort(node) => node.initialize(rx, ctx).await,
170			ExecutionPlan::TableScan(node) => node.initialize(rx, ctx).await,
171			ExecutionPlan::Take(node) => node.initialize(rx, ctx).await,
172			ExecutionPlan::TopK(node) => node.initialize(rx, ctx).await,
173			ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx).await,
174			ExecutionPlan::Variable(node) => node.initialize(rx, ctx).await,
175			ExecutionPlan::Environment(node) => node.initialize(rx, ctx).await,
176			ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx).await,
177			ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx).await,
178			ExecutionPlan::Generator(node) => node.initialize(rx, ctx).await,
179			ExecutionPlan::Declare(node) => node.initialize(rx, ctx).await,
180			ExecutionPlan::Assign(node) => node.initialize(rx, ctx).await,
181			ExecutionPlan::Conditional(node) => node.initialize(rx, ctx).await,
182			ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx).await,
183			ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx).await,
184			ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx).await,
185			ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx).await,
186		}
187	}
188
189	async fn next<'a>(
190		&mut self,
191		rx: &mut StandardTransaction<'a>,
192		ctx: &mut ExecutionContext,
193	) -> crate::Result<Option<Batch>> {
194		match self {
195			ExecutionPlan::Aggregate(node) => node.next(rx, ctx).await,
196			ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx).await,
197			ExecutionPlan::Filter(node) => node.next(rx, ctx).await,
198			ExecutionPlan::IndexScan(node) => node.next(rx, ctx).await,
199			ExecutionPlan::InlineData(node) => node.next(rx, ctx).await,
200			ExecutionPlan::InnerJoin(node) => node.next(rx, ctx).await,
201			ExecutionPlan::LeftJoin(node) => node.next(rx, ctx).await,
202			ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx).await,
203			ExecutionPlan::Map(node) => node.next(rx, ctx).await,
204			ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx).await,
205			ExecutionPlan::Extend(node) => node.next(rx, ctx).await,
206			ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx).await,
207			ExecutionPlan::Sort(node) => node.next(rx, ctx).await,
208			ExecutionPlan::TableScan(node) => node.next(rx, ctx).await,
209			ExecutionPlan::Take(node) => node.next(rx, ctx).await,
210			ExecutionPlan::TopK(node) => node.next(rx, ctx).await,
211			ExecutionPlan::ViewScan(node) => node.next(rx, ctx).await,
212			ExecutionPlan::Variable(node) => node.next(rx, ctx).await,
213			ExecutionPlan::Environment(node) => node.next(rx, ctx).await,
214			ExecutionPlan::VirtualScan(node) => node.next(rx, ctx).await,
215			ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx).await,
216			ExecutionPlan::Generator(node) => node.next(rx, ctx).await,
217			ExecutionPlan::Declare(node) => node.next(rx, ctx).await,
218			ExecutionPlan::Assign(node) => node.next(rx, ctx).await,
219			ExecutionPlan::Conditional(node) => node.next(rx, ctx).await,
220			ExecutionPlan::Scalarize(node) => node.next(rx, ctx).await,
221			ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx).await,
222			ExecutionPlan::RowListLookup(node) => node.next(rx, ctx).await,
223			ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx).await,
224		}
225	}
226
227	fn headers(&self) -> Option<ColumnHeaders> {
228		match self {
229			ExecutionPlan::Aggregate(node) => node.headers(),
230			ExecutionPlan::DictionaryScan(node) => node.headers(),
231			ExecutionPlan::Filter(node) => node.headers(),
232			ExecutionPlan::IndexScan(node) => node.headers(),
233			ExecutionPlan::InlineData(node) => node.headers(),
234			ExecutionPlan::InnerJoin(node) => node.headers(),
235			ExecutionPlan::LeftJoin(node) => node.headers(),
236			ExecutionPlan::NaturalJoin(node) => node.headers(),
237			ExecutionPlan::Map(node) => node.headers(),
238			ExecutionPlan::MapWithoutInput(node) => node.headers(),
239			ExecutionPlan::Extend(node) => node.headers(),
240			ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
241			ExecutionPlan::Sort(node) => node.headers(),
242			ExecutionPlan::TableScan(node) => node.headers(),
243			ExecutionPlan::Take(node) => node.headers(),
244			ExecutionPlan::TopK(node) => node.headers(),
245			ExecutionPlan::ViewScan(node) => node.headers(),
246			ExecutionPlan::Variable(node) => node.headers(),
247			ExecutionPlan::Environment(node) => node.headers(),
248			ExecutionPlan::VirtualScan(node) => node.headers(),
249			ExecutionPlan::RingBufferScan(node) => node.headers(),
250			ExecutionPlan::Generator(node) => node.headers(),
251			ExecutionPlan::Declare(node) => node.headers(),
252			ExecutionPlan::Assign(node) => node.headers(),
253			ExecutionPlan::Conditional(node) => node.headers(),
254			ExecutionPlan::Scalarize(node) => node.headers(),
255			ExecutionPlan::RowPointLookup(node) => node.headers(),
256			ExecutionPlan::RowListLookup(node) => node.headers(),
257			ExecutionPlan::RowRangeScan(node) => node.headers(),
258		}
259	}
260}
261
262pub struct Executor(Arc<ExecutorInner>);
263
264pub struct ExecutorInner {
265	pub functions: Functions,
266	pub flow_operator_store: FlowOperatorStore,
267	pub virtual_table_registry: TableVirtualUserRegistry,
268	pub stats_tracker: StorageTracker,
269}
270
271impl Clone for Executor {
272	fn clone(&self) -> Self {
273		Self(self.0.clone())
274	}
275}
276
277impl std::ops::Deref for Executor {
278	type Target = ExecutorInner;
279
280	fn deref(&self) -> &Self::Target {
281		&self.0
282	}
283}
284
285impl Executor {
286	pub fn new(
287		functions: Functions,
288		flow_operator_store: FlowOperatorStore,
289		stats_tracker: StorageTracker,
290	) -> Self {
291		Self(Arc::new(ExecutorInner {
292			functions,
293			flow_operator_store,
294			virtual_table_registry: TableVirtualUserRegistry::new(),
295			stats_tracker,
296		}))
297	}
298
299	pub fn with_virtual_table_registry(
300		functions: Functions,
301		flow_operator_store: FlowOperatorStore,
302		virtual_table_registry: TableVirtualUserRegistry,
303		stats_tracker: StorageTracker,
304	) -> Self {
305		Self(Arc::new(ExecutorInner {
306			functions,
307			flow_operator_store,
308			virtual_table_registry,
309			stats_tracker,
310		}))
311	}
312
313	#[allow(dead_code)]
314	pub fn testing() -> Self {
315		Self::new(
316			Functions::builder()
317				.register_aggregate("math::sum", math::aggregate::Sum::new)
318				.register_aggregate("math::min", math::aggregate::Min::new)
319				.register_aggregate("math::max", math::aggregate::Max::new)
320				.register_aggregate("math::avg", math::aggregate::Avg::new)
321				.register_aggregate("math::count", math::aggregate::Count::new)
322				.register_scalar("math::abs", math::scalar::Abs::new)
323				.register_scalar("math::avg", math::scalar::Avg::new)
324				.register_generator("generate_series", generator::GenerateSeries::new)
325				.build(),
326			FlowOperatorStore::new(),
327			StorageTracker::with_defaults(),
328		)
329	}
330}
331
332#[async_trait]
333impl ExecuteCommand<StandardCommandTransaction> for Executor {
334	#[instrument(name = "executor::execute_command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
335	async fn execute_command(
336		&self,
337		txn: &mut StandardCommandTransaction,
338		cmd: Command<'_>,
339	) -> crate::Result<Vec<Frame>> {
340		let mut result = vec![];
341		let statements = ast::parse_str(cmd.rql)?;
342
343		// Create a single persistent Stack for all statements in this command
344		let mut persistent_stack = Stack::new();
345
346		// Populate the stack with parameters so they can be accessed as variables
347		match &cmd.params {
348			Params::Positional(values) => {
349				// For positional parameters, use $1, $2, $3, etc.
350				for (index, value) in values.iter().enumerate() {
351					let param_name = (index + 1).to_string(); // 1-based indexing
352					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
353				}
354			}
355			Params::Named(map) => {
356				// For named parameters, use the parameter name directly
357				for (name, value) in map {
358					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
359				}
360			}
361			Params::None => {
362				// No parameters to populate
363			}
364		}
365
366		for statement in statements {
367			if let Some(plan) = plan(txn, statement).await? {
368				if let Some(er) = self
369					.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)
370					.await?
371				{
372					result.push(Frame::from(er));
373				}
374			}
375		}
376
377		Ok(result)
378	}
379}
380
381#[async_trait]
382impl ExecuteQuery<StandardQueryTransaction> for Executor {
383	#[instrument(name = "executor::execute_query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
384	async fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
385		let mut result = vec![];
386		let statements = ast::parse_str(qry.rql)?;
387
388		// Create a single persistent Stack for all statements in this query
389		let mut persistent_stack = Stack::new();
390
391		// Populate the stack with parameters so they can be accessed as variables
392		match &qry.params {
393			Params::Positional(values) => {
394				// For positional parameters, use $1, $2, $3, etc.
395				for (index, value) in values.iter().enumerate() {
396					let param_name = (index + 1).to_string(); // 1-based indexing
397					persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
398				}
399			}
400			Params::Named(map) => {
401				// For named parameters, use the parameter name directly
402				for (name, value) in map {
403					persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
404				}
405			}
406			Params::None => {
407				// No parameters to populate
408			}
409		}
410
411		for statement in statements {
412			if let Some(plan) = plan(txn, statement).await? {
413				if let Some(er) = self
414					.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)
415					.await?
416				{
417					result.push(Frame::from(er));
418				}
419			}
420		}
421
422		Ok(result)
423	}
424}
425
426impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
427
428impl Executor {
429	#[instrument(name = "executor::plan::query", level = "debug", skip(self, rx, plan, params, stack))]
430	pub(crate) async fn execute_query_plan<'a>(
431		&self,
432		rx: &'a mut StandardQueryTransaction,
433		plan: PhysicalPlan,
434		params: Params,
435		stack: &mut Stack,
436	) -> crate::Result<Option<Columns>> {
437		match plan {
438			// Query
439			PhysicalPlan::Aggregate(_)
440			| PhysicalPlan::DictionaryScan(_)
441			| PhysicalPlan::Filter(_)
442			| PhysicalPlan::IndexScan(_)
443			| PhysicalPlan::JoinInner(_)
444			| PhysicalPlan::JoinLeft(_)
445			| PhysicalPlan::JoinNatural(_)
446			| PhysicalPlan::Take(_)
447			| PhysicalPlan::Sort(_)
448			| PhysicalPlan::Map(_)
449			| PhysicalPlan::Extend(_)
450			| PhysicalPlan::InlineData(_)
451			| PhysicalPlan::Generator(_)
452			| PhysicalPlan::Delete(_)
453			| PhysicalPlan::DeleteRingBuffer(_)
454			| PhysicalPlan::InsertTable(_)
455			| PhysicalPlan::InsertRingBuffer(_)
456			| PhysicalPlan::InsertDictionary(_)
457			| PhysicalPlan::Update(_)
458			| PhysicalPlan::UpdateRingBuffer(_)
459			| PhysicalPlan::TableScan(_)
460			| PhysicalPlan::ViewScan(_)
461			| PhysicalPlan::FlowScan(_)
462			| PhysicalPlan::TableVirtualScan(_)
463			| PhysicalPlan::RingBufferScan(_)
464			| PhysicalPlan::Variable(_)
465			| PhysicalPlan::Environment(_)
466			| PhysicalPlan::Conditional(_)
467			| PhysicalPlan::Scalarize(_)
468			| PhysicalPlan::RowPointLookup(_)
469			| PhysicalPlan::RowListLookup(_)
470			| PhysicalPlan::RowRangeScan(_) => {
471				let mut std_txn = StandardTransaction::from(rx);
472				self.query(&mut std_txn, plan, params, stack).await
473			}
474			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
475				let mut std_txn = StandardTransaction::from(rx);
476				self.query(&mut std_txn, plan, params, stack).await?;
477				Ok(None)
478			}
479			PhysicalPlan::AlterSequence(_)
480			| PhysicalPlan::AlterTable(_)
481			| PhysicalPlan::AlterView(_)
482			| PhysicalPlan::AlterFlow(_)
483			| PhysicalPlan::CreateDeferredView(_)
484			| PhysicalPlan::CreateTransactionalView(_)
485			| PhysicalPlan::CreateNamespace(_)
486			| PhysicalPlan::CreateTable(_)
487			| PhysicalPlan::CreateRingBuffer(_)
488			| PhysicalPlan::CreateFlow(_)
489			| PhysicalPlan::CreateDictionary(_)
490			| PhysicalPlan::Distinct(_)
491			| PhysicalPlan::Apply(_) => {
492				// Apply operator requires flow engine for mod
493				// execution
494				unimplemented!(
495					"Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
496				)
497			}
498			PhysicalPlan::Window(_) => {
499				// Window operator requires flow engine for mod
500				// execution
501				unimplemented!(
502					"Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
503				)
504			}
505			PhysicalPlan::Merge(_) => {
506				// Merge operator requires flow engine
507				unimplemented!(
508					"Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
509				)
510			}
511		}
512	}
513
514	#[instrument(name = "executor::plan::command", level = "debug", skip(self, txn, plan, params, stack))]
515	pub async fn execute_command_plan<'a>(
516		&self,
517		txn: &'a mut StandardCommandTransaction,
518		plan: PhysicalPlan,
519		params: Params,
520		stack: &mut Stack,
521	) -> crate::Result<Option<Columns>> {
522		match plan {
523			PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan).await?)),
524			PhysicalPlan::CreateDeferredView(plan) => {
525				Ok(Some(self.create_deferred_view(txn, plan).await?))
526			}
527			PhysicalPlan::CreateTransactionalView(plan) => {
528				Ok(Some(self.create_transactional_view(txn, plan).await?))
529			}
530			PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan).await?)),
531			PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan).await?)),
532			PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ringbuffer(txn, plan).await?)),
533			PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan).await?)),
534			PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan).await?)),
535			PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params).await?)),
536			PhysicalPlan::DeleteRingBuffer(plan) => {
537				Ok(Some(self.delete_ringbuffer(txn, plan, params).await?))
538			}
539			PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack).await?)),
540			PhysicalPlan::InsertRingBuffer(plan) => {
541				Ok(Some(self.insert_ringbuffer(txn, plan, params).await?))
542			}
543			PhysicalPlan::InsertDictionary(plan) => {
544				Ok(Some(self.insert_dictionary(txn, plan, stack).await?))
545			}
546			PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params).await?)),
547			PhysicalPlan::UpdateRingBuffer(plan) => {
548				Ok(Some(self.update_ringbuffer(txn, plan, params).await?))
549			}
550
551			PhysicalPlan::Aggregate(_)
552			| PhysicalPlan::DictionaryScan(_)
553			| PhysicalPlan::Filter(_)
554			| PhysicalPlan::IndexScan(_)
555			| PhysicalPlan::JoinInner(_)
556			| PhysicalPlan::JoinLeft(_)
557			| PhysicalPlan::JoinNatural(_)
558			| PhysicalPlan::Take(_)
559			| PhysicalPlan::Sort(_)
560			| PhysicalPlan::Map(_)
561			| PhysicalPlan::Extend(_)
562			| PhysicalPlan::InlineData(_)
563			| PhysicalPlan::Generator(_)
564			| PhysicalPlan::TableScan(_)
565			| PhysicalPlan::ViewScan(_)
566			| PhysicalPlan::FlowScan(_)
567			| PhysicalPlan::TableVirtualScan(_)
568			| PhysicalPlan::RingBufferScan(_)
569			| PhysicalPlan::Distinct(_)
570			| PhysicalPlan::Variable(_)
571			| PhysicalPlan::Environment(_)
572			| PhysicalPlan::Apply(_)
573			| PhysicalPlan::Conditional(_)
574			| PhysicalPlan::Scalarize(_)
575			| PhysicalPlan::RowPointLookup(_)
576			| PhysicalPlan::RowListLookup(_)
577			| PhysicalPlan::RowRangeScan(_) => {
578				let mut std_txn = StandardTransaction::from(txn);
579				self.query(&mut std_txn, plan, params, stack).await
580			}
581			PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
582				let mut std_txn = StandardTransaction::from(txn);
583				self.query(&mut std_txn, plan, params, stack).await?;
584				Ok(None)
585			}
586			PhysicalPlan::Window(_) => {
587				let mut std_txn = StandardTransaction::from(txn);
588				self.query(&mut std_txn, plan, params, stack).await
589			}
590			PhysicalPlan::Merge(_) => {
591				let mut std_txn = StandardTransaction::from(txn);
592				self.query(&mut std_txn, plan, params, stack).await
593			}
594
595			PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan).await?)),
596			PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan).await?)),
597			PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan).await?)),
598		}
599	}
600
601	#[instrument(name = "executor::query", level = "debug", skip(self, rx, plan, params, stack))]
602	async fn query<'a>(
603		&self,
604		rx: &mut StandardTransaction<'a>,
605		plan: PhysicalPlan,
606		params: Params,
607		stack: &mut Stack,
608	) -> crate::Result<Option<Columns>> {
609		let mut context = ExecutionContext {
610			executor: self.clone(),
611			source: None,
612			batch_size: 1024,
613			params: params.clone(),
614			stack: stack.clone(),
615		};
616		let mut node = compile(plan, rx, Arc::new(context.clone())).await;
617
618		// Initialize the operator before execution
619		node.initialize(rx, &context).await?;
620
621		let mut result: Option<Columns> = None;
622
623		while let Some(Batch {
624			columns,
625		}) = node.next(rx, &mut context).await?
626		{
627			if let Some(mut result_columns) = result.take() {
628				result_columns.append_columns(columns)?;
629				result = Some(result_columns);
630			} else {
631				result = Some(columns);
632			}
633		}
634
635		// Copy stack changes back to persistent stack
636		*stack = context.stack;
637
638		let headers = node.headers();
639
640		if let Some(mut columns) = result {
641			if let Some(headers) = headers {
642				columns.apply_headers(&headers);
643			}
644
645			Ok(columns.into())
646		} else {
647			// empty columns - reconstruct table,
648			// for better UX
649			let columns: Vec<Column> = node
650				.headers()
651				.unwrap_or(ColumnHeaders {
652					columns: vec![],
653				})
654				.columns
655				.into_iter()
656				.map(|name| Column {
657					name,
658					data: ColumnData::undefined(0),
659				})
660				.collect();
661
662			Ok(Some(Columns::new(columns)))
663		}
664	}
665}