1use std::sync::Arc;
5
6use query::{
7 aggregate::AggregateNode,
8 assign::AssignNode,
9 compile::compile,
10 declare::DeclareNode,
11 dictionary_scan::DictionaryScan,
12 environment::EnvironmentNode,
13 extend::{ExtendNode, ExtendWithoutInputNode},
14 filter::FilterNode,
15 generator::GeneratorNode,
16 index_scan::IndexScanNode,
17 inline::InlineDataNode,
18 join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
19 map::{MapNode, MapWithoutInputNode},
20 ring_buffer_scan::RingBufferScan,
21 row_lookup::{RowListLookupNode, RowPointLookupNode, RowRangeScanNode},
22 scalarize::ScalarizeNode,
23 sort::SortNode,
24 table_scan::TableScanNode,
25 table_virtual_scan::VirtualScanNode,
26 take::TakeNode,
27 variable::VariableNode,
28 view_scan::ViewScanNode,
29};
30use reifydb_core::{
31 Frame,
32 interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
33 value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
34};
35use reifydb_rql::{
36 ast,
37 plan::{physical::PhysicalPlan, plan},
38};
39
40use crate::{
41 StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
42 function::{Functions, generator, math},
43 stack::{Stack, Variable},
44 table_virtual::system::FlowOperatorStore,
45};
46
47mod catalog;
48mod mutate;
49mod query;
50
51pub(crate) trait QueryNode<'a> {
54 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
57
58 fn next(
61 &mut self,
62 rx: &mut StandardTransaction<'a>,
63 ctx: &mut ExecutionContext<'a>,
64 ) -> crate::Result<Option<Batch<'a>>>;
65
66 fn headers(&self) -> Option<ColumnHeaders<'a>>;
68}
69
70#[derive(Clone)]
71pub struct ExecutionContext<'a> {
72 pub executor: Executor,
73 pub source: Option<ResolvedSource<'a>>,
74 pub batch_size: u64,
75 pub params: Params,
76 pub stack: Stack,
77}
78
79#[derive(Debug)]
80pub struct Batch<'a> {
81 pub columns: Columns<'a>,
82}
83
84pub(crate) enum ExecutionPlan<'a> {
85 Aggregate(AggregateNode<'a>),
86 DictionaryScan(DictionaryScan<'a>),
87 Filter(FilterNode<'a>),
88 IndexScan(IndexScanNode<'a>),
89 InlineData(InlineDataNode<'a>),
90 InnerJoin(InnerJoinNode<'a>),
91 LeftJoin(LeftJoinNode<'a>),
92 NaturalJoin(NaturalJoinNode<'a>),
93 Map(MapNode<'a>),
94 MapWithoutInput(MapWithoutInputNode<'a>),
95 Extend(ExtendNode<'a>),
96 ExtendWithoutInput(ExtendWithoutInputNode<'a>),
97 Sort(SortNode<'a>),
98 TableScan(TableScanNode<'a>),
99 Take(TakeNode<'a>),
100 ViewScan(ViewScanNode<'a>),
101 Variable(VariableNode<'a>),
102 Environment(EnvironmentNode),
103 VirtualScan(VirtualScanNode<'a>),
104 RingBufferScan(RingBufferScan<'a>),
105 Generator(GeneratorNode<'a>),
106 Declare(DeclareNode<'a>),
107 Assign(AssignNode<'a>),
108 Conditional(query::conditional::ConditionalNode<'a>),
109 Scalarize(ScalarizeNode<'a>),
110 RowPointLookup(RowPointLookupNode<'a>),
112 RowListLookup(RowListLookupNode<'a>),
113 RowRangeScan(RowRangeScanNode<'a>),
114}
115
116impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
118 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
119 (**self).initialize(rx, ctx)
120 }
121
122 fn next(
123 &mut self,
124 rx: &mut StandardTransaction<'a>,
125 ctx: &mut ExecutionContext<'a>,
126 ) -> crate::Result<Option<Batch<'a>>> {
127 (**self).next(rx, ctx)
128 }
129
130 fn headers(&self) -> Option<ColumnHeaders<'a>> {
131 (**self).headers()
132 }
133}
134
135impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
136 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
137 match self {
138 ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
139 ExecutionPlan::DictionaryScan(node) => node.initialize(rx, ctx),
140 ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
141 ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
142 ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
143 ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
144 ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
145 ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
146 ExecutionPlan::Map(node) => node.initialize(rx, ctx),
147 ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
148 ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
149 ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
150 ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
151 ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
152 ExecutionPlan::Take(node) => node.initialize(rx, ctx),
153 ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
154 ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
155 ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
156 ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
157 ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
158 ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
159 ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
160 ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
161 ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
162 ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
163 ExecutionPlan::RowPointLookup(node) => node.initialize(rx, ctx),
164 ExecutionPlan::RowListLookup(node) => node.initialize(rx, ctx),
165 ExecutionPlan::RowRangeScan(node) => node.initialize(rx, ctx),
166 }
167 }
168
169 fn next(
170 &mut self,
171 rx: &mut StandardTransaction<'a>,
172 ctx: &mut ExecutionContext<'a>,
173 ) -> crate::Result<Option<Batch<'a>>> {
174 match self {
175 ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
176 ExecutionPlan::DictionaryScan(node) => node.next(rx, ctx),
177 ExecutionPlan::Filter(node) => node.next(rx, ctx),
178 ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
179 ExecutionPlan::InlineData(node) => node.next(rx, ctx),
180 ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
181 ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
182 ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
183 ExecutionPlan::Map(node) => node.next(rx, ctx),
184 ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
185 ExecutionPlan::Extend(node) => node.next(rx, ctx),
186 ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
187 ExecutionPlan::Sort(node) => node.next(rx, ctx),
188 ExecutionPlan::TableScan(node) => node.next(rx, ctx),
189 ExecutionPlan::Take(node) => node.next(rx, ctx),
190 ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
191 ExecutionPlan::Variable(node) => node.next(rx, ctx),
192 ExecutionPlan::Environment(node) => node.next(rx, ctx),
193 ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
194 ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
195 ExecutionPlan::Generator(node) => node.next(rx, ctx),
196 ExecutionPlan::Declare(node) => node.next(rx, ctx),
197 ExecutionPlan::Assign(node) => node.next(rx, ctx),
198 ExecutionPlan::Conditional(node) => node.next(rx, ctx),
199 ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
200 ExecutionPlan::RowPointLookup(node) => node.next(rx, ctx),
201 ExecutionPlan::RowListLookup(node) => node.next(rx, ctx),
202 ExecutionPlan::RowRangeScan(node) => node.next(rx, ctx),
203 }
204 }
205
206 fn headers(&self) -> Option<ColumnHeaders<'a>> {
207 match self {
208 ExecutionPlan::Aggregate(node) => node.headers(),
209 ExecutionPlan::DictionaryScan(node) => node.headers(),
210 ExecutionPlan::Filter(node) => node.headers(),
211 ExecutionPlan::IndexScan(node) => node.headers(),
212 ExecutionPlan::InlineData(node) => node.headers(),
213 ExecutionPlan::InnerJoin(node) => node.headers(),
214 ExecutionPlan::LeftJoin(node) => node.headers(),
215 ExecutionPlan::NaturalJoin(node) => node.headers(),
216 ExecutionPlan::Map(node) => node.headers(),
217 ExecutionPlan::MapWithoutInput(node) => node.headers(),
218 ExecutionPlan::Extend(node) => node.headers(),
219 ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
220 ExecutionPlan::Sort(node) => node.headers(),
221 ExecutionPlan::TableScan(node) => node.headers(),
222 ExecutionPlan::Take(node) => node.headers(),
223 ExecutionPlan::ViewScan(node) => node.headers(),
224 ExecutionPlan::Variable(node) => node.headers(),
225 ExecutionPlan::Environment(node) => node.headers(),
226 ExecutionPlan::VirtualScan(node) => node.headers(),
227 ExecutionPlan::RingBufferScan(node) => node.headers(),
228 ExecutionPlan::Generator(node) => node.headers(),
229 ExecutionPlan::Declare(node) => node.headers(),
230 ExecutionPlan::Assign(node) => node.headers(),
231 ExecutionPlan::Conditional(node) => node.headers(),
232 ExecutionPlan::Scalarize(node) => node.headers(),
233 ExecutionPlan::RowPointLookup(node) => node.headers(),
234 ExecutionPlan::RowListLookup(node) => node.headers(),
235 ExecutionPlan::RowRangeScan(node) => node.headers(),
236 }
237 }
238}
239
240pub struct Executor(Arc<ExecutorInner>);
241
242pub struct ExecutorInner {
243 pub functions: Functions,
244 pub flow_operator_store: FlowOperatorStore,
245}
246
247impl Clone for Executor {
248 fn clone(&self) -> Self {
249 Self(self.0.clone())
250 }
251}
252
253impl std::ops::Deref for Executor {
254 type Target = ExecutorInner;
255
256 fn deref(&self) -> &Self::Target {
257 &self.0
258 }
259}
260
261impl Executor {
262 pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
263 Self(Arc::new(ExecutorInner {
264 functions,
265 flow_operator_store,
266 }))
267 }
268
269 #[allow(dead_code)]
270 pub fn testing() -> Self {
271 Self::new(
272 Functions::builder()
273 .register_aggregate("math::sum", math::aggregate::Sum::new)
274 .register_aggregate("math::min", math::aggregate::Min::new)
275 .register_aggregate("math::max", math::aggregate::Max::new)
276 .register_aggregate("math::avg", math::aggregate::Avg::new)
277 .register_aggregate("math::count", math::aggregate::Count::new)
278 .register_scalar("math::abs", math::scalar::Abs::new)
279 .register_scalar("math::avg", math::scalar::Avg::new)
280 .register_generator("generate_series", generator::GenerateSeries::new)
281 .build(),
282 FlowOperatorStore::new(),
283 )
284 }
285}
286
287impl ExecuteCommand<StandardCommandTransaction> for Executor {
288 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
289 let mut result = vec![];
290 let statements = ast::parse_str(cmd.rql)?;
291
292 let mut persistent_stack = Stack::new();
294
295 match &cmd.params {
297 reifydb_core::interface::Params::Positional(values) => {
298 for (index, value) in values.iter().enumerate() {
300 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
302 }
303 }
304 reifydb_core::interface::Params::Named(map) => {
305 for (name, value) in map {
307 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
308 }
309 }
310 reifydb_core::interface::Params::None => {
311 }
313 }
314
315 for statement in statements {
316 if let Some(plan) = plan(txn, statement)? {
317 if let Some(er) =
318 self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
319 {
320 result.push(Frame::from(er));
321 }
322 }
323 }
324
325 Ok(result)
326 }
327}
328
329impl ExecuteQuery<StandardQueryTransaction> for Executor {
330 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
331 let mut result = vec![];
332 let statements = ast::parse_str(qry.rql)?;
333
334 let mut persistent_stack = Stack::new();
336
337 match &qry.params {
339 reifydb_core::interface::Params::Positional(values) => {
340 for (index, value) in values.iter().enumerate() {
342 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
344 }
345 }
346 reifydb_core::interface::Params::Named(map) => {
347 for (name, value) in map {
349 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
350 }
351 }
352 reifydb_core::interface::Params::None => {
353 }
355 }
356
357 for statement in statements {
358 if let Some(plan) = plan(txn, statement)? {
359 if let Some(er) =
360 self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
361 {
362 result.push(Frame::from(er));
363 }
364 }
365 }
366
367 Ok(result)
368 }
369}
370
371impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
372
373impl Executor {
374 pub(crate) fn execute_query_plan<'a>(
375 &self,
376 rx: &'a mut StandardQueryTransaction,
377 plan: PhysicalPlan<'a>,
378 params: Params,
379 stack: &mut Stack,
380 ) -> crate::Result<Option<Columns<'a>>> {
381 match plan {
382 PhysicalPlan::Aggregate(_)
384 | PhysicalPlan::DictionaryScan(_)
385 | PhysicalPlan::Filter(_)
386 | PhysicalPlan::IndexScan(_)
387 | PhysicalPlan::JoinInner(_)
388 | PhysicalPlan::JoinLeft(_)
389 | PhysicalPlan::JoinNatural(_)
390 | PhysicalPlan::Take(_)
391 | PhysicalPlan::Sort(_)
392 | PhysicalPlan::Map(_)
393 | PhysicalPlan::Extend(_)
394 | PhysicalPlan::InlineData(_)
395 | PhysicalPlan::Generator(_)
396 | PhysicalPlan::Delete(_)
397 | PhysicalPlan::DeleteRingBuffer(_)
398 | PhysicalPlan::InsertTable(_)
399 | PhysicalPlan::InsertRingBuffer(_)
400 | PhysicalPlan::InsertDictionary(_)
401 | PhysicalPlan::Update(_)
402 | PhysicalPlan::UpdateRingBuffer(_)
403 | PhysicalPlan::TableScan(_)
404 | PhysicalPlan::ViewScan(_)
405 | PhysicalPlan::FlowScan(_)
406 | PhysicalPlan::TableVirtualScan(_)
407 | PhysicalPlan::RingBufferScan(_)
408 | PhysicalPlan::Variable(_)
409 | PhysicalPlan::Environment(_)
410 | PhysicalPlan::Conditional(_)
411 | PhysicalPlan::Scalarize(_)
412 | PhysicalPlan::RowPointLookup(_)
413 | PhysicalPlan::RowListLookup(_)
414 | PhysicalPlan::RowRangeScan(_) => {
415 let mut std_txn = StandardTransaction::from(rx);
416 self.query(&mut std_txn, plan, params, stack)
417 }
418 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
419 let mut std_txn = StandardTransaction::from(rx);
420 self.query(&mut std_txn, plan, params, stack)?;
421 Ok(None)
422 }
423 PhysicalPlan::AlterSequence(_)
424 | PhysicalPlan::AlterTable(_)
425 | PhysicalPlan::AlterView(_)
426 | PhysicalPlan::AlterFlow(_)
427 | PhysicalPlan::CreateDeferredView(_)
428 | PhysicalPlan::CreateTransactionalView(_)
429 | PhysicalPlan::CreateNamespace(_)
430 | PhysicalPlan::CreateTable(_)
431 | PhysicalPlan::CreateRingBuffer(_)
432 | PhysicalPlan::CreateFlow(_)
433 | PhysicalPlan::CreateDictionary(_)
434 | PhysicalPlan::Distinct(_)
435 | PhysicalPlan::Apply(_) => {
436 unimplemented!(
439 "Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
440 )
441 }
442 PhysicalPlan::Window(_) => {
443 unimplemented!(
446 "Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
447 )
448 }
449 }
450 }
451
452 pub fn execute_command_plan<'a>(
453 &self,
454 txn: &'a mut StandardCommandTransaction,
455 plan: PhysicalPlan<'a>,
456 params: Params,
457 stack: &mut Stack,
458 ) -> crate::Result<Option<Columns<'a>>> {
459 match plan {
460 PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
461 PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
462 PhysicalPlan::CreateTransactionalView(plan) => {
463 Ok(Some(self.create_transactional_view(txn, plan)?))
464 }
465 PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
466 PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
467 PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
468 PhysicalPlan::CreateFlow(plan) => Ok(Some(self.create_flow(txn, plan)?)),
469 PhysicalPlan::CreateDictionary(plan) => Ok(Some(self.create_dictionary(txn, plan)?)),
470 PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
471 PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
472 PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
473 PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
474 PhysicalPlan::InsertDictionary(plan) => Ok(Some(self.insert_dictionary(txn, plan, stack)?)),
475 PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
476 PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
477
478 PhysicalPlan::Aggregate(_)
479 | PhysicalPlan::DictionaryScan(_)
480 | PhysicalPlan::Filter(_)
481 | PhysicalPlan::IndexScan(_)
482 | PhysicalPlan::JoinInner(_)
483 | PhysicalPlan::JoinLeft(_)
484 | PhysicalPlan::JoinNatural(_)
485 | PhysicalPlan::Take(_)
486 | PhysicalPlan::Sort(_)
487 | PhysicalPlan::Map(_)
488 | PhysicalPlan::Extend(_)
489 | PhysicalPlan::InlineData(_)
490 | PhysicalPlan::Generator(_)
491 | PhysicalPlan::TableScan(_)
492 | PhysicalPlan::ViewScan(_)
493 | PhysicalPlan::FlowScan(_)
494 | PhysicalPlan::TableVirtualScan(_)
495 | PhysicalPlan::RingBufferScan(_)
496 | PhysicalPlan::Distinct(_)
497 | PhysicalPlan::Variable(_)
498 | PhysicalPlan::Environment(_)
499 | PhysicalPlan::Apply(_)
500 | PhysicalPlan::Conditional(_)
501 | PhysicalPlan::Scalarize(_)
502 | PhysicalPlan::RowPointLookup(_)
503 | PhysicalPlan::RowListLookup(_)
504 | PhysicalPlan::RowRangeScan(_) => {
505 let mut std_txn = StandardTransaction::from(txn);
506 self.query(&mut std_txn, plan, params, stack)
507 }
508 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
509 let mut std_txn = StandardTransaction::from(txn);
510 self.query(&mut std_txn, plan, params, stack)?;
511 Ok(None)
512 }
513 PhysicalPlan::Window(_) => {
514 let mut std_txn = StandardTransaction::from(txn);
515 self.query(&mut std_txn, plan, params, stack)
516 }
517
518 PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
519 PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
520 PhysicalPlan::AlterFlow(plan) => Ok(Some(self.execute_alter_flow(txn, plan)?)),
521 }
522 }
523
524 fn query<'a>(
525 &self,
526 rx: &mut StandardTransaction<'a>,
527 plan: PhysicalPlan<'a>,
528 params: Params,
529 stack: &mut Stack,
530 ) -> crate::Result<Option<Columns<'a>>> {
531 let context = Arc::new(ExecutionContext {
532 executor: self.clone(),
533 source: None,
534 batch_size: 1024,
535 params: params.clone(),
536 stack: stack.clone(),
537 });
538 let mut node = compile(plan, rx, context.clone());
539
540 node.initialize(rx, &context)?;
542
543 let mut result: Option<Columns> = None;
544 let mut mutable_context = (*context).clone();
545
546 while let Some(Batch {
547 columns,
548 }) = node.next(rx, &mut mutable_context)?
549 {
550 if let Some(mut result_columns) = result.take() {
551 result_columns.append_columns(columns)?;
552 result = Some(result_columns);
553 } else {
554 result = Some(columns);
555 }
556 }
557
558 *stack = mutable_context.stack;
560
561 let headers = node.headers();
562
563 if let Some(mut columns) = result {
564 if let Some(headers) = headers {
565 columns.apply_headers(&headers);
566 }
567
568 Ok(columns.into())
569 } else {
570 let columns: Vec<Column<'a>> = node
573 .headers()
574 .unwrap_or(ColumnHeaders {
575 columns: vec![],
576 })
577 .columns
578 .into_iter()
579 .map(|name| Column {
580 name,
581 data: ColumnData::undefined(0),
582 })
583 .collect();
584
585 Ok(Some(Columns::new(columns)))
586 }
587 }
588}