1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
use crate::TaskEstimator;
use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
use crate::work_unit_feed::WorkUnitFeedRegistry;
use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
use std::fmt::{Debug, Formatter};
use std::sync::Arc;
extensions_options! {
/// Configuration for the distributed planner.
pub struct DistributedConfig {
/// Sets the number of bytes each partitions is expected to scan from parquet files. If
/// more partitions than the ones available in one machine would be needed, several machines
/// are used, and the scan is distributed.
/// Lowering this number will increase parallelism.
pub file_scan_config_bytes_per_partition: usize, default = 16 * 1024 * 1024
/// Task multiplying factor for when a node declares that it changes the cardinality
/// of the data:
/// - If a node is increasing the cardinality of the data, this factor will increase.
/// - If a node reduces the cardinality of the data, this factor will decrease.
/// - In any other situation, this factor is left intact.
pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
/// When encountering a UNION operation, isolate its children depending on the task context.
/// For example, on a UNION operation with 3 children running in 3 distributed tasks,
/// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
/// 1/3, 2/3, and 3/3 respectively, Execute:
/// - The first child in the first task with a DistributedTaskContext of 1/1
/// - The second child in the second task with a DistributedTaskContext of 1/1
/// - The third child in the third task with a DistributedTaskContext of 1/1
pub children_isolator_unions: bool, default = true
/// Propagate collected metrics from all nodes in the plan across network boundaries
/// so that they can be reconstructed on the head node of the plan.
pub collect_metrics: bool, default = true
/// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
/// a CollectLeft join is broadcast to all consumer tasks.
/// TODO: This option exists temporarily until we become smarter about when to actually
/// use broadcasting like checking build side size.
/// For now, broadcasting all CollectLeft joins is not always beneficial.
pub broadcast_joins: bool, default = false
/// The compression used for sending data over the network between workers.
/// It can be set to either `zstd`, `lz4` or `none`.
pub compression: String, default = "lz4".to_string()
/// Overrides `datafusion.execution.batch_size` for worker-executed stages. Because
/// `RepartitionExec` reads `session_config().batch_size()` at execute time to size its
/// output batches (via its internal `LimitedBatchCoalescer`), this knob lets users tune
/// shuffle batch sizes independently of the global `datafusion.execution.batch_size`.
///
/// Set to 0 (the default) to apply no override and inherit `datafusion.execution.batch_size`.
pub shuffle_batch_size: usize, default = 0
/// Maximum tasks that will be assigned per stage during distributed planning.
/// If set to 0, this value is the number of workers returned by the provided `WorkerResolver`.
/// It defaults to 0.
pub max_tasks_per_stage: usize, default = 0
/// Enable the PartialReduce optimization, which inserts an extra aggregation pass
/// above hash RepartitionExec before network shuffles to reduce shuffle data size.
/// Disabled by default because its effectiveness is workload-dependent: it helps when
/// aggregation significantly reduces cardinality, but adds overhead when it does not.
pub partial_reduce: bool, default = false
/// Soft byte budget that each per-worker connection will buffer in memory before pausing
/// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
/// head-of-line blocking between sibling partitions), so backpressure is enforced
/// globally per [WorkerConnection] using this budget. A single message larger than this
/// budget will still be admitted (otherwise we would livelock), so the actual peak per
/// connection is `worker_connection_buffer_budget_bytes + max_message_size`.
pub worker_connection_buffer_budget_bytes: usize, default = 64 * 1024 * 1024
/// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
/// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
/// [ChannelResolver] implementation that tells the distributed planner information about
/// the available workers ready to execute distributed tasks.
pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
/// [WorkerResolver] implementation that tells the distributed planner information about
/// the available workers ready to execute distributed tasks.
pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
/// [WorkUnitFeedRegistry] that contains a set of getters that, applied to each node in a
/// plan, will return the [crate::WorkUnitFeed]s present in all nodes.
pub(crate) __private_work_unit_feed_registry: WorkUnitFeedRegistry, default = WorkUnitFeedRegistry::default()
}
}
fn cardinality_task_count_factor_default() -> f64 {
if cfg!(test) || cfg!(feature = "integration") {
1.5
} else {
1.0
}
}
impl DistributedConfig {
/// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
/// order on leaf nodes, and the first one to provide a value is the one that gets to decide
/// how many tasks are used for that [Stage].
pub fn with_task_estimator(
mut self,
task_estimator: impl TaskEstimator + Send + Sync + 'static,
) -> Self {
self.__private_task_estimator
.user_provided
.push(Arc::new(task_estimator));
self
}
/// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
};
Ok(distributed_cfg)
}
/// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
};
Ok(distributed_cfg)
}
}
impl ConfigExtension for DistributedConfig {
const PREFIX: &'static str = "distributed";
}
// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
// extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
// to pass this as ConfigOptions as we need these two fields to be present during
// planning in the DistributedQueryPlanner, and the signature of the create_physical_plan()
// method there accepts a SessionState which only provides ConfigOptions.
// The following PR addresses this: https://github.com/apache/datafusion/pull/18168
// but it still has not been accepted or merged.
// Because of this, all the boilerplate trait implementations below are needed.
impl ConfigField for ChannelResolverExtension {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
// nothing to do.
}
fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
not_impl_err!("Not implemented")
}
}
impl Debug for ChannelResolverExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "ChannelResolverExtension")
}
}
impl ConfigField for WorkerResolverExtension {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
// nothing to do.
}
fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
not_impl_err!("Not implemented")
}
}
impl Debug for WorkerResolverExtension {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "WorkerResolverExtension")
}
}
impl ConfigField for CombinedTaskEstimator {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
//nothing to do.
}
fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
not_impl_err!("not implemented")
}
}
impl Debug for CombinedTaskEstimator {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "TaskEstimators")
}
}
impl ConfigField for WorkUnitFeedRegistry {
fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
//nothing to do.
}
fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
not_impl_err!("not implemented")
}
}
impl Debug for WorkUnitFeedRegistry {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "WorkUnitFeedRegistry")
}
}