use std::sync::Arc;
use crate::distributed::config::DistributedConfig;
use crate::distributed::partition::{Partition, PartitionSet};
use crate::error::Result;
pub trait ExecutionEngine: Send + Sync {
fn initialize(&mut self, config: &DistributedConfig) -> Result<()>;
fn is_initialized(&self) -> bool;
fn create_context(&self, config: &DistributedConfig) -> Result<Box<dyn ExecutionContext>>;
}
pub trait ExecutionContext: Send + Sync {
fn execute(&self, plan: &ExecutionPlan) -> Result<ExecutionResult>;
fn register_dataset(&mut self, name: &str, partitions: PartitionSet) -> Result<()>;
fn register_csv(&mut self, name: &str, path: &str) -> Result<()>;
fn register_parquet(&mut self, name: &str, path: &str) -> Result<()>;
fn sql(&self, query: &str) -> Result<ExecutionResult>;
fn explain_plan(&self, plan: &ExecutionPlan, with_statistics: bool) -> Result<String>;
}
#[derive(Debug, Clone)]
pub struct ExecutionPlan {
operation: Operation,
inputs: Vec<String>,
output: String,
}
#[derive(Debug, Clone)]
pub enum Operation {
Select {
columns: Vec<String>,
},
Filter {
predicate: String,
},
Join {
right: String,
join_type: JoinType,
left_keys: Vec<String>,
right_keys: Vec<String>,
},
GroupBy {
keys: Vec<String>,
aggregates: Vec<AggregateExpr>,
},
OrderBy {
sort_exprs: Vec<SortExpr>,
},
Limit {
limit: usize,
},
Window {
window_functions: Vec<crate::distributed::window::WindowFunction>,
},
Custom {
name: String,
params: std::collections::HashMap<String, String>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
}
#[derive(Debug, Clone)]
pub struct AggregateExpr {
function: String,
input: String,
output: String,
}
#[derive(Debug, Clone)]
pub struct SortExpr {
column: String,
ascending: bool,
nulls_first: bool,
}
impl ExecutionPlan {
pub fn new(operation: Operation, inputs: Vec<String>, output: String) -> Self {
Self {
operation,
inputs,
output,
}
}
pub fn operation(&self) -> &Operation {
&self.operation
}
pub fn inputs(&self) -> &[String] {
&self.inputs
}
pub fn output(&self) -> &str {
&self.output
}
}
#[derive(Debug, Clone)]
pub struct ExecutionResult {
partitions: PartitionSet,
metrics: ExecutionMetrics,
}
#[derive(Debug, Clone, Default)]
pub struct ExecutionMetrics {
execution_time_ms: u64,
rows_processed: usize,
partitions_processed: usize,
bytes_processed: usize,
bytes_output: usize,
query_id: Option<String>,
}
impl ExecutionResult {
pub fn new(partitions: PartitionSet, metrics: ExecutionMetrics) -> Self {
Self {
partitions,
metrics,
}
}
pub fn partitions(&self) -> &PartitionSet {
&self.partitions
}
pub fn metrics(&self) -> &ExecutionMetrics {
&self.metrics
}
pub fn collect_to_local(&self) -> Result<crate::dataframe::DataFrame> {
#[cfg(feature = "distributed")]
{
let mut batches = Vec::new();
for partition in self.partitions.partitions() {
if let Some(data) = partition.data() {
batches.push(data.clone());
}
}
if batches.is_empty() {
return Ok(crate::dataframe::DataFrame::new());
}
crate::distributed::datafusion::conversion::record_batches_to_dataframe(&batches)
}
#[cfg(not(feature = "distributed"))]
{
use crate::error::Error;
Err(Error::FeatureNotAvailable(
"Collecting to local DataFrame is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}
pub fn write_parquet(&self, path: &str) -> Result<()> {
#[cfg(feature = "distributed")]
{
use arrow::datatypes::SchemaRef;
use std::fs::File;
use std::sync::Arc;
let mut batches = Vec::new();
for partition in self.partitions.partitions() {
if let Some(data) = partition.data() {
batches.push(data.clone());
}
}
if batches.is_empty() {
return Err(Error::InvalidOperation("No data to write".to_string()));
}
let file = File::create(path)
.map_err(|e| Error::IoError(format!("Failed to create Parquet file: {}", e)))?;
let schema = batches[0].schema();
let props = parquet::file::properties::WriterProperties::builder().build();
let mut writer = parquet::arrow::ArrowWriter::try_new(file, schema, Some(props))
.map_err(|e| {
Error::ParquetError(format!("Failed to create Parquet writer: {}", e))
})?;
for batch in batches {
writer.write(&batch).map_err(|e| {
Error::ParquetError(format!("Failed to write batch to Parquet: {}", e))
})?;
}
writer.close().map_err(|e| {
Error::ParquetError(format!("Failed to close Parquet writer: {}", e))
})?;
Ok(())
}
#[cfg(not(feature = "distributed"))]
{
use crate::error::Error;
Err(Error::FeatureNotAvailable(
"Writing to Parquet is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}
}
impl ExecutionMetrics {
pub fn new(
execution_time_ms: u64,
rows_processed: usize,
partitions_processed: usize,
bytes_processed: usize,
bytes_output: usize,
) -> Self {
Self {
execution_time_ms,
rows_processed,
partitions_processed,
bytes_processed,
bytes_output,
query_id: None,
}
}
pub fn with_query_id(
execution_time_ms: u64,
rows_processed: usize,
partitions_processed: usize,
bytes_processed: usize,
bytes_output: usize,
query_id: impl Into<String>,
) -> Self {
Self {
execution_time_ms,
rows_processed,
partitions_processed,
bytes_processed,
bytes_output,
query_id: Some(query_id.into()),
}
}
pub fn execution_time_ms(&self) -> u64 {
self.execution_time_ms
}
pub fn rows_processed(&self) -> usize {
self.rows_processed
}
pub fn partitions_processed(&self) -> usize {
self.partitions_processed
}
pub fn bytes_processed(&self) -> usize {
self.bytes_processed
}
pub fn bytes_output(&self) -> usize {
self.bytes_output
}
pub fn query_id(&self) -> Option<&str> {
self.query_id.as_deref()
}
pub fn with_id(mut self, id: impl Into<String>) -> Self {
self.query_id = Some(id.into());
self
}
pub fn format(&self) -> String {
let mut result = String::new();
if let Some(id) = &self.query_id {
result.push_str(&format!("Query ID: {}\n", id));
}
let execution_time_sec = self.execution_time_ms as f64 / 1000.0;
result.push_str(&format!("Execution time: {:.3}s\n", execution_time_sec));
result.push_str(&format!("Rows processed: {}\n", self.rows_processed));
result.push_str(&format!("Partitions: {}\n", self.partitions_processed));
if self.execution_time_ms > 0 && self.rows_processed > 0 {
let rows_per_sec =
(self.rows_processed as f64 * 1000.0) / self.execution_time_ms as f64;
result.push_str(&format!("Processing rate: {:.1} rows/s\n", rows_per_sec));
}
if self.bytes_processed > 0 {
let mb = 1024.0 * 1024.0;
result.push_str(&format!(
"Memory processed: {:.2} MB\n",
self.bytes_processed as f64 / mb
));
}
if self.bytes_output > 0 {
let mb = 1024.0 * 1024.0;
result.push_str(&format!(
"Memory output: {:.2} MB\n",
self.bytes_output as f64 / mb
));
}
result
}
}