Skip to main content

datafusion_distributed/distributed_planner/
distributed_config.rs

1use crate::TaskEstimator;
2use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
3use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
4use datafusion::common::utils::get_available_parallelism;
5use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
6use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
7use std::fmt::{Debug, Formatter};
8use std::sync::Arc;
9
10extensions_options! {
11    /// Configuration for the distributed planner.
12    pub struct DistributedConfig {
13        /// Sets the maximum amount of files that will be assigned to each task. Reducing this
14        /// number will spawn more tasks for the same number of files. This only applies when
15        /// estimating tasks for stages containing `DataSourceExec` nodes with `FileScanConfig`
16        /// implementations.
17        pub files_per_task: usize, default = files_per_task_default()
18        /// Task multiplying factor for when a node declares that it changes the cardinality
19        /// of the data:
20        /// - If a node is increasing the cardinality of the data, this factor will increase.
21        /// - If a node reduces the cardinality of the data, this factor will decrease.
22        /// - In any other situation, this factor is left intact.
23        pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
24        /// Upon shuffling over the network, data streams need to be disassembled in a lot of output
25        /// partitions, which means the resulting streams might contain a lot of tiny record batches
26        /// to be sent over the wire. This parameter controls the batch size in number of rows for
27        /// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
28        /// batches over the wire.
29        /// If set to 0, batch coalescing is disabled on network shuffle operations.
30        pub shuffle_batch_size: usize, default = 8192
31        /// When encountering a UNION operation, isolate its children depending on the task context.
32        /// For example, on a UNION operation with 3 children running in 3 distributed tasks,
33        /// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
34        /// 1/3, 2/3, and 3/3 respectively, Execute:
35        /// - The first child in the first task with a DistributedTaskContext of 1/1
36        /// - The second child in the second task with a DistributedTaskContext of 1/1
37        /// - The third child in the third task with a DistributedTaskContext of 1/1
38        pub children_isolator_unions: bool, default = true
39        /// Propagate collected metrics from all nodes in the plan across network boundaries
40        /// so that they can be reconstructed on the head node of the plan.
41        pub collect_metrics: bool, default = true
42        /// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
43        /// a CollectLeft join is broadcast to all consumer tasks.
44        /// TODO: This option exists temporarily until we become smarter about when to actually
45        /// use broadcasting like checking build side size.
46        /// For now, broadcasting all CollectLeft joins is not always beneficial.
47        pub broadcast_joins: bool, default = false
48        /// The compression used for sending data over the network between workers.
49        /// It can be set to either `zstd`, `lz4` or `none`.
50        pub compression: String, default = "lz4".to_string()
51        /// Maximum tasks that will be assigned per stage during distributed planning.
52        /// If set to 0, this value is the number of workers returned by the provided `WorkerResolver`.
53        /// It defaults to 0.
54        pub max_tasks_per_stage: usize, default = 0
55        /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
56        /// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
57        pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
58        /// [ChannelResolver] implementation that tells the distributed planner information about
59        /// the available workers ready to execute distributed tasks.
60        pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
61        /// [WorkerResolver] implementation that tells the distributed planner information about
62        /// the available workers ready to execute distributed tasks.
63        pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
64    }
65}
66
67fn files_per_task_default() -> usize {
68    if cfg!(test) || cfg!(feature = "integration") {
69        1
70    } else {
71        get_available_parallelism()
72    }
73}
74
75fn cardinality_task_count_factor_default() -> f64 {
76    if cfg!(test) || cfg!(feature = "integration") {
77        1.5
78    } else {
79        1.0
80    }
81}
82
83impl DistributedConfig {
84    /// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
85    /// order on leaf nodes, and the first one to provide a value is the one that gets to decide
86    /// how many tasks are used for that [Stage].
87    pub fn with_task_estimator(
88        mut self,
89        task_estimator: impl TaskEstimator + Send + Sync + 'static,
90    ) -> Self {
91        self.__private_task_estimator
92            .user_provided
93            .push(Arc::new(task_estimator));
94        self
95    }
96
97    /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
98    pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
99        let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
100            return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
101        };
102        Ok(distributed_cfg)
103    }
104
105    /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
106    pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
107        let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
108            return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
109        };
110        Ok(distributed_cfg)
111    }
112}
113
114impl ConfigExtension for DistributedConfig {
115    const PREFIX: &'static str = "distributed";
116}
117
118// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
119//  extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
120//  to pass this as ConfigOptions as we need these two fields to be present during
121//  planning in the DistributedPhysicalOptimizerRule, and the signature of the optimize()
122//  method there accepts a ConfigOptions instead of a SessionConfig.
123//  The following PR addresses this: https://github.com/apache/datafusion/pull/18168
124//  but it still has not been accepted or merged.
125//  Because of this, all the boilerplate trait implementations below are needed.
126impl ConfigField for ChannelResolverExtension {
127    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
128        // nothing to do.
129    }
130
131    fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
132        not_impl_err!("Not implemented")
133    }
134}
135
136impl Debug for ChannelResolverExtension {
137    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138        write!(f, "ChannelResolverExtension")
139    }
140}
141
142impl ConfigField for WorkerResolverExtension {
143    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
144        // nothing to do.
145    }
146
147    fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
148        not_impl_err!("Not implemented")
149    }
150}
151
152impl Debug for WorkerResolverExtension {
153    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154        write!(f, "WorkerResolverExtension")
155    }
156}
157
158impl ConfigField for CombinedTaskEstimator {
159    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
160        //nothing to do.
161    }
162
163    fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
164        not_impl_err!("not implemented")
165    }
166}
167
168impl Debug for CombinedTaskEstimator {
169    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170        write!(f, "TaskEstimators")
171    }
172}