use std::sync::{Arc, Mutex};
use crate::error::{Error, Result};
use crate::lock_safe;
use super::config::DistributedConfig;
use super::execution::{ExecutionEngine, ExecutionContext, ExecutionPlan, ExecutionResult, Operation, AggregateExpr, JoinType, SortExpr};
use super::partition::{PartitionSet, PartitionStrategy, Partitioner};
use super::ToDistributed;
pub struct DistributedDataFrame {
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Arc<Mutex<Box<dyn ExecutionContext>>>,
current_result: Option<ExecutionResult>,
id: String,
lazy: bool,
pending_operations: Vec<ExecutionPlan>,
}
impl DistributedDataFrame {
pub fn new(
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Box<dyn ExecutionContext>,
id: String,
) -> Self {
Self {
config,
engine,
context: Arc::new(Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
}
}
pub fn from_local(
df: &crate::dataframe::DataFrame,
config: DistributedConfig,
) -> Result<Self> {
#[cfg(feature = "distributed")]
{
use std::time::{SystemTime, UNIX_EPOCH};
let mut engine: Box<dyn ExecutionEngine> = match config.executor_type() {
crate::distributed::config::ExecutorType::DataFusion => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
},
_ => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
}
};
engine.initialize(&config)?;
let mut context = engine.create_context(&config)?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let id = format!("df_{:x}", now);
use crate::distributed::datafusion::conversion::dataframe_to_record_batches;
let row_count = df.shape()?.0;
let batch_size = std::cmp::max(
1,
row_count / std::cmp::max(1, config.concurrency())
);
let batches = dataframe_to_record_batches(df, batch_size)?;
let mut partitions = Vec::new();
for (i, batch) in batches.iter().enumerate() {
let partition = crate::distributed::partition::Partition::new(i, batch.clone());
partitions.push(std::sync::Arc::new(partition));
}
let schema_ref = if !batches.is_empty() {
batches[0].schema()
} else {
std::sync::Arc::new(arrow::datatypes::Schema::empty())
};
let partition_set = crate::distributed::partition::PartitionSet::new(
partitions,
schema_ref,
);
context.register_dataset(&id, partition_set)?;
Ok(Self {
config,
engine,
context: std::sync::Arc::new(std::sync::Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
})
}
#[cfg(not(feature = "distributed"))]
{
Err(Error::FeatureNotAvailable(
"Distributed processing is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}
pub fn from_csv(
path: &str,
config: DistributedConfig,
) -> Result<Self> {
#[cfg(feature = "distributed")]
{
use std::time::{SystemTime, UNIX_EPOCH};
use std::path::Path;
if !Path::new(path).exists() {
return Err(Error::IoError(format!("CSV file not found: {}", path)));
}
let mut engine: Box<dyn ExecutionEngine> = match config.executor_type() {
crate::distributed::config::ExecutorType::DataFusion => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
},
_ => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
}
};
engine.initialize(&config)?;
let mut context = engine.create_context(&config)?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let id = format!("df_{:x}", now);
context.register_csv(&id, path)?;
Ok(Self {
config,
engine,
context: std::sync::Arc::new(std::sync::Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
})
}
#[cfg(not(feature = "distributed"))]
{
Err(Error::FeatureNotAvailable(
"Distributed processing is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}
pub fn from_parquet(
path: &str,
config: DistributedConfig,
) -> Result<Self> {
#[cfg(feature = "distributed")]
{
use std::time::{SystemTime, UNIX_EPOCH};
use std::path::Path;
if !Path::new(path).exists() {
return Err(Error::IoError(format!("Parquet file not found: {}", path)));
}
let mut engine: Box<dyn ExecutionEngine> = match config.executor_type() {
crate::distributed::config::ExecutorType::DataFusion => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
},
_ => {
Box::new(crate::distributed::datafusion::DataFusionEngine::new())
}
};
engine.initialize(&config)?;
let mut context = engine.create_context(&config)?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let id = format!("df_{:x}", now);
context.register_parquet(&id, path)?;
Ok(Self {
config,
engine,
context: std::sync::Arc::new(std::sync::Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
})
}
#[cfg(not(feature = "distributed"))]
{
Err(Error::FeatureNotAvailable(
"Distributed processing is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}
pub fn select(&self, columns: &[&str]) -> Result<Self> {
let columns = columns.iter().map(|s| s.to_string()).collect();
let operation = Operation::Select { columns };
if self.lazy {
let mut new_df = self.clone_empty();
new_df.add_pending_operation(operation, vec![self.id.clone()]);
Ok(new_df)
} else {
self.execute_operation(operation, vec![self.id.clone()])
}
}
pub fn filter(&self, predicate: &str) -> Result<Self> {
let operation = Operation::Filter {
predicate: predicate.to_string(),
};
if self.lazy {
let mut new_df = self.clone_empty();
new_df.add_pending_operation(operation, vec![self.id.clone()]);
Ok(new_df)
} else {
self.execute_operation(operation, vec![self.id.clone()])
}
}
pub fn join(
&self,
other: &Self,
left_keys: &[&str],
right_keys: &[&str],
join_type: JoinType,
) -> Result<Self> {
let left_keys = left_keys.iter().map(|s| s.to_string()).collect();
let right_keys = right_keys.iter().map(|s| s.to_string()).collect();
let operation = Operation::Join {
right: other.id.clone(),
join_type,
left_keys,
right_keys,
};
if self.lazy {
let mut new_df = self.clone_empty();
new_df.add_pending_operation(operation, vec![self.id.clone(), other.id.clone()]);
Ok(new_df)
} else {
self.execute_operation(operation, vec![self.id.clone(), other.id.clone()])
}
}
pub fn groupby(&self, keys: &[&str]) -> Result<DistributedGroupBy> {
let keys = keys.iter().map(|s| s.to_string()).collect();
Ok(DistributedGroupBy::new(self.clone(), keys))
}
pub fn sort(&self, columns: &[&str], ascending: &[bool]) -> Result<Self> {
let mut sort_exprs = Vec::with_capacity(columns.len());
for (i, col) in columns.iter().enumerate() {
let ascending = if i < ascending.len() { ascending[i] } else { true };
sort_exprs.push(SortExpr {
column: col.to_string(),
ascending,
nulls_first: !ascending,
});
}
let operation = Operation::OrderBy { sort_exprs };
if self.lazy {
let mut new_df = self.clone_empty();
new_df.add_pending_operation(operation, vec![self.id.clone()]);
Ok(new_df)
} else {
self.execute_operation(operation, vec![self.id.clone()])
}
}
pub fn limit(&self, limit: usize) -> Result<Self> {
let operation = Operation::Limit { limit };
if self.lazy {
let mut new_df = self.clone_empty();
new_df.add_pending_operation(operation, vec![self.id.clone()]);
Ok(new_df)
} else {
self.execute_operation(operation, vec![self.id.clone()])
}
}
pub fn execute(&self) -> Result<Self> {
if !self.lazy || self.pending_operations.is_empty() {
return Ok(self.clone());
}
let mut result = self.clone();
result.lazy = false;
if self.pending_operations.len() == 1 {
let context = lock_safe!(result.context, "distributed dataframe result context lock")?;
let exec_result = context.execute(&self.pending_operations[0])?;
result.current_result = Some(exec_result);
} else {
let mut combined_sql = String::new();
let mut executed = false;
#[cfg(feature = "distributed")]
{
if let Ok(context) = result.context.lock() {
if let Some(df_context) = context.downcast_ref::<crate::distributed::datafusion::DataFusionContext>() {
let mut cte_parts = Vec::new();
for (i, plan) in self.pending_operations.iter().enumerate() {
if let Ok(sql) = df_context.convert_operation_to_sql(plan) {
if i == 0 {
cte_parts.push(format!("WITH cte_0 AS ({})", sql));
} else if i < self.pending_operations.len() - 1 {
let modified_sql = sql.replace(&plan.inputs()[0], &format!("cte_{}", i-1));
cte_parts.push(format!("cte_{} AS ({})", i, modified_sql));
} else {
let modified_sql = sql.replace(&plan.inputs()[0], &format!("cte_{}", i-1));
combined_sql = format!("{} {}", cte_parts.join(", "), modified_sql);
if let Ok(exec_result) = context.sql(&combined_sql) {
result.current_result = Some(exec_result);
executed = true;
break;
}
}
}
}
}
}
}
if !executed {
for plan in &self.pending_operations {
let context = lock_safe!(result.context, "distributed dataframe result context lock")?;
let exec_result = context.execute(plan)?;
result.current_result = Some(exec_result);
}
}
}
Ok(result)
}
pub fn execution_metrics(&self) -> Option<&crate::distributed::execution::ExecutionMetrics> {
self.current_result.as_ref().map(|result| result.metrics())
}
pub fn explain(&self, with_statistics: bool) -> Result<String> {
if self.pending_operations.is_empty() {
return Ok("No pending operations to explain.".to_string());
}
let plan = self.pending_operations.last().expect("operation should succeed");
let context = self.context.lock()
.map_err(|_| Error::DistributedProcessing("Failed to lock context".to_string()))?;
context.explain_plan(plan, with_statistics)
}
pub fn summarize(&self) -> Result<String> {
let executed = self.execute()?;
if let Some(metrics) = executed.execution_metrics() {
let mut summary = String::new();
summary.push_str(&format!("Execution Summary:\n"));
summary.push_str(&format!("-----------------\n"));
summary.push_str(&format!("Rows processed: {}\n", metrics.rows_processed()));
summary.push_str(&format!("Partitions processed: {}\n", metrics.partitions_processed()));
let execution_time_sec = metrics.execution_time_ms() as f64 / 1000.0;
summary.push_str(&format!("Execution time: {:.3}s\n", execution_time_sec));
if metrics.rows_processed() > 0 && metrics.execution_time_ms() > 0 {
let rows_per_sec = metrics.rows_processed() as f64 / execution_time_sec;
summary.push_str(&format!("Processing speed: {:.0} rows/s\n", rows_per_sec));
}
let mb = 1024.0 * 1024.0;
if metrics.bytes_processed() > 0 {
summary.push_str(&format!("Memory processed: {:.2} MB\n",
metrics.bytes_processed() as f64 / mb));
}
if metrics.bytes_output() > 0 {
summary.push_str(&format!("Memory output: {:.2} MB\n",
metrics.bytes_output() as f64 / mb));
}
summary.push_str(&format!("\nLazy evaluation: {}\n", self.lazy));
summary.push_str(&format!("Pending operations: {}\n", self.pending_operations.len()));
Ok(summary)
} else {
Err(Error::InvalidOperation("No execution metrics available. The DataFrame may not have been executed yet.".to_string()))
}
}
pub fn collect_to_local(&self) -> Result<crate::dataframe::DataFrame> {
let df = self.execute()?;
match &df.current_result {
Some(result) => result.collect_to_local(),
None => Err(Error::InvalidOperation("No data to collect".to_string())),
}
}
pub fn write_parquet(&self, path: &str) -> Result<()> {
let df = self.execute()?;
match &df.current_result {
Some(result) => result.write_parquet(path),
None => Err(Error::InvalidOperation("No data to write".to_string())),
}
}
pub fn config(&self) -> &DistributedConfig {
&self.config
}
pub fn context(&self) -> Arc<Mutex<Box<dyn ExecutionContext>>> {
self.context.clone()
}
pub fn id(&self) -> &str {
&self.id
}
pub fn is_lazy(&self) -> bool {
self.lazy
}
fn clone_empty(&self) -> Self {
Self {
config: self.config.clone(),
engine: self.engine_clone(),
context: self.context.clone(),
current_result: None,
id: format!("{}_derived_{}", self.id, generate_unique_id()),
lazy: true,
pending_operations: Vec::new(),
}
}
fn execute_operation(
&self,
operation: Operation,
inputs: Vec<String>,
) -> Result<Self> {
let output = format!("{}_{}", self.id, generate_unique_id());
let plan = ExecutionPlan::new(operation, inputs, output.clone());
let context = lock_safe!(self.context, "distributed dataframe self context lock")?;
let result = context.execute(&plan)?;
let mut new_df = self.clone_empty();
new_df.id = output;
new_df.current_result = Some(result);
new_df.lazy = false;
Ok(new_df)
}
fn add_pending_operation(
&mut self,
operation: Operation,
inputs: Vec<String>,
) {
let output = format!("{}_{}", self.id, generate_unique_id());
let plan = ExecutionPlan::new(operation, inputs, output.clone());
self.pending_operations.push(plan);
self.id = output;
}
fn engine_clone(&self) -> Box<dyn ExecutionEngine> {
#[cfg(feature = "distributed")]
{
if self.config.executor_type().to_string() == "datafusion" {
Box::new(super::datafusion::DataFusionEngine::new())
} else {
Box::new(super::ballista::BallistaEngine::new())
}
}
#[cfg(not(feature = "distributed"))]
{
Box::new(PlaceholderEngine::new())
}
}
}
impl Clone for DistributedDataFrame {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
engine: self.engine_clone(),
context: self.context.clone(),
current_result: self.current_result.clone(),
id: self.id.clone(),
lazy: self.lazy,
pending_operations: self.pending_operations.clone(),
}
}
}
pub struct DistributedGroupBy {
df: DistributedDataFrame,
keys: Vec<String>,
}
impl DistributedGroupBy {
pub fn new(df: DistributedDataFrame, keys: Vec<String>) -> Self {
Self { df, keys }
}
pub fn aggregate(
&self,
agg_columns: &[&str],
agg_functions: &[&str],
) -> Result<DistributedDataFrame> {
let mut agg_exprs = Vec::with_capacity(agg_columns.len());
for (i, col) in agg_columns.iter().enumerate() {
let func = if i < agg_functions.len() {
agg_functions[i]
} else {
"sum"
};
agg_exprs.push(AggregateExpr {
function: func.to_string(),
input: col.to_string(),
output: format!("{}_{}", func, col),
});
}
let operation = Operation::GroupBy {
keys: self.keys.clone(),
aggregates: agg_exprs,
};
if self.df.is_lazy() {
let mut new_df = self.df.clone_empty();
new_df.add_pending_operation(operation, vec![self.df.id().to_string()]);
Ok(new_df)
} else {
self.df.execute_operation(operation, vec![self.df.id().to_string()])
}
}
}
impl ToDistributed for crate::dataframe::DataFrame {
fn to_distributed(&self, config: DistributedConfig) -> Result<DistributedDataFrame> {
DistributedDataFrame::from_local(self, config)
}
}
fn generate_unique_id() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
format!("{:x}", now)
}
#[cfg(not(feature = "distributed"))]
struct PlaceholderEngine {
initialized: bool,
}
#[cfg(not(feature = "distributed"))]
impl PlaceholderEngine {
fn new() -> Self {
Self { initialized: false }
}
}
#[cfg(not(feature = "distributed"))]
impl ExecutionEngine for PlaceholderEngine {
fn initialize(&mut self, _config: &DistributedConfig) -> Result<()> {
Err(crate::error::Error::InvalidOperation(
"Distributed feature not enabled. Cannot initialize execution engine.".into()
))
}
fn is_initialized(&self) -> bool {
false
}
fn create_context(&self, _config: &DistributedConfig) -> Result<Box<dyn ExecutionContext>> {
Err(crate::error::Error::InvalidOperation(
"Distributed feature not enabled. Cannot create execution context.".into()
))
}
}