#[cfg(feature = "distributed")]
use std::collections::HashMap;
#[cfg(feature = "distributed")]
use std::path::Path;
#[cfg(feature = "distributed")]
use std::sync::{Arc, Mutex};
#[cfg(feature = "distributed")]
use super::config::DistributedConfig;
#[cfg(feature = "distributed")]
use crate::dataframe::DataFrame;
#[cfg(feature = "distributed")]
use crate::distributed::core::dataframe::DistributedDataFrame;
#[cfg(feature = "distributed")]
use crate::distributed::execution::{ExecutionContext, ExecutionEngine, ExecutionMetrics};
#[cfg(feature = "distributed")]
use crate::distributed::expr::ExprSchema;
#[cfg(feature = "distributed")]
use crate::distributed::schema_validator::SchemaValidator;
#[cfg(feature = "distributed")]
use crate::distributed::ToDistributed;
use crate::error::{Error, Result};
#[cfg(feature = "distributed")]
use crate::lock_safe;
#[cfg(feature = "distributed")]
pub struct DistributedContext {
config: DistributedConfig,
engine: Box<dyn ExecutionEngine>,
context: Arc<Mutex<Box<dyn ExecutionContext>>>,
datasets: HashMap<String, DistributedDataFrame>,
}
#[cfg(feature = "distributed")]
impl DistributedContext {
pub fn new_local(concurrency: usize) -> Result<Self> {
let config = DistributedConfig::new().with_concurrency(concurrency);
Self::new(config)
}
pub fn new(config: DistributedConfig) -> Result<Self> {
let mut engine: Box<dyn ExecutionEngine> = match config.executor_type() {
crate::distributed::core::config::ExecutorType::DataFusion => {
Box::new(crate::distributed::engines::datafusion::DataFusionEngine::new())
}
_ => {
Box::new(crate::distributed::engines::datafusion::DataFusionEngine::new())
}
};
engine.initialize(&config)?;
let context = engine.create_context(&config)?;
Ok(Self {
config,
engine,
context: Arc::new(Mutex::new(context)),
datasets: HashMap::new(),
})
}
pub fn register_dataframe(&mut self, name: &str, df: &DataFrame) -> Result<()> {
let dist_df = df.to_distributed(self.config.clone())?;
let dist_df_with_context = DistributedDataFrame::new(
self.config.clone(),
self.engine.clone(),
lock_safe!(self.context, "distributed context lock")?
.as_ref()
.clone(),
name.to_string(),
);
self.datasets.insert(name.to_string(), dist_df_with_context);
Ok(())
}
pub fn register_csv(&mut self, name: &str, path: &str) -> Result<()> {
let mut context = lock_safe!(self.context, "distributed context lock")?;
context.register_csv(name, path)?;
Ok(())
}
pub fn register_parquet(&mut self, name: &str, path: &str) -> Result<()> {
let mut context = lock_safe!(self.context, "distributed context lock")?;
context.register_parquet(name, path)?;
Ok(())
}
pub fn get_dataset(&self, name: &str) -> Option<&DistributedDataFrame> {
self.datasets.get(name)
}
pub fn get_dataset_mut(&mut self, name: &str) -> Option<&mut DistributedDataFrame> {
self.datasets.get_mut(name)
}
pub fn config(&self) -> &DistributedConfig {
&self.config
}
pub fn engine(&self) -> &dyn ExecutionEngine {
&*self.engine
}
pub fn execution_context(&self) -> Arc<Mutex<Box<dyn ExecutionContext>>> {
self.context.clone()
}
pub fn metrics(&self) -> Result<ExecutionMetrics> {
let context = lock_safe!(self.context, "distributed context lock")?;
context.metrics()
}
pub fn validate_schema(&self, schema: &ExprSchema) -> Result<()> {
let mut validator = SchemaValidator::new();
for (name, dataset) in &self.datasets {
if let Ok(dataset_schema) = dataset.schema() {
let expr_schema = convert_arrow_schema_to_expr_schema(&dataset_schema)?;
validator.register_schema(name.clone(), expr_schema);
}
}
for (column_name, _column_meta) in schema.columns() {
let mut found = false;
for dataset_schema in validator.schemas().values() {
if dataset_schema.has_column(column_name) {
found = true;
break;
}
}
if !found {
return Err(Error::InvalidOperation(format!(
"Column '{}' not found in any registered dataset",
column_name
)));
}
}
Ok(())
}
}
#[cfg(feature = "distributed")]
fn convert_arrow_schema_to_expr_schema(
arrow_schema: &arrow::datatypes::SchemaRef,
) -> Result<ExprSchema> {
let mut expr_schema = ExprSchema::new();
for field in arrow_schema.fields() {
let data_type = match field.data_type() {
arrow::datatypes::DataType::Boolean => crate::distributed::expr::ExprDataType::Boolean,
arrow::datatypes::DataType::Int8
| arrow::datatypes::DataType::Int16
| arrow::datatypes::DataType::Int32
| arrow::datatypes::DataType::Int64 => crate::distributed::expr::ExprDataType::Integer,
arrow::datatypes::DataType::Float32 | arrow::datatypes::DataType::Float64 => {
crate::distributed::expr::ExprDataType::Float
}
arrow::datatypes::DataType::Utf8 | arrow::datatypes::DataType::LargeUtf8 => {
crate::distributed::expr::ExprDataType::String
}
arrow::datatypes::DataType::Date32 | arrow::datatypes::DataType::Date64 => {
crate::distributed::expr::ExprDataType::Date
}
arrow::datatypes::DataType::Timestamp(_, _) => {
crate::distributed::expr::ExprDataType::Timestamp
}
_ => crate::distributed::expr::ExprDataType::String, };
let column_meta = crate::distributed::expr::ColumnMeta::new(
field.name().clone(),
data_type,
field.is_nullable(),
None, );
expr_schema.add_column(column_meta);
}
Ok(expr_schema)
}
#[cfg(not(feature = "distributed"))]
pub struct DistributedContext;
#[cfg(not(feature = "distributed"))]
impl DistributedContext {
pub fn new(_config: super::DistributedConfig) -> Result<Self> {
Err(Error::FeatureNotAvailable(
"Distributed processing is not available. Recompile with the 'distributed' feature flag.".to_string()
))
}
}