1use std::sync::Arc;
5
6use query::{
7 aggregate::AggregateNode,
8 assign::AssignNode,
9 compile::compile,
10 declare::DeclareNode,
11 dictionary_scan::DictionaryScan,
12 environment::EnvironmentNode,
13 extend::{ExtendNode, ExtendWithoutInputNode},
14 filter::FilterNode,
15 generator::GeneratorNode,
16 index_scan::IndexScanNode,
17 inline::InlineDataNode,
18 join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19 map::{MapNode, MapWithoutInputNode},
20 ring_buffer_scan::RingBufferScan,
21 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22 scalarize::ScalarizeNode,
23 sort::SortNode,
24 table_scan::TableScanNode,
25 table_virtual_scan::VirtualScanNode,
26 take::TakeNode,
27 variable::VariableNode,
28 view_scan::ViewScanNode,
29};
30use reifydb_core::{
31 Frame,
32 interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33 value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36 ast,
37 plan::{physical::PhysicalPlan, plan},
38};
39use tracing::instrument;
40
41use crate::{
42 StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
43 function::{Functions, generator, math},
44 stack::{Stack, Variable},
45 table_virtual::{TableVirtualUserRegistry, system::FlowOperatorStore},
46};
47
48mod catalog;
49mod mutate;
50mod query;
51
52pub(crate) trait QueryNode<'a> {
55 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
58
59 fn next(
62 &mut self,
63 rx: &mut StandardTransaction<'a>,
64 ctx: &mut ExecutionContext<'a>,
65 ) -> crate::Result<Option<Batch<'a>>>;
66
67 fn headers(&self) -> Option<ColumnHeaders<'a>>;
69}
70
71#[derive(Clone)]
72pub struct ExecutionContext<'a> {
73 pub executor: Executor,
74 pub source: Option<ResolvedSource<'a>>,
75 pub batch_size: u64,
76 pub params: Params,
77 pub stack: Stack,
78}
79
80#[derive(Debug)]
81pub struct Batch<'a> {
82 pub columns: Columns<'a>,
83}
84
85pub(crate) enum ExecutionPlan<'a> {
86 Aggregate(AggregateNode<'a>),
87 DictionaryScan(DictionaryScan<'a>),
88 Filter(FilterNode<'a>),
89 IndexScan(IndexScanNode<'a>),
90 InlineData(InlineDataNode<'a>),
91 InnerJoin(InnerJoinNode<'a>),
92 LeftJoin(LeftJoinNode<'a>),
93 NaturalJoin(NaturalJoinNode<'a>),
94 Map(MapNode<'a>),
95 MapWithoutInput(MapWithoutInputNode<'a>),
96 Extend(ExtendNode<'a>),
97 ExtendWithoutInput(ExtendWithoutInputNode<'a>),
98 Sort(SortNode<'a>),
99 TableScan(TableScanNode<'a>),
100 Take(TakeNode<'a>),
101 ViewScan(ViewScanNode<'a>),
102 Variable(VariableNode<'a>),
103 Environment(EnvironmentNode),
104 VirtualScan(VirtualScanNode<'a>),
105 RingBufferScan(RingBufferScan<'a>),
106 Generator(GeneratorNode<'a>),
107 Declare(DeclareNode<'a>),
108 Assign(AssignNode<'a>),
109 Conditional(query::conditional::ConditionalNode<'a>),
110 Scalarize(ScalarizeNode<'a>),
111 RowPointLookup(RowPointLookupNode<'a>),
113 RowListLookup(RowListLookupNode<'a>),
114 RowRangeScan(RowRangeScanNode<'a>),
115}
116
117impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
119 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
120 (**self).initialize(rx, ctx)
121 }
122
123 fn next(
124 &mut self,
125 rx: &mut StandardTransaction<'a>,
126 ctx: &mut ExecutionContext<'a>,
127 ) -> crate::Result<Option<Batch<'a>>> {
128 (**self).next(rx, ctx)
129 }
130
131 fn headers(&self) -> Option<ColumnHeaders<'a>> {
132 (**self).headers()
133 }
134}
135
136impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
137 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
138 match self {
139 ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
140 ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
141 ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
142 ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
143 ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
144 ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
145 ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
146 ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
147 ExecutionPlan::Map(node) => node.initialize(rx, ctx),
148 ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
149 ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
150 ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
151 ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
152 ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
153 ExecutionPlan::Take(node) => node.initialize(rx, ctx),
154 ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
155 ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
156 ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
157 ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
158 ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
159 ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
160 ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
161 ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
162 ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
163 ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
164 ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
165 ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
166 ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
167 }
168 }
169
170 fn next(
171 &mut self,
172 rx: &mut StandardTransaction<'a>,
173 ctx: &mut ExecutionContext<'a>,
174 ) -> crate::Result<Option<Batch<'a>>> {
175 match self {
176 ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
177 ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
178 ExecutionPlan::Filter(node) => node.next(rx, ctx),
179 ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
180 ExecutionPlan::InlineData(node) => node.next(rx, ctx),
181 ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
182 ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
183 ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
184 ExecutionPlan::Map(node) => node.next(rx, ctx),
185 ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
186 ExecutionPlan::Extend(node) => node.next(rx, ctx),
187 ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
188 ExecutionPlan::Sort(node) => node.next(rx, ctx),
189 ExecutionPlan::TableScan(node) => node.next(rx, ctx),
190 ExecutionPlan::Take(node) => node.next(rx, ctx),
191 ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
192 ExecutionPlan::Variable(node) => node.next(rx, ctx),
193 ExecutionPlan::Environment(node) => node.next(rx, ctx),
194 ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
195 ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
196 ExecutionPlan::Generator(node) => node.next(rx, ctx),
197 ExecutionPlan::Declare(node) => node.next(rx, ctx),
198 ExecutionPlan::Assign(node) => node.next(rx, ctx),
199 ExecutionPlan::Conditional(node) => node.next(rx, ctx),
200 ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
201 ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
202 ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
203 ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
204 }
205 }
206
207 fn headers(&self) -> Option<ColumnHeaders<'a>> {
208 match self {
209 ExecutionPlan::Aggregate(node) => node.headers(),
210 ExecutionPlan::DictionaryScan(node) => node.headers(),
211 ExecutionPlan::Filter(node) => node.headers(),
212 ExecutionPlan::IndexScan(node) => node.headers(),
213 ExecutionPlan::InlineData(node) => node.headers(),
214 ExecutionPlan::InnerJoin(node) => node.headers(),
215 ExecutionPlan::LeftJoin(node) => node.headers(),
216 ExecutionPlan::NaturalJoin(node) => node.headers(),
217 ExecutionPlan::Map(node) => node.headers(),
218 ExecutionPlan::MapWithoutInput(node) => node.headers(),
219 ExecutionPlan::Extend(node) => node.headers(),
220 ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
221 ExecutionPlan::Sort(node) => node.headers(),
222 ExecutionPlan::TableScan(node) => node.headers(),
223 ExecutionPlan::Take(node) => node.headers(),
224 ExecutionPlan::ViewScan(node) => node.headers(),
225 ExecutionPlan::Variable(node) => node.headers(),
226 ExecutionPlan::Environment(node) => node.headers(),
227 ExecutionPlan::VirtualScan(node) => node.headers(),
228 ExecutionPlan::RingBufferScan(node) => node.headers(),
229 ExecutionPlan::Generator(node) => node.headers(),
230 ExecutionPlan::Declare(node) => node.headers(),
231 ExecutionPlan::Assign(node) => node.headers(),
232 ExecutionPlan::Conditional(node) => node.headers(),
233 ExecutionPlan::Scalarize(node) => node.headers(),
234 ExecutionPlan::RowPointLookup(node) => node.headers(),
235 ExecutionPlan::RowListLookup(node) => node.headers(),
236 ExecutionPlan::RowRangeScan(node) => node.headers(),
237 }
238 }
239}
240
241pub struct Executor(Arc<ExecutorInner>);
242
243pub struct ExecutorInner {
244 pub functions: Functions,
245 pub flow_operator_store: FlowOperatorStore,
246 pub virtual_table_registry: TableVirtualUserRegistry,
247}
248
249impl Clone for Executor {
250 fn clone(&self) -> Self {
251 Self(self.0.clone())
252 }
253}
254
255impl std::ops::Deref for Executor {
256 type Target = ExecutorInner;
257
258 fn deref(&self) -> &Self::Target {
259 &self.0
260 }
261}
262
263impl Executor {
264 pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
265 Self(Arc::new(ExecutorInner {
266 functions,
267 flow_operator_store,
268 virtual_table_registry: TableVirtualUserRegistry::new(),
269 }))
270 }
271
272 pub fn with_virtual_table_registry(
273 functions: Functions,
274 flow_operator_store: FlowOperatorStore,
275 virtual_table_registry: TableVirtualUserRegistry,
276 ) -> Self {
277 Self(Arc::new(ExecutorInner {
278 functions,
279 flow_operator_store,
280 virtual_table_registry,
281 }))
282 }
283
284 #[allow(dead_code)]
285 pub fn testing() -> Self {
286 Self::new(
287 Functions::builder()
288 .register_aggregate("math::sum", math::aggregate::Sum::new)
289 .register_aggregate("math::min", math::aggregate::Min::new)
290 .register_aggregate("math::max", math::aggregate::Max::new)
291 .register_aggregate("math::avg", math::aggregate::Avg::new)
292 .register_aggregate("math::count", math::aggregate::Count::new)
293 .register_scalar("math::abs", math::scalar::Abs::new)
294 .register_scalar("math::avg", math::scalar::Avg::new)
295 .register_generator("generate_series", generator::GenerateSeries::new)
296 .build(),
297 FlowOperatorStore::new(),
298 )
299 }
300}
301
302impl ExecuteCommand<StandardCommandTransaction> for Executor {
303 #[instrument(level = "debug", skip(self, txn, cmd), fields(rql = %cmd.rql))]
304 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
305 let mut result = vec![];
306 let statements = ast::parse_str(cmd.rql)?;
307
308 let mut persistent_stack = Stack::new();
310
311 match &cmd.params {
313 reifydb_core::interface::Params::Positional(values) => {
314 for (index, value) in values.iter().enumerate() {
316 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
318 }
319 }
320 reifydb_core::interface::Params::Named(map) => {
321 for (name, value) in map {
323 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
324 }
325 }
326 reifydb_core::interface::Params::None => {
327 }
329 }
330
331 for statement in statements {
332 if let Some(plan) = plan(txn, statement)? {
333 if let Some(er) =
334 self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
335 {
336 result.push(Frame::from(er));
337 }
338 }
339 }
340
341 Ok(result)
342 }
343}
344
345impl ExecuteQuery<StandardQueryTransaction> for Executor {
346 #[instrument(level = "debug", skip(self, txn, qry), fields(rql = %qry.rql))]
347 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
348 let mut result = vec![];
349 let statements = ast::parse_str(qry.rql)?;
350
351 let mut persistent_stack = Stack::new();
353
354 match &qry.params {
356 reifydb_core::interface::Params::Positional(values) => {
357 for (index, value) in values.iter().enumerate() {
359 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
361 }
362 }
363 reifydb_core::interface::Params::Named(map) => {
364 for (name, value) in map {
366 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
367 }
368 }
369 reifydb_core::interface::Params::None => {
370 }
372 }
373
374 for statement in statements {
375 if let Some(plan) = plan(txn, statement)? {
376 if let Some(er) =
377 self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
378 {
379 result.push(Frame::from(er));
380 }
381 }
382 }
383
384 Ok(result)
385 }
386}
387
388impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
389
390impl Executor {
391 pub(crate) fn execute_query_plan<'a>(
392 &self,
393 rx: &'a mut StandardQueryTransaction,
394 plan: PhysicalPlan<'a>,
395 params: Params,
396 stack: &mut Stack,
397 ) -> crate::Result<Option<Columns<'a>>> {
398 match plan {
399 PhysicalPlan::Aggregate(_)
401 | PhysicalPlan::DictionaryScan(_)
402 | PhysicalPlan::Filter(_)
403 | PhysicalPlan::IndexScan(_)
404 | PhysicalPlan::JoinInner(_)
405 | PhysicalPlan::JoinLeft(_)
406 | PhysicalPlan::JoinNatural(_)
407 | PhysicalPlan::Take(_)
408 | PhysicalPlan::Sort(_)
409 | PhysicalPlan::Map(_)
410 | PhysicalPlan::Extend(_)
411 | PhysicalPlan::InlineData(_)
412 | PhysicalPlan::Generator(_)
413 | PhysicalPlan::Delete(_)
414 | PhysicalPlan::DeleteRingBuffer(_)
415 | PhysicalPlan::InsertTable(_)
416 | PhysicalPlan::InsertRingBuffer(_)
417 | PhysicalPlan::InsertDictionary(_)
418 | PhysicalPlan::Update(_)
419 | PhysicalPlan::UpdateRingBuffer(_)
420 | PhysicalPlan::TableScan(_)
421 | PhysicalPlan::ViewScan(_)
422 | PhysicalPlan::FlowScan(_)
423 | PhysicalPlan::TableVirtualScan(_)
424 | PhysicalPlan::RingBufferScan(_)
425 | PhysicalPlan::Variable(_)
426 | PhysicalPlan::Environment(_)
427 | PhysicalPlan::Conditional(_)
428 | PhysicalPlan::Scalarize(_)
429 | PhysicalPlan::RowPointLookup(_)
430 | PhysicalPlan::RowListLookup(_)
431 | PhysicalPlan::RowRangeScan(_) => {
432 let mut std_txn = StandardTransaction::from(rx);
433 self.query(&mut std_txn, plan, params, stack)
434 }
435 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
436 let mut std_txn = StandardTransaction::from(rx);
437 self.query(&mut std_txn, plan, params, stack)?;
438 Ok(None)
439 }
440 PhysicalPlan::AlterSequence(_)
441 | PhysicalPlan::AlterTable(_)
442 | PhysicalPlan::AlterView(_)
443 | PhysicalPlan::AlterFlow(_)
444 | PhysicalPlan::CreateDeferredView(_)
445 | PhysicalPlan::CreateTransactionalView(_)
446 | PhysicalPlan::CreateNamespace(_)
447 | PhysicalPlan::CreateTable(_)
448 | PhysicalPlan::CreateRingBuffer(_)
449 | PhysicalPlan::CreateFlow(_)
450 | PhysicalPlan::CreateDictionary(_)
451 | PhysicalPlan::Distinct(_)
452 | PhysicalPlan::Apply(_) => {
453 unimplemented!(
456 "Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
457 )
458 }
459 PhysicalPlan::Window(_) => {
460 unimplemented!(
463 "Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
464 )
465 }
466 PhysicalPlan::Merge(_) => {
467 unimplemented!(
469 "Merge operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
470 )
471 }
472 }
473 }
474
475 pub fn execute_command_plan<'a>(
476 &self,
477 txn: &'a mut StandardCommandTransaction,
478 plan: PhysicalPlan<'a>,
479 params: Params,
480 stack: &mut Stack,
481 ) -> crate::Result<Option<Columns<'a>>> {
482 match plan {
483 PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
484 PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
485 PhysicalPlan::CreateTransactionalView(plan) => {
486 Ok(Some(self.create_transactional_view(txn, plan)?))
487 }
488 PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
489 PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
490 PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
491 PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
492 PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
493 PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
494 PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
495 PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
496 PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
497 PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
498 PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
499 PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
500
501 PhysicalPlan::Aggregate(_)
502 | PhysicalPlan::DictionaryScan(_)
503 | PhysicalPlan::Filter(_)
504 | PhysicalPlan::IndexScan(_)
505 | PhysicalPlan::JoinInner(_)
506 | PhysicalPlan::JoinLeft(_)
507 | PhysicalPlan::JoinNatural(_)
508 | PhysicalPlan::Take(_)
509 | PhysicalPlan::Sort(_)
510 | PhysicalPlan::Map(_)
511 | PhysicalPlan::Extend(_)
512 | PhysicalPlan::InlineData(_)
513 | PhysicalPlan::Generator(_)
514 | PhysicalPlan::TableScan(_)
515 | PhysicalPlan::ViewScan(_)
516 | PhysicalPlan::FlowScan(_)
517 | PhysicalPlan::TableVirtualScan(_)
518 | PhysicalPlan::RingBufferScan(_)
519 | PhysicalPlan::Distinct(_)
520 | PhysicalPlan::Variable(_)
521 | PhysicalPlan::Environment(_)
522 | PhysicalPlan::Apply(_)
523 | PhysicalPlan::Conditional(_)
524 | PhysicalPlan::Scalarize(_)
525 | PhysicalPlan::RowPointLookup(_)
526 | PhysicalPlan::RowListLookup(_)
527 | PhysicalPlan::RowRangeScan(_) => {
528 let mut std_txn = StandardTransaction::from(txn);
529 self.query(&mut std_txn, plan, params, stack)
530 }
531 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
532 let mut std_txn = StandardTransaction::from(txn);
533 self.query(&mut std_txn, plan, params, stack)?;
534 Ok(None)
535 }
536 PhysicalPlan::Window(_) => {
537 let mut std_txn = StandardTransaction::from(txn);
538 self.query(&mut std_txn, plan, params, stack)
539 }
540 PhysicalPlan::Merge(_) => {
541 let mut std_txn = StandardTransaction::from(txn);
542 self.query(&mut std_txn, plan, params, stack)
543 }
544
545 PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
546 PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
547 PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
548 }
549 }
550
551 fn query<'a>(
552 &self,
553 rx: &mut StandardTransaction<'a>,
554 plan: PhysicalPlan<'a>,
555 params: Params,
556 stack: &mut Stack,
557 ) -> crate::Result<Option<Columns<'a>>> {
558 let context = Arc::new(ExecutionContext {
559 executor: self.clone(),
560 source: None,
561 batch_size: 1024,
562 params: params.clone(),
563 stack: stack.clone(),
564 });
565 let mut node = compile(plan, rx, context.clone());
566
567 node.initialize(rx, &context)?;
569
570 let mut result: Option<Columns> = None;
571 let mut mutable_context = (*context).clone();
572
573 while let Some(Batch {
574 columns,
575 }) = node.next(rx, &mut mutable_context)?
576 {
577 if let Some(mut result_columns) = result.take() {
578 result_columns.append_columns(columns)?;
579 result = Some(result_columns);
580 } else {
581 result = Some(columns);
582 }
583 }
584
585 *stack = mutable_context.stack;
587
588 let headers = node.headers();
589
590 if let Some(mut columns) = result {
591 if let Some(headers) = headers {
592 columns.apply_headers(&headers);
593 }
594
595 Ok(columns.into())
596 } else {
597 let columns: Vec<Column<'a>> = node
600 .headers()
601 .unwrap_or(ColumnHeaders {
602 columns: vec![],
603 })
604 .columns
605 .into_iter()
606 .map(|name| Column {
607 name,
608 data: ColumnData::undefined(0),
609 })
610 .collect();
611
612 Ok(Some(Columns::new(columns)))
613 }
614 }
615}