1use std::sync::Arc;
5
6use query::{
7 aggregate::AggregateNode,
8 assign::AssignNode,
9 compile::compile,
10 declare::DeclareNode,
11 environment::EnvironmentNode,
12 extend::{ExtendNode, ExtendWithoutInputNode},
13 filter::FilterNode,
14 generator::GeneratorNode,
15 index_scan::IndexScanNode,
16 inline::InlineDataNode,
17 join::{InnerJoinNode, LeftJoinNode, NaturalJoinNode},
18 map::{MapNode, MapWithoutInputNode},
19 ring_buffer_scan::RingBufferScan,
20 scalarize::ScalarizeNode,
21 sort::SortNode,
22 table_scan::TableScanNode,
23 table_virtual_scan::VirtualScanNode,
24 take::TakeNode,
25 variable::VariableNode,
26 view_scan::ViewScanNode,
27};
28use reifydb_core::{
29 Frame,
30 interface::{Command, Execute, ExecuteCommand, ExecuteQuery, Params, Query, ResolvedSource},
31 value::column::{Column, ColumnData, Columns, headers::ColumnHeaders},
32};
33use reifydb_rql::{
34 ast,
35 plan::{physical::PhysicalPlan, plan},
36};
37
38use crate::{
39 StandardCommandTransaction, StandardQueryTransaction, StandardTransaction,
40 function::{Functions, generator, math},
41 stack::{Stack, Variable},
42 table_virtual::system::FlowOperatorStore,
43};
44
45mod catalog;
46mod mutate;
47mod query;
48
49pub(crate) trait QueryNode<'a> {
52 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()>;
55
56 fn next(
59 &mut self,
60 rx: &mut StandardTransaction<'a>,
61 ctx: &mut ExecutionContext<'a>,
62 ) -> crate::Result<Option<Batch<'a>>>;
63
64 fn headers(&self) -> Option<ColumnHeaders<'a>>;
66}
67
68#[derive(Clone)]
69pub struct ExecutionContext<'a> {
70 pub executor: Executor,
71 pub source: Option<ResolvedSource<'a>>,
72 pub batch_size: u64,
73 pub params: Params,
74 pub stack: Stack,
75}
76
77#[derive(Debug)]
78pub struct Batch<'a> {
79 pub columns: Columns<'a>,
80}
81
82pub(crate) enum ExecutionPlan<'a> {
83 Aggregate(AggregateNode<'a>),
84 Filter(FilterNode<'a>),
85 IndexScan(IndexScanNode<'a>),
86 InlineData(InlineDataNode<'a>),
87 InnerJoin(InnerJoinNode<'a>),
88 LeftJoin(LeftJoinNode<'a>),
89 NaturalJoin(NaturalJoinNode<'a>),
90 Map(MapNode<'a>),
91 MapWithoutInput(MapWithoutInputNode<'a>),
92 Extend(ExtendNode<'a>),
93 ExtendWithoutInput(ExtendWithoutInputNode<'a>),
94 Sort(SortNode<'a>),
95 TableScan(TableScanNode<'a>),
96 Take(TakeNode<'a>),
97 ViewScan(ViewScanNode<'a>),
98 Variable(VariableNode<'a>),
99 Environment(EnvironmentNode),
100 VirtualScan(VirtualScanNode<'a>),
101 RingBufferScan(RingBufferScan<'a>),
102 Generator(GeneratorNode<'a>),
103 Declare(DeclareNode<'a>),
104 Assign(AssignNode<'a>),
105 Conditional(query::conditional::ConditionalNode<'a>),
106 Scalarize(ScalarizeNode<'a>),
107}
108
109impl<'a> QueryNode<'a> for Box<ExecutionPlan<'a>> {
111 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
112 (**self).initialize(rx, ctx)
113 }
114
115 fn next(
116 &mut self,
117 rx: &mut StandardTransaction<'a>,
118 ctx: &mut ExecutionContext<'a>,
119 ) -> crate::Result<Option<Batch<'a>>> {
120 (**self).next(rx, ctx)
121 }
122
123 fn headers(&self) -> Option<ColumnHeaders<'a>> {
124 (**self).headers()
125 }
126}
127
128impl<'a> QueryNode<'a> for ExecutionPlan<'a> {
129 fn initialize(&mut self, rx: &mut StandardTransaction<'a>, ctx: &ExecutionContext<'a>) -> crate::Result<()> {
130 match self {
131 ExecutionPlan::Aggregate(node) => node.initialize(rx, ctx),
132 ExecutionPlan::Filter(node) => node.initialize(rx, ctx),
133 ExecutionPlan::IndexScan(node) => node.initialize(rx, ctx),
134 ExecutionPlan::InlineData(node) => node.initialize(rx, ctx),
135 ExecutionPlan::InnerJoin(node) => node.initialize(rx, ctx),
136 ExecutionPlan::LeftJoin(node) => node.initialize(rx, ctx),
137 ExecutionPlan::NaturalJoin(node) => node.initialize(rx, ctx),
138 ExecutionPlan::Map(node) => node.initialize(rx, ctx),
139 ExecutionPlan::MapWithoutInput(node) => node.initialize(rx, ctx),
140 ExecutionPlan::Extend(node) => node.initialize(rx, ctx),
141 ExecutionPlan::ExtendWithoutInput(node) => node.initialize(rx, ctx),
142 ExecutionPlan::Sort(node) => node.initialize(rx, ctx),
143 ExecutionPlan::TableScan(node) => node.initialize(rx, ctx),
144 ExecutionPlan::Take(node) => node.initialize(rx, ctx),
145 ExecutionPlan::ViewScan(node) => node.initialize(rx, ctx),
146 ExecutionPlan::Variable(node) => node.initialize(rx, ctx),
147 ExecutionPlan::Environment(node) => node.initialize(rx, ctx),
148 ExecutionPlan::VirtualScan(node) => node.initialize(rx, ctx),
149 ExecutionPlan::RingBufferScan(node) => node.initialize(rx, ctx),
150 ExecutionPlan::Generator(node) => node.initialize(rx, ctx),
151 ExecutionPlan::Declare(node) => node.initialize(rx, ctx),
152 ExecutionPlan::Assign(node) => node.initialize(rx, ctx),
153 ExecutionPlan::Conditional(node) => node.initialize(rx, ctx),
154 ExecutionPlan::Scalarize(node) => node.initialize(rx, ctx),
155 }
156 }
157
158 fn next(
159 &mut self,
160 rx: &mut StandardTransaction<'a>,
161 ctx: &mut ExecutionContext<'a>,
162 ) -> crate::Result<Option<Batch<'a>>> {
163 match self {
164 ExecutionPlan::Aggregate(node) => node.next(rx, ctx),
165 ExecutionPlan::Filter(node) => node.next(rx, ctx),
166 ExecutionPlan::IndexScan(node) => node.next(rx, ctx),
167 ExecutionPlan::InlineData(node) => node.next(rx, ctx),
168 ExecutionPlan::InnerJoin(node) => node.next(rx, ctx),
169 ExecutionPlan::LeftJoin(node) => node.next(rx, ctx),
170 ExecutionPlan::NaturalJoin(node) => node.next(rx, ctx),
171 ExecutionPlan::Map(node) => node.next(rx, ctx),
172 ExecutionPlan::MapWithoutInput(node) => node.next(rx, ctx),
173 ExecutionPlan::Extend(node) => node.next(rx, ctx),
174 ExecutionPlan::ExtendWithoutInput(node) => node.next(rx, ctx),
175 ExecutionPlan::Sort(node) => node.next(rx, ctx),
176 ExecutionPlan::TableScan(node) => node.next(rx, ctx),
177 ExecutionPlan::Take(node) => node.next(rx, ctx),
178 ExecutionPlan::ViewScan(node) => node.next(rx, ctx),
179 ExecutionPlan::Variable(node) => node.next(rx, ctx),
180 ExecutionPlan::Environment(node) => node.next(rx, ctx),
181 ExecutionPlan::VirtualScan(node) => node.next(rx, ctx),
182 ExecutionPlan::RingBufferScan(node) => node.next(rx, ctx),
183 ExecutionPlan::Generator(node) => node.next(rx, ctx),
184 ExecutionPlan::Declare(node) => node.next(rx, ctx),
185 ExecutionPlan::Assign(node) => node.next(rx, ctx),
186 ExecutionPlan::Conditional(node) => node.next(rx, ctx),
187 ExecutionPlan::Scalarize(node) => node.next(rx, ctx),
188 }
189 }
190
191 fn headers(&self) -> Option<ColumnHeaders<'a>> {
192 match self {
193 ExecutionPlan::Aggregate(node) => node.headers(),
194 ExecutionPlan::Filter(node) => node.headers(),
195 ExecutionPlan::IndexScan(node) => node.headers(),
196 ExecutionPlan::InlineData(node) => node.headers(),
197 ExecutionPlan::InnerJoin(node) => node.headers(),
198 ExecutionPlan::LeftJoin(node) => node.headers(),
199 ExecutionPlan::NaturalJoin(node) => node.headers(),
200 ExecutionPlan::Map(node) => node.headers(),
201 ExecutionPlan::MapWithoutInput(node) => node.headers(),
202 ExecutionPlan::Extend(node) => node.headers(),
203 ExecutionPlan::ExtendWithoutInput(node) => node.headers(),
204 ExecutionPlan::Sort(node) => node.headers(),
205 ExecutionPlan::TableScan(node) => node.headers(),
206 ExecutionPlan::Take(node) => node.headers(),
207 ExecutionPlan::ViewScan(node) => node.headers(),
208 ExecutionPlan::Variable(node) => node.headers(),
209 ExecutionPlan::Environment(node) => node.headers(),
210 ExecutionPlan::VirtualScan(node) => node.headers(),
211 ExecutionPlan::RingBufferScan(node) => node.headers(),
212 ExecutionPlan::Generator(node) => node.headers(),
213 ExecutionPlan::Declare(node) => node.headers(),
214 ExecutionPlan::Assign(node) => node.headers(),
215 ExecutionPlan::Conditional(node) => node.headers(),
216 ExecutionPlan::Scalarize(node) => node.headers(),
217 }
218 }
219}
220
221pub struct Executor(Arc<ExecutorInner>);
222
223pub struct ExecutorInner {
224 pub functions: Functions,
225 pub flow_operator_store: FlowOperatorStore,
226}
227
228impl Clone for Executor {
229 fn clone(&self) -> Self {
230 Self(self.0.clone())
231 }
232}
233
234impl std::ops::Deref for Executor {
235 type Target = ExecutorInner;
236
237 fn deref(&self) -> &Self::Target {
238 &self.0
239 }
240}
241
242impl Executor {
243 pub fn new(functions: Functions, flow_operator_store: FlowOperatorStore) -> Self {
244 Self(Arc::new(ExecutorInner {
245 functions,
246 flow_operator_store,
247 }))
248 }
249
250 #[allow(dead_code)]
251 pub fn testing() -> Self {
252 Self::new(
253 Functions::builder()
254 .register_aggregate("math::sum", math::aggregate::Sum::new)
255 .register_aggregate("math::min", math::aggregate::Min::new)
256 .register_aggregate("math::max", math::aggregate::Max::new)
257 .register_aggregate("math::avg", math::aggregate::Avg::new)
258 .register_aggregate("math::count", math::aggregate::Count::new)
259 .register_scalar("math::abs", math::scalar::Abs::new)
260 .register_scalar("math::avg", math::scalar::Avg::new)
261 .register_generator("generate_series", generator::GenerateSeries::new)
262 .build(),
263 FlowOperatorStore::new(),
264 )
265 }
266}
267
268impl ExecuteCommand<StandardCommandTransaction> for Executor {
269 fn execute_command(&self, txn: &mut StandardCommandTransaction, cmd: Command<'_>) -> crate::Result<Vec<Frame>> {
270 let mut result = vec![];
271 let statements = ast::parse_str(cmd.rql)?;
272
273 let mut persistent_stack = Stack::new();
275
276 match &cmd.params {
278 reifydb_core::interface::Params::Positional(values) => {
279 for (index, value) in values.iter().enumerate() {
281 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
283 }
284 }
285 reifydb_core::interface::Params::Named(map) => {
286 for (name, value) in map {
288 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
289 }
290 }
291 reifydb_core::interface::Params::None => {
292 }
294 }
295
296 for statement in statements {
297 if let Some(plan) = plan(txn, statement)? {
298 if let Some(er) =
299 self.execute_command_plan(txn, plan, cmd.params.clone(), &mut persistent_stack)?
300 {
301 result.push(Frame::from(er));
302 }
303 }
304 }
305
306 Ok(result)
307 }
308}
309
310impl ExecuteQuery<StandardQueryTransaction> for Executor {
311 fn execute_query(&self, txn: &mut StandardQueryTransaction, qry: Query<'_>) -> crate::Result<Vec<Frame>> {
312 let mut result = vec![];
313 let statements = ast::parse_str(qry.rql)?;
314
315 let mut persistent_stack = Stack::new();
317
318 match &qry.params {
320 reifydb_core::interface::Params::Positional(values) => {
321 for (index, value) in values.iter().enumerate() {
323 let param_name = (index + 1).to_string(); persistent_stack.set(param_name, Variable::Scalar(value.clone()), false)?;
325 }
326 }
327 reifydb_core::interface::Params::Named(map) => {
328 for (name, value) in map {
330 persistent_stack.set(name.clone(), Variable::Scalar(value.clone()), false)?;
331 }
332 }
333 reifydb_core::interface::Params::None => {
334 }
336 }
337
338 for statement in statements {
339 if let Some(plan) = plan(txn, statement)? {
340 if let Some(er) =
341 self.execute_query_plan(txn, plan, qry.params.clone(), &mut persistent_stack)?
342 {
343 result.push(Frame::from(er));
344 }
345 }
346 }
347
348 Ok(result)
349 }
350}
351
352impl Execute<StandardCommandTransaction, StandardQueryTransaction> for Executor {}
353
354impl Executor {
355 pub(crate) fn execute_query_plan<'a>(
356 &self,
357 rx: &'a mut StandardQueryTransaction,
358 plan: PhysicalPlan<'a>,
359 params: Params,
360 stack: &mut Stack,
361 ) -> crate::Result<Option<Columns<'a>>> {
362 match plan {
363 PhysicalPlan::Aggregate(_)
365 | PhysicalPlan::Filter(_)
366 | PhysicalPlan::IndexScan(_)
367 | PhysicalPlan::JoinInner(_)
368 | PhysicalPlan::JoinLeft(_)
369 | PhysicalPlan::JoinNatural(_)
370 | PhysicalPlan::Take(_)
371 | PhysicalPlan::Sort(_)
372 | PhysicalPlan::Map(_)
373 | PhysicalPlan::Extend(_)
374 | PhysicalPlan::InlineData(_)
375 | PhysicalPlan::Generator(_)
376 | PhysicalPlan::Delete(_)
377 | PhysicalPlan::DeleteRingBuffer(_)
378 | PhysicalPlan::InsertTable(_)
379 | PhysicalPlan::InsertRingBuffer(_)
380 | PhysicalPlan::Update(_)
381 | PhysicalPlan::UpdateRingBuffer(_)
382 | PhysicalPlan::TableScan(_)
383 | PhysicalPlan::ViewScan(_)
384 | PhysicalPlan::FlowScan(_)
385 | PhysicalPlan::TableVirtualScan(_)
386 | PhysicalPlan::RingBufferScan(_)
387 | PhysicalPlan::Variable(_)
388 | PhysicalPlan::Environment(_)
389 | PhysicalPlan::Conditional(_)
390 | PhysicalPlan::Scalarize(_) => {
391 let mut std_txn = StandardTransaction::from(rx);
392 self.query(&mut std_txn, plan, params, stack)
393 }
394 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
395 let mut std_txn = StandardTransaction::from(rx);
396 self.query(&mut std_txn, plan, params, stack)?;
397 Ok(None)
398 }
399 PhysicalPlan::AlterSequence(_)
400 | PhysicalPlan::AlterTable(_)
401 | PhysicalPlan::AlterView(_)
402 | PhysicalPlan::AlterFlow(_)
403 | PhysicalPlan::CreateDeferredView(_)
404 | PhysicalPlan::CreateTransactionalView(_)
405 | PhysicalPlan::CreateNamespace(_)
406 | PhysicalPlan::CreateTable(_)
407 | PhysicalPlan::CreateRingBuffer(_)
408 | PhysicalPlan::CreateFlow(_)
409 | PhysicalPlan::Distinct(_)
410 | PhysicalPlan::Apply(_) => {
411 unimplemented!(
414 "Apply operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
415 )
416 }
417 PhysicalPlan::Window(_) => {
418 unimplemented!(
421 "Window operator is only supported in deferred views and requires the flow engine. Use within a CREATE DEFERRED VIEW statement."
422 )
423 }
424 }
425 }
426
427 pub fn execute_command_plan<'a>(
428 &self,
429 txn: &'a mut StandardCommandTransaction,
430 plan: PhysicalPlan<'a>,
431 params: Params,
432 stack: &mut Stack,
433 ) -> crate::Result<Option<Columns<'a>>> {
434 match plan {
435 PhysicalPlan::AlterSequence(plan) => Ok(Some(self.alter_table_sequence(txn, plan)?)),
436 PhysicalPlan::CreateDeferredView(plan) => Ok(Some(self.create_deferred_view(txn, plan)?)),
437 PhysicalPlan::CreateTransactionalView(plan) => {
438 Ok(Some(self.create_transactional_view(txn, plan)?))
439 }
440 PhysicalPlan::CreateNamespace(plan) => Ok(Some(self.create_namespace(txn, plan)?)),
441 PhysicalPlan::CreateTable(plan) => Ok(Some(self.create_table(txn, plan)?)),
442 PhysicalPlan::CreateRingBuffer(plan) => Ok(Some(self.create_ring_buffer(txn, plan)?)),
443 PhysicalPlan::CreateFlow(_plan) => {
444 unimplemented!("CREATE FLOW not yet implemented")
446 }
447 PhysicalPlan::Delete(plan) => Ok(Some(self.delete(txn, plan, params)?)),
448 PhysicalPlan::DeleteRingBuffer(plan) => Ok(Some(self.delete_ring_buffer(txn, plan, params)?)),
449 PhysicalPlan::InsertTable(plan) => Ok(Some(self.insert_table(txn, plan, stack)?)),
450 PhysicalPlan::InsertRingBuffer(plan) => Ok(Some(self.insert_ring_buffer(txn, plan, params)?)),
451 PhysicalPlan::Update(plan) => Ok(Some(self.update_table(txn, plan, params)?)),
452 PhysicalPlan::UpdateRingBuffer(plan) => Ok(Some(self.update_ring_buffer(txn, plan, params)?)),
453
454 PhysicalPlan::Aggregate(_)
455 | PhysicalPlan::Filter(_)
456 | PhysicalPlan::IndexScan(_)
457 | PhysicalPlan::JoinInner(_)
458 | PhysicalPlan::JoinLeft(_)
459 | PhysicalPlan::JoinNatural(_)
460 | PhysicalPlan::Take(_)
461 | PhysicalPlan::Sort(_)
462 | PhysicalPlan::Map(_)
463 | PhysicalPlan::Extend(_)
464 | PhysicalPlan::InlineData(_)
465 | PhysicalPlan::Generator(_)
466 | PhysicalPlan::TableScan(_)
467 | PhysicalPlan::ViewScan(_)
468 | PhysicalPlan::FlowScan(_)
469 | PhysicalPlan::TableVirtualScan(_)
470 | PhysicalPlan::RingBufferScan(_)
471 | PhysicalPlan::Distinct(_)
472 | PhysicalPlan::Variable(_)
473 | PhysicalPlan::Environment(_)
474 | PhysicalPlan::Apply(_)
475 | PhysicalPlan::Conditional(_)
476 | PhysicalPlan::Scalarize(_) => {
477 let mut std_txn = StandardTransaction::from(txn);
478 self.query(&mut std_txn, plan, params, stack)
479 }
480 PhysicalPlan::Declare(_) | PhysicalPlan::Assign(_) => {
481 let mut std_txn = StandardTransaction::from(txn);
482 self.query(&mut std_txn, plan, params, stack)?;
483 Ok(None)
484 }
485 PhysicalPlan::Window(_) => {
486 let mut std_txn = StandardTransaction::from(txn);
487 self.query(&mut std_txn, plan, params, stack)
488 }
489
490 PhysicalPlan::AlterTable(plan) => Ok(Some(self.alter_table(txn, plan)?)),
491 PhysicalPlan::AlterView(plan) => Ok(Some(self.execute_alter_view(txn, plan)?)),
492 PhysicalPlan::AlterFlow(_plan) => {
493 unimplemented!("ALTER FLOW not yet implemented")
495 }
496 }
497 }
498
499 fn query<'a>(
500 &self,
501 rx: &mut StandardTransaction<'a>,
502 plan: PhysicalPlan<'a>,
503 params: Params,
504 stack: &mut Stack,
505 ) -> crate::Result<Option<Columns<'a>>> {
506 let context = Arc::new(ExecutionContext {
507 executor: self.clone(),
508 source: None,
509 batch_size: 1024,
510 params: params.clone(),
511 stack: stack.clone(),
512 });
513 let mut node = compile(plan, rx, context.clone());
514
515 node.initialize(rx, &context)?;
517
518 let mut result: Option<Columns> = None;
519 let mut mutable_context = (*context).clone();
520
521 while let Some(Batch {
522 columns,
523 }) = node.next(rx, &mut mutable_context)?
524 {
525 if let Some(mut result_columns) = result.take() {
526 result_columns.append_columns(columns)?;
527 result = Some(result_columns);
528 } else {
529 result = Some(columns);
530 }
531 }
532
533 *stack = mutable_context.stack;
535
536 let headers = node.headers();
537
538 if let Some(mut columns) = result {
539 if let Some(headers) = headers {
540 columns.apply_headers(&headers);
541 }
542
543 Ok(columns.into())
544 } else {
545 let columns: Vec<Column<'a>> = node
548 .headers()
549 .unwrap_or(ColumnHeaders {
550 columns: vec![],
551 })
552 .columns
553 .into_iter()
554 .map(|name| Column {
555 name,
556 data: ColumnData::undefined(0),
557 })
558 .collect();
559
560 Ok(Some(Columns::new(columns)))
561 }
562 }
563}