1use std::sync::Arc;
5
6use async_trait::async_trait;
7use query::{
8 aggregate::AggregateNode,
9 assign::AssignNode,
10 compile::compile,
11 declare::DeclareNode,
12 dictionary_scan::DictionaryScanNode,
13 environment::EnvironmentNode,
14 extend::{ExtendNode, ExtendWithoutInputNode},
15 filter::FilterNode,
16 generator::GeneratorNode,
17 index_scan::IndexScanNode,
18 inline::InlineDataNode,
19 join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
20 map::{MapNode, MapWithoutInputNode},
21 ringbuffer_scan::RingBufferScan,
22 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
23 scalarize::ScalarizeNode,
24 sort::SortNode,
25 table_scan::TableScanNode,
26 table_virtual_scan::VirtualScanNode,
27 take::TakeNode,
28 top_k::TopKNode,
29 variable::VariableNode,
30 view_scan::ViewScanNode,
31};
32use reifydb_core::{
33 Frame,
34 interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
35 value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
36};
37use reifydb_rql::{
38 ast,
39 plan::{physical::PhysicalPlan, plan},
40};
41use reifydb_transaction::StorageTracker;
42use tracing::instrument;
43
44use crate::{
45 StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
46 function::{Functions, generator, math},
47 stack::{Stack, Variable},
48 table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
49};
50
51mod catalog;
52pub(crate) mod mutate;
53mod query;
54
55#[async_trait]
57pub(crate) trait QueryNode {
58 async fn initialize<'a>(
61 &mut self,
62 rx: &mut StandardTransaction<'a>,
63 ctx: &ExecutionContext,
64 ) -> crate::Result<()>;
65
66 async fn next<'a>(
69 &mut self,
70 rx: &mut StandardTransaction<'a>,
71 ctx: &mut ExecutionContext,
72 ) -> crate::Result<Option<Batch>>;
73
74 fn headers(&self) -> Option<ColumnHeaders>;
76}
77
78#[derive(Clone)]
79pub struct ExecutionContext {
80 pub executor: Executor,
81 pub source: Option<ResolvedSource>,
82 pub batch_size: u64,
83 pub params: Params,
84 pub stack: Stack,
85}
86
87#[derive(Debug)]
88pub struct Batch {
89 pub columns: Columns,
90}
91
92pub(crate) enum ExecutionPlan {
93 Aggregate(AggregateNode),
94 DictionaryScan(DictionaryScanNode),
95 Filter(FilterNode),
96 IndexScan(IndexScanNode),
97 InlineData(InlineDataNode),
98 InnerJoin(InnerJoinNode),
99 LeftJoin(LeftJoinNode),
100 NaturalJoin(NaturalJoinNode),
101 Map(MapNode),
102 MapWithoutInput(MapWithoutInputNode),
103 Extend(ExtendNode),
104 ExtendWithoutInput(ExtendWithoutInputNode),
105 Sort(SortNode),
106 TableScan(TableScanNode),
107 Take(TakeNode),
108 TopK(TopKNode),
109 ViewScan(ViewScanNode),
110 Variable(VariableNode),
111 Environment(EnvironmentNode),
112 VirtualScan(VirtualScanNode),
113 RingBufferScan(RingBufferScan),
114 Generator(GeneratorNode),
115 Declare(DeclareNode),
116 Assign(AssignNode),
117 Conditional(query::conditional::ConditionalNode),
118 Scalarize(ScalarizeNode),
119 RowPointLookup(RowPointLookupNode),
121 RowListLookup(RowListLookupNode),
122 RowRangeScan(RowRangeScanNode),
123}
124
125#[async_trait]
127impl QueryNode for Box<ExecutionPlan> {
128 async fn initialize<'a>(
129 &mut self,
130 rx: &mut StandardTransaction<'a>,
131 ctx: &ExecutionContext,
132 ) -> crate::Result<()> {
133 (**self).initialize(rx, ctx).await
134 }
135
136 async fn next<'a>(
137 &mut self,
138 rx: &mut StandardTransaction<'a>,
139 ctx: &mut ExecutionContext,
140 ) -> crate::Result<Option<Batch>> {
141 (**self).next(rx, ctx).await
142 }
143
144 fn headers(&self) -> Option<ColumnHeaders> {
145 (**self).headers()
146 }
147}
148
149#[async_trait]
150impl QueryNode for ExecutionPlan {
151 async fn initialize<'a>(
152 &mut self,
153 rx: &mut StandardTransaction<'a>,
154 ctx: &ExecutionContext,
155 ) -> crate::Result<()> {
156 match self {
157 ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx).await,
158 ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx).await,
159 ExecutionPlan::Filter(node) => node.initialize(rx, ctx).await,
160 ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx).await,
161 ExecutionPlan::InlineData(node) => node.initialize(rx, ctx).await,
162 ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx).await,
163 ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx).await,
164 ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx).await,
165 ExecutionPlan::Map(node) => node.initialize(rx, ctx).await,
166 ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx).await,
167 ExecutionPlan::Extend(node) => node.initialize(rx, ctx).await,
168 ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx).await,
169 ExecutionPlan::Sort(node) => node.initialize(rx, ctx).await,
170 ExecutionPlan::TableScan(node) => node.initialize(rx, ctx).await,
171 ExecutionPlan::Take(node) => node.initialize(rx, ctx).await,
172 ExecutionPlan::TopK(node) => node.initialize(rx, ctx).await,
173 ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx).await,
174 ExecutionPlan::Variable(node) => node.initialize(rx, ctx).await,
175 ExecutionPlan::Environment(node) => node.initialize(rx, ctx).await,
176 ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx).await,
177 ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx).await,
178 ExecutionPlan::Generator(node) => node.initialize(rx, ctx).await,
179 ExecutionPlan::Declare(node) => node.initialize(rx, ctx).await,
180 ExecutionPlan::Assign(node) => node.initialize(rx, ctx).await,
181 ExecutionPlan::Conditional(node) => node.initialize(rx, ctx).await,
182 ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx).await,
183 ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx).await,
184 ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx).await,
185 ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx).await,
186 }
187 }
188
189 async fn next<'a>(
190 &mut self,
191 rx: &mut StandardTransaction<'a>,
192 ctx: &mut ExecutionContext,
193 ) -> crate::Result<Option<Batch>> {
194 match self {
195 ExecutionPlan::Aggregate(node) => node.next(rx, ctx).await,
196 ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx).await,
197 ExecutionPlan::Filter(node) => node.next(rx, ctx).await,
198 ExecutionPlan::IndexScan(node) => node.next(rx, ctx).await,
199 ExecutionPlan::InlineData(node) => node.next(rx, ctx).await,
200 ExecutionPlan::InnerJoin(node) => node.next(rx, ctx).await,
201 ExecutionPlan::LeftJoin(node) => node.next(rx, ctx).await,
202 ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx).await,
203 ExecutionPlan::Map(node) => node.next(rx, ctx).await,
204 ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx).await,
205 ExecutionPlan::Extend(node) => node.next(rx, ctx).await,
206 ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx).await,
207 ExecutionPlan::Sort(node) => node.next(rx, ctx).await,
208 ExecutionPlan::TableScan(node) => node.next(rx, ctx).await,
209 ExecutionPlan::Take(node) => node.next(rx, ctx).await,
210 ExecutionPlan::TopK(node) => node.next(rx, ctx).await,
211 ExecutionPlan::ViewScan(node) => node.next(rx, ctx).await,
212 ExecutionPlan::Variable(node) => node.next(rx, ctx).await,
213 ExecutionPlan::Environment(node) => node.next(rx, ctx).await,
214 ExecutionPlan::VirtualScan(node) => node.next(rx, ctx).await,
215 ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx).await,
216 ExecutionPlan::Generator(node) => node.next(rx, ctx).await,
217 ExecutionPlan::Declare(node) => node.next(rx, ctx).await,
218 ExecutionPlan::Assign(node) => node.next(rx, ctx).await,
219 ExecutionPlan::Conditional(node) => node.next(rx, ctx).await,
220 ExecutionPlan::Scalarize(node) => node.next(rx, ctx).await,
221 ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx).await,
222 ExecutionPlan::RowListLookup(node) => node.next(rx, ctx).await,
223 ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx).await,
224 }
225 }
226
227 fn headers(&self) -> Option<ColumnHeaders> {
228 match self {
229 ExecutionPlan::Aggregate(node) => node.headers(),
230 ExecutionPlan::DictionaryScan(node) => node.headers(),
231 ExecutionPlan::Filter(node) => node.headers(),
232 ExecutionPlan::IndexScan(node) => node.headers(),
233 ExecutionPlan::InlineData(node) => node.headers(),
234 ExecutionPlan::InnerJoin(node) => node.headers(),
235 ExecutionPlan::LeftJoin(node) => node.headers(),
236 ExecutionPlan::NaturalJoin(node) => node.headers(),
237 ExecutionPlan::Map(node) => node.headers(),
238 ExecutionPlan::MapWithoutInput(node) => node.headers(),
239 ExecutionPlan::Extend(node) => node.headers(),
240 ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
241 ExecutionPlan::Sort(node) => node.headers(),
242 ExecutionPlan::TableScan(node) => node.headers(),
243 ExecutionPlan::Take(node) => node.headers(),
244 ExecutionPlan::TopK(node) => node.headers(),
245 ExecutionPlan::ViewScan(node) => node.headers(),
246 ExecutionPlan::Variable(node) => node.headers(),
247 ExecutionPlan::Environment(node) => node.headers(),
248 ExecutionPlan::VirtualScan(node) => node.headers(),
249 ExecutionPlan::RingBufferScan(node) => node.headers(),
250 ExecutionPlan::Generator(node) => node.headers(),
251 ExecutionPlan::Declare(node) => node.headers(),
252 ExecutionPlan::Assign(node) => node.headers(),
253 ExecutionPlan::Conditional(node) => node.headers(),
254 ExecutionPlan::Scalarize(node) => node.headers(),
255 ExecutionPlan::RowPointLookup(node) => node.headers(),
256 ExecutionPlan::RowListLookup(node) => node.headers(),
257 ExecutionPlan::RowRangeScan(node) => node.headers(),
258 }
259 }
260}
261
262pub struct Executor(Arc<ExecutorInner>);
263
264pub struct ExecutorInner {
265 pub functions: Functions,
266 pub flow_operator_store: FlowOperatorStore,
267 pub virtual_table_registry: TableVirtualUserRegistry,
268 pub stats_tracker: StorageTracker,
269}
270
271impl Clone for Executor {
272 fn clone(&self) -> Self {
273 Self(self.0.clone())
274 }
275}
276
277impl std::ops::Deref for Executor {
278 type Target = ExecutorInner;
279
280 fn deref(&self) -> &Self::Target {
281 &self.0
282 }
283}
284
285impl Executor {
286 pub fn new(
287 functions: Functions,
288 flow_operator_store: FlowOperatorStore,
289 stats_tracker: StorageTracker,
290 ) -> Self {
291 Self(Arc::new(ExecutorInner {
292 functions,
293 flow_operator_store,
294 virtual_table_registry: TableVirtualUserRegistry::new(),
295 stats_tracker,
296 }))
297 }
298
299 pub fn with_virtual_table_registry(
300 functions: Functions,
301 flow_operator_store: FlowOperatorStore,
302 virtual_table_registry: TableVirtualUserRegistry,
303 stats_tracker: StorageTracker,
304 ) -> Self {
305 Self(Arc::new(ExecutorInner {
306 functions,
307 flow_operator_store,
308 virtual_table_registry,
309 stats_tracker,
310 }))
311 }
312
313 #[allow(dead_code)]
314 pub fn testing() -> Self {
315 Self::new(
316 Functions::builder()
317 .register_aggregate("math::sum", math::aggregate::Sum::new)
318 .register_aggregate("math::min", math::aggregate::Min::new)
319 .register_aggregate("math::max", math::aggregate::Max::new)
320 .register_aggregate("math::avg", math::aggregate::Avg::new)
321 .register_aggregate("math::count", math::aggregate::Count::new)
322 .register_scalar("math::abs", math::scalar::Abs::new)
323 .register_scalar("math::avg", math::scalar::Avg::new)
324 .register_generator("generate_series", generator::GenerateSeries::new)
325 .build(),
326 FlowOperatorStore::new(),
327 StorageTracker::with_defaults(),
328 )
329 }
330}
331
332#[async_trait]
333impl ExecuteCommand<StandardCommandTransaction> for Executor {
334 #[instrument(name = "executor::execute_command", level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
335 async fn execute_command(
336 &self,
337 txn: &mut StandardCommandTransaction,
338 cmd: Command<'_>,
339 ) -> crate::Result<Vec<Frame>> {
340 let mut result = vec![];
341 let statements = ast::parse_str(cmd.rql)?;
342
343 let mut persistent_stack = Stack::new();
345
346 match &cmd.params {
348 Params::Positional(values) => {
349 for (index, value) in values.iter().enumerate() {
351 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
353 }
354 }
355 Params::Named(map) => {
356 for (name, value) in map {
358 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
359 }
360 }
361 Params::None => {
362 }
364 }
365
366 for statement in statements {
367 if let Some(plan) = plan(txn, statement).await? {
368 if let Some(er) = self
369 .execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)
370 .await?
371 {
372 result.push(Frame::from(er));
373 }
374 }
375 }
376
377 Ok(result)
378 }
379}
380
381#[async_trait]
382impl ExecuteQuery<StandardQueryTransaction> for Executor {
383 #[instrument(name = "executor::execute_query", level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
384 async fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
385 let mut result = vec![];
386 let statements = ast::parse_str(qry.rql)?;
387
388 let mut persistent_stack = Stack::new();
390
391 match &qry.params {
393 Params::Positional(values) => {
394 for (index, value) in values.iter().enumerate() {
396 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
398 }
399 }
400 Params::Named(map) => {
401 for (name, value) in map {
403 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
404 }
405 }
406 Params::None => {
407 }
409 }
410
411 for statement in statements {
412 if let Some(plan) = plan(txn, statement).await? {
413 if let Some(er) = self
414 .execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)
415 .await?
416 {
417 result.push(Frame::from(er));
418 }
419 }
420 }
421
422 Ok(result)
423 }
424}
425
426impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
427
428impl Executor {
429 #[instrument(name = "executor::plan::query", level = "debug", skip(self, rx, plan, params, stack))]
430 pub(crate) async fn execute_query_plan<'a>(
431 &self,
432 rx: &'a mut StandardQueryTransaction,
433 plan: PhysicalPlan,
434 params: Params,
435 stack: &mut Stack,
436 ) -> crate::Result<Option<Columns>> {
437 match plan {
438 PhysicalPlan::Aggregate(_)
440 | PhysicalPlan::DictionaryScan(_)
441 | PhysicalPlan::Filter(_)
442 | PhysicalPlan::IndexScan(_)
443 | PhysicalPlan::JoinInner(_)
444 | PhysicalPlan::JoinLeft(_)
445 | PhysicalPlan::JoinNatural(_)
446 | PhysicalPlan::Take(_)
447 | PhysicalPlan::Sort(_)
448 | PhysicalPlan::Map(_)
449 | PhysicalPlan::Extend(_)
450 | PhysicalPlan::InlineData(_)
451 | PhysicalPlan::Generator(_)
452 | PhysicalPlan::Delete(_)
453 | PhysicalPlan::DeleteRingBuffer(_)
454 | PhysicalPlan::InsertTable(_)
455 | PhysicalPlan::InsertRingBuffer(_)
456 | PhysicalPlan::InsertDictionary(_)
457 | PhysicalPlan::Update(_)
458 | PhysicalPlan::UpdateRingBuffer(_)
459 | PhysicalPlan::TableScan(_)
460 | PhysicalPlan::ViewScan(_)
461 | PhysicalPlan::FlowScan(_)
462 | PhysicalPlan::TableVirtualScan(_)
463 | PhysicalPlan::RingBufferScan(_)
464 | PhysicalPlan::Variable(_)
465 | PhysicalPlan::Environment(_)
466 | PhysicalPlan::Conditional(_)
467 | PhysicalPlan::Scalarize(_)
468 | PhysicalPlan::RowPointLookup(_)
469 | PhysicalPlan::RowListLookup(_)
470 | PhysicalPlan::RowRangeScan(_) => {
471 let mut std_txn = StandardTransaction::from(rx);
472 self.query(&mut std_txn, plan, params, stack).await
473 }
474 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
475 let mut std_txn = StandardTransaction::from(rx);
476 self.query(&mut std_txn, plan, params, stack).await?;
477 Ok(None)
478 }
479 PhysicalPlan::AlterSequence(_)
480 | PhysicalPlan::AlterTable(_)
481 | PhysicalPlan::AlterView(_)
482 | PhysicalPlan::AlterFlow(_)
483 | PhysicalPlan::CreateDeferredView(_)
484 | PhysicalPlan::CreateTransactionalView(_)
485 | PhysicalPlan::CreateNamespace(_)
486 | PhysicalPlan::CreateTable(_)
487 | PhysicalPlan::CreateRingBuffer(_)
488 | PhysicalPlan::CreateFlow(_)
489 | PhysicalPlan::CreateDictionary(_)
490 | PhysicalPlan::Distinct(_)
491 | PhysicalPlan::Apply(_) => {
492 unimplemented!(
495 "Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
496 )
497 }
498 PhysicalPlan::Window(_) => {
499 unimplemented!(
502 "Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
503 )
504 }
505 PhysicalPlan::Merge(_) => {
506 unimplemented!(
508 "Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
509 )
510 }
511 }
512 }
513
514 #[instrument(name = "executor::plan::command", level = "debug", skip(self, txn, plan, params, stack))]
515 pub async fn execute_command_plan<'a>(
516 &self,
517 txn: &'a mut StandardCommandTransaction,
518 plan: PhysicalPlan,
519 params: Params,
520 stack: &mut Stack,
521 ) -> crate::Result<Option<Columns>> {
522 match plan {
523 PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan).await?)),
524 PhysicalPlan::CreateDeferredView(plan) => {
525 Ok(Some(self.create_deferred_view(txn, plan).await?))
526 }
527 PhysicalPlan::CreateTransactionalView(plan) => {
528 Ok(Some(self.create_transactional_view(txn, plan).await?))
529 }
530 PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan).await?)),
531 PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan).await?)),
532 PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ringbuffer(txn, plan).await?)),
533 PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan).await?)),
534 PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan).await?)),
535 PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params).await?)),
536 PhysicalPlan::DeleteRingBuffer(plan) => {
537 Ok(Some(self.delete_ringbuffer(txn, plan, params).await?))
538 }
539 PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack).await?)),
540 PhysicalPlan::InsertRingBuffer(plan) => {
541 Ok(Some(self.insert_ringbuffer(txn, plan, params).await?))
542 }
543 PhysicalPlan::InsertDictionary(plan) => {
544 Ok(Some(self.insert_dictionary(txn, plan, stack).await?))
545 }
546 PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params).await?)),
547 PhysicalPlan::UpdateRingBuffer(plan) => {
548 Ok(Some(self.update_ringbuffer(txn, plan, params).await?))
549 }
550
551 PhysicalPlan::Aggregate(_)
552 | PhysicalPlan::DictionaryScan(_)
553 | PhysicalPlan::Filter(_)
554 | PhysicalPlan::IndexScan(_)
555 | PhysicalPlan::JoinInner(_)
556 | PhysicalPlan::JoinLeft(_)
557 | PhysicalPlan::JoinNatural(_)
558 | PhysicalPlan::Take(_)
559 | PhysicalPlan::Sort(_)
560 | PhysicalPlan::Map(_)
561 | PhysicalPlan::Extend(_)
562 | PhysicalPlan::InlineData(_)
563 | PhysicalPlan::Generator(_)
564 | PhysicalPlan::TableScan(_)
565 | PhysicalPlan::ViewScan(_)
566 | PhysicalPlan::FlowScan(_)
567 | PhysicalPlan::TableVirtualScan(_)
568 | PhysicalPlan::RingBufferScan(_)
569 | PhysicalPlan::Distinct(_)
570 | PhysicalPlan::Variable(_)
571 | PhysicalPlan::Environment(_)
572 | PhysicalPlan::Apply(_)
573 | PhysicalPlan::Conditional(_)
574 | PhysicalPlan::Scalarize(_)
575 | PhysicalPlan::RowPointLookup(_)
576 | PhysicalPlan::RowListLookup(_)
577 | PhysicalPlan::RowRangeScan(_) => {
578 let mut std_txn = StandardTransaction::from(txn);
579 self.query(&mut std_txn, plan, params, stack).await
580 }
581 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
582 let mut std_txn = StandardTransaction::from(txn);
583 self.query(&mut std_txn, plan, params, stack).await?;
584 Ok(None)
585 }
586 PhysicalPlan::Window(_) => {
587 let mut std_txn = StandardTransaction::from(txn);
588 self.query(&mut std_txn, plan, params, stack).await
589 }
590 PhysicalPlan::Merge(_) => {
591 let mut std_txn = StandardTransaction::from(txn);
592 self.query(&mut std_txn, plan, params, stack).await
593 }
594
595 PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan).await?)),
596 PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan).await?)),
597 PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan).await?)),
598 }
599 }
600
601 #[instrument(name = "executor::query", level = "debug", skip(self, rx, plan, params, stack))]
602 async fn query<'a>(
603 &self,
604 rx: &mut StandardTransaction<'a>,
605 plan: PhysicalPlan,
606 params: Params,
607 stack: &mut Stack,
608 ) -> crate::Result<Option<Columns>> {
609 let mut context = ExecutionContext {
610 executor: self.clone(),
611 source: None,
612 batch_size: 1024,
613 params: params.clone(),
614 stack: stack.clone(),
615 };
616 let mut node = compile(plan, rx, Arc::new(context.clone())).await;
617
618 node.initialize(rx, &context).await?;
620
621 let mut result: Option<Columns> = None;
622
623 while let Some(Batch {
624 columns,
625 }) = node.next(rx, &mut context).await?
626 {
627 if let Some(mut result_columns) = result.take() {
628 result_columns.append_columns(columns)?;
629 result = Some(result_columns);
630 } else {
631 result = Some(columns);
632 }
633 }
634
635 *stack = context.stack;
637
638 let headers = node.headers();
639
640 if let Some(mut columns) = result {
641 if let Some(headers) = headers {
642 columns.apply_headers(&headers);
643 }
644
645 Ok(columns.into())
646 } else {
647 let columns: Vec<Column> = node
650 .headers()
651 .unwrap_or(ColumnHeaders {
652 columns: vec![],
653 })
654 .columns
655 .into_iter()
656 .map(|name| Column {
657 name,
658 data: ColumnData::undefined(0),
659 })
660 .collect();
661
662 Ok(Some(Columns::new(columns)))
663 }
664 }
665}