use std::fmt::{Debug, Display};
use std::sync::{Arc, Mutex};
use std::{any::Any, pin::Pin};
use crate::execution::context::ExecutionContextState;
use crate::logical_plan::LogicalPlan;
use crate::{error::Result, scalar::ScalarValue};
use arrow::datatypes::{DataType, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::record_batch::RecordBatch;
use arrow::{array::ArrayRef, datatypes::Field};
use async_trait::async_trait;
use futures::stream::Stream;
use self::merge::MergeExec;
use hashbrown::HashMap;
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> {
fn schema(&self) -> SchemaRef;
}
pub type SendableRecordBatchStream = Pin<Box<dyn RecordBatchStream + Send + Sync>>;
#[derive(Debug, Clone)]
pub enum MetricType {
Counter,
TimeNanos,
}
#[derive(Debug, Clone)]
pub struct SQLMetric {
name: String,
value: usize,
metric_type: MetricType,
}
impl SQLMetric {
pub fn counter(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::Counter)))
}
pub fn time_nanos(name: &str) -> Arc<Mutex<SQLMetric>> {
Arc::new(Mutex::new(SQLMetric::new(name, MetricType::TimeNanos)))
}
pub fn new(name: &str, metric_type: MetricType) -> Self {
Self {
name: name.to_owned(),
value: 0,
metric_type,
}
}
pub fn add(&mut self, n: usize) {
self.value += n;
}
pub fn value(&self) -> usize {
self.value
}
}
pub trait PhysicalPlanner {
fn create_physical_plan(
&self,
logical_plan: &LogicalPlan,
ctx_state: &ExecutionContextState,
) -> Result<Arc<dyn ExecutionPlan>>;
}
#[async_trait]
pub trait ExecutionPlan: Debug + Send + Sync {
fn as_any(&self) -> &dyn Any;
fn schema(&self) -> SchemaRef;
fn output_partitioning(&self) -> Partitioning;
fn required_child_distribution(&self) -> Distribution {
Distribution::UnspecifiedDistribution
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>>;
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchStream>;
fn metrics(&self) -> HashMap<String, SQLMetric> {
HashMap::new()
}
}
pub async fn collect(plan: Arc<dyn ExecutionPlan>) -> Result<Vec<RecordBatch>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
common::collect(it).await
}
_ => {
let plan = MergeExec::new(plan.clone());
assert_eq!(1, plan.output_partitioning().partition_count());
common::collect(plan.execute(0).await?).await
}
}
}
pub async fn collect_partitioned(
plan: Arc<dyn ExecutionPlan>,
) -> Result<Vec<Vec<RecordBatch>>> {
match plan.output_partitioning().partition_count() {
0 => Ok(vec![]),
1 => {
let it = plan.execute(0).await?;
Ok(vec![common::collect(it).await?])
}
_ => {
let mut partitions = vec![];
for i in 0..plan.output_partitioning().partition_count() {
partitions.push(common::collect(plan.execute(i).await?).await?)
}
Ok(partitions)
}
}
}
#[derive(Debug, Clone)]
pub enum Partitioning {
RoundRobinBatch(usize),
Hash(Vec<Arc<dyn PhysicalExpr>>, usize),
UnknownPartitioning(usize),
}
impl Partitioning {
pub fn partition_count(&self) -> usize {
use Partitioning::*;
match self {
RoundRobinBatch(n) => *n,
Hash(_, n) => *n,
UnknownPartitioning(n) => *n,
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum Distribution {
UnspecifiedDistribution,
SinglePartition,
}
#[derive(Clone)]
pub enum ColumnarValue {
Array(ArrayRef),
Scalar(ScalarValue),
}
impl ColumnarValue {
fn data_type(&self) -> DataType {
match self {
ColumnarValue::Array(array_value) => array_value.data_type().clone(),
ColumnarValue::Scalar(scalar_value) => scalar_value.get_datatype(),
}
}
fn into_array(self, num_rows: usize) -> ArrayRef {
match self {
ColumnarValue::Array(array) => array,
ColumnarValue::Scalar(scalar) => scalar.to_array_of_size(num_rows),
}
}
}
pub trait PhysicalExpr: Send + Sync + Display + Debug {
fn as_any(&self) -> &dyn Any;
fn data_type(&self, input_schema: &Schema) -> Result<DataType>;
fn nullable(&self, input_schema: &Schema) -> Result<bool>;
fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue>;
}
pub trait AggregateExpr: Send + Sync + Debug {
fn as_any(&self) -> &dyn Any;
fn field(&self) -> Result<Field>;
fn create_accumulator(&self) -> Result<Box<dyn Accumulator>>;
fn state_fields(&self) -> Result<Vec<Field>>;
fn expressions(&self) -> Vec<Arc<dyn PhysicalExpr>>;
}
pub trait Accumulator: Send + Sync + Debug {
fn state(&self) -> Result<Vec<ScalarValue>>;
fn update(&mut self, values: &[ScalarValue]) -> Result<()>;
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
if values.is_empty() {
return Ok(());
};
(0..values[0].len()).try_for_each(|index| {
let v = values
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.update(&v)
})
}
fn merge(&mut self, states: &[ScalarValue]) -> Result<()>;
fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
if states.is_empty() {
return Ok(());
};
(0..states[0].len()).try_for_each(|index| {
let v = states
.iter()
.map(|array| ScalarValue::try_from_array(array, index))
.collect::<Result<Vec<_>>>()?;
self.merge(&v)
})
}
fn evaluate(&self) -> Result<ScalarValue>;
}
pub mod aggregates;
pub mod array_expressions;
pub mod coalesce_batches;
pub mod common;
#[cfg(feature = "crypto_expressions")]
pub mod crypto_expressions;
pub mod csv;
pub mod datetime_expressions;
pub mod distinct_expressions;
pub mod empty;
pub mod explain;
pub mod expressions;
pub mod filter;
pub mod functions;
pub mod group_scalar;
pub mod hash_aggregate;
pub mod hash_join;
pub mod hash_utils;
pub mod limit;
pub mod math_expressions;
pub mod memory;
pub mod merge;
pub mod parquet;
pub mod planner;
pub mod projection;
#[cfg(feature = "regex_expressions")]
pub mod regex_expressions;
pub mod repartition;
pub mod sort;
pub mod string_expressions;
pub mod type_coercion;
pub mod udaf;
pub mod udf;
#[cfg(feature = "unicode_expressions")]
pub mod unicode_expressions;
pub mod union;