Skip to main content

datafusion_distributed/
distributed_ext.rs

1use crate::config_extension_ext::{
2    set_distributed_option_extension, set_distributed_option_extension_from_headers,
3};
4use crate::distributed_planner::set_distributed_task_estimator;
5use crate::networking::{set_distributed_channel_resolver, set_distributed_worker_resolver};
6use crate::passthrough_headers::set_passthrough_headers;
7use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
8use crate::work_unit_feed::set_distributed_work_unit_feed;
9use crate::{
10    ChannelResolver, DistributedConfig, TaskEstimator, WorkUnitFeed, WorkUnitFeedProvider,
11    WorkerResolver,
12};
13use arrow_ipc::CompressionType;
14use datafusion::common::DataFusionError;
15use datafusion::config::ConfigExtension;
16use datafusion::execution::{SessionState, SessionStateBuilder};
17use datafusion::physical_plan::ExecutionPlan;
18use datafusion::prelude::{SessionConfig, SessionContext};
19use datafusion_proto::physical_plan::PhysicalExtensionCodec;
20use delegate::delegate;
21use http::HeaderMap;
22use std::sync::Arc;
23
24/// Extends DataFusion with distributed capabilities.
25pub trait DistributedExt: Sized {
26    /// Adds the provided [ConfigExtension] to the distributed context. The [ConfigExtension] will
27    /// be serialized using gRPC metadata and sent across tasks. Users are expected to call this
28    /// method with their own extensions to be able to access them in any place in the
29    /// plan.
30    ///
31    /// This method also adds the provided [ConfigExtension] to the current session option
32    /// extensions, the same as calling [SessionConfig::with_option_extension].
33    ///
34    /// Example:
35    ///
36    /// ```rust
37    /// # use async_trait::async_trait;
38    /// # use datafusion::common::{extensions_options, DataFusionError};
39    /// # use datafusion::config::ConfigExtension;
40    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
41    /// # use datafusion::prelude::SessionConfig;
42    /// # use datafusion_distributed::{DistributedExt, WorkerSessionBuilder, WorkerQueryContext};
43    ///
44    /// extensions_options! {
45    ///     pub struct CustomExtension {
46    ///         pub foo: String, default = "".to_string()
47    ///         pub bar: usize, default = 0
48    ///         pub baz: bool, default = false
49    ///     }
50    /// }
51    ///
52    /// impl ConfigExtension for CustomExtension {
53    ///     const PREFIX: &'static str = "custom";
54    /// }
55    ///
56    /// let mut my_custom_extension = CustomExtension::default();
57    /// // Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
58    /// // Flight request, it will be sent through gRPC metadata.
59    /// let state = SessionStateBuilder::new()
60    ///     .with_distributed_option_extension(my_custom_extension)
61    ///     .build();
62    ///
63    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
64    ///     // This function can be provided to a Worker to tell it how to
65    ///     // build sessions that retrieve the CustomExtension from gRPC metadata.
66    ///     Ok(ctx
67    ///         .builder
68    ///         .with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
69    ///         .build())
70    /// }
71    /// ```
72    fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
73
74    /// Same as [DistributedExt::with_distributed_option_extension] but with an in-place mutation
75    fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
76
77    /// Adds the provided [ConfigExtension] to the distributed context. The [ConfigExtension] will
78    /// be serialized using gRPC metadata and sent across tasks. Users are expected to call this
79    /// method with their own extensions to be able to access them in any place in the
80    /// plan.
81    ///
82    /// - If there was a [ConfigExtension] of the same type already present, it's updated with an
83    ///   in-place mutation based on the headers that came over the wire.
84    /// - If there was no [ConfigExtension] set before, it will get added, as if
85    ///   [SessionConfig::with_option_extension] was being called.
86    ///
87    /// Example:
88    ///
89    /// ```rust
90    /// # use async_trait::async_trait;
91    /// # use datafusion::common::{extensions_options, DataFusionError};
92    /// # use datafusion::config::ConfigExtension;
93    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
94    /// # use datafusion::prelude::SessionConfig;
95    /// # use datafusion_distributed::{DistributedExt, WorkerSessionBuilder, WorkerQueryContext};
96    ///
97    /// extensions_options! {
98    ///     pub struct CustomExtension {
99    ///         pub foo: String, default = "".to_string()
100    ///         pub bar: usize, default = 0
101    ///         pub baz: bool, default = false
102    ///     }
103    /// }
104    ///
105    /// impl ConfigExtension for CustomExtension {
106    ///     const PREFIX: &'static str = "custom";
107    /// }
108    ///
109    /// let mut my_custom_extension = CustomExtension::default();
110    /// // Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
111    /// // Flight request, it will be sent through gRPC metadata.
112    /// let state = SessionStateBuilder::new()
113    ///     .with_distributed_option_extension(my_custom_extension)
114    ///     .build();
115    ///
116    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
117    ///     // This function can be provided to a Worker to tell it how to
118    ///     // build sessions that retrieve the CustomExtension from gRPC metadata.
119    ///     Ok(ctx
120    ///         .builder
121    ///         .with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
122    ///         .build())
123    /// }
124    /// ```
125    fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
126        self,
127        headers: &HeaderMap,
128    ) -> Result<Self, DataFusionError>;
129
130    /// Same as [DistributedExt::with_distributed_option_extension_from_headers] but with an in-place mutation
131    fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
132        &mut self,
133        headers: &HeaderMap,
134    ) -> Result<(), DataFusionError>;
135
136    /// Injects a user-defined [PhysicalExtensionCodec] that is capable of encoding/decoding
137    /// custom execution nodes. Multiple user-defined [PhysicalExtensionCodec] can be added
138    /// by calling this method several times.
139    ///
140    /// Example:
141    ///
142    /// ```
143    /// # use std::sync::Arc;
144    /// # use datafusion::common::DataFusionError;
145    /// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder, TaskContext};
146    /// # use datafusion::physical_plan::ExecutionPlan;
147    /// # use datafusion::prelude::SessionConfig;
148    /// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
149    /// # use datafusion_distributed::{DistributedExt, WorkerQueryContext};
150    ///
151    /// #[derive(Debug)]
152    /// struct CustomExecCodec;
153    ///
154    /// impl PhysicalExtensionCodec for CustomExecCodec {
155    ///     fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
156    ///         todo!()
157    ///     }
158    ///
159    ///     fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
160    ///         todo!()
161    ///     }
162    /// }
163    ///
164    /// let state = SessionStateBuilder::new()
165    ///     .with_distributed_user_codec(CustomExecCodec)
166    ///     .build();
167    ///
168    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
169    ///     // This function can be provided to a Worker to tell it how to
170    ///     // encode/decode CustomExec nodes.
171    ///     Ok(SessionStateBuilder::new()
172    ///         .with_distributed_user_codec(CustomExecCodec)
173    ///         .build())
174    /// }
175    /// ```
176    fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
177
178    /// Same as [DistributedExt::with_distributed_user_codec] but with an in-place mutation
179    fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
180
181    /// Same as [DistributedExt::with_distributed_user_codec] but with a dynamic argument.
182    fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
183
184    /// Same as [DistributedExt::set_distributed_user_codec] but with a dynamic argument.
185    fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
186
187    /// This is what tells Distributed DataFusion the URLs of the workers available for serving queries.
188    ///
189    /// It injects a [WorkerResolver] implementation for Distributed DataFusion to resolve worker
190    /// nodes in the cluster. When running in distributed mode, setting a [WorkerResolver] is required.
191    ///
192    /// Even if this is required to be present in the [SessionContext] that first initiates and
193    /// plans the query, it's not necessary to be present in a Worker's session state builder,
194    /// as no planning happens there.
195    ///
196    /// Example:
197    ///
198    /// ```
199    /// # use async_trait::async_trait;
200    /// # use datafusion::common::DataFusionError;
201    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
202    /// # use datafusion::prelude::SessionConfig;
203    /// # use url::Url;
204    /// # use std::sync::Arc;
205    /// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext};
206    ///
207    /// struct CustomWorkerResolver;
208    ///
209    /// #[async_trait]
210    /// impl WorkerResolver for CustomWorkerResolver {
211    ///     fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
212    ///         todo!()
213    ///     }
214    /// }
215    ///
216    /// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
217    /// let state = SessionStateBuilder::new()
218    ///     .with_distributed_worker_resolver(CustomWorkerResolver)
219    ///     .with_distributed_planner()
220    ///     .build();
221    /// ```
222    fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
223        self,
224        resolver: T,
225    ) -> Self;
226
227    /// Same as [DistributedExt::with_distributed_channel_resolver] but with an in-place mutation.
228    fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
229        &mut self,
230        resolver: T,
231    );
232
233    /// This is what tells Distributed DataFusion how to build a Worker gRPC client out of a worker URL.
234    ///
235    /// There's a default implementation that caches the Worker client instances so that there's
236    /// only one per URL, but users can decide to override that behavior in favor of their own solution.
237    ///
238    /// Example:
239    ///
240    /// ```
241    /// # use async_trait::async_trait;
242    /// # use datafusion::common::DataFusionError;
243    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
244    /// # use datafusion::prelude::SessionConfig;
245    /// # use url::Url;
246    /// # use std::sync::Arc;
247    /// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, SessionStateBuilderExt, WorkerQueryContext, WorkerServiceClient};
248    ///
249    /// struct CustomChannelResolver;
250    ///
251    /// #[async_trait]
252    /// impl ChannelResolver for CustomChannelResolver {
253    ///     async fn get_worker_client_for_url(&self, url: &Url) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
254    ///         // Build a custom WorkerServiceClient wrapped with tower layers or something similar.
255    ///         todo!()
256    ///     }
257    /// }
258    ///
259    /// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
260    /// let state = SessionStateBuilder::new()
261    ///     .with_distributed_channel_resolver(CustomChannelResolver)
262    ///     .with_distributed_planner()
263    ///     .build();
264    ///
265    /// // This function can be provided to a Worker so that, upon receiving a distributed
266    /// // part of a plan, it knows how to resolve gRPC channels from URLs for making network calls to other nodes.
267    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
268    ///     // If you have a custom channel resolver, it should also be passed in the
269    ///     // Worker session builder.
270    ///     Ok(ctx
271    ///         .builder
272    ///         .with_distributed_channel_resolver(CustomChannelResolver)
273    ///         .build())
274    /// }
275    /// ```
276    fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
277        self,
278        resolver: T,
279    ) -> Self;
280
281    /// Same as [DistributedExt::with_distributed_channel_resolver] but with an in-place mutation.
282    fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
283        &mut self,
284        resolver: T,
285    );
286
287    /// Adds a distributed task count estimator. [TaskEstimator]s are executed on each node
288    /// sequentially until one returns an estimation on the number of tasks that should be
289    /// used for the stage containing that node.
290    ///
291    /// Many nodes might decide to provide an estimation, so a reconciliation between all of them
292    /// is performed internally during planning.
293    ///
294    /// ```text
295    ///     ┌───────────────────────┐
296    ///     │SortPreservingMergeExec│
297    ///     └───────────────────────┘
298    ///                 ▲
299    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
300    ///     ┌───────────┴───────────┐    │
301    /// │   │       SortExec        │
302    ///     └───────────────────────┘    │
303    /// │   ┌───────────────────────┐
304    ///     │     AggregateExec     │    │
305    /// │   └───────────────────────┘
306    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
307    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
308    ///     ┌───────────────────────┐    │
309    /// │   │      FilterExec       │
310    ///     └───────────────────────┘    │
311    /// │   ┌───────────────────────┐       a TaskEstimator estimates the amount of tasks
312    ///     │       SomeExec        │◀───┼──  based on how much data will be pulled.
313    /// │   └───────────────────────┘
314    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
315    /// ```
316    fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
317        self,
318        estimator: T,
319    ) -> Self;
320
321    /// Same as [DistributedExt::with_distributed_task_estimator] but with an in-place mutation.
322    fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
323        &mut self,
324        estimator: T,
325    );
326
327    /// Sets the number of bytes each partition in a stage with a FileScanConfig node is
328    /// expected to scan. A task runs `target_partitions` partitions, so the task count is
329    /// roughly `total_scan_bytes / bytes_per_partition / target_partitions` (capped at the
330    /// number of available workers). Reducing this number increases the amount of tasks.
331    ///
332    /// ```text
333    ///     ┌───────────────────────┐
334    ///     │SortPreservingMergeExec│
335    ///     └───────────────────────┘
336    ///                 ▲
337    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
338    ///     ┌───────────┴───────────┐    │
339    /// │   │       SortExec        │
340    ///     └───────────────────────┘    │
341    /// │   ┌───────────────────────┐
342    ///     │     AggregateExec     │    │
343    /// │   └───────────────────────┘
344    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
345    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
346    ///     ┌───────────────────────┐    │
347    /// │   │      FilterExec       │
348    ///     └───────────────────────┘    │
349    /// │   ┌───────────────────────┐        Sets the bytes scanned per
350    ///     │    FileScanConfig     │◀───┼─   partition. Less
351    /// │   └───────────────────────┘        bytes_per_partition == more tasks
352    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
353    ///```
354    fn with_distributed_file_scan_config_bytes_per_partition(
355        self,
356        bytes_per_partition: usize,
357    ) -> Result<Self, DataFusionError>;
358
359    /// Same as [DistributedExt::with_distributed_file_scan_config_bytes_per_partition] but with an in-place mutation.
360    fn set_distributed_file_scan_config_bytes_per_partition(
361        &mut self,
362        bytes_per_partition: usize,
363    ) -> Result<(), DataFusionError>;
364
365    /// The number of tasks in each stage is calculated in a bottom-to-top fashion.
366    ///
367    /// Bottom stages containing leaf nodes will provide an estimation of the amount of tasks
368    /// for those stages, but upper stages might see a reduction (or increment) in the amount
369    /// of tasks based on the cardinality effect bottom stages have in the data.
370    ///
371    /// For example: If there are two stages, and the leaf stage is estimated to use 10 tasks,
372    ///  the upper stage might use less (e.g. 5) if it sees that the leaf stage is returning
373    ///  less data because of filters or aggregations.
374    ///
375    /// This function sets the scale factor for when encountering these nodes that change the
376    /// cardinality of the data. For example, if a stage with 10 tasks contains an AggregateExec
377    /// node, and the scale factor is 2.0, the following stage will use  10 / 2.0 = 5 tasks.
378    ///
379    /// ```text
380    ///     ┌───────────────────────┐
381    ///     │SortPreservingMergeExec│
382    ///     └───────────────────────┘
383    ///                 ▲
384    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2 (N/scale_factor tasks)
385    ///     ┌───────────┴───────────┐    │
386    /// │   │       SortExec        │
387    ///     └───────────────────────┘    │
388    /// │   ┌───────────────────────┐
389    ///     │     AggregateExec     │    │
390    /// │   └───────────────────────┘
391    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
392    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1 (N tasks)
393    ///     ┌───────────────────────┐    │       A filter reduces cardinality,
394    /// │   │      FilterExec       │◀────────therefore the next stage will have
395    ///     └───────────────────────┘    │    less tasks according to this factor
396    /// │   ┌───────────────────────┐
397    ///     │    FileScanConfig     │    │
398    /// │   └───────────────────────┘
399    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
400    /// ```
401    fn with_distributed_cardinality_effect_task_scale_factor(
402        self,
403        factor: f64,
404    ) -> Result<Self, DataFusionError>;
405
406    /// Same as [DistributedExt::with_distributed_cardinality_effect_task_scale_factor] but with
407    /// an in-place mutation.
408    fn set_distributed_cardinality_effect_task_scale_factor(
409        &mut self,
410        factor: f64,
411    ) -> Result<(), DataFusionError>;
412
413    /// Enables metrics collection across network boundaries so that all the metrics gather in
414    /// each node are accessible from the head stage that started running the query.
415    fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
416
417    /// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
418    fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
419
420    /// Enables children isolator unions for distributing UNION operations across as many tasks as
421    /// the sum of all the tasks required for each child.
422    ///
423    /// For example, if there is a UNION with 3 children, requiring one task each, it will result
424    /// in a plan with 3 tasks where each task runs one child:
425    ///
426    /// ```text
427    /// ┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
428    /// │           Task 1            ││           Task 2            ││           Task 3            │
429    /// │┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
430    /// ││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
431    /// │└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
432    /// │    │                        ││              │              ││                        │    │
433    /// │┌───┴───┐ ┌  ─│ ─   ┌  ─│ ─  ││┌  ─│ ─   ┌───┴───┐ ┌  ─│ ─  ││┌  ─│ ─   ┌  ─│ ─   ┌───┴───┐│
434    /// ││Child 1│  Child 2│  Child 3│││ Child 1│ │Child 2│  Child 3│││ Child 1│  Child 2│ │Child 3││
435    /// │└───────┘ └  ─  ─   └  ─  ─  ││└  ─  ─   └───────┘ └  ─  ─  ││└  ─  ─   └  ─  ─   └───────┘│
436    /// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
437    /// ```
438    fn with_distributed_children_isolator_unions(
439        self,
440        enabled: bool,
441    ) -> Result<Self, DataFusionError>;
442
443    /// Same as [DistributedExt::with_distributed_children_isolator_unions] but with an in-place mutation.
444    fn set_distributed_children_isolator_unions(
445        &mut self,
446        enabled: bool,
447    ) -> Result<(), DataFusionError>;
448
449    /// Enables broadcast joins for CollectLeft hash joins. When enabled, the build side of
450    /// a CollectLeft join is broadcast to all consumer tasks instead of being coalesced
451    /// into a single partition.
452    ///
453    /// Note: This option is disabled by default until the implementation is smarter about when to
454    /// broadcast.
455    fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
456
457    /// Same as [DistributedExt::with_distributed_broadcast_joins_enabled] but with an in-place mutation.
458    fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
459
460    /// The compression type to use for sending data over the wire.
461    ///
462    /// The default is [CompressionType::LZ4_FRAME].
463    fn with_distributed_compression(
464        self,
465        compression: Option<CompressionType>,
466    ) -> Result<Self, DataFusionError>;
467
468    /// Same as [DistributedExt::with_distributed_compression] but with an in-place mutation.
469    fn set_distributed_compression(
470        &mut self,
471        compression: Option<CompressionType>,
472    ) -> Result<(), DataFusionError>;
473
474    /// Overrides `datafusion.execution.batch_size` for worker-executed stages, letting users
475    /// tune shuffle batch sizes (specifically `RepartitionExec`'s output batching via its
476    /// internal `LimitedBatchCoalescer`) independently of the global batch size.
477    ///
478    /// Set to 0 (the default) to apply no override.
479    fn with_distributed_shuffle_batch_size(
480        self,
481        batch_size: usize,
482    ) -> Result<Self, DataFusionError>;
483
484    /// Same as [DistributedExt::with_distributed_shuffle_batch_size] but with an in-place mutation.
485    fn set_distributed_shuffle_batch_size(
486        &mut self,
487        batch_size: usize,
488    ) -> Result<(), DataFusionError>;
489
490    /// Sets arbitrary HTTP headers that will be forwarded unchanged to worker nodes.
491    /// These headers are included in outgoing Arrow Flight requests to workers.
492    ///
493    /// Returns an error if any header name starts with the reserved prefix
494    /// `x-datafusion-distributed-config-`, which is used internally.
495    ///
496    /// Example:
497    ///
498    /// ```rust
499    /// # use datafusion::execution::SessionStateBuilder;
500    /// # use datafusion_distributed::DistributedExt;
501    /// # use http::HeaderMap;
502    ///
503    /// let mut passthrough = HeaderMap::new();
504    /// passthrough.insert("x-custom-priority", "high".parse().unwrap());
505    ///
506    /// let state = SessionStateBuilder::new()
507    ///     .with_distributed_passthrough_headers(passthrough)
508    ///     .unwrap()
509    ///     .build();
510    /// ```
511    fn with_distributed_passthrough_headers(
512        self,
513        headers: HeaderMap,
514    ) -> Result<Self, DataFusionError>;
515
516    /// Same as [DistributedExt::with_distributed_passthrough_headers] but with an in-place mutation.
517    fn set_distributed_passthrough_headers(
518        &mut self,
519        headers: HeaderMap,
520    ) -> Result<(), DataFusionError>;
521
522    /// Sets the maximum tasks that will be assigned for each stage.
523    ///
524    /// If not specified, the number of workers returned by the provided [WorkerResolver] is taken.
525    fn with_distributed_max_tasks_per_stage(
526        self,
527        max_tasks_per_stage: usize,
528    ) -> Result<Self, DataFusionError>;
529
530    /// Same as [DistributedExt::with_distributed_max_tasks_per_stage] but with an in-place mutation.
531    fn set_distributed_max_tasks_per_stage(
532        &mut self,
533        max_tasks_per_stage: usize,
534    ) -> Result<(), DataFusionError>;
535
536    /// Enables or disables the PartialReduce optimization, which inserts an extra aggregation
537    /// pass above hash RepartitionExec before network shuffles to reduce shuffle data size.
538    /// Disabled by default because its effectiveness is workload-dependent: it helps when
539    /// aggregation significantly reduces cardinality, but adds overhead when it does not.
540    fn with_distributed_partial_reduce(self, enabled: bool) -> Result<Self, DataFusionError>;
541
542    /// Same as [DistributedExt::with_distributed_partial_reduce] but with an in-place mutation.
543    fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
544
545    /// Sets the soft byte budget that each per-worker connection will buffer in memory before
546    /// pausing the gRPC pull from that worker. Per-partition channels are unbounded (to avoid
547    /// head-of-line blocking between sibling partitions), so backpressure is enforced globally
548    /// per worker connection using this budget.
549    fn with_distributed_worker_connection_buffer_budget_bytes(
550        self,
551        budget_bytes: usize,
552    ) -> Result<Self, DataFusionError>;
553
554    /// Same as [DistributedExt::with_distributed_worker_connection_buffer_budget_bytes] but with
555    /// an in-place mutation.
556    fn set_distributed_worker_connection_buffer_budget_bytes(
557        &mut self,
558        budget_bytes: usize,
559    ) -> Result<(), DataFusionError>;
560
561    /// Registers a [WorkUnitFeed] so that Distributed DataFusion can discover it while traversing
562    /// plans. For more info, refer to [WorkUnitFeed] docs.
563    ///
564    /// This method uses some type system trickery so that users can provide a callback like this:
565    ///
566    /// ```ignore
567    /// # use datafusion::execution::SessionStateBuilder;
568    ///
569    /// SessionStateBuilder::new()
570    ///     .with_distributed_work_unit_feed(|p: &MyCustomPlan| &p.my_work_unit_feed);
571    /// ```
572    fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Self
573    where
574        T: ExecutionPlan + 'static,
575        P: WorkUnitFeedProvider + 'static,
576        P::WorkUnit: 'static,
577        F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
578
579    /// Same as [DistributedExt::with_distributed_work_unit_feed] but with an in-place mutation.
580    fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
581    where
582        T: ExecutionPlan + 'static,
583        P: WorkUnitFeedProvider + 'static,
584        P::WorkUnit: 'static,
585        F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
586}
587
588impl DistributedExt for SessionConfig {
589    fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) {
590        set_distributed_option_extension(self, t)
591    }
592
593    fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
594        &mut self,
595        headers: &HeaderMap,
596    ) -> Result<(), DataFusionError> {
597        set_distributed_option_extension_from_headers::<T>(self, headers)?;
598        Ok(())
599    }
600
601    fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
602        set_distributed_user_codec(self, codec)
603    }
604
605    fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>) {
606        set_distributed_user_codec_arc(self, codec)
607    }
608
609    fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
610        &mut self,
611        resolver: T,
612    ) {
613        set_distributed_worker_resolver(self, resolver);
614    }
615
616    fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
617        &mut self,
618        resolver: T,
619    ) {
620        set_distributed_channel_resolver(self, resolver);
621    }
622
623    fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
624        &mut self,
625        estimator: T,
626    ) {
627        set_distributed_task_estimator(self, estimator)
628    }
629
630    fn set_distributed_file_scan_config_bytes_per_partition(
631        &mut self,
632        bytes_per_partition: usize,
633    ) -> Result<(), DataFusionError> {
634        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
635        d_cfg.file_scan_config_bytes_per_partition = bytes_per_partition;
636        Ok(())
637    }
638
639    fn set_distributed_cardinality_effect_task_scale_factor(
640        &mut self,
641        factor: f64,
642    ) -> Result<(), DataFusionError> {
643        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
644        d_cfg.cardinality_task_count_factor = factor;
645        Ok(())
646    }
647
648    fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
649        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
650        d_cfg.collect_metrics = enabled;
651        Ok(())
652    }
653
654    fn set_distributed_children_isolator_unions(
655        &mut self,
656        enabled: bool,
657    ) -> Result<(), DataFusionError> {
658        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
659        d_cfg.children_isolator_unions = enabled;
660        Ok(())
661    }
662
663    fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError> {
664        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
665        d_cfg.broadcast_joins = enabled;
666        Ok(())
667    }
668
669    fn set_distributed_compression(
670        &mut self,
671        compression: Option<CompressionType>,
672    ) -> Result<(), DataFusionError> {
673        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
674        d_cfg.compression = match compression {
675            Some(CompressionType::ZSTD) => "zstd".to_string(),
676            Some(CompressionType::LZ4_FRAME) => "lz4".to_string(),
677            _ => "none".to_string(),
678        };
679        Ok(())
680    }
681
682    fn set_distributed_shuffle_batch_size(
683        &mut self,
684        batch_size: usize,
685    ) -> Result<(), DataFusionError> {
686        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
687        d_cfg.shuffle_batch_size = batch_size;
688        Ok(())
689    }
690
691    fn set_distributed_passthrough_headers(
692        &mut self,
693        headers: HeaderMap,
694    ) -> Result<(), DataFusionError> {
695        set_passthrough_headers(self, headers)
696    }
697
698    fn set_distributed_max_tasks_per_stage(
699        &mut self,
700        max_tasks_per_stage: usize,
701    ) -> Result<(), DataFusionError> {
702        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
703        d_cfg.max_tasks_per_stage = max_tasks_per_stage;
704        Ok(())
705    }
706
707    fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError> {
708        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
709        d_cfg.partial_reduce = enabled;
710        Ok(())
711    }
712
713    fn set_distributed_worker_connection_buffer_budget_bytes(
714        &mut self,
715        budget_bytes: usize,
716    ) -> Result<(), DataFusionError> {
717        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
718        d_cfg.worker_connection_buffer_budget_bytes = budget_bytes;
719        Ok(())
720    }
721
722    fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
723    where
724        T: ExecutionPlan + 'static,
725        P: WorkUnitFeedProvider + 'static,
726        P::WorkUnit: 'static,
727        F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
728    {
729        set_distributed_work_unit_feed(self, move |plan: &Arc<dyn ExecutionPlan>| {
730            plan.downcast_ref::<T>().and_then(&getter)
731        })
732    }
733
734    delegate! {
735        to self {
736            #[call(set_distributed_option_extension)]
737            #[expr($;self)]
738            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
739
740            #[call(set_distributed_option_extension_from_headers)]
741            #[expr($?;Ok(self))]
742            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
743
744            #[call(set_distributed_user_codec)]
745            #[expr($;self)]
746            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
747
748            #[call(set_distributed_user_codec_arc)]
749            #[expr($;self)]
750            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
751
752            #[call(set_distributed_worker_resolver)]
753            #[expr($;self)]
754            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
755
756            #[call(set_distributed_channel_resolver)]
757            #[expr($;self)]
758            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
759
760            #[call(set_distributed_task_estimator)]
761            #[expr($;self)]
762            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
763
764            #[call(set_distributed_file_scan_config_bytes_per_partition)]
765            #[expr($?;Ok(self))]
766            fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
767
768            #[call(set_distributed_cardinality_effect_task_scale_factor)]
769            #[expr($?;Ok(self))]
770            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
771
772            #[call(set_distributed_metrics_collection)]
773            #[expr($?;Ok(self))]
774            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
775
776            #[call(set_distributed_children_isolator_unions)]
777            #[expr($?;Ok(self))]
778            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
779
780            #[call(set_distributed_broadcast_joins)]
781            #[expr($?;Ok(self))]
782            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
783
784            #[call(set_distributed_compression)]
785            #[expr($?;Ok(self))]
786            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
787
788            #[call(set_distributed_shuffle_batch_size)]
789            #[expr($?;Ok(self))]
790            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
791
792            #[call(set_distributed_passthrough_headers)]
793            #[expr($?;Ok(self))]
794            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
795
796            #[call(set_distributed_max_tasks_per_stage)]
797            #[expr($?;Ok(self))]
798            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
799
800            #[call(set_distributed_partial_reduce)]
801            #[expr($?;Ok(self))]
802            fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
803
804            #[call(set_distributed_worker_connection_buffer_budget_bytes)]
805            #[expr($?;Ok(self))]
806            fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
807
808            #[call(set_distributed_work_unit_feed)]
809            #[expr($;self)]
810            fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
811            where
812                T: ExecutionPlan + 'static,
813                P: WorkUnitFeedProvider + 'static,
814                P::WorkUnit: 'static,
815                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
816        }
817    }
818}
819
820impl DistributedExt for SessionStateBuilder {
821    delegate! {
822        to self.config().get_or_insert_default() {
823            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
824            #[call(set_distributed_option_extension)]
825            #[expr($;self)]
826            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
827
828            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
829            #[call(set_distributed_option_extension_from_headers)]
830            #[expr($?;Ok(self))]
831            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
832
833            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
834            #[call(set_distributed_user_codec)]
835            #[expr($;self)]
836            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
837
838            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
839            #[call(set_distributed_user_codec_arc)]
840            #[expr($;self)]
841            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
842
843            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
844            #[call(set_distributed_worker_resolver)]
845            #[expr($;self)]
846            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
847
848            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
849            #[call(set_distributed_channel_resolver)]
850            #[expr($;self)]
851            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
852
853            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
854            #[call(set_distributed_task_estimator)]
855            #[expr($;self)]
856            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
857
858            fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
859            #[call(set_distributed_file_scan_config_bytes_per_partition)]
860            #[expr($?;Ok(self))]
861            fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
862
863            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
864            #[call(set_distributed_cardinality_effect_task_scale_factor)]
865            #[expr($?;Ok(self))]
866            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
867
868            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
869            #[call(set_distributed_metrics_collection)]
870            #[expr($?;Ok(self))]
871            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
872
873            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
874            #[call(set_distributed_children_isolator_unions)]
875            #[expr($?;Ok(self))]
876            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
877
878            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
879            #[call(set_distributed_broadcast_joins)]
880            #[expr($?;Ok(self))]
881            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
882
883            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
884            #[call(set_distributed_compression)]
885            #[expr($?;Ok(self))]
886            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
887
888            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
889            #[call(set_distributed_shuffle_batch_size)]
890            #[expr($?;Ok(self))]
891            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
892
893            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
894            #[call(set_distributed_passthrough_headers)]
895            #[expr($?;Ok(self))]
896            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
897
898            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
899            #[call(set_distributed_max_tasks_per_stage)]
900            #[expr($?;Ok(self))]
901            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
902
903            fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
904            #[call(set_distributed_partial_reduce)]
905            #[expr($?;Ok(self))]
906            fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
907
908            fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
909            #[call(set_distributed_worker_connection_buffer_budget_bytes)]
910            #[expr($?;Ok(self))]
911            fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
912
913            fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
914            where
915                T: ExecutionPlan + 'static,
916                P: WorkUnitFeedProvider + 'static,
917                P::WorkUnit: 'static,
918                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
919            #[call(set_distributed_work_unit_feed)]
920            #[expr($;self)]
921            fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
922            where
923                T: ExecutionPlan + 'static,
924                P: WorkUnitFeedProvider + 'static,
925                P::WorkUnit: 'static,
926                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
927        }
928    }
929}
930
931impl DistributedExt for SessionState {
932    delegate! {
933        to self.config_mut() {
934            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
935            #[call(set_distributed_option_extension)]
936            #[expr($;self)]
937            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
938
939            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
940            #[call(set_distributed_option_extension_from_headers)]
941            #[expr($?;Ok(self))]
942            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
943
944            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
945            #[call(set_distributed_user_codec)]
946            #[expr($;self)]
947            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
948
949            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
950            #[call(set_distributed_user_codec_arc)]
951            #[expr($;self)]
952            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
953
954            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
955            #[call(set_distributed_worker_resolver)]
956            #[expr($;self)]
957            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
958
959            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
960            #[call(set_distributed_channel_resolver)]
961            #[expr($;self)]
962            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
963
964            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
965            #[call(set_distributed_task_estimator)]
966            #[expr($;self)]
967            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
968
969            fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
970            #[call(set_distributed_file_scan_config_bytes_per_partition)]
971            #[expr($?;Ok(self))]
972            fn with_distributed_file_scan_config_bytes_per_partition(mut self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
973
974            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
975            #[call(set_distributed_cardinality_effect_task_scale_factor)]
976            #[expr($?;Ok(self))]
977            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
978
979            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
980            #[call(set_distributed_metrics_collection)]
981            #[expr($?;Ok(self))]
982            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
983
984            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
985            #[call(set_distributed_children_isolator_unions)]
986            #[expr($?;Ok(self))]
987            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
988
989            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
990            #[call(set_distributed_broadcast_joins)]
991            #[expr($?;Ok(self))]
992            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
993
994            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
995            #[call(set_distributed_compression)]
996            #[expr($?;Ok(self))]
997            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
998
999            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
1000            #[call(set_distributed_shuffle_batch_size)]
1001            #[expr($?;Ok(self))]
1002            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
1003
1004            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
1005            #[call(set_distributed_passthrough_headers)]
1006            #[expr($?;Ok(self))]
1007            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
1008
1009            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
1010            #[call(set_distributed_max_tasks_per_stage)]
1011            #[expr($?;Ok(self))]
1012            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
1013
1014            fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1015            #[call(set_distributed_partial_reduce)]
1016            #[expr($?;Ok(self))]
1017            fn with_distributed_partial_reduce(mut self, enabled: bool) -> Result<Self, DataFusionError>;
1018
1019            fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1020            #[call(set_distributed_worker_connection_buffer_budget_bytes)]
1021            #[expr($?;Ok(self))]
1022            fn with_distributed_worker_connection_buffer_budget_bytes(mut self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1023
1024            fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
1025            where
1026                T: ExecutionPlan + 'static,
1027                P: WorkUnitFeedProvider + 'static,
1028                P::WorkUnit: 'static,
1029                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1030            #[call(set_distributed_work_unit_feed)]
1031            #[expr($;self)]
1032            fn with_distributed_work_unit_feed<T, P, F>(mut self, getter: F) -> Self
1033            where
1034                T: ExecutionPlan + 'static,
1035                P: WorkUnitFeedProvider + 'static,
1036                P::WorkUnit: 'static,
1037                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1038        }
1039    }
1040}
1041
1042impl DistributedExt for SessionContext {
1043    delegate! {
1044        to self.state_ref().write().config_mut() {
1045            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
1046            #[call(set_distributed_option_extension)]
1047            #[expr($;self)]
1048            fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
1049
1050            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
1051            #[call(set_distributed_option_extension_from_headers)]
1052            #[expr($?;Ok(self))]
1053            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
1054
1055            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
1056            #[call(set_distributed_user_codec)]
1057            #[expr($;self)]
1058            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
1059
1060            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
1061            #[call(set_distributed_user_codec_arc)]
1062            #[expr($;self)]
1063            fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
1064
1065            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
1066            #[call(set_distributed_worker_resolver)]
1067            #[expr($;self)]
1068            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
1069
1070            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
1071            #[call(set_distributed_channel_resolver)]
1072            #[expr($;self)]
1073            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
1074
1075            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
1076            #[call(set_distributed_task_estimator)]
1077            #[expr($;self)]
1078            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;
1079
1080            fn set_distributed_file_scan_config_bytes_per_partition(&mut self, bytes_per_partition: usize) -> Result<(), DataFusionError>;
1081            #[call(set_distributed_file_scan_config_bytes_per_partition)]
1082            #[expr($?;Ok(self))]
1083            fn with_distributed_file_scan_config_bytes_per_partition(self, bytes_per_partition: usize) -> Result<Self, DataFusionError>;
1084
1085            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
1086            #[call(set_distributed_cardinality_effect_task_scale_factor)]
1087            #[expr($?;Ok(self))]
1088            fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
1089
1090            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1091            #[call(set_distributed_metrics_collection)]
1092            #[expr($?;Ok(self))]
1093            fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
1094
1095            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1096            #[call(set_distributed_children_isolator_unions)]
1097            #[expr($?;Ok(self))]
1098            fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;
1099
1100            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1101            #[call(set_distributed_broadcast_joins)]
1102            #[expr($?;Ok(self))]
1103            fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
1104
1105            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
1106            #[call(set_distributed_compression)]
1107            #[expr($?;Ok(self))]
1108            fn with_distributed_compression(self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
1109
1110            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
1111            #[call(set_distributed_shuffle_batch_size)]
1112            #[expr($?;Ok(self))]
1113            fn with_distributed_shuffle_batch_size(self, batch_size: usize) -> Result<Self, DataFusionError>;
1114
1115            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
1116            #[call(set_distributed_passthrough_headers)]
1117            #[expr($?;Ok(self))]
1118            fn with_distributed_passthrough_headers(self, headers: HeaderMap) -> Result<Self, DataFusionError>;
1119
1120            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
1121            #[call(set_distributed_max_tasks_per_stage)]
1122            #[expr($?;Ok(self))]
1123            fn with_distributed_max_tasks_per_stage(self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
1124
1125            fn set_distributed_partial_reduce(&mut self, enabled: bool) -> Result<(), DataFusionError>;
1126            #[call(set_distributed_partial_reduce)]
1127            #[expr($?;Ok(self))]
1128            fn with_distributed_partial_reduce(self, enabled: bool) -> Result<Self, DataFusionError>;
1129
1130            fn set_distributed_worker_connection_buffer_budget_bytes(&mut self, budget_bytes: usize) -> Result<(), DataFusionError>;
1131            #[call(set_distributed_worker_connection_buffer_budget_bytes)]
1132            #[expr($?;Ok(self))]
1133            fn with_distributed_worker_connection_buffer_budget_bytes(self, budget_bytes: usize) -> Result<Self, DataFusionError>;
1134
1135            fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
1136            where
1137                T: ExecutionPlan + 'static,
1138                P: WorkUnitFeedProvider + 'static,
1139                P::WorkUnit: 'static,
1140                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1141            #[call(set_distributed_work_unit_feed)]
1142            #[expr($;self)]
1143            fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Self
1144            where
1145                T: ExecutionPlan + 'static,
1146                P: WorkUnitFeedProvider + 'static,
1147                P::WorkUnit: 'static,
1148                F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
1149        }
1150    }
1151}