#[cfg(feature = "distributed")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "distributed")]
use super::config::DistributedConfig;
#[cfg(feature = "distributed")]
use super::partition::{PartitionSet, PartitionStrategy, Partitioner};
#[cfg(feature = "distributed")]
use crate::distributed::execution::{
AggregateExpr, ExecutionContext, ExecutionEngine, ExecutionPlan, ExecutionResult, JoinType,
Operation, SortExpr,
};
#[cfg(feature = "distributed")]
use crate::distributed::ToDistributed;
use crate::error::{Error, Result};
#[cfg(feature = "distributed")]
use crate::lock_safe;
#[cfg(feature = "distributed")]
pub struct DistributedDataFrame {
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Arc<Mutex<Box<dyn ExecutionContext>>>,
current_result: Option<ExecutionResult>,
id: String,
lazy: bool,
pending_operations: Vec<ExecutionPlan>,
}
#[cfg(feature = "distributed")]
impl DistributedDataFrame {
pub fn new(
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Box<dyn ExecutionContext>,
id: String,
) -> Self {
Self {
config,
engine,
context: Arc::new(Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
}
}
pub fn from_local(df: &crate::dataframe::DataFrame, config: DistributedConfig) -> Result<Self> {
use std::time::{SystemTime, UNIX_EPOCH};
let mut engine: Box<dyn ExecutionEngine> = match config.executor_type() {
crate::distributed::core::config::ExecutorType::DataFusion => {
Box::new(crate::distributed::engines::datafusion::DataFusionEngine::new())
}
_ => {
Box::new(crate::distributed::engines::datafusion::DataFusionEngine::new())
}
};
engine.initialize(&config)?;
let mut context = engine.create_context(&config)?;
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
let id = format!("df_{:x}", now);
use crate::distributed::engines::datafusion::conversion::dataframe_to_record_batches;
let row_count = df.row_count();
let batch_size = std::cmp::max(1, row_count / std::cmp::max(1, config.concurrency()));
let batches = dataframe_to_record_batches(df, batch_size)?;
let mut partitions = Vec::new();
for (i, batch) in batches.iter().enumerate() {
let partition = crate::distributed::core::partition::Partition::new(i, batch.clone());
partitions.push(std::sync::Arc::new(partition));
}
let schema = if !batches.is_empty() {
batches[0].schema()
} else {
return Err(Error::InvalidInput("DataFrame is empty".to_string()));
};
let partition_set =
crate::distributed::core::partition::PartitionSet::new(partitions, schema);
context.register_in_memory_table(&id, partition_set)?;
let result = Self {
config,
engine,
context: Arc::new(Mutex::new(context)),
current_result: None,
id,
lazy: true,
pending_operations: Vec::new(),
};
Ok(result)
}
pub fn with_result(
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Box<dyn ExecutionContext>,
id: String,
result: ExecutionResult,
) -> Self {
Self {
config,
engine,
context: Arc::new(Mutex::new(context)),
current_result: Some(result),
id,
lazy: true,
pending_operations: Vec::new(),
}
}
pub fn id(&self) -> &str {
&self.id
}
pub fn schema(&self) -> Result<arrow::datatypes::SchemaRef> {
let context = lock_safe!(self.context, "distributed dataframe context lock")?;
if let Some(result) = &self.current_result {
Ok(result.schema().clone())
} else {
context.table_schema(&self.id)
}
}
pub fn execute(&mut self) -> Result<&ExecutionResult> {
if self.pending_operations.is_empty() && self.current_result.is_some() {
return self
.current_result
.as_ref()
.ok_or_else(|| Error::InvalidOperation("No result computed yet".into()))
.map(|r| r);
}
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
let mut plan = ExecutionPlan::new(&self.id);
for op in &self.pending_operations {
plan.add_operations(op.operations().clone());
}
let result = context.execute_plan(plan)?;
self.current_result = Some(result);
self.pending_operations.clear();
self.current_result
.as_ref()
.ok_or_else(|| Error::InvalidOperation("No result computed yet".into()))
}
pub fn collect(&mut self) -> Result<crate::dataframe::DataFrame> {
let result = self.execute()?;
use crate::distributed::engines::datafusion::conversion::record_batches_to_dataframe;
let batches = result.collect()?;
let df = record_batches_to_dataframe(&batches)?;
Ok(df)
}
pub fn write_parquet(&mut self, path: &str) -> Result<()> {
if !self.pending_operations.is_empty() || self.current_result.is_none() {
self.execute()?;
}
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
if let Some(result) = &self.current_result {
context.write_parquet(result, path)
} else {
Err(Error::InvalidValue("No result available".to_string()))
}
}
pub fn write_csv(&mut self, path: &str) -> Result<()> {
if !self.pending_operations.is_empty() || self.current_result.is_none() {
self.execute()?;
}
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
if let Some(result) = &self.current_result {
context.write_csv(result, path)
} else {
Err(Error::InvalidValue("No result available".to_string()))
}
}
pub fn row_count(&mut self) -> Result<usize> {
let result = self.execute()?;
Ok(result.row_count())
}
pub fn shape(&mut self) -> Result<(usize, usize)> {
let result = self.execute()?;
let schema = result.schema();
Ok((result.row_count(), schema.fields().len()))
}
pub fn select(&mut self, columns: &[&str]) -> Result<Self> {
let mut plan = ExecutionPlan::new(&self.id);
plan.add_operation(Operation::Select(
columns.iter().map(|s| s.to_string()).collect(),
));
if self.lazy {
self.pending_operations.push(plan);
let id = format!("{}_{}", self.id, self.pending_operations.len());
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: None,
id,
lazy: true,
pending_operations: self.pending_operations.clone(),
})
} else {
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
let result = context.execute_plan(plan)?;
let id = format!("{}_{}", self.id, "select");
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: Some(result),
id,
lazy: false,
pending_operations: Vec::new(),
})
}
}
pub fn filter(&mut self, condition: &str) -> Result<Self> {
let mut plan = ExecutionPlan::new(&self.id);
plan.add_operation(Operation::Filter(condition.to_string()));
if self.lazy {
self.pending_operations.push(plan);
let id = format!("{}_{}", self.id, self.pending_operations.len());
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: None,
id,
lazy: true,
pending_operations: self.pending_operations.clone(),
})
} else {
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
let result = context.execute_plan(plan)?;
let id = format!("{}_{}", self.id, "filter");
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: Some(result),
id,
lazy: false,
pending_operations: Vec::new(),
})
}
}
pub fn aggregate(
&mut self,
group_by: &[&str],
aggregates: &[(&str, &str, &str)],
) -> Result<Self> {
let mut plan = ExecutionPlan::new(&self.id);
let group_by = group_by.iter().map(|s| s.to_string()).collect();
let mut agg_exprs = Vec::new();
for (column, func, alias) in aggregates {
agg_exprs.push(AggregateExpr {
column: column.to_string(),
function: func.to_string(),
alias: alias.to_string(),
});
}
plan.add_operation(Operation::Aggregate(group_by, agg_exprs));
if self.lazy {
self.pending_operations.push(plan);
let id = format!("{}_{}", self.id, self.pending_operations.len());
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: None,
id,
lazy: true,
pending_operations: self.pending_operations.clone(),
})
} else {
let mut context = lock_safe!(self.context, "distributed dataframe context lock")?;
let result = context.execute_plan(plan)?;
let id = format!("{}_{}", self.id, "aggregate");
Ok(Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: Some(result),
id,
lazy: false,
pending_operations: Vec::new(),
})
}
}
pub fn context(&self) -> Arc<Mutex<Box<dyn ExecutionContext>>> {
self.context.clone()
}
pub fn config(&self) -> &DistributedConfig {
&self.config
}
pub fn with_lazy(&mut self, lazy: bool) -> &mut Self {
self.lazy = lazy;
self
}
pub fn is_lazy(&self) -> bool {
self.lazy
}
pub fn clone_empty(&self) -> Self {
Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: None,
id: format!("{}_empty", self.id),
lazy: self.lazy,
pending_operations: Vec::new(),
}
}
pub fn add_pending_operation(&mut self, operation: ExecutionPlan, _inputs: Vec<String>) {
self.pending_operations.push(operation);
}
pub fn execute_operation(
&self,
_operation: ExecutionPlan,
_inputs: Vec<String>,
) -> Result<Self> {
Ok(self.clone())
}
}
#[cfg(feature = "distributed")]
impl Clone for DistributedDataFrame {
fn clone(&self) -> Self {
Self {
config: self.config.clone(),
engine: self.engine.clone(),
context: self.context.clone(),
current_result: self.current_result.clone(),
id: self.id.clone(),
lazy: self.lazy,
pending_operations: self.pending_operations.clone(),
}
}
}
#[cfg(not(feature = "distributed"))]
pub struct DistributedDataFrame;
#[cfg(not(feature = "distributed"))]
impl DistributedDataFrame {
pub fn new() -> Self {
Self
}
}