datafusion_distributed/distributed_planner/distributed_config.rs
1use crate::TaskEstimator;
2use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
3use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
4use datafusion::common::utils::get_available_parallelism;
5use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
6use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
7use std::fmt::{Debug, Formatter};
8use std::sync::Arc;
9
10extensions_options! {
11 /// Configuration for the distributed planner.
12 pub struct DistributedConfig {
13 /// Sets the maximum amount of files that will be assigned to each task. Reducing this
14 /// number will spawn more tasks for the same number of files. This only applies when
15 /// estimating tasks for stages containing `DataSourceExec` nodes with `FileScanConfig`
16 /// implementations.
17 pub files_per_task: usize, default = files_per_task_default()
18 /// Task multiplying factor for when a node declares that it changes the cardinality
19 /// of the data:
20 /// - If a node is increasing the cardinality of the data, this factor will increase.
21 /// - If a node reduces the cardinality of the data, this factor will decrease.
22 /// - In any other situation, this factor is left intact.
23 pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
24 /// Upon shuffling over the network, data streams need to be disassembled in a lot of output
25 /// partitions, which means the resulting streams might contain a lot of tiny record batches
26 /// to be sent over the wire. This parameter controls the batch size in number of rows for
27 /// the CoalesceBatchExec operator that is placed at the top of the stage for sending bigger
28 /// batches over the wire.
29 /// If set to 0, batch coalescing is disabled on network shuffle operations.
30 pub shuffle_batch_size: usize, default = 8192
31 /// When encountering a UNION operation, isolate its children depending on the task context.
32 /// For example, on a UNION operation with 3 children running in 3 distributed tasks,
33 /// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
34 /// 1/3, 2/3, and 3/3 respectively, Execute:
35 /// - The first child in the first task with a DistributedTaskContext of 1/1
36 /// - The second child in the second task with a DistributedTaskContext of 1/1
37 /// - The third child in the third task with a DistributedTaskContext of 1/1
38 pub children_isolator_unions: bool, default = true
39 /// Propagate collected metrics from all nodes in the plan across network boundaries
40 /// so that they can be reconstructed on the head node of the plan.
41 pub collect_metrics: bool, default = true
42 /// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
43 /// a CollectLeft join is broadcast to all consumer tasks.
44 /// TODO: This option exists temporarily until we become smarter about when to actually
45 /// use broadcasting like checking build side size.
46 /// For now, broadcasting all CollectLeft joins is not always beneficial.
47 pub broadcast_joins: bool, default = false
48 /// The compression used for sending data over the network between workers.
49 /// It can be set to either `zstd`, `lz4` or `none`.
50 pub compression: String, default = "lz4".to_string()
51 /// Maximum tasks that will be assigned per stage during distributed planning.
52 /// If set to 0, this value is the number of workers returned by the provided `WorkerResolver`.
53 /// It defaults to 0.
54 pub max_tasks_per_stage: usize, default = 0
55 /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
56 /// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
57 pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
58 /// [ChannelResolver] implementation that tells the distributed planner information about
59 /// the available workers ready to execute distributed tasks.
60 pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
61 /// [WorkerResolver] implementation that tells the distributed planner information about
62 /// the available workers ready to execute distributed tasks.
63 pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
64 }
65}
66
67fn files_per_task_default() -> usize {
68 if cfg!(test) || cfg!(feature = "integration") {
69 1
70 } else {
71 get_available_parallelism()
72 }
73}
74
75fn cardinality_task_count_factor_default() -> f64 {
76 if cfg!(test) || cfg!(feature = "integration") {
77 1.5
78 } else {
79 1.0
80 }
81}
82
83impl DistributedConfig {
84 /// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
85 /// order on leaf nodes, and the first one to provide a value is the one that gets to decide
86 /// how many tasks are used for that [Stage].
87 pub fn with_task_estimator(
88 mut self,
89 task_estimator: impl TaskEstimator + Send + Sync + 'static,
90 ) -> Self {
91 self.__private_task_estimator
92 .user_provided
93 .push(Arc::new(task_estimator));
94 self
95 }
96
97 /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
98 pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
99 let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
100 return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
101 };
102 Ok(distributed_cfg)
103 }
104
105 /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
106 pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
107 let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
108 return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
109 };
110 Ok(distributed_cfg)
111 }
112}
113
114impl ConfigExtension for DistributedConfig {
115 const PREFIX: &'static str = "distributed";
116}
117
118// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
119// extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
120// to pass this as ConfigOptions as we need these two fields to be present during
121// planning in the DistributedPhysicalOptimizerRule, and the signature of the optimize()
122// method there accepts a ConfigOptions instead of a SessionConfig.
123// The following PR addresses this: https://github.com/apache/datafusion/pull/18168
124// but it still has not been accepted or merged.
125// Because of this, all the boilerplate trait implementations below are needed.
126impl ConfigField for ChannelResolverExtension {
127 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
128 // nothing to do.
129 }
130
131 fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
132 not_impl_err!("Not implemented")
133 }
134}
135
136impl Debug for ChannelResolverExtension {
137 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
138 write!(f, "ChannelResolverExtension")
139 }
140}
141
142impl ConfigField for WorkerResolverExtension {
143 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
144 // nothing to do.
145 }
146
147 fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
148 not_impl_err!("Not implemented")
149 }
150}
151
152impl Debug for WorkerResolverExtension {
153 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
154 write!(f, "WorkerResolverExtension")
155 }
156}
157
158impl ConfigField for CombinedTaskEstimator {
159 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
160 //nothing to do.
161 }
162
163 fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
164 not_impl_err!("not implemented")
165 }
166}
167
168impl Debug for CombinedTaskEstimator {
169 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170 write!(f, "TaskEstimators")
171 }
172}