use std::sync::Arc;
use crate::distributed::core::config::DistributedConfig;
use crate::distributed::core::partition::{Partition, PartitionSet};
use crate::error::Result;
pub trait ExecutionEngine: Send + Sync {
fn initialize(&mut self, config: &DistributedConfig) -> Result<()>;
fn is_initialized(&self) -> bool;
fn create_context(&self, config: &DistributedConfig) -> Result<Box<dyn ExecutionContext>>;
fn clone(&self) -> Box<dyn ExecutionEngine>;
}
pub trait ExecutionContext: Send + Sync {
fn execute_plan(&mut self, plan: ExecutionPlan) -> Result<ExecutionResult>;
fn register_in_memory_table(&mut self, name: &str, partitions: PartitionSet) -> Result<()>;
fn register_csv(&mut self, name: &str, path: &str) -> Result<()>;
fn register_parquet(&mut self, name: &str, path: &str) -> Result<()>;
fn sql(&mut self, query: &str) -> Result<ExecutionResult>;
fn table_schema(&self, name: &str) -> Result<arrow::datatypes::SchemaRef>;
fn explain_plan(&self, plan: &ExecutionPlan, with_statistics: bool) -> Result<String>;
fn write_parquet(&mut self, result: &ExecutionResult, path: &str) -> Result<()>;
fn write_csv(&mut self, result: &ExecutionResult, path: &str) -> Result<()>;
fn metrics(&self) -> Result<ExecutionMetrics>;
fn clone(&self) -> Box<dyn ExecutionContext>;
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
input: String,
operations: Vec<Operation>,
}
impl ExecutionPlan {
pub fn inputs(&self) -> Vec<&str> {
vec![&self.input]
}
pub fn new(input: &str) -> Self {
Self {
input: input.to_string(),
operations: Vec::new(),
}
}
pub fn add_operation(&mut self, operation: Operation) -> &mut Self {
self.operations.push(operation);
self
}
pub fn add_operations(&mut self, operations: Vec<Operation>) -> &mut Self {
self.operations.extend(operations);
self
}
pub fn input(&self) -> &str {
&self.input
}
pub fn operations(&self) -> &Vec<Operation> {
&self.operations
}
}
#[derive(Debug, Clone)]
pub enum Operation {
Select(Vec<String>),
Filter(String),
Join {
right: String,
join_type: JoinType,
left_keys: Vec<String>,
right_keys: Vec<String>,
},
Aggregate(Vec<String>, Vec<AggregateExpr>),
GroupBy {
keys: Vec<String>,
aggregates: Vec<AggregateExpr>,
},
OrderBy(Vec<SortExpr>),
Limit(usize),
Window(Vec<String>),
Project(Vec<(String, String)>),
Distinct,
Union(String),
Intersect(String),
Except(String),
Custom {
name: String,
params: std::collections::HashMap<String, String>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
}
#[derive(Debug, Clone)]
pub struct AggregateExpr {
pub column: String,
pub function: String,
pub alias: String,
}
#[derive(Debug, Clone)]
pub struct SortExpr {
pub column: String,
pub ascending: bool,
pub nulls_first: bool,
}
#[derive(Debug, Clone)]
pub struct ExecutionResult {
partitions: PartitionSet,
schema: arrow::datatypes::SchemaRef,
metrics: ExecutionMetrics,
}
impl ExecutionResult {
pub fn new(
partitions: PartitionSet,
schema: arrow::datatypes::SchemaRef,
metrics: ExecutionMetrics,
) -> Self {
Self {
partitions,
schema,
metrics,
}
}
pub fn partitions(&self) -> &PartitionSet {
&self.partitions
}
pub fn schema(&self) -> &arrow::datatypes::SchemaRef {
&self.schema
}
pub fn metrics(&self) -> &ExecutionMetrics {
&self.metrics
}
pub fn row_count(&self) -> usize {
self.partitions.total_rows()
}
pub fn collect(&self) -> Result<Vec<arrow::record_batch::RecordBatch>> {
let mut batches = Vec::new();
for partition in self.partitions.partitions() {
if let Some(batch) = partition.data() {
batches.push(batch.clone());
}
}
Ok(batches)
}
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionMetrics {
pub execution_time_ms: u64,
pub rows_processed: usize,
pub partitions_processed: usize,
pub bytes_processed: usize,
pub bytes_output: usize,
pub query_id: Option<String>,
pub input_rows: usize,
pub output_rows: usize,
pub custom_metrics: std::collections::HashMap<String, String>,
}
impl ExecutionMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn with_execution_time(mut self, time_ms: u64) -> Self {
self.execution_time_ms = time_ms;
self
}
pub fn with_rows_processed(mut self, rows: usize) -> Self {
self.rows_processed = rows;
self
}
pub fn with_partitions_processed(mut self, partitions: usize) -> Self {
self.partitions_processed = partitions;
self
}
pub fn with_bytes_processed(mut self, bytes: usize) -> Self {
self.bytes_processed = bytes;
self
}
pub fn with_bytes_output(mut self, bytes: usize) -> Self {
self.bytes_output = bytes;
self
}
pub fn with_query_id(mut self, id: impl Into<String>) -> Self {
self.query_id = Some(id.into());
self
}
pub fn with_input_rows(mut self, rows: usize) -> Self {
self.input_rows = rows;
self
}
pub fn with_output_rows(mut self, rows: usize) -> Self {
self.output_rows = rows;
self
}
pub fn with_custom_metric(mut self, name: impl Into<String>, value: impl Into<String>) -> Self {
self.custom_metrics.insert(name.into(), value.into());
self
}
pub fn summary(&self) -> String {
let mut result = String::new();
result.push_str(&format!("Execution time: {}ms\n", self.execution_time_ms));
result.push_str(&format!("Rows processed: {}\n", self.rows_processed));
result.push_str(&format!(
"Partitions processed: {}\n",
self.partitions_processed
));
result.push_str(&format!("Bytes processed: {}\n", self.bytes_processed));
result.push_str(&format!("Bytes output: {}\n", self.bytes_output));
if let Some(query_id) = &self.query_id {
result.push_str(&format!("Query ID: {}\n", query_id));
}
if self.input_rows > 0 {
result.push_str(&format!("Input rows: {}\n", self.input_rows));
}
if self.output_rows > 0 {
result.push_str(&format!("Output rows: {}\n", self.output_rows));
}
if !self.custom_metrics.is_empty() {
result.push_str("Custom metrics:\n");
for (name, value) in &self.custom_metrics {
result.push_str(&format!(" {}: {}\n", name, value));
}
}
result
}
pub fn merge(&mut self, other: &Self) {
self.execution_time_ms += other.execution_time_ms;
self.rows_processed += other.rows_processed;
self.partitions_processed += other.partitions_processed;
self.bytes_processed += other.bytes_processed;
self.bytes_output += other.bytes_output;
self.input_rows += other.input_rows;
self.output_rows += other.output_rows;
for (name, value) in &other.custom_metrics {
self.custom_metrics.insert(name.clone(), value.clone());
}
}
pub fn add_execution_time(&mut self, time_ms: u64) {
self.execution_time_ms += time_ms;
}
pub fn add_rows_processed(&mut self, rows: usize) {
self.rows_processed += rows;
}
pub fn add_partitions_processed(&mut self, partitions: usize) {
self.partitions_processed += partitions;
}
pub fn add_bytes_processed(&mut self, bytes: usize) {
self.bytes_processed += bytes;
}
}