datafusion_distributed/distributed_planner/distributed_config.rs
1use crate::TaskEstimator;
2use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
3use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
4use crate::work_unit_feed::WorkUnitFeedRegistry;
5use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
6use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
7use std::fmt::{Debug, Formatter};
8use std::sync::Arc;
9
10extensions_options! {
11 /// Configuration for the distributed planner.
12 pub struct DistributedConfig {
13 /// Sets the number of bytes each partitions is expected to scan from parquet files. If
14 /// more partitions than the ones available in one machine would be needed, several machines
15 /// are used, and the scan is distributed.
16 /// Lowering this number will increase parallelism.
17 pub file_scan_config_bytes_per_partition: usize, default = 16 * 1024 * 1024
18 /// Task multiplying factor for when a node declares that it changes the cardinality
19 /// of the data:
20 /// - If a node is increasing the cardinality of the data, this factor will increase.
21 /// - If a node reduces the cardinality of the data, this factor will decrease.
22 /// - In any other situation, this factor is left intact.
23 pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
24 /// When encountering a UNION operation, isolate its children depending on the task context.
25 /// For example, on a UNION operation with 3 children running in 3 distributed tasks,
26 /// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
27 /// 1/3, 2/3, and 3/3 respectively, Execute:
28 /// - The first child in the first task with a DistributedTaskContext of 1/1
29 /// - The second child in the second task with a DistributedTaskContext of 1/1
30 /// - The third child in the third task with a DistributedTaskContext of 1/1
31 pub children_isolator_unions: bool, default = true
32 /// Propagate collected metrics from all nodes in the plan across network boundaries
33 /// so that they can be reconstructed on the head node of the plan.
34 pub collect_metrics: bool, default = true
35 /// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
36 /// a CollectLeft join is broadcast to all consumer tasks.
37 /// TODO: This option exists temporarily until we become smarter about when to actually
38 /// use broadcasting like checking build side size.
39 /// For now, broadcasting all CollectLeft joins is not always beneficial.
40 pub broadcast_joins: bool, default = false
41 /// The compression used for sending data over the network between workers.
42 /// It can be set to either `zstd`, `lz4` or `none`.
43 pub compression: String, default = "lz4".to_string()
44 /// Overrides `datafusion.execution.batch_size` for worker-executed stages. Because
45 /// `RepartitionExec` reads `session_config().batch_size()` at execute time to size its
46 /// output batches (via its internal `LimitedBatchCoalescer`), this knob lets users tune
47 /// shuffle batch sizes independently of the global `datafusion.execution.batch_size`.
48 ///
49 /// Set to 0 (the default) to apply no override and inherit `datafusion.execution.batch_size`.
50 pub shuffle_batch_size: usize, default = 0
51 /// Maximum tasks that will be assigned per stage during distributed planning.
52 /// If set to 0, this value is the number of workers returned by the provided `WorkerResolver`.
53 /// It defaults to 0.
54 pub max_tasks_per_stage: usize, default = 0
55 /// Enable the PartialReduce optimization, which inserts an extra aggregation pass
56 /// above hash RepartitionExec before network shuffles to reduce shuffle data size.
57 /// Disabled by default because its effectiveness is workload-dependent: it helps when
58 /// aggregation significantly reduces cardinality, but adds overhead when it does not.
59 pub partial_reduce: bool, default = false
60 /// Soft byte budget that each per-worker connection will buffer in memory before pausing
61 /// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
62 /// head-of-line blocking between sibling partitions), so backpressure is enforced
63 /// globally per [WorkerConnection] using this budget. A single message larger than this
64 /// budget will still be admitted (otherwise we would livelock), so the actual peak per
65 /// connection is `worker_connection_buffer_budget_bytes + max_message_size`.
66 pub worker_connection_buffer_budget_bytes: usize, default = 64 * 1024 * 1024
67 /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
68 /// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
69 pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
70 /// [ChannelResolver] implementation that tells the distributed planner information about
71 /// the available workers ready to execute distributed tasks.
72 pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
73 /// [WorkerResolver] implementation that tells the distributed planner information about
74 /// the available workers ready to execute distributed tasks.
75 pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
76 /// [WorkUnitFeedRegistry] that contains a set of getters that, applied to each node in a
77 /// plan, will return the [crate::WorkUnitFeed]s present in all nodes.
78 pub(crate) __private_work_unit_feed_registry: WorkUnitFeedRegistry, default = WorkUnitFeedRegistry::default()
79 }
80}
81
82fn cardinality_task_count_factor_default() -> f64 {
83 if cfg!(test) || cfg!(feature = "integration") {
84 1.5
85 } else {
86 1.0
87 }
88}
89
90impl DistributedConfig {
91 /// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
92 /// order on leaf nodes, and the first one to provide a value is the one that gets to decide
93 /// how many tasks are used for that [Stage].
94 pub fn with_task_estimator(
95 mut self,
96 task_estimator: impl TaskEstimator + Send + Sync + 'static,
97 ) -> Self {
98 self.__private_task_estimator
99 .user_provided
100 .push(Arc::new(task_estimator));
101 self
102 }
103
104 /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
105 pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
106 let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
107 return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
108 };
109 Ok(distributed_cfg)
110 }
111
112 /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
113 pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
114 let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
115 return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
116 };
117 Ok(distributed_cfg)
118 }
119}
120
121impl ConfigExtension for DistributedConfig {
122 const PREFIX: &'static str = "distributed";
123}
124
125// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
126// extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
127// to pass this as ConfigOptions as we need these two fields to be present during
128// planning in the DistributedQueryPlanner, and the signature of the create_physical_plan()
129// method there accepts a SessionState which only provides ConfigOptions.
130// The following PR addresses this: https://github.com/apache/datafusion/pull/18168
131// but it still has not been accepted or merged.
132// Because of this, all the boilerplate trait implementations below are needed.
133impl ConfigField for ChannelResolverExtension {
134 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
135 // nothing to do.
136 }
137
138 fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
139 not_impl_err!("Not implemented")
140 }
141}
142
143impl Debug for ChannelResolverExtension {
144 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145 write!(f, "ChannelResolverExtension")
146 }
147}
148
149impl ConfigField for WorkerResolverExtension {
150 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
151 // nothing to do.
152 }
153
154 fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
155 not_impl_err!("Not implemented")
156 }
157}
158
159impl Debug for WorkerResolverExtension {
160 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
161 write!(f, "WorkerResolverExtension")
162 }
163}
164
165impl ConfigField for CombinedTaskEstimator {
166 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
167 //nothing to do.
168 }
169
170 fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
171 not_impl_err!("not implemented")
172 }
173}
174
175impl Debug for CombinedTaskEstimator {
176 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177 write!(f, "TaskEstimators")
178 }
179}
180
181impl ConfigField for WorkUnitFeedRegistry {
182 fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
183 //nothing to do.
184 }
185
186 fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
187 not_impl_err!("not implemented")
188 }
189}
190
191impl Debug for WorkUnitFeedRegistry {
192 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193 write!(f, "WorkUnitFeedRegistry")
194 }
195}