#![deny(rust_2018_idioms)]
#![deny(clippy::all)]
mod util;
pub mod graph;
use ordered_float::OrderedFloat;
use partiql_common::catalog::ObjectId;
use partiql_value::BindingsName;
use rust_decimal::Decimal as RustDecimal;
use rustc_hash::FxHashMap;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::fmt::{Debug, Display, Formatter};
#[derive(Debug, Clone, Eq, PartialEq, Default)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LogicalPlan<T>
where
T: Default + Debug,
{
nodes: Vec<T>,
edges: Vec<(OpId, OpId, u8)>,
}
impl<T> LogicalPlan<T>
where
T: Default + Debug,
{
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn add_operator(&mut self, op: T) -> OpId {
self.nodes.push(op);
OpId(self.operator_count())
}
#[inline]
pub fn add_flow(&mut self, src: OpId, dst: OpId) {
assert!(src.index() <= self.operator_count());
assert!(dst.index() <= self.operator_count());
self.edges.push((src, dst, 0));
}
#[inline]
pub fn add_flow_with_branch_num(&mut self, src: OpId, dst: OpId, branch_num: u8) {
assert!(src.index() <= self.operator_count());
assert!(dst.index() <= self.operator_count());
self.edges.push((src, dst, branch_num));
}
#[inline]
pub fn extend_with_flows(&mut self, flows: &[(OpId, OpId)]) {
flows.iter().for_each(|&(s, d)| self.add_flow(s, d));
}
#[inline]
pub fn merge_plan(&mut self, other: Self) {
let LogicalPlan { nodes, edges } = other;
let mut mapping = FxHashMap::default();
for (old_id, op) in nodes.into_iter().enumerate().map(|(i, n)| (OpId(i + 1), n)) {
let new_id = self.add_operator(op);
mapping.insert(old_id, new_id);
}
for (old1, old2, branch) in edges {
let new1 = mapping[&old1];
let new2 = mapping[&old2];
self.edges.push((new1, new2, branch));
}
}
#[inline]
#[must_use]
pub fn operator_count(&self) -> usize {
self.nodes.len()
}
#[must_use]
pub fn operators(&self) -> &Vec<T> {
&self.nodes
}
pub fn operators_by_id(&self) -> impl Iterator<Item = (OpId, &T)> {
self.nodes.iter().enumerate().map(|(i, n)| (OpId(i + 1), n))
}
#[must_use]
pub fn flows(&self) -> &Vec<(OpId, OpId, u8)> {
&self.edges
}
#[must_use]
pub fn operator(&self, id: OpId) -> Option<&T> {
self.nodes.get(id.0 - 1)
}
pub fn operator_as_mut(&mut self, id: OpId) -> Option<&mut T> {
self.nodes.get_mut(id.0 - 1)
}
}
#[derive(Debug, Clone, Eq, PartialEq, Copy, Hash)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct OpId(usize);
impl OpId {
#[must_use]
pub fn index(&self) -> usize {
self.0
}
}
impl<T> Display for LogicalPlan<T>
where
T: Default + Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let flows = self.flows();
writeln!(f, "LogicalPlan")?;
writeln!(f, "---")?;
for (s, d, _w) in flows {
let src_node = self.operator(*s).expect("Unable to get the src operator");
let dst_node = self.operator(*d).expect("Unable to get the dst operator");
writeln!(f, ">>> [{src_node:?}] -> [{dst_node:?}]")?;
}
writeln!(f)
}
}
#[derive(Debug, Clone, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum BindingsOp {
Scan(Scan),
Pivot(Pivot),
Unpivot(Unpivot),
Filter(Filter),
OrderBy(OrderBy),
LimitOffset(LimitOffset),
Join(Join),
BagOp(BagOp),
Project(Project),
ProjectAll(ProjectAllMode),
ProjectValue(ProjectValue),
ExprQuery(ExprQuery),
Distinct,
GroupBy(GroupBy),
Having(Having),
#[default]
Sink,
}
#[derive(Debug, Clone, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ProjectAllMode {
#[default]
Unwrap,
PassThrough,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Scan {
pub expr: ValueExpr,
pub as_key: String,
pub at_key: Option<String>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Pivot {
pub key: ValueExpr,
pub value: ValueExpr,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Unpivot {
pub expr: ValueExpr,
pub as_key: String,
pub at_key: Option<String>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Filter {
pub expr: ValueExpr,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Having {
pub expr: ValueExpr,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct OrderBy {
pub specs: Vec<SortSpec>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SortSpecOrder {
Asc,
Desc,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SortSpecNullOrder {
First,
Last,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SortSpec {
pub expr: ValueExpr,
pub order: SortSpecOrder,
pub null_order: SortSpecNullOrder,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LimitOffset {
pub limit: Option<ValueExpr>,
pub offset: Option<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BagOp {
pub bag_op: BagOperator,
pub setq: SetQuantifier,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum BagOperator {
Union,
Except,
Intersect,
OuterUnion,
OuterExcept,
OuterIntersect,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Join {
pub kind: JoinKind,
pub left: Box<BindingsOp>,
pub right: Box<BindingsOp>,
pub on: Option<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum JoinKind {
Inner,
Left,
Right,
Full,
Cross,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct AggregateExpression {
pub name: String,
pub expr: ValueExpr,
pub func: AggFunc,
pub setq: SetQuantifier,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum AggFunc {
AggAvg,
AggCount,
AggMax,
AggMin,
AggSum,
AggAny,
AggEvery,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GroupBy {
pub strategy: GroupingStrategy,
pub exprs: FxHashMap<String, ValueExpr>,
pub aggregate_exprs: Vec<AggregateExpression>,
pub group_as_alias: Option<String>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum GroupingStrategy {
GroupFull,
GroupPartial,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct Project {
pub exprs: Vec<(String, ValueExpr)>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ProjectValue {
pub expr: ValueExpr,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ExprQuery {
pub expr: ValueExpr,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ValueExpr {
UnExpr(UnaryOp, Box<ValueExpr>),
BinaryExpr(BinaryOp, Box<ValueExpr>, Box<ValueExpr>),
Lit(Box<Lit>),
DynamicLookup(Box<Vec<ValueExpr>>),
Path(Box<ValueExpr>, Vec<PathComponent>),
VarRef(BindingsName<'static>, VarRefType),
TupleExpr(TupleExpr),
ListExpr(ListExpr),
BagExpr(BagExpr),
BetweenExpr(BetweenExpr),
PatternMatchExpr(PatternMatchExpr),
SubQueryExpr(SubQueryExpr),
SimpleCase(SimpleCase),
SearchedCase(SearchedCase),
IsTypeExpr(IsTypeExpr),
NullIfExpr(NullIfExpr),
CoalesceExpr(CoalesceExpr),
Call(CallExpr),
GraphMatch(Box<GraphMatchExpr>),
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Lit {
Null,
Missing,
Int8(i8),
Int16(i16),
Int32(i32),
Int64(i64),
Decimal(RustDecimal),
Double(OrderedFloat<f64>),
Bool(bool),
String(String),
Variant(Vec<u8>, String), Struct(Vec<(String, Lit)>),
Bag(Vec<Lit>),
List(Vec<Lit>),
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum UnaryOp {
Pos,
Neg,
Not,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum BinaryOp {
And,
Or,
Concat,
Eq,
Neq,
Gt,
Gteq,
Lt,
Lteq,
Add,
Sub,
Mul,
Div,
Mod,
Exp,
In,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum PathComponent {
Key(BindingsName<'static>),
Index(i64),
KeyExpr(Box<ValueExpr>),
IndexExpr(Box<ValueExpr>),
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct TupleExpr {
pub attrs: Vec<ValueExpr>,
pub values: Vec<ValueExpr>,
}
impl TupleExpr {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct ListExpr {
pub elements: Vec<ValueExpr>,
}
impl ListExpr {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[derive(Clone, Debug, Default, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BagExpr {
pub elements: Vec<ValueExpr>,
}
impl BagExpr {
#[must_use]
pub fn new() -> Self {
Self::default()
}
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct BetweenExpr {
pub value: Box<ValueExpr>,
pub from: Box<ValueExpr>,
pub to: Box<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct PatternMatchExpr {
pub value: Box<ValueExpr>,
pub pattern: Pattern,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Pattern {
Like(LikeMatch), LikeNonStringNonLiteral(LikeNonStringNonLiteralMatch),
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LikeMatch {
pub pattern: String,
pub escape: String,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct LikeNonStringNonLiteralMatch {
pub pattern: Box<ValueExpr>,
pub escape: Box<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GraphMatchExpr {
pub value: Box<ValueExpr>,
pub pattern: graph::PathPatternMatch,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct GraphPathPattern {}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SubQueryExpr {
pub plan: LogicalPlan<BindingsOp>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SimpleCase {
pub expr: Box<ValueExpr>,
pub cases: Vec<(Box<ValueExpr>, Box<ValueExpr>)>,
pub default: Option<Box<ValueExpr>>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct SearchedCase {
pub cases: Vec<(Box<ValueExpr>, Box<ValueExpr>)>,
pub default: Option<Box<ValueExpr>>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct IsTypeExpr {
pub not: bool,
pub expr: Box<ValueExpr>,
pub is_type: Type,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum Type {
NullType,
BooleanType,
Integer2Type,
Integer4Type,
Integer8Type,
DecimalType,
NumericType,
RealType,
DoublePrecisionType,
TimestampType,
CharacterType,
CharacterVaryingType,
MissingType,
StringType,
SymbolType,
BlobType,
ClobType,
DateType,
TimeType,
ZonedTimestampType,
StructType,
TupleType,
ListType,
SexpType,
BagType,
AnyType,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct NullIfExpr {
pub lhs: Box<ValueExpr>,
pub rhs: Box<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CoalesceExpr {
pub elements: Vec<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub struct CallExpr {
pub name: CallName,
pub arguments: Vec<ValueExpr>,
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum CallName {
Lower,
Upper,
CharLength,
OctetLength,
BitLength,
LTrim,
BTrim,
RTrim,
Substring,
Position,
Overlay,
Exists,
Abs,
Mod,
Cardinality,
ExtractYear,
ExtractMonth,
ExtractDay,
ExtractHour,
ExtractMinute,
ExtractSecond,
ExtractTimezoneHour,
ExtractTimezoneMinute,
CollAvg(SetQuantifier),
CollCount(SetQuantifier),
CollMax(SetQuantifier),
CollMin(SetQuantifier),
CollSum(SetQuantifier),
CollAny(SetQuantifier),
CollEvery(SetQuantifier),
ByName(String),
ById(String, ObjectId, usize),
}
#[derive(Debug, Clone, Eq, PartialEq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum SetQuantifier {
All,
Distinct,
}
#[derive(Clone, Debug, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum VarRefType {
Global,
Local,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_plan() {
let mut p: LogicalPlan<BindingsOp> = LogicalPlan::new();
let a = p.add_operator(BindingsOp::OrderBy(OrderBy { specs: vec![] }));
let b = p.add_operator(BindingsOp::Sink);
let c = p.add_operator(BindingsOp::LimitOffset(LimitOffset {
limit: None,
offset: None,
}));
let d = p.add_operator(BindingsOp::GroupBy(GroupBy {
strategy: GroupingStrategy::GroupFull,
exprs: Default::default(),
aggregate_exprs: vec![],
group_as_alias: None,
}));
p.add_flow(a, b);
p.add_flow(a, c);
p.extend_with_flows(&[(c, d), (b, c)]);
assert_eq!(4, p.operators().len());
assert_eq!(4, p.flows().len());
}
}