1#![deny(rust_2018_idioms)]
2#![deny(clippy::all)]
3
4mod util;
17
18pub mod graph;
19
20use ordered_float::OrderedFloat;
21use partiql_common::catalog::ObjectId;
22use partiql_value::BindingsName;
23use rust_decimal::Decimal as RustDecimal;
63use rustc_hash::FxHashMap;
64#[cfg(feature = "serde")]
65use serde::{Deserialize, Serialize};
66use std::fmt::{Debug, Display, Formatter};
67
68#[derive(Debug, Clone, Eq, PartialEq, Default)]
70#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
71pub struct LogicalPlan<T>
72where
73 T: Default + Debug,
74{
75 nodes: Vec<T>,
76 edges: Vec<(OpId, OpId, u8)>,
78}
79
80impl<T> LogicalPlan<T>
81where
82 T: Default + Debug,
83{
84 #[must_use]
86 pub fn new() -> Self {
87 Self::default()
88 }
89
90 pub fn add_operator(&mut self, op: T) -> OpId {
92 self.nodes.push(op);
93 OpId(self.operator_count())
94 }
95
96 #[inline]
98 pub fn add_flow(&mut self, src: OpId, dst: OpId) {
99 assert!(src.index() <= self.operator_count());
100 assert!(dst.index() <= self.operator_count());
101
102 self.edges.push((src, dst, 0));
103 }
104
105 #[inline]
107 pub fn add_flow_with_branch_num(&mut self, src: OpId, dst: OpId, branch_num: u8) {
108 assert!(src.index() <= self.operator_count());
109 assert!(dst.index() <= self.operator_count());
110
111 self.edges.push((src, dst, branch_num));
112 }
113
114 #[inline]
136 pub fn extend_with_flows(&mut self, flows: &[(OpId, OpId)]) {
137 flows.iter().for_each(|&(s, d)| self.add_flow(s, d));
138 }
139
140 #[inline]
141 pub fn merge_plan(&mut self, other: Self) {
142 let LogicalPlan { nodes, edges } = other;
143 let mut mapping = FxHashMap::default();
144 for (old_id, op) in nodes.into_iter().enumerate().map(|(i, n)| (OpId(i + 1), n)) {
145 let new_id = self.add_operator(op);
146 mapping.insert(old_id, new_id);
147 }
148
149 for (old1, old2, branch) in edges {
150 let new1 = mapping[&old1];
151 let new2 = mapping[&old2];
152 self.edges.push((new1, new2, branch));
153 }
154 }
155
156 #[inline]
158 #[must_use]
159 pub fn operator_count(&self) -> usize {
160 self.nodes.len()
161 }
162
163 #[must_use]
165 pub fn operators(&self) -> &Vec<T> {
166 &self.nodes
167 }
168
169 pub fn operators_by_id(&self) -> impl Iterator<Item = (OpId, &T)> {
171 self.nodes.iter().enumerate().map(|(i, n)| (OpId(i + 1), n))
172 }
173
174 #[must_use]
176 pub fn flows(&self) -> &Vec<(OpId, OpId, u8)> {
177 &self.edges
178 }
179
180 #[must_use]
181 pub fn operator(&self, id: OpId) -> Option<&T> {
182 self.nodes.get(id.0 - 1)
183 }
184
185 pub fn operator_as_mut(&mut self, id: OpId) -> Option<&mut T> {
186 self.nodes.get_mut(id.0 - 1)
187 }
188
189 }
191
192#[derive(Debug, Clone, Eq, PartialEq, Copy, Hash)]
194#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
195pub struct OpId(usize);
196
197impl OpId {
198 #[must_use]
200 pub fn index(&self) -> usize {
201 self.0
202 }
203}
204
205impl<T> Display for LogicalPlan<T>
206where
207 T: Default + Debug,
208{
209 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
210 let flows = self.flows();
211 writeln!(f, "LogicalPlan")?;
212 writeln!(f, "---")?;
213 for (s, d, _w) in flows {
214 let src_node = self.operator(*s).expect("Unable to get the src operator");
215 let dst_node = self.operator(*d).expect("Unable to get the dst operator");
216 writeln!(f, ">>> [{src_node:?}] -> [{dst_node:?}]")?;
217 }
218 writeln!(f)
219 }
220}
221
222#[derive(Debug, Clone, Default, Eq, PartialEq)]
225#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
226pub enum BindingsOp {
227 Scan(Scan),
228 Pivot(Pivot),
229 Unpivot(Unpivot),
230 Filter(Filter),
231 OrderBy(OrderBy),
232 LimitOffset(LimitOffset),
233 Join(Join),
234 BagOp(BagOp),
235 Project(Project),
236 ProjectAll(ProjectAllMode),
237 ProjectValue(ProjectValue),
238 ExprQuery(ExprQuery),
239 Distinct,
240 GroupBy(GroupBy),
241 Having(Having),
242 #[default]
243 Sink,
244}
245
246#[derive(Debug, Clone, Default, Eq, PartialEq)]
247#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
248pub enum ProjectAllMode {
249 #[default]
252 Unwrap,
253 PassThrough,
256}
257
258#[derive(Debug, Clone, Eq, PartialEq)]
260#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
261pub struct Scan {
262 pub expr: ValueExpr,
263 pub as_key: String,
264 pub at_key: Option<String>,
265}
266
267#[derive(Debug, Clone, Eq, PartialEq)]
272#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
273pub struct Pivot {
274 pub key: ValueExpr,
275 pub value: ValueExpr,
276}
277
278#[derive(Debug, Clone, Eq, PartialEq)]
280#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
281pub struct Unpivot {
282 pub expr: ValueExpr,
283 pub as_key: String,
284 pub at_key: Option<String>,
285}
286
287#[derive(Debug, Clone, Eq, PartialEq)]
289#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
290pub struct Filter {
291 pub expr: ValueExpr,
292}
293
294#[derive(Debug, Clone, Eq, PartialEq)]
296#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
297pub struct Having {
298 pub expr: ValueExpr,
299}
300
301#[derive(Debug, Clone, Eq, PartialEq)]
303#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
304pub struct OrderBy {
305 pub specs: Vec<SortSpec>,
306}
307
308#[derive(Clone, Debug, PartialEq, Eq)]
309#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
310pub enum SortSpecOrder {
311 Asc,
312 Desc,
313}
314
315#[derive(Clone, Debug, PartialEq, Eq)]
316#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
317pub enum SortSpecNullOrder {
318 First,
319 Last,
320}
321
322#[derive(Debug, Clone, Eq, PartialEq)]
324#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
325pub struct SortSpec {
326 pub expr: ValueExpr,
327 pub order: SortSpecOrder,
328 pub null_order: SortSpecNullOrder,
329}
330
331#[derive(Debug, Clone, Eq, PartialEq)]
333#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
334pub struct LimitOffset {
335 pub limit: Option<ValueExpr>,
336 pub offset: Option<ValueExpr>,
337}
338
339#[derive(Debug, Clone, Eq, PartialEq)]
341#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
342pub struct BagOp {
343 pub bag_op: BagOperator,
344 pub setq: SetQuantifier,
345}
346
347#[derive(Debug, Clone, Eq, PartialEq)]
349#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
350pub enum BagOperator {
351 Union,
352 Except,
353 Intersect,
354 OuterUnion,
355 OuterExcept,
356 OuterIntersect,
357}
358
359#[derive(Debug, Clone, Eq, PartialEq)]
362#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
363pub struct Join {
364 pub kind: JoinKind,
365 pub left: Box<BindingsOp>,
366 pub right: Box<BindingsOp>,
367 pub on: Option<ValueExpr>,
368}
369
370#[derive(Debug, Clone, Eq, PartialEq)]
372#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
373pub enum JoinKind {
374 Inner,
375 Left,
376 Right,
377 Full,
378 Cross,
379}
380
381#[derive(Debug, Clone, Eq, PartialEq)]
383#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
384pub struct AggregateExpression {
385 pub name: String,
386 pub expr: ValueExpr,
387 pub func: AggFunc,
388 pub setq: SetQuantifier,
389}
390
391#[derive(Debug, Clone, Eq, PartialEq)]
393#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
394pub enum AggFunc {
395 AggAvg,
398 AggCount,
400 AggMax,
402 AggMin,
404 AggSum,
406 AggAny,
408 AggEvery,
410}
411
412#[derive(Debug, Clone, Eq, PartialEq)]
414#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
415pub struct GroupBy {
416 pub strategy: GroupingStrategy,
417 pub exprs: FxHashMap<String, ValueExpr>,
418 pub aggregate_exprs: Vec<AggregateExpression>,
419 pub group_as_alias: Option<String>,
420}
421
422#[derive(Debug, Clone, Eq, PartialEq)]
424#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
425pub enum GroupingStrategy {
426 GroupFull,
427 GroupPartial,
428}
429
430#[derive(Debug, Clone, Eq, PartialEq)]
432#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
433pub struct Project {
434 pub exprs: Vec<(String, ValueExpr)>,
435}
436
437#[derive(Debug, Clone, Eq, PartialEq)]
440#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
441pub struct ProjectValue {
442 pub expr: ValueExpr,
443}
444
445#[derive(Debug, Clone, Eq, PartialEq)]
447#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
448pub struct ExprQuery {
449 pub expr: ValueExpr,
450}
451
452#[derive(Debug, Clone, Eq, PartialEq)]
455#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
456pub enum ValueExpr {
457 UnExpr(UnaryOp, Box<ValueExpr>),
458 BinaryExpr(BinaryOp, Box<ValueExpr>, Box<ValueExpr>),
459 Lit(Box<Lit>),
460 DynamicLookup(Box<Vec<ValueExpr>>),
461 Path(Box<ValueExpr>, Vec<PathComponent>),
462 VarRef(BindingsName<'static>, VarRefType),
463 TupleExpr(TupleExpr),
464 ListExpr(ListExpr),
465 BagExpr(BagExpr),
466 BetweenExpr(BetweenExpr),
467 PatternMatchExpr(PatternMatchExpr),
468 SubQueryExpr(SubQueryExpr),
469 SimpleCase(SimpleCase),
470 SearchedCase(SearchedCase),
471 IsTypeExpr(IsTypeExpr),
472 NullIfExpr(NullIfExpr),
473 CoalesceExpr(CoalesceExpr),
474 Call(CallExpr),
475 GraphMatch(Box<GraphMatchExpr>),
476}
477
478#[derive(Debug, Clone, Eq, PartialEq)]
480#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
481pub enum Lit {
482 Null,
483 Missing,
484 Int8(i8),
485 Int16(i16),
486 Int32(i32),
487 Int64(i64),
488 Decimal(RustDecimal),
489 Double(OrderedFloat<f64>),
490 Bool(bool),
491 String(String),
492 Variant(Vec<u8>, String), Struct(Vec<(String, Lit)>),
494 Bag(Vec<Lit>),
495 List(Vec<Lit>),
496}
497#[derive(Debug, Clone, Eq, PartialEq)]
500#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
501pub enum UnaryOp {
502 Pos,
503 Neg,
504 Not,
505}
506
507#[derive(Debug, Clone, Eq, PartialEq)]
510#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
511pub enum BinaryOp {
512 And,
513 Or,
514 Concat,
515 Eq,
516 Neq,
517 Gt,
518 Gteq,
519 Lt,
520 Lteq,
521
522 Add,
524 Sub,
525 Mul,
526 Div,
527 Mod,
528 Exp,
529
530 In,
531}
532
533#[derive(Debug, Clone, Eq, PartialEq)]
534#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
535pub enum PathComponent {
537 Key(BindingsName<'static>),
539 Index(i64),
541 KeyExpr(Box<ValueExpr>),
542 IndexExpr(Box<ValueExpr>),
543}
544
545#[derive(Clone, Debug, Default, Eq, PartialEq)]
547#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
548pub struct TupleExpr {
549 pub attrs: Vec<ValueExpr>,
550 pub values: Vec<ValueExpr>,
551}
552
553impl TupleExpr {
554 #[must_use]
556 pub fn new() -> Self {
557 Self::default()
558 }
559}
560
561#[derive(Clone, Debug, Default, Eq, PartialEq)]
563#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
564pub struct ListExpr {
565 pub elements: Vec<ValueExpr>,
566}
567
568impl ListExpr {
569 #[must_use]
571 pub fn new() -> Self {
572 Self::default()
573 }
574}
575
576#[derive(Clone, Debug, Default, Eq, PartialEq)]
578#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
579pub struct BagExpr {
580 pub elements: Vec<ValueExpr>,
581}
582
583impl BagExpr {
584 #[must_use]
586 pub fn new() -> Self {
587 Self::default()
588 }
589}
590
591#[derive(Debug, Clone, Eq, PartialEq)]
593#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
594pub struct BetweenExpr {
595 pub value: Box<ValueExpr>,
596 pub from: Box<ValueExpr>,
597 pub to: Box<ValueExpr>,
598}
599
600#[derive(Debug, Clone, Eq, PartialEq)]
602#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
603pub struct PatternMatchExpr {
604 pub value: Box<ValueExpr>,
605 pub pattern: Pattern,
606}
607
608#[derive(Debug, Clone, Eq, PartialEq)]
609#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
610pub enum Pattern {
611 Like(LikeMatch), LikeNonStringNonLiteral(LikeNonStringNonLiteralMatch),
613}
614
615#[derive(Debug, Clone, Eq, PartialEq)]
618#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
619pub struct LikeMatch {
620 pub pattern: String,
621 pub escape: String,
622}
623
624#[derive(Debug, Clone, Eq, PartialEq)]
627#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
628pub struct LikeNonStringNonLiteralMatch {
629 pub pattern: Box<ValueExpr>,
630 pub escape: Box<ValueExpr>,
631}
632
633#[derive(Debug, Clone, Eq, PartialEq)]
635#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
636pub struct GraphMatchExpr {
637 pub value: Box<ValueExpr>,
638 pub pattern: graph::PathPatternMatch,
639}
640#[derive(Debug, Clone, Eq, PartialEq)]
641#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
642pub struct GraphPathPattern {}
643
644#[derive(Debug, Clone, Eq, PartialEq)]
647#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
648pub struct SubQueryExpr {
649 pub plan: LogicalPlan<BindingsOp>,
650}
651
652#[derive(Debug, Clone, Eq, PartialEq)]
655#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
656pub struct SimpleCase {
657 pub expr: Box<ValueExpr>,
658 pub cases: Vec<(Box<ValueExpr>, Box<ValueExpr>)>,
659 pub default: Option<Box<ValueExpr>>,
660}
661
662#[derive(Debug, Clone, Eq, PartialEq)]
665#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
666pub struct SearchedCase {
667 pub cases: Vec<(Box<ValueExpr>, Box<ValueExpr>)>,
668 pub default: Option<Box<ValueExpr>>,
669}
670
671#[derive(Debug, Clone, Eq, PartialEq)]
673#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
674pub struct IsTypeExpr {
675 pub not: bool,
676 pub expr: Box<ValueExpr>,
677 pub is_type: Type,
678}
679
680#[derive(Clone, Debug, PartialEq, Eq)]
682#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
683pub enum Type {
684 NullType,
685 BooleanType,
686 Integer2Type,
687 Integer4Type,
688 Integer8Type,
689 DecimalType,
690 NumericType,
691 RealType,
692 DoublePrecisionType,
693 TimestampType,
694 CharacterType,
695 CharacterVaryingType,
696 MissingType,
697 StringType,
698 SymbolType,
699 BlobType,
700 ClobType,
701 DateType,
702 TimeType,
703 ZonedTimestampType,
704 StructType,
705 TupleType,
706 ListType,
707 SexpType,
708 BagType,
709 AnyType,
710 }
712
713#[derive(Debug, Clone, Eq, PartialEq)]
715#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
716pub struct NullIfExpr {
717 pub lhs: Box<ValueExpr>,
718 pub rhs: Box<ValueExpr>,
719}
720
721#[derive(Debug, Clone, Eq, PartialEq)]
724#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
725pub struct CoalesceExpr {
726 pub elements: Vec<ValueExpr>,
727}
728
729#[derive(Debug, Clone, Eq, PartialEq)]
731#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
732pub struct CallExpr {
733 pub name: CallName,
734 pub arguments: Vec<ValueExpr>,
735}
736
737#[derive(Debug, Clone, Eq, PartialEq)]
739#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
740pub enum CallName {
741 Lower,
742 Upper,
743 CharLength,
744 OctetLength,
745 BitLength,
746 LTrim,
747 BTrim,
748 RTrim,
749 Substring,
750 Position,
751 Overlay,
752 Exists,
753 Abs,
754 Mod,
755 Cardinality,
756 ExtractYear,
757 ExtractMonth,
758 ExtractDay,
759 ExtractHour,
760 ExtractMinute,
761 ExtractSecond,
762 ExtractTimezoneHour,
763 ExtractTimezoneMinute,
764 CollAvg(SetQuantifier),
765 CollCount(SetQuantifier),
766 CollMax(SetQuantifier),
767 CollMin(SetQuantifier),
768 CollSum(SetQuantifier),
769 CollAny(SetQuantifier),
770 CollEvery(SetQuantifier),
771 ByName(String),
772 ById(String, ObjectId, usize),
773}
774
775#[derive(Debug, Clone, Eq, PartialEq)]
777#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
778pub enum SetQuantifier {
779 All,
780 Distinct,
781}
782
783#[derive(Clone, Debug, PartialEq, Eq)]
785#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
786pub enum VarRefType {
787 Global,
789 Local,
791}
792
793#[cfg(test)]
794mod tests {
795 use super::*;
796
797 #[test]
798 fn test_plan() {
799 let mut p: LogicalPlan<BindingsOp> = LogicalPlan::new();
800 let a = p.add_operator(BindingsOp::OrderBy(OrderBy { specs: vec![] }));
801 let b = p.add_operator(BindingsOp::Sink);
802 let c = p.add_operator(BindingsOp::LimitOffset(LimitOffset {
803 limit: None,
804 offset: None,
805 }));
806 let d = p.add_operator(BindingsOp::GroupBy(GroupBy {
807 strategy: GroupingStrategy::GroupFull,
808 exprs: Default::default(),
809 aggregate_exprs: vec![],
810 group_as_alias: None,
811 }));
812 p.add_flow(a, b);
813 p.add_flow(a, c);
814 p.extend_with_flows(&[(c, d), (b, c)]);
815 assert_eq!(4, p.operators().len());
816 assert_eq!(4, p.flows().len());
817 }
818}