use std::sync::Arc;
use egg::{Id, Language};
use futures::stream::{BoxStream, Stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use self::copy_from_file::*;
use self::copy_to_file::*;
use self::create::*;
use self::delete::*;
use self::drop::*;
use self::evaluator::*;
use self::explain::*;
use self::filter::*;
use self::hash_agg::*;
use self::hash_join::*;
use self::insert::*;
use self::limit::*;
use self::nested_loop_join::*;
use self::order::*;
use self::projection::*;
use self::simple_agg::*;
use self::table_scan::*;
use self::top_n::TopNExecutor;
use self::values::*;
use crate::array::DataChunk;
use crate::catalog::RootCatalogRef;
use crate::planner::{Expr, RecExpr, TypeSchemaAnalysis};
use crate::storage::{Storage, TracedStorageError};
use crate::types::{ColumnIndex, ConvertError, DataType};
mod copy_from_file;
mod copy_to_file;
mod create;
mod delete;
mod drop;
mod evaluator;
mod explain;
mod filter;
mod hash_agg;
mod hash_join;
mod insert;
mod limit;
mod nested_loop_join;
mod order;
mod projection;
mod simple_agg;
mod table_scan;
mod top_n;
mod values;
#[derive(thiserror::Error, Debug)]
pub enum ExecutorError {
#[error("storage error: {0}")]
Storage(
#[from]
#[backtrace]
#[source]
TracedStorageError,
),
#[error("conversion error: {0}")]
Convert(#[from] ConvertError),
#[error("tuple length mismatch: expected {expected} but got {actual}")]
LengthMismatch { expected: usize, actual: usize },
#[error("io error")]
Io(
#[from]
#[source]
std::io::Error,
),
#[error("csv error")]
Csv(
#[from]
#[source]
csv::Error,
),
#[error("value can not be null")]
NotNullable,
#[error("exceed char/varchar length limit: item length {length} > char/varchar width {width}")]
ExceedLengthLimit { length: u64, width: u64 },
#[error("abort")]
Abort,
}
const PROCESSING_WINDOW_SIZE: usize = 1024;
pub type BoxedExecutor = BoxStream<'static, Result<DataChunk, ExecutorError>>;
pub fn build(catalog: RootCatalogRef, storage: Arc<impl Storage>, plan: &RecExpr) -> BoxedExecutor {
Builder::new(catalog, storage, plan).build()
}
struct Builder<S: Storage> {
storage: Arc<S>,
catalog: RootCatalogRef,
egraph: egg::EGraph<Expr, TypeSchemaAnalysis>,
root: Id,
}
impl<S: Storage> Builder<S> {
fn new(catalog: RootCatalogRef, storage: Arc<S>, plan: &RecExpr) -> Self {
let mut egraph = egg::EGraph::new(TypeSchemaAnalysis {
catalog: catalog.clone(),
});
let root = egraph.add_expr(plan);
Builder {
storage,
catalog,
egraph,
root,
}
}
fn node(&self, id: Id) -> &Expr {
&self.egraph[id].nodes[0]
}
fn recexpr(&self, id: Id) -> RecExpr {
self.node(id).build_recexpr(|id| self.node(id).clone())
}
fn plan_types(&self, id: Id) -> &[DataType] {
let ty = self.egraph[id].data.type_.as_ref().unwrap();
ty.kind.as_struct()
}
fn resolve_column_index(&self, expr: Id, plan: Id) -> RecExpr {
let schema = self.egraph[plan].data.schema.as_ref().expect("no schema");
self.node(expr).build_recexpr(|id| {
if let Some(idx) = schema.iter().position(|x| *x == id) {
return Expr::ColumnIndex(ColumnIndex(idx as _));
}
match self.node(id) {
Expr::Column(c) => panic!("column {c} not found from input"),
e => e.clone(),
}
})
}
fn build(self) -> BoxedExecutor {
self.build_id(self.root)
}
fn build_id(&self, id: Id) -> BoxedExecutor {
use Expr::*;
let stream = match self.node(id).clone() {
Scan([table, list]) => TableScanExecutor {
table_id: self.node(table).as_table(),
columns: (self.node(list).as_list().iter())
.map(|id| self.node(*id).as_column())
.collect(),
storage: self.storage.clone(),
}
.execute(),
Values(rows) => ValuesExecutor {
column_types: self.plan_types(id).to_vec(),
values: {
rows.iter()
.map(|row| {
(self.node(*row).as_list().iter())
.map(|id| self.recexpr(*id))
.collect()
})
.collect()
},
}
.execute(),
Proj([projs, child]) => ProjectionExecutor {
projs: self.resolve_column_index(projs, child),
}
.execute(self.build_id(child)),
Filter([cond, child]) => FilterExecutor {
condition: self.resolve_column_index(cond, child),
}
.execute(self.build_id(child)),
Order([order_keys, child]) => OrderExecutor {
order_keys: self.resolve_column_index(order_keys, child),
types: self.plan_types(id).to_vec(),
}
.execute(self.build_id(child)),
Limit([limit, offset, child]) => LimitExecutor {
limit: (self.node(limit).as_const().as_usize().unwrap()).unwrap_or(usize::MAX / 2),
offset: self.node(offset).as_const().as_usize().unwrap().unwrap(),
}
.execute(self.build_id(child)),
TopN([limit, offset, order_keys, child]) => TopNExecutor {
limit: (self.node(limit).as_const().as_usize().unwrap()).unwrap_or(usize::MAX / 2),
offset: self.node(offset).as_const().as_usize().unwrap().unwrap(),
order_keys: self.resolve_column_index(order_keys, child),
types: self.plan_types(id).to_vec(),
}
.execute(self.build_id(child)),
Join([op, on, left, right]) => NestedLoopJoinExecutor {
op: self.node(op).clone(),
condition: self.resolve_column_index(on, id),
left_types: self.plan_types(left).to_vec(),
right_types: self.plan_types(right).to_vec(),
}
.execute(self.build_id(left), self.build_id(right)),
HashJoin([op, lkeys, rkeys, left, right]) => HashJoinExecutor {
op: self.node(op).clone(),
left_keys: self.resolve_column_index(lkeys, left),
right_keys: self.resolve_column_index(rkeys, right),
left_types: self.plan_types(left).to_vec(),
right_types: self.plan_types(right).to_vec(),
}
.execute(self.build_id(left), self.build_id(right)),
Agg([aggs, group_keys, child]) => {
let aggs = self.resolve_column_index(aggs, child);
let group_keys = self.resolve_column_index(group_keys, child);
if group_keys.as_ref().last().unwrap().as_list().is_empty() {
SimpleAggExecutor { aggs }.execute(self.build_id(child))
} else {
HashAggExecutor {
aggs,
group_keys,
types: self.plan_types(id).to_vec(),
}
.execute(self.build_id(child))
}
}
CreateTable(plan) => CreateTableExecutor {
plan,
storage: self.storage.clone(),
}
.execute(),
Drop(plan) => DropExecutor {
plan,
storage: self.storage.clone(),
}
.execute(),
Insert([table, cols, child]) => InsertExecutor {
table_id: self.node(table).as_table(),
column_ids: (self.node(cols).as_list().iter())
.map(|id| self.node(*id).as_column().column_id)
.collect(),
storage: self.storage.clone(),
}
.execute(self.build_id(child)),
Delete([table, child]) => DeleteExecutor {
table_id: self.node(table).as_table(),
storage: self.storage.clone(),
}
.execute(self.build_id(child)),
CopyFrom([src, types]) => CopyFromFileExecutor {
source: self.node(src).as_ext_source(),
types: self.node(types).as_type().as_struct().to_vec(),
}
.execute(),
CopyTo([src, child]) => CopyToFileExecutor {
source: self.node(src).as_ext_source(),
}
.execute(self.build_id(child)),
Explain(plan) => ExplainExecutor {
plan: self.recexpr(plan),
catalog: self.catalog.clone(),
}
.execute(),
Empty(_) => futures::stream::empty().boxed(),
node => panic!("not a plan: {node:?}"),
};
spawn(&self.node(id).to_string(), stream)
}
}
fn spawn(name: &str, mut stream: BoxedExecutor) -> BoxedExecutor {
let (tx, rx) = tokio::sync::mpsc::channel(16);
let handle = tokio::task::Builder::default()
.name(name)
.spawn(async move {
while let Some(item) = stream.next().await {
if tx.send(item).await.is_err() {
return;
}
}
})
.expect("failed to spawn task");
use std::pin::Pin;
use std::task::{Context, Poll};
struct SpawnedStream {
rx: tokio::sync::mpsc::Receiver<Result<DataChunk, ExecutorError>>,
handle: tokio::task::JoinHandle<()>,
}
impl Stream for SpawnedStream {
type Item = Result<DataChunk, ExecutorError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.rx.poll_recv(cx)
}
}
impl Drop for SpawnedStream {
fn drop(&mut self) {
self.handle.abort();
}
}
Box::pin(SpawnedStream { rx, handle })
}