Skip to main content

datafusion_distributed/distributed_planner/
distributed_config.rs

1use crate::TaskEstimator;
2use crate::distributed_planner::task_estimator::CombinedTaskEstimator;
3use crate::networking::{ChannelResolverExtension, WorkerResolverExtension};
4use crate::work_unit_feed::WorkUnitFeedRegistry;
5use datafusion::common::{DataFusionError, extensions_options, not_impl_err, plan_err};
6use datafusion::config::{ConfigExtension, ConfigField, ConfigOptions, Visit};
7use std::fmt::{Debug, Formatter};
8use std::sync::Arc;
9
10extensions_options! {
11    /// Configuration for the distributed planner.
12    pub struct DistributedConfig {
13        /// Sets the number of bytes each partitions is expected to scan from parquet files. If
14        /// more partitions than the ones available in one machine would be needed, several machines
15        /// are used, and the scan is distributed.
16        /// Lowering this number will increase parallelism.
17        pub file_scan_config_bytes_per_partition: usize, default = 16 * 1024 * 1024
18        /// Task multiplying factor for when a node declares that it changes the cardinality
19        /// of the data:
20        /// - If a node is increasing the cardinality of the data, this factor will increase.
21        /// - If a node reduces the cardinality of the data, this factor will decrease.
22        /// - In any other situation, this factor is left intact.
23        pub cardinality_task_count_factor: f64, default = cardinality_task_count_factor_default()
24        /// When encountering a UNION operation, isolate its children depending on the task context.
25        /// For example, on a UNION operation with 3 children running in 3 distributed tasks,
26        /// instead of executing the 3 children in each 3 tasks with a DistributedTaskContext of
27        /// 1/3, 2/3, and 3/3 respectively, Execute:
28        /// - The first child in the first task with a DistributedTaskContext of 1/1
29        /// - The second child in the second task with a DistributedTaskContext of 1/1
30        /// - The third child in the third task with a DistributedTaskContext of 1/1
31        pub children_isolator_unions: bool, default = true
32        /// Propagate collected metrics from all nodes in the plan across network boundaries
33        /// so that they can be reconstructed on the head node of the plan.
34        pub collect_metrics: bool, default = true
35        /// Enable broadcast joins for CollectLeft hash joins. When enabled, the build side of
36        /// a CollectLeft join is broadcast to all consumer tasks.
37        /// TODO: This option exists temporarily until we become smarter about when to actually
38        /// use broadcasting like checking build side size.
39        /// For now, broadcasting all CollectLeft joins is not always beneficial.
40        pub broadcast_joins: bool, default = false
41        /// The compression used for sending data over the network between workers.
42        /// It can be set to either `zstd`, `lz4` or `none`.
43        pub compression: String, default = "lz4".to_string()
44        /// Overrides `datafusion.execution.batch_size` for worker-executed stages. Because
45        /// `RepartitionExec` reads `session_config().batch_size()` at execute time to size its
46        /// output batches (via its internal `LimitedBatchCoalescer`), this knob lets users tune
47        /// shuffle batch sizes independently of the global `datafusion.execution.batch_size`.
48        ///
49        /// Set to 0 (the default) to apply no override and inherit `datafusion.execution.batch_size`.
50        pub shuffle_batch_size: usize, default = 0
51        /// Maximum tasks that will be assigned per stage during distributed planning.
52        /// If set to 0, this value is the number of workers returned by the provided `WorkerResolver`.
53        /// It defaults to 0.
54        pub max_tasks_per_stage: usize, default = 0
55        /// Enable the PartialReduce optimization, which inserts an extra aggregation pass
56        /// above hash RepartitionExec before network shuffles to reduce shuffle data size.
57        /// Disabled by default because its effectiveness is workload-dependent: it helps when
58        /// aggregation significantly reduces cardinality, but adds overhead when it does not.
59        pub partial_reduce: bool, default = false
60        /// Soft byte budget that each per-worker connection will buffer in memory before pausing
61        /// the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
62        /// head-of-line blocking between sibling partitions), so backpressure is enforced
63        /// globally per [WorkerConnection] using this budget. A single message larger than this
64        /// budget will still be admitted (otherwise we would livelock), so the actual peak per
65        /// connection is `worker_connection_buffer_budget_bytes + max_message_size`.
66        pub worker_connection_buffer_budget_bytes: usize, default = 64 * 1024 * 1024
67        /// Collection of [TaskEstimator]s that will be applied to leaf nodes in order to
68        /// estimate how many tasks should be spawned for the [Stage] containing the leaf node.
69        pub(crate) __private_task_estimator: CombinedTaskEstimator, default = CombinedTaskEstimator::default()
70        /// [ChannelResolver] implementation that tells the distributed planner information about
71        /// the available workers ready to execute distributed tasks.
72        pub(crate) __private_channel_resolver: ChannelResolverExtension, default = ChannelResolverExtension::default()
73        /// [WorkerResolver] implementation that tells the distributed planner information about
74        /// the available workers ready to execute distributed tasks.
75        pub(crate) __private_worker_resolver: WorkerResolverExtension, default = WorkerResolverExtension::not_implemented()
76        /// [WorkUnitFeedRegistry] that contains a set of getters that, applied to each node in a
77        /// plan, will return the [crate::WorkUnitFeed]s present in all nodes.
78        pub(crate) __private_work_unit_feed_registry: WorkUnitFeedRegistry, default = WorkUnitFeedRegistry::default()
79    }
80}
81
82fn cardinality_task_count_factor_default() -> f64 {
83    if cfg!(test) || cfg!(feature = "integration") {
84        1.5
85    } else {
86        1.0
87    }
88}
89
90impl DistributedConfig {
91    /// Appends a [TaskEstimator] to the list. [TaskEstimator] will be executed sequentially in
92    /// order on leaf nodes, and the first one to provide a value is the one that gets to decide
93    /// how many tasks are used for that [Stage].
94    pub fn with_task_estimator(
95        mut self,
96        task_estimator: impl TaskEstimator + Send + Sync + 'static,
97    ) -> Self {
98        self.__private_task_estimator
99            .user_provided
100            .push(Arc::new(task_estimator));
101        self
102    }
103
104    /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
105    pub fn from_config_options(cfg: &ConfigOptions) -> Result<&Self, DataFusionError> {
106        let Some(distributed_cfg) = cfg.extensions.get::<DistributedConfig>() else {
107            return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
108        };
109        Ok(distributed_cfg)
110    }
111
112    /// Gets the [DistributedConfig] from the [ConfigOptions]'s extensions.
113    pub fn from_config_options_mut(cfg: &mut ConfigOptions) -> Result<&mut Self, DataFusionError> {
114        let Some(distributed_cfg) = cfg.extensions.get_mut::<DistributedConfig>() else {
115            return plan_err!("DistributedConfig is not in ConfigOptions.extensions");
116        };
117        Ok(distributed_cfg)
118    }
119}
120
121impl ConfigExtension for DistributedConfig {
122    const PREFIX: &'static str = "distributed";
123}
124
125// FIXME: Ideally, both ChannelResolverExtension and TaskEstimators would be passed as
126//  extensions in SessionConfig's AnyMap instead of the ConfigOptions. However, we need
127//  to pass this as ConfigOptions as we need these two fields to be present during
128//  planning in the DistributedQueryPlanner, and the signature of the create_physical_plan()
129//  method there accepts a SessionState which only provides ConfigOptions.
130//  The following PR addresses this: https://github.com/apache/datafusion/pull/18168
131//  but it still has not been accepted or merged.
132//  Because of this, all the boilerplate trait implementations below are needed.
133impl ConfigField for ChannelResolverExtension {
134    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
135        // nothing to do.
136    }
137
138    fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
139        not_impl_err!("Not implemented")
140    }
141}
142
143impl Debug for ChannelResolverExtension {
144    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
145        write!(f, "ChannelResolverExtension")
146    }
147}
148
149impl ConfigField for WorkerResolverExtension {
150    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
151        // nothing to do.
152    }
153
154    fn set(&mut self, _: &str, _: &str) -> datafusion::common::Result<()> {
155        not_impl_err!("Not implemented")
156    }
157}
158
159impl Debug for WorkerResolverExtension {
160    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
161        write!(f, "WorkerResolverExtension")
162    }
163}
164
165impl ConfigField for CombinedTaskEstimator {
166    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
167        //nothing to do.
168    }
169
170    fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
171        not_impl_err!("not implemented")
172    }
173}
174
175impl Debug for CombinedTaskEstimator {
176    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
177        write!(f, "TaskEstimators")
178    }
179}
180
181impl ConfigField for WorkUnitFeedRegistry {
182    fn visit<V: Visit>(&self, _: &mut V, _: &str, _: &'static str) {
183        //nothing to do.
184    }
185
186    fn set(&mut self, _: &str, _: &str) -> Result<(), DataFusionError> {
187        not_impl_err!("not implemented")
188    }
189}
190
191impl Debug for WorkUnitFeedRegistry {
192    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
193        write!(f, "WorkUnitFeedRegistry")
194    }
195}