1use std::sync::Arc;
5
6use query::{
7 aggregate::AggregateNode,
8 assign::AssignNode,
9 compile::compile,
10 declare::DeclareNode,
11 dictionary_scan::DictionaryScan,
12 environment::EnvironmentNode,
13 extend::{ExtendNode, ExtendWithoutInputNode},
14 filter::FilterNode,
15 generator::GeneratorNode,
16 index_scan::IndexScanNode,
17 inline::InlineDataNode,
18 join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19 map::{MapNode, MapWithoutInputNode},
20 ring_buffer_scan::RingBufferScan,
21 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22 scalarize::ScalarizeNode,
23 sort::SortNode,
24 table_scan::TableScanNode,
25 table_virtual_scan::VirtualScanNode,
26 take::TakeNode,
27 variable::VariableNode,
28 view_scan::ViewScanNode,
29};
30use reifydb_core::{
31 Frame,
32 interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33 value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36 ast,
37 plan::{physical::PhysicalPlan, plan},
38};
39use tracing::instrument;
40
41use crate::{
42 StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
43 function::{Functions, generator, math},
44 stack::{Stack, Variable},
45 table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
46};
47
48mod catalog;
49mod mutate;
50mod query;
51
52pub(crate) trait QueryNode<'a> {
55 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
58
59 fn next(
62 &mut self,
63 rx: &mut StandardTransaction<'a>,
64 ctx: &mut ExecutionContext<'a>,
65 ) -> crate::Result<Option<Batch<'a>>>;
66
67 fn headers(&self) -> Option<ColumnHeaders<'a>>;
69}
70
71#[derive(Clone)]
72pub struct ExecutionContext<'a> {
73 pub executor: Executor,
74 pub source: Option<ResolvedSource<'a>>,
75 pub batch_size: u64,
76 pub params: Params,
77 pub stack: Stack,
78}
79
80#[derive(Debug)]
81pub struct Batch<'a> {
82 pub columns: Columns<'a>,
83}
84
85pub(crate) enum ExecutionPlan<'a> {
86 Aggregate(AggregateNode<'a>),
87 DictionaryScan(DictionaryScan<'a>),
88 Filter(FilterNode<'a>),
89 IndexScan(IndexScanNode<'a>),
90 InlineData(InlineDataNode<'a>),
91 InnerJoin(InnerJoinNode<'a>),
92 LeftJoin(LeftJoinNode<'a>),
93 NaturalJoin(NaturalJoinNode<'a>),
94 Map(MapNode<'a>),
95 MapWithoutInput(MapWithoutInputNode<'a>),
96 Extend(ExtendNode<'a>),
97 ExtendWithoutInput(ExtendWithoutInputNode<'a>),
98 Sort(SortNode<'a>),
99 TableScan(TableScanNode<'a>),
100 Take(TakeNode<'a>),
101 ViewScan(ViewScanNode<'a>),
102 Variable(VariableNode<'a>),
103 Environment(EnvironmentNode),
104 VirtualScan(VirtualScanNode<'a>),
105 RingBufferScan(RingBufferScan<'a>),
106 Generator(GeneratorNode<'a>),
107 Declare(DeclareNode<'a>),
108 Assign(AssignNode<'a>),
109 Conditional(query::conditional::ConditionalNode<'a>),
110 Scalarize(ScalarizeNode<'a>),
111 RowPointLookup(RowPointLookupNode<'a>),
113 RowListLookup(RowListLookupNode<'a>),
114 RowRangeScan(RowRangeScanNode<'a>),
115}
116
117impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
119 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
120 (**self).initialize(rx, ctx)
121 }
122
123 fn next(
124 &mut self,
125 rx: &mut StandardTransaction<'a>,
126 ctx: &mut ExecutionContext<'a>,
127 ) -> crate::Result<Option<Batch<'a>>> {
128 (**self).next(rx, ctx)
129 }
130
131 fn headers(&self) -> Option<ColumnHeaders<'a>> {
132 (**self).headers()
133 }
134}
135
136impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
137 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
138 match self {
139 ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
140 ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
141 ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
142 ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
143 ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
144 ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
145 ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
146 ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
147 ExecutionPlan::Map(node) => node.initialize(rx, ctx),
148 ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
149 ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
150 ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
151 ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
152 ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
153 ExecutionPlan::Take(node) => node.initialize(rx, ctx),
154 ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
155 ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
156 ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
157 ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
158 ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
159 ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
160 ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
161 ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
162 ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
163 ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
164 ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
165 ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
166 ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
167 }
168 }
169
170 fn next(
171 &mut self,
172 rx: &mut StandardTransaction<'a>,
173 ctx: &mut ExecutionContext<'a>,
174 ) -> crate::Result<Option<Batch<'a>>> {
175 match self {
176 ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
177 ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
178 ExecutionPlan::Filter(node) => node.next(rx, ctx),
179 ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
180 ExecutionPlan::InlineData(node) => node.next(rx, ctx),
181 ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
182 ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
183 ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
184 ExecutionPlan::Map(node) => node.next(rx, ctx),
185 ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
186 ExecutionPlan::Extend(node) => node.next(rx, ctx),
187 ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
188 ExecutionPlan::Sort(node) => node.next(rx, ctx),
189 ExecutionPlan::TableScan(node) => node.next(rx, ctx),
190 ExecutionPlan::Take(node) => node.next(rx, ctx),
191 ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
192 ExecutionPlan::Variable(node) => node.next(rx, ctx),
193 ExecutionPlan::Environment(node) => node.next(rx, ctx),
194 ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
195 ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
196 ExecutionPlan::Generator(node) => node.next(rx, ctx),
197 ExecutionPlan::Declare(node) => node.next(rx, ctx),
198 ExecutionPlan::Assign(node) => node.next(rx, ctx),
199 ExecutionPlan::Conditional(node) => node.next(rx, ctx),
200 ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
201 ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
202 ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
203 ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
204 }
205 }
206
207 fn headers(&self) -> Option<ColumnHeaders<'a>> {
208 match self {
209 ExecutionPlan::Aggregate(node) => node.headers(),
210 ExecutionPlan::DictionaryScan(node) => node.headers(),
211 ExecutionPlan::Filter(node) => node.headers(),
212 ExecutionPlan::IndexScan(node) => node.headers(),
213 ExecutionPlan::InlineData(node) => node.headers(),
214 ExecutionPlan::InnerJoin(node) => node.headers(),
215 ExecutionPlan::LeftJoin(node) => node.headers(),
216 ExecutionPlan::NaturalJoin(node) => node.headers(),
217 ExecutionPlan::Map(node) => node.headers(),
218 ExecutionPlan::MapWithoutInput(node) => node.headers(),
219 ExecutionPlan::Extend(node) => node.headers(),
220 ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
221 ExecutionPlan::Sort(node) => node.headers(),
222 ExecutionPlan::TableScan(node) => node.headers(),
223 ExecutionPlan::Take(node) => node.headers(),
224 ExecutionPlan::ViewScan(node) => node.headers(),
225 ExecutionPlan::Variable(node) => node.headers(),
226 ExecutionPlan::Environment(node) => node.headers(),
227 ExecutionPlan::VirtualScan(node) => node.headers(),
228 ExecutionPlan::RingBufferScan(node) => node.headers(),
229 ExecutionPlan::Generator(node) => node.headers(),
230 ExecutionPlan::Declare(node) => node.headers(),
231 ExecutionPlan::Assign(node) => node.headers(),
232 ExecutionPlan::Conditional(node) => node.headers(),
233 ExecutionPlan::Scalarize(node) => node.headers(),
234 ExecutionPlan::RowPointLookup(node) => node.headers(),
235 ExecutionPlan::RowListLookup(node) => node.headers(),
236 ExecutionPlan::RowRangeScan(node) => node.headers(),
237 }
238 }
239}
240
241pub struct Executor(Arc<ExecutorInner>);
242
243pub struct ExecutorInner {
244 pub functions: Functions,
245 pub flow_operator_store: FlowOperatorStore,
246 pub virtual_table_registry: TableVirtualUserRegistry,
247}
248
249impl Clone for Executor {
250 fn clone(&self) -> Self {
251 Self(self.0.clone())
252 }
253}
254
255impl std::ops::Deref for Executor {
256 type Target = ExecutorInner;
257
258 fn deref(&self) -> &Self::Target {
259 &self.0
260 }
261}
262
263impl Executor {
264 pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
265 Self(Arc::new(ExecutorInner {
266 functions,
267 flow_operator_store,
268 virtual_table_registry: TableVirtualUserRegistry::new(),
269 }))
270 }
271
272 pub fn with_virtual_table_registry(
273 functions: Functions,
274 flow_operator_store: FlowOperatorStore,
275 virtual_table_registry: TableVirtualUserRegistry,
276 ) -> Self {
277 Self(Arc::new(ExecutorInner {
278 functions,
279 flow_operator_store,
280 virtual_table_registry,
281 }))
282 }
283
284 #[allow(dead_code)]
285 pub fn testing() -> Self {
286 Self::new(
287 Functions::builder()
288 .register_aggregate("math::sum", math::aggregate::Sum::new)
289 .register_aggregate("math::min", math::aggregate::Min::new)
290 .register_aggregate("math::max", math::aggregate::Max::new)
291 .register_aggregate("math::avg", math::aggregate::Avg::new)
292 .register_aggregate("math::count", math::aggregate::Count::new)
293 .register_scalar("math::abs", math::scalar::Abs::new)
294 .register_scalar("math::avg", math::scalar::Avg::new)
295 .register_generator("generate_series", generator::GenerateSeries::new)
296 .build(),
297 FlowOperatorStore::new(),
298 )
299 }
300}
301
302impl ExecuteCommand<StandardCommandTransaction> for Executor {
303 #[instrument(level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
304 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
305 let mut result = vec![];
306 let statements = ast::parse_str(cmd.rql)?;
307
308 let mut persistent_stack = Stack::new();
310
311 match &cmd.params {
313 Params::Positional(values) => {
314 for (index, value) in values.iter().enumerate() {
316 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
318 }
319 }
320 Params::Named(map) => {
321 for (name, value) in map {
323 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
324 }
325 }
326 Params::None => {
327 }
329 }
330
331 for statement in statements {
332 if let Some(plan) = plan(txn, statement)? {
333 if let Some(er) =
334 self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
335 {
336 result.push(Frame::from(er));
337 }
338 }
339 }
340
341 Ok(result)
342 }
343}
344
345impl ExecuteQuery<StandardQueryTransaction> for Executor {
346 #[instrument(level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
347 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
348 let mut result = vec![];
349 let statements = ast::parse_str(qry.rql)?;
350
351 let mut persistent_stack = Stack::new();
353
354 match &qry.params {
356 Params::Positional(values) => {
357 for (index, value) in values.iter().enumerate() {
359 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
361 }
362 }
363 Params::Named(map) => {
364 for (name, value) in map {
366 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
367 }
368 }
369 Params::None => {
370 }
372 }
373
374 for statement in statements {
375 if let Some(plan) = plan(txn, statement)? {
376 if let Some(er) =
377 self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
378 {
379 result.push(Frame::from(er));
380 }
381 }
382 }
383
384 Ok(result)
385 }
386}
387
388impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
389
390impl Executor {
391 #[instrument(level = "debug", skip(self, rx, plan, params, stack))]
392 pub(crate) fn execute_query_plan<'a>(
393 &self,
394 rx: &'a mut StandardQueryTransaction,
395 plan: PhysicalPlan<'a>,
396 params: Params,
397 stack: &mut Stack,
398 ) -> crate::Result<Option<Columns<'a>>> {
399 match plan {
400 PhysicalPlan::Aggregate(_)
402 | PhysicalPlan::DictionaryScan(_)
403 | PhysicalPlan::Filter(_)
404 | PhysicalPlan::IndexScan(_)
405 | PhysicalPlan::JoinInner(_)
406 | PhysicalPlan::JoinLeft(_)
407 | PhysicalPlan::JoinNatural(_)
408 | PhysicalPlan::Take(_)
409 | PhysicalPlan::Sort(_)
410 | PhysicalPlan::Map(_)
411 | PhysicalPlan::Extend(_)
412 | PhysicalPlan::InlineData(_)
413 | PhysicalPlan::Generator(_)
414 | PhysicalPlan::Delete(_)
415 | PhysicalPlan::DeleteRingBuffer(_)
416 | PhysicalPlan::InsertTable(_)
417 | PhysicalPlan::InsertRingBuffer(_)
418 | PhysicalPlan::InsertDictionary(_)
419 | PhysicalPlan::Update(_)
420 | PhysicalPlan::UpdateRingBuffer(_)
421 | PhysicalPlan::TableScan(_)
422 | PhysicalPlan::ViewScan(_)
423 | PhysicalPlan::FlowScan(_)
424 | PhysicalPlan::TableVirtualScan(_)
425 | PhysicalPlan::RingBufferScan(_)
426 | PhysicalPlan::Variable(_)
427 | PhysicalPlan::Environment(_)
428 | PhysicalPlan::Conditional(_)
429 | PhysicalPlan::Scalarize(_)
430 | PhysicalPlan::RowPointLookup(_)
431 | PhysicalPlan::RowListLookup(_)
432 | PhysicalPlan::RowRangeScan(_) => {
433 let mut std_txn = StandardTransaction::from(rx);
434 self.query(&mut std_txn, plan, params, stack)
435 }
436 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
437 let mut std_txn = StandardTransaction::from(rx);
438 self.query(&mut std_txn, plan, params, stack)?;
439 Ok(None)
440 }
441 PhysicalPlan::AlterSequence(_)
442 | PhysicalPlan::AlterTable(_)
443 | PhysicalPlan::AlterView(_)
444 | PhysicalPlan::AlterFlow(_)
445 | PhysicalPlan::CreateDeferredView(_)
446 | PhysicalPlan::CreateTransactionalView(_)
447 | PhysicalPlan::CreateNamespace(_)
448 | PhysicalPlan::CreateTable(_)
449 | PhysicalPlan::CreateRingBuffer(_)
450 | PhysicalPlan::CreateFlow(_)
451 | PhysicalPlan::CreateDictionary(_)
452 | PhysicalPlan::Distinct(_)
453 | PhysicalPlan::Apply(_) => {
454 unimplemented!(
457 "Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
458 )
459 }
460 PhysicalPlan::Window(_) => {
461 unimplemented!(
464 "Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
465 )
466 }
467 PhysicalPlan::Merge(_) => {
468 unimplemented!(
470 "Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
471 )
472 }
473 }
474 }
475
476 #[instrument(level = "debug", skip(self, txn, plan, params, stack))]
477 pub fn execute_command_plan<'a>(
478 &self,
479 txn: &'a mut StandardCommandTransaction,
480 plan: PhysicalPlan<'a>,
481 params: Params,
482 stack: &mut Stack,
483 ) -> crate::Result<Option<Columns<'a>>> {
484 match plan {
485 PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
486 PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
487 PhysicalPlan::CreateTransactionalView(plan) => {
488 Ok(Some(self.create_transactional_view(txn, plan)?))
489 }
490 PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
491 PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
492 PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
493 PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
494 PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
495 PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
496 PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
497 PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
498 PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
499 PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
500 PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
501 PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
502
503 PhysicalPlan::Aggregate(_)
504 | PhysicalPlan::DictionaryScan(_)
505 | PhysicalPlan::Filter(_)
506 | PhysicalPlan::IndexScan(_)
507 | PhysicalPlan::JoinInner(_)
508 | PhysicalPlan::JoinLeft(_)
509 | PhysicalPlan::JoinNatural(_)
510 | PhysicalPlan::Take(_)
511 | PhysicalPlan::Sort(_)
512 | PhysicalPlan::Map(_)
513 | PhysicalPlan::Extend(_)
514 | PhysicalPlan::InlineData(_)
515 | PhysicalPlan::Generator(_)
516 | PhysicalPlan::TableScan(_)
517 | PhysicalPlan::ViewScan(_)
518 | PhysicalPlan::FlowScan(_)
519 | PhysicalPlan::TableVirtualScan(_)
520 | PhysicalPlan::RingBufferScan(_)
521 | PhysicalPlan::Distinct(_)
522 | PhysicalPlan::Variable(_)
523 | PhysicalPlan::Environment(_)
524 | PhysicalPlan::Apply(_)
525 | PhysicalPlan::Conditional(_)
526 | PhysicalPlan::Scalarize(_)
527 | PhysicalPlan::RowPointLookup(_)
528 | PhysicalPlan::RowListLookup(_)
529 | PhysicalPlan::RowRangeScan(_) => {
530 let mut std_txn = StandardTransaction::from(txn);
531 self.query(&mut std_txn, plan, params, stack)
532 }
533 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
534 let mut std_txn = StandardTransaction::from(txn);
535 self.query(&mut std_txn, plan, params, stack)?;
536 Ok(None)
537 }
538 PhysicalPlan::Window(_) => {
539 let mut std_txn = StandardTransaction::from(txn);
540 self.query(&mut std_txn, plan, params, stack)
541 }
542 PhysicalPlan::Merge(_) => {
543 let mut std_txn = StandardTransaction::from(txn);
544 self.query(&mut std_txn, plan, params, stack)
545 }
546
547 PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
548 PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
549 PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
550 }
551 }
552
553 #[instrument(level = "debug", skip(self, rx, plan, params, stack))]
554 fn query<'a>(
555 &self,
556 rx: &mut StandardTransaction<'a>,
557 plan: PhysicalPlan<'a>,
558 params: Params,
559 stack: &mut Stack,
560 ) -> crate::Result<Option<Columns<'a>>> {
561 let context = Arc::new(ExecutionContext {
562 executor: self.clone(),
563 source: None,
564 batch_size: 1024,
565 params: params.clone(),
566 stack: stack.clone(),
567 });
568 let mut node = compile(plan, rx, context.clone());
569
570 node.initialize(rx, &context)?;
572
573 let mut result: Option<Columns> = None;
574 let mut mutable_context = (*context).clone();
575
576 while let Some(Batch {
577 columns,
578 }) = node.next(rx, &mut mutable_context)?
579 {
580 if let Some(mut result_columns) = result.take() {
581 result_columns.append_columns(columns)?;
582 result = Some(result_columns);
583 } else {
584 result = Some(columns);
585 }
586 }
587
588 *stack = mutable_context.stack;
590
591 let headers = node.headers();
592
593 if let Some(mut columns) = result {
594 if let Some(headers) = headers {
595 columns.apply_headers(&headers);
596 }
597
598 Ok(columns.into())
599 } else {
600 let columns: Vec<Column<'a>> = node
603 .headers()
604 .unwrap_or(ColumnHeaders {
605 columns: vec![],
606 })
607 .columns
608 .into_iter()
609 .map(|name| Column {
610 name,
611 data: ColumnData::undefined(0),
612 })
613 .collect();
614
615 Ok(Some(Columns::new(columns)))
616 }
617 }
618}