use crate::TaskEstimator;
use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
use datafusion::common::utils::get_available_parallelism;
use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
extensions_options! {
pub struct DistributedConfig {
pub files_per_task: usize, default = files_per_task_default()
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
pub shuffle_batch_size: usize, default = 8192
pub children_isolator_unions: bool, default = true
pub collect_metrics: bool, default = true
pub broadcast_joins: bool, default = false
pub compression: String, default = "lz4".to_string()
pub max_tasks_per_stage: usize, default = 0
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
}
}
fn files_per_task_default() -> usize {
if cfg!(test) || cfg!(feature = "integration") {
1
} else {
get_available_parallelism()
}
}
fn cardinality_task_count_factor_default() -> f64 {
if cfg!(test) || cfg!(feature = "integration") {
1.5
} else {
1.0
}
}
impl DistributedConfig {
pub fn with_task_estimator(
mut self,
task_estimator: impl TaskEstimator + Send + Sync + 'static,
) -> Self {
self.__private_task_estimator
.user_provided
.push(Arc::new(task_estimator));
self
}
pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
};
Ok(distributed_cfg)
}
pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
};
Ok(distributed_cfg)
}
}
impl ConfigExtension for DistributedConfig {
const PREFIX: &'static str = "distributed";
}
impl ConfigField for ChannelResolverExtension {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
}
fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
not_impl_err!("Not implemented")
}
}
impl Debug for ChannelResolverExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ChannelResolverExtension")
}
}
impl ConfigField for WorkerResolverExtension {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
}
fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
not_impl_err!("Not implemented")
}
}
impl Debug for WorkerResolverExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "WorkerResolverExtension")
}
}
impl ConfigField for CombinedTaskEstimator {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
}
fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
not_impl_err!("not implemented")
}
}
impl Debug for CombinedTaskEstimator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "TaskEstimators")
}
}