use crate::executor::operators::{AggregateFunction, JoinType, Operator, ScanMode};
use crate::executor::stats::Statistics;
use crate::executor::{ExecutionError, Result};
use ordered_float::OrderedFloat;
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::fmt;
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone)]
pub struct LogicalPlan {
pub root: PlanNode,
pub schema: QuerySchema,
}
impl LogicalPlan {
pub fn new(root: PlanNode, schema: QuerySchema) -> Self {
Self { root, schema }
}
pub fn cache_key(&self) -> String {
let mut hasher = DefaultHasher::new();
format!("{:?}", self).hash(&mut hasher);
format!("plan_{:x}", hasher.finish())
}
pub fn is_parallelizable(&self) -> bool {
self.root.is_parallelizable()
}
pub fn estimate_cardinality(&self) -> usize {
self.root.estimate_cardinality()
}
}
pub struct PhysicalPlan {
pub operators: Vec<Box<dyn Operator>>,
pub pipeline_breakers: Vec<usize>,
pub parallelism: usize,
}
impl fmt::Debug for PhysicalPlan {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PhysicalPlan")
.field("operator_count", &self.operators.len())
.field("pipeline_breakers", &self.pipeline_breakers)
.field("parallelism", &self.parallelism)
.finish()
}
}
impl PhysicalPlan {
pub fn from_logical(logical: &LogicalPlan, stats: &Statistics) -> Result<Self> {
let mut operators = Vec::new();
let mut pipeline_breakers = Vec::new();
Self::compile_node(&logical.root, stats, &mut operators, &mut pipeline_breakers)?;
let parallelism = if logical.is_parallelizable() {
num_cpus::get()
} else {
1
};
Ok(Self {
operators,
pipeline_breakers,
parallelism,
})
}
fn compile_node(
node: &PlanNode,
stats: &Statistics,
operators: &mut Vec<Box<dyn Operator>>,
pipeline_breakers: &mut Vec<usize>,
) -> Result<()> {
match node {
PlanNode::NodeScan { mode, filter } => {
operators.push(Box::new(crate::executor::operators::NodeScan::new(
mode.clone(),
filter.clone(),
)));
}
PlanNode::EdgeScan { mode, filter } => {
operators.push(Box::new(crate::executor::operators::EdgeScan::new(
mode.clone(),
filter.clone(),
)));
}
PlanNode::Filter { input, predicate } => {
Self::compile_node(input, stats, operators, pipeline_breakers)?;
operators.push(Box::new(crate::executor::operators::Filter::new(
predicate.clone(),
)));
}
PlanNode::Join {
left,
right,
join_type,
on,
} => {
Self::compile_node(left, stats, operators, pipeline_breakers)?;
pipeline_breakers.push(operators.len());
Self::compile_node(right, stats, operators, pipeline_breakers)?;
operators.push(Box::new(crate::executor::operators::Join::new(
*join_type,
on.clone(),
)));
}
PlanNode::Aggregate {
input,
group_by,
aggregates,
} => {
Self::compile_node(input, stats, operators, pipeline_breakers)?;
pipeline_breakers.push(operators.len());
operators.push(Box::new(crate::executor::operators::Aggregate::new(
group_by.clone(),
aggregates.clone(),
)));
}
PlanNode::Sort { input, order_by } => {
Self::compile_node(input, stats, operators, pipeline_breakers)?;
pipeline_breakers.push(operators.len());
operators.push(Box::new(crate::executor::operators::Sort::new(
order_by.clone(),
)));
}
PlanNode::Limit {
input,
limit,
offset,
} => {
Self::compile_node(input, stats, operators, pipeline_breakers)?;
operators.push(Box::new(crate::executor::operators::Limit::new(
*limit, *offset,
)));
}
PlanNode::Project { input, columns } => {
Self::compile_node(input, stats, operators, pipeline_breakers)?;
operators.push(Box::new(crate::executor::operators::Project::new(
columns.clone(),
)));
}
PlanNode::HyperedgeScan { mode, filter } => {
operators.push(Box::new(crate::executor::operators::HyperedgeScan::new(
mode.clone(),
filter.clone(),
)));
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
pub enum PlanNode {
NodeScan {
mode: ScanMode,
filter: Option<Predicate>,
},
EdgeScan {
mode: ScanMode,
filter: Option<Predicate>,
},
HyperedgeScan {
mode: ScanMode,
filter: Option<Predicate>,
},
Filter {
input: Box<PlanNode>,
predicate: Predicate,
},
Join {
left: Box<PlanNode>,
right: Box<PlanNode>,
join_type: JoinType,
on: Vec<(String, String)>,
},
Aggregate {
input: Box<PlanNode>,
group_by: Vec<String>,
aggregates: Vec<(AggregateFunction, String)>,
},
Sort {
input: Box<PlanNode>,
order_by: Vec<(String, SortOrder)>,
},
Limit {
input: Box<PlanNode>,
limit: usize,
offset: usize,
},
Project {
input: Box<PlanNode>,
columns: Vec<String>,
},
}
impl PlanNode {
pub fn is_parallelizable(&self) -> bool {
match self {
PlanNode::NodeScan { .. } => true,
PlanNode::EdgeScan { .. } => true,
PlanNode::HyperedgeScan { .. } => true,
PlanNode::Filter { input, .. } => input.is_parallelizable(),
PlanNode::Join { .. } => true,
PlanNode::Aggregate { .. } => true,
PlanNode::Sort { .. } => true,
PlanNode::Limit { .. } => false,
PlanNode::Project { input, .. } => input.is_parallelizable(),
}
}
pub fn estimate_cardinality(&self) -> usize {
match self {
PlanNode::NodeScan { .. } => 1000, PlanNode::EdgeScan { .. } => 5000,
PlanNode::HyperedgeScan { .. } => 500,
PlanNode::Filter { input, .. } => input.estimate_cardinality() / 10,
PlanNode::Join { left, right, .. } => {
left.estimate_cardinality() * right.estimate_cardinality() / 100
}
PlanNode::Aggregate { input, .. } => input.estimate_cardinality() / 20,
PlanNode::Sort { input, .. } => input.estimate_cardinality(),
PlanNode::Limit { limit, .. } => *limit,
PlanNode::Project { input, .. } => input.estimate_cardinality(),
}
}
}
#[derive(Debug, Clone)]
pub struct QuerySchema {
pub columns: Vec<ColumnDef>,
}
impl QuerySchema {
pub fn new(columns: Vec<ColumnDef>) -> Self {
Self { columns }
}
}
#[derive(Debug, Clone)]
pub struct ColumnDef {
pub name: String,
pub data_type: DataType,
pub nullable: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub enum DataType {
Int64,
Float64,
String,
Boolean,
Bytes,
List(Box<DataType>),
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum SortOrder {
Ascending,
Descending,
}
#[derive(Debug, Clone)]
pub enum Predicate {
Equals(String, Value),
NotEquals(String, Value),
GreaterThan(String, Value),
GreaterThanOrEqual(String, Value),
LessThan(String, Value),
LessThanOrEqual(String, Value),
In(String, Vec<Value>),
Like(String, String),
And(Vec<Predicate>),
Or(Vec<Predicate>),
Not(Box<Predicate>),
}
#[derive(Debug, Clone, PartialEq)]
pub enum Value {
Int64(i64),
Float64(f64),
String(String),
Boolean(bool),
Bytes(Vec<u8>),
Null,
}
impl Eq for Value {}
impl Hash for Value {
fn hash<H: Hasher>(&self, state: &mut H) {
std::mem::discriminant(self).hash(state);
match self {
Value::Int64(v) => v.hash(state),
Value::Float64(v) => OrderedFloat(*v).hash(state),
Value::String(v) => v.hash(state),
Value::Boolean(v) => v.hash(state),
Value::Bytes(v) => v.hash(state),
Value::Null => {}
}
}
}
impl Value {
pub fn compare(&self, other: &Value) -> Option<std::cmp::Ordering> {
match (self, other) {
(Value::Int64(a), Value::Int64(b)) => Some(a.cmp(b)),
(Value::Float64(a), Value::Float64(b)) => a.partial_cmp(b),
(Value::String(a), Value::String(b)) => Some(a.cmp(b)),
(Value::Boolean(a), Value::Boolean(b)) => Some(a.cmp(b)),
_ => None,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_logical_plan_creation() {
let schema = QuerySchema::new(vec![ColumnDef {
name: "id".to_string(),
data_type: DataType::Int64,
nullable: false,
}]);
let plan = LogicalPlan::new(
PlanNode::NodeScan {
mode: ScanMode::Sequential,
filter: None,
},
schema,
);
assert!(plan.is_parallelizable());
}
#[test]
fn test_value_comparison() {
let v1 = Value::Int64(42);
let v2 = Value::Int64(100);
assert_eq!(v1.compare(&v2), Some(std::cmp::Ordering::Less));
}
}