Skip to main content

datafusion_distributed/
distributed_ext.rs

1use crate::config_extension_ext::{
2    set_distributed_option_extension, set_distributed_option_extension_from_headers,
3};
4use crate::distributed_planner::set_distributed_task_estimator;
5use crate::networking::{set_distributed_channel_resolver, set_distributed_worker_resolver};
6use crate::passthrough_headers::set_passthrough_headers;
7use crate::protobuf::{set_distributed_user_codec, set_distributed_user_codec_arc};
8use crate::{ChannelResolver, DistributedConfig, TaskEstimator, WorkerResolver};
9use arrow_ipc::CompressionType;
10use datafusion::common::DataFusionError;
11use datafusion::config::ConfigExtension;
12use datafusion::execution::{SessionState, SessionStateBuilder};
13use datafusion::prelude::{SessionConfig, SessionContext};
14use datafusion_proto::physical_plan::PhysicalExtensionCodec;
15use delegate::delegate;
16use http::HeaderMap;
17use std::sync::Arc;
18
19/// Extends DataFusion with distributed capabilities.
20pub trait DistributedExt: Sized {
21    /// Adds the provided [ConfigExtension] to the distributed context. The [ConfigExtension] will
22    /// be serialized using gRPC metadata and sent across tasks. Users are expected to call this
23    /// method with their own extensions to be able to access them in any place in the
24    /// plan.
25    ///
26    /// This method also adds the provided [ConfigExtension] to the current session option
27    /// extensions, the same as calling [SessionConfig::with_option_extension].
28    ///
29    /// Example:
30    ///
31    /// ```rust
32    /// # use async_trait::async_trait;
33    /// # use datafusion::common::{extensions_options, DataFusionError};
34    /// # use datafusion::config::ConfigExtension;
35    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
36    /// # use datafusion::prelude::SessionConfig;
37    /// # use datafusion_distributed::{DistributedExt, WorkerSessionBuilder, WorkerQueryContext};
38    ///
39    /// extensions_options! {
40    ///     pub struct CustomExtension {
41    ///         pub foo: String, default = "".to_string()
42    ///         pub bar: usize, default = 0
43    ///         pub baz: bool, default = false
44    ///     }
45    /// }
46    ///
47    /// impl ConfigExtension for CustomExtension {
48    ///     const PREFIX: &'static str = "custom";
49    /// }
50    ///
51    /// let mut my_custom_extension = CustomExtension::default();
52    /// // Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
53    /// // Flight request, it will be sent through gRPC metadata.
54    /// let state = SessionStateBuilder::new()
55    ///     .with_distributed_option_extension(my_custom_extension)
56    ///     .build();
57    ///
58    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
59    ///     // This function can be provided to a Worker to tell it how to
60    ///     // build sessions that retrieve the CustomExtension from gRPC metadata.
61    ///     Ok(ctx
62    ///         .builder
63    ///         .with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
64    ///         .build())
65    /// }
66    /// ```
67    fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
68
69    /// Same as [DistributedExt::with_distributed_option_extension] but with an in-place mutation
70    fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
71
72    /// Adds the provided [ConfigExtension] to the distributed context. The [ConfigExtension] will
73    /// be serialized using gRPC metadata and sent across tasks. Users are expected to call this
74    /// method with their own extensions to be able to access them in any place in the
75    /// plan.
76    ///
77    /// - If there was a [ConfigExtension] of the same type already present, it's updated with an
78    ///   in-place mutation based on the headers that came over the wire.
79    /// - If there was no [ConfigExtension] set before, it will get added, as if
80    ///   [SessionConfig::with_option_extension] was being called.
81    ///
82    /// Example:
83    ///
84    /// ```rust
85    /// # use async_trait::async_trait;
86    /// # use datafusion::common::{extensions_options, DataFusionError};
87    /// # use datafusion::config::ConfigExtension;
88    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
89    /// # use datafusion::prelude::SessionConfig;
90    /// # use datafusion_distributed::{DistributedExt, WorkerSessionBuilder, WorkerQueryContext};
91    ///
92    /// extensions_options! {
93    ///     pub struct CustomExtension {
94    ///         pub foo: String, default = "".to_string()
95    ///         pub bar: usize, default = 0
96    ///         pub baz: bool, default = false
97    ///     }
98    /// }
99    ///
100    /// impl ConfigExtension for CustomExtension {
101    ///     const PREFIX: &'static str = "custom";
102    /// }
103    ///
104    /// let mut my_custom_extension = CustomExtension::default();
105    /// // Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
106    /// // Flight request, it will be sent through gRPC metadata.
107    /// let state = SessionStateBuilder::new()
108    ///     .with_distributed_option_extension(my_custom_extension)
109    ///     .build();
110    ///
111    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
112    ///     // This function can be provided to a Worker to tell it how to
113    ///     // build sessions that retrieve the CustomExtension from gRPC metadata.
114    ///     Ok(ctx
115    ///         .builder
116    ///         .with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
117    ///         .build())
118    /// }
119    /// ```
120    fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
121        self,
122        headers: &HeaderMap,
123    ) -> Result<Self, DataFusionError>;
124
125    /// Same as [DistributedExt::with_distributed_option_extension_from_headers] but with an in-place mutation
126    fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
127        &mut self,
128        headers: &HeaderMap,
129    ) -> Result<(), DataFusionError>;
130
131    /// Injects a user-defined [PhysicalExtensionCodec] that is capable of encoding/decoding
132    /// custom execution nodes. Multiple user-defined [PhysicalExtensionCodec] can be added
133    /// by calling this method several times.
134    ///
135    /// Example:
136    ///
137    /// ```
138    /// # use std::sync::Arc;
139    /// # use datafusion::common::DataFusionError;
140    /// # use datafusion::execution::{SessionState, FunctionRegistry, SessionStateBuilder, TaskContext};
141    /// # use datafusion::physical_plan::ExecutionPlan;
142    /// # use datafusion::prelude::SessionConfig;
143    /// # use datafusion_proto::physical_plan::PhysicalExtensionCodec;
144    /// # use datafusion_distributed::{DistributedExt, WorkerQueryContext};
145    ///
146    /// #[derive(Debug)]
147    /// struct CustomExecCodec;
148    ///
149    /// impl PhysicalExtensionCodec for CustomExecCodec {
150    ///     fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
151    ///         todo!()
152    ///     }
153    ///
154    ///     fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
155    ///         todo!()
156    ///     }
157    /// }
158    ///
159    /// let state = SessionStateBuilder::new()
160    ///     .with_distributed_user_codec(CustomExecCodec)
161    ///     .build();
162    ///
163    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
164    ///     // This function can be provided to a Worker to tell it how to
165    ///     // encode/decode CustomExec nodes.
166    ///     Ok(SessionStateBuilder::new()
167    ///         .with_distributed_user_codec(CustomExecCodec)
168    ///         .build())
169    /// }
170    /// ```
171    fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
172
173    /// Same as [DistributedExt::with_distributed_user_codec] but with an in-place mutation
174    fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
175
176    /// Same as [DistributedExt::with_distributed_user_codec] but with a dynamic argument.
177    fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
178
179    /// Same as [DistributedExt::set_distributed_user_codec] but with a dynamic argument.
180    fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
181
182    /// This is what tells Distributed DataFusion the URLs of the workers available for serving queries.
183    ///
184    /// It injects a [WorkerResolver] implementation for Distributed DataFusion to resolve worker
185    /// nodes in the cluster. When running in distributed mode, setting a [WorkerResolver] is required.
186    ///
187    /// Even if this is required to be present in the [SessionContext] that first initiates and
188    /// plans the query, it's not necessary to be present in a Worker's session state builder,
189    /// as no planning happens there.
190    ///
191    /// Example:
192    ///
193    /// ```
194    /// # use async_trait::async_trait;
195    /// # use datafusion::common::DataFusionError;
196    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
197    /// # use datafusion::prelude::SessionConfig;
198    /// # use url::Url;
199    /// # use std::sync::Arc;
200    /// # use datafusion_distributed::{BoxCloneSyncChannel, WorkerResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext};
201    ///
202    /// struct CustomWorkerResolver;
203    ///
204    /// #[async_trait]
205    /// impl WorkerResolver for CustomWorkerResolver {
206    ///     fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
207    ///         todo!()
208    ///     }
209    /// }
210    ///
211    /// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
212    /// let state = SessionStateBuilder::new()
213    ///     .with_distributed_worker_resolver(CustomWorkerResolver)
214    ///     // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
215    ///     // get distributed.
216    ///     .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
217    ///     .build();
218    /// ```
219    fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
220        self,
221        resolver: T,
222    ) -> Self;
223
224    /// Same as [DistributedExt::with_distributed_channel_resolver] but with an in-place mutation.
225    fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
226        &mut self,
227        resolver: T,
228    );
229
230    /// This is what tells Distributed DataFusion how to build a Worker gRPC client out of a worker URL.
231    ///
232    /// There's a default implementation that caches the Worker client instances so that there's
233    /// only one per URL, but users can decide to override that behavior in favor of their own solution.
234    ///
235    /// Example:
236    ///
237    /// ```
238    /// # use async_trait::async_trait;
239    /// # use datafusion::common::DataFusionError;
240    /// # use datafusion::execution::{SessionState, SessionStateBuilder};
241    /// # use datafusion::prelude::SessionConfig;
242    /// # use url::Url;
243    /// # use std::sync::Arc;
244    /// # use datafusion_distributed::{BoxCloneSyncChannel, ChannelResolver, DistributedExt, DistributedPhysicalOptimizerRule, WorkerQueryContext, WorkerServiceClient};
245    ///
246    /// struct CustomChannelResolver;
247    ///
248    /// #[async_trait]
249    /// impl ChannelResolver for CustomChannelResolver {
250    ///     async fn get_worker_client_for_url(&self, url: &Url) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
251    ///         // Build a custom WorkerServiceClient wrapped with tower layers or something similar.
252    ///         todo!()
253    ///     }
254    /// }
255    ///
256    /// // This tweaks the SessionState so that it can plan for distributed queries and execute them.
257    /// let state = SessionStateBuilder::new()
258    ///     .with_distributed_channel_resolver(CustomChannelResolver)
259    ///     // the DistributedPhysicalOptimizerRule also needs to be passed so that query plans
260    ///     // get distributed.
261    ///     .with_physical_optimizer_rule(Arc::new(DistributedPhysicalOptimizerRule))
262    ///     .build();
263    ///
264    /// // This function can be provided to a Worker so that, upon receiving a distributed
265    /// // part of a plan, it knows how to resolve gRPC channels from URLs for making network calls to other nodes.
266    /// async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
267    ///     // If you have a custom channel resolver, it should also be passed in the
268    ///     // Worker session builder.
269    ///     Ok(ctx
270    ///         .builder
271    ///         .with_distributed_channel_resolver(CustomChannelResolver)
272    ///         .build())
273    /// }
274    /// ```
275    fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
276        self,
277        resolver: T,
278    ) -> Self;
279
280    /// Same as [DistributedExt::with_distributed_channel_resolver] but with an in-place mutation.
281    fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
282        &mut self,
283        resolver: T,
284    );
285
286    /// Adds a distributed task count estimator. [TaskEstimator]s are executed on each node
287    /// sequentially until one returns an estimation on the number of tasks that should be
288    /// used for the stage containing that node.
289    ///
290    /// Many nodes might decide to provide an estimation, so a reconciliation between all of them
291    /// is performed internally during planning.
292    ///
293    /// ```text
294    ///     ┌───────────────────────┐
295    ///     │SortPreservingMergeExec│
296    ///     └───────────────────────┘
297    ///                 ▲
298    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
299    ///     ┌───────────┴───────────┐    │
300    /// │   │       SortExec        │
301    ///     └───────────────────────┘    │
302    /// │   ┌───────────────────────┐
303    ///     │     AggregateExec     │    │
304    /// │   └───────────────────────┘
305    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
306    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
307    ///     ┌───────────────────────┐    │
308    /// │   │      FilterExec       │
309    ///     └───────────────────────┘    │
310    /// │   ┌───────────────────────┐       a TaskEstimator estimates the amount of tasks
311    ///     │       SomeExec        │◀───┼──  based on how much data will be pulled.
312    /// │   └───────────────────────┘
313    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
314    /// ```
315    fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
316        self,
317        estimator: T,
318    ) -> Self;
319
320    /// Same as [DistributedExt::with_distributed_task_estimator] but with an in-place mutation.
321    fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
322        &mut self,
323        estimator: T,
324    );
325
326    /// Sets the maximum number of files each task in a stage with a FileScanConfig node will
327    /// handle. Reducing this number will increment the amount of tasks. By default, this
328    /// is close to the number of cores in the machine.
329    ///
330    /// ```text
331    ///     ┌───────────────────────┐
332    ///     │SortPreservingMergeExec│
333    ///     └───────────────────────┘
334    ///                 ▲
335    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
336    ///     ┌───────────┴───────────┐    │
337    /// │   │       SortExec        │
338    ///     └───────────────────────┘    │
339    /// │   ┌───────────────────────┐
340    ///     │     AggregateExec     │    │
341    /// │   └───────────────────────┘
342    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
343    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
344    ///     ┌───────────────────────┐    │
345    /// │   │      FilterExec       │
346    ///     └───────────────────────┘    │
347    /// │   ┌───────────────────────┐        Sets the max number of files
348    ///     │    FileScanConfig     │◀───┼─   each task will handle. Less
349    /// │   └───────────────────────┘        files_per_task == more tasks
350    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
351    ///```
352    fn with_distributed_files_per_task(
353        self,
354        files_per_task: usize,
355    ) -> Result<Self, DataFusionError>;
356
357    /// Same as [DistributedExt::with_distributed_files_per_task] but with an in-place mutation.
358    fn set_distributed_files_per_task(
359        &mut self,
360        files_per_task: usize,
361    ) -> Result<(), DataFusionError>;
362
363    /// The number of tasks in each stage is calculated in a bottom-to-top fashion.
364    ///
365    /// Bottom stages containing leaf nodes will provide an estimation of the amount of tasks
366    /// for those stages, but upper stages might see a reduction (or increment) in the amount
367    /// of tasks based on the cardinality effect bottom stages have in the data.
368    ///
369    /// For example: If there are two stages, and the leaf stage is estimated to use 10 tasks,
370    ///  the upper stage might use less (e.g. 5) if it sees that the leaf stage is returning
371    ///  less data because of filters or aggregations.
372    ///
373    /// This function sets the scale factor for when encountering these nodes that change the
374    /// cardinality of the data. For example, if a stage with 10 tasks contains an AggregateExec
375    /// node, and the scale factor is 2.0, the following stage will use  10 / 2.0 = 5 tasks.
376    ///
377    /// ```text
378    ///     ┌───────────────────────┐
379    ///     │SortPreservingMergeExec│
380    ///     └───────────────────────┘
381    ///                 ▲
382    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2 (N/scale_factor tasks)
383    ///     ┌───────────┴───────────┐    │
384    /// │   │       SortExec        │
385    ///     └───────────────────────┘    │
386    /// │   ┌───────────────────────┐
387    ///     │     AggregateExec     │    │
388    /// │   └───────────────────────┘
389    ///  ─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
390    /// ┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1 (N tasks)
391    ///     ┌───────────────────────┐    │       A filter reduces cardinality,
392    /// │   │      FilterExec       │◀────────therefore the next stage will have
393    ///     └───────────────────────┘    │    less tasks according to this factor
394    /// │   ┌───────────────────────┐
395    ///     │    FileScanConfig     │    │
396    /// │   └───────────────────────┘
397    ///  ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
398    /// ```
399    fn with_distributed_cardinality_effect_task_scale_factor(
400        self,
401        factor: f64,
402    ) -> Result<Self, DataFusionError>;
403
404    /// Same as [DistributedExt::with_distributed_cardinality_effect_task_scale_factor] but with
405    /// an in-place mutation.
406    fn set_distributed_cardinality_effect_task_scale_factor(
407        &mut self,
408        factor: f64,
409    ) -> Result<(), DataFusionError>;
410
411    /// Enables metrics collection across network boundaries so that all the metrics gather in
412    /// each node are accessible from the head stage that started running the query.
413    fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
414
415    /// Same as [DistributedExt::with_distributed_metrics_collection] but with an in-place mutation.
416    fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
417
418    /// Enables children isolator unions for distributing UNION operations across as many tasks as
419    /// the sum of all the tasks required for each child.
420    ///
421    /// For example, if there is a UNION with 3 children, requiring one task each, it will result
422    /// in a plan with 3 tasks where each task runs one child:
423    ///
424    /// ```text
425    /// ┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
426    /// │           Task 1            ││           Task 2            ││           Task 3            │
427    /// │┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
428    /// ││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
429    /// │└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
430    /// │    │                        ││              │              ││                        │    │
431    /// │┌───┴───┐ ┌  ─│ ─   ┌  ─│ ─  ││┌  ─│ ─   ┌───┴───┐ ┌  ─│ ─  ││┌  ─│ ─   ┌  ─│ ─   ┌───┴───┐│
432    /// ││Child 1│  Child 2│  Child 3│││ Child 1│ │Child 2│  Child 3│││ Child 1│  Child 2│ │Child 3││
433    /// │└───────┘ └  ─  ─   └  ─  ─  ││└  ─  ─   └───────┘ └  ─  ─  ││└  ─  ─   └  ─  ─   └───────┘│
434    /// └─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘
435    /// ```
436    fn with_distributed_children_isolator_unions(
437        self,
438        enabled: bool,
439    ) -> Result<Self, DataFusionError>;
440
441    /// Same as [DistributedExt::with_distributed_children_isolator_unions] but with an in-place mutation.
442    fn set_distributed_children_isolator_unions(
443        &mut self,
444        enabled: bool,
445    ) -> Result<(), DataFusionError>;
446
447    /// Enables broadcast joins for CollectLeft hash joins. When enabled, the build side of
448    /// a CollectLeft join is broadcast to all consumer tasks instead of being coalesced
449    /// into a single partition.
450    ///
451    /// Note: This option is disabled by default until the implementation is smarter about when to
452    /// broadcast.
453    fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
454
455    /// Same as [DistributedExt::with_distributed_broadcast_joins_enabled] but with an in-place mutation.
456    fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
457
458    /// The compression type to use for sending data over the wire.
459    ///
460    /// The default is [CompressionType::LZ4_FRAME].
461    fn with_distributed_compression(
462        self,
463        compression: Option<CompressionType>,
464    ) -> Result<Self, DataFusionError>;
465
466    /// Same as [DistributedExt::with_distributed_compression] but with an in-place mutation.
467    fn set_distributed_compression(
468        &mut self,
469        compression: Option<CompressionType>,
470    ) -> Result<(), DataFusionError>;
471
472    /// How many rows to collect in each record batch before sending it over the wire in a
473    /// shuffle operation. This value defaults to the same as `datafusion.execution.batch_size`.
474    ///
475    /// Setting it to something smaller than `datafusion.execution.batch_size` has no effect.
476    ///
477    /// It's preferable to set `datafusion.execution.batch_size` directly instead of this
478    /// parameter if the specific use case allows it.
479    fn with_distributed_shuffle_batch_size(
480        self,
481        batch_size: usize,
482    ) -> Result<Self, DataFusionError>;
483
484    /// Same as [DistributedExt::with_distributed_shuffle_batch_size] but with an in-place mutation.
485    fn set_distributed_shuffle_batch_size(
486        &mut self,
487        batch_size: usize,
488    ) -> Result<(), DataFusionError>;
489
490    /// Sets arbitrary HTTP headers that will be forwarded unchanged to worker nodes.
491    /// These headers are included in outgoing Arrow Flight requests to workers.
492    ///
493    /// Returns an error if any header name starts with the reserved prefix
494    /// `x-datafusion-distributed-config-`, which is used internally.
495    ///
496    /// Example:
497    ///
498    /// ```rust
499    /// # use datafusion::execution::SessionStateBuilder;
500    /// # use datafusion_distributed::DistributedExt;
501    /// # use http::HeaderMap;
502    ///
503    /// let mut passthrough = HeaderMap::new();
504    /// passthrough.insert("x-custom-priority", "high".parse().unwrap());
505    ///
506    /// let state = SessionStateBuilder::new()
507    ///     .with_distributed_passthrough_headers(passthrough)
508    ///     .unwrap()
509    ///     .build();
510    /// ```
511    fn with_distributed_passthrough_headers(
512        self,
513        headers: HeaderMap,
514    ) -> Result<Self, DataFusionError>;
515
516    /// Same as [DistributedExt::with_distributed_passthrough_headers] but with an in-place mutation.
517    fn set_distributed_passthrough_headers(
518        &mut self,
519        headers: HeaderMap,
520    ) -> Result<(), DataFusionError>;
521
522    /// Sets the maximum tasks that will be assigned for each stage.
523    ///
524    /// If not specified, the number of workers returned by the provided [WorkerResolver] is taken.
525    fn with_distributed_max_tasks_per_stage(
526        self,
527        max_tasks_per_stage: usize,
528    ) -> Result<Self, DataFusionError>;
529
530    /// Same as [DistributedExt::with_distributed_max_tasks_per_stage] but with an in-place mutation.
531    fn set_distributed_max_tasks_per_stage(
532        &mut self,
533        max_tasks_per_stage: usize,
534    ) -> Result<(), DataFusionError>;
535}
536
537impl DistributedExt for SessionConfig {
538    fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T) {
539        set_distributed_option_extension(self, t)
540    }
541
542    fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
543        &mut self,
544        headers: &HeaderMap,
545    ) -> Result<(), DataFusionError> {
546        set_distributed_option_extension_from_headers::<T>(self, headers)?;
547        Ok(())
548    }
549
550    fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T) {
551        set_distributed_user_codec(self, codec)
552    }
553
554    fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>) {
555        set_distributed_user_codec_arc(self, codec)
556    }
557
558    fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
559        &mut self,
560        resolver: T,
561    ) {
562        set_distributed_worker_resolver(self, resolver);
563    }
564
565    fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
566        &mut self,
567        resolver: T,
568    ) {
569        set_distributed_channel_resolver(self, resolver);
570    }
571
572    fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
573        &mut self,
574        estimator: T,
575    ) {
576        set_distributed_task_estimator(self, estimator)
577    }
578
579    fn set_distributed_files_per_task(
580        &mut self,
581        files_per_task: usize,
582    ) -> Result<(), DataFusionError> {
583        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
584        d_cfg.files_per_task = files_per_task;
585        Ok(())
586    }
587
588    fn set_distributed_cardinality_effect_task_scale_factor(
589        &mut self,
590        factor: f64,
591    ) -> Result<(), DataFusionError> {
592        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
593        d_cfg.cardinality_task_count_factor = factor;
594        Ok(())
595    }
596
597    fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError> {
598        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
599        d_cfg.collect_metrics = enabled;
600        Ok(())
601    }
602
603    fn set_distributed_children_isolator_unions(
604        &mut self,
605        enabled: bool,
606    ) -> Result<(), DataFusionError> {
607        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
608        d_cfg.children_isolator_unions = enabled;
609        Ok(())
610    }
611
612    fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError> {
613        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
614        d_cfg.broadcast_joins = enabled;
615        Ok(())
616    }
617
618    fn set_distributed_compression(
619        &mut self,
620        compression: Option<CompressionType>,
621    ) -> Result<(), DataFusionError> {
622        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
623        d_cfg.compression = match compression {
624            Some(CompressionType::ZSTD) => "zstd".to_string(),
625            Some(CompressionType::LZ4_FRAME) => "lz4".to_string(),
626            _ => "none".to_string(),
627        };
628        Ok(())
629    }
630
631    fn set_distributed_shuffle_batch_size(
632        &mut self,
633        batch_size: usize,
634    ) -> Result<(), DataFusionError> {
635        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
636        d_cfg.shuffle_batch_size = batch_size;
637        Ok(())
638    }
639
640    fn set_distributed_passthrough_headers(
641        &mut self,
642        headers: HeaderMap,
643    ) -> Result<(), DataFusionError> {
644        set_passthrough_headers(self, headers)
645    }
646
647    fn set_distributed_max_tasks_per_stage(
648        &mut self,
649        max_tasks_per_stage: usize,
650    ) -> Result<(), DataFusionError> {
651        let d_cfg = DistributedConfig::from_config_options_mut(self.options_mut())?;
652        d_cfg.max_tasks_per_stage = max_tasks_per_stage;
653        Ok(())
654    }
655
656    delegate! {
657        to self {
658            #[call(set_distributed_option_extension)]
659            #[expr($;self)]
660            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
661
662            #[call(set_distributed_option_extension_from_headers)]
663            #[expr($?;Ok(self))]
664            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
665
666            #[call(set_distributed_user_codec)]
667            #[expr($;self)]
668            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
669
670            #[call(set_distributed_user_codec_arc)]
671            #[expr($;self)]
672            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
673
674            #[call(set_distributed_worker_resolver)]
675            #[expr($;self)]
676            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
677
678            #[call(set_distributed_channel_resolver)]
679            #[expr($;self)]
680            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
681
682            #[call(set_distributed_task_estimator)]
683            #[expr($;self)]
684            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
685
686            #[call(set_distributed_files_per_task)]
687            #[expr($?;Ok(self))]
688            fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
689
690            #[call(set_distributed_cardinality_effect_task_scale_factor)]
691            #[expr($?;Ok(self))]
692            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
693
694            #[call(set_distributed_metrics_collection)]
695            #[expr($?;Ok(self))]
696            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
697
698            #[call(set_distributed_children_isolator_unions)]
699            #[expr($?;Ok(self))]
700            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
701
702            #[call(set_distributed_broadcast_joins)]
703            #[expr($?;Ok(self))]
704            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
705
706            #[call(set_distributed_compression)]
707            #[expr($?;Ok(self))]
708            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
709
710            #[call(set_distributed_shuffle_batch_size)]
711            #[expr($?;Ok(self))]
712            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
713
714            #[call(set_distributed_passthrough_headers)]
715            #[expr($?;Ok(self))]
716            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
717
718            #[call(set_distributed_max_tasks_per_stage)]
719            #[expr($?;Ok(self))]
720            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
721        }
722    }
723}
724
725impl DistributedExt for SessionStateBuilder {
726    delegate! {
727        to self.config().get_or_insert_default() {
728            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
729            #[call(set_distributed_option_extension)]
730            #[expr($;self)]
731            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
732
733            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
734            #[call(set_distributed_option_extension_from_headers)]
735            #[expr($?;Ok(self))]
736            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
737
738            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
739            #[call(set_distributed_user_codec)]
740            #[expr($;self)]
741            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
742
743            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
744            #[call(set_distributed_user_codec_arc)]
745            #[expr($;self)]
746            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
747
748            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
749            #[call(set_distributed_worker_resolver)]
750            #[expr($;self)]
751            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
752
753            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
754            #[call(set_distributed_channel_resolver)]
755            #[expr($;self)]
756            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
757
758            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
759            #[call(set_distributed_task_estimator)]
760            #[expr($;self)]
761            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
762
763            fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
764            #[call(set_distributed_files_per_task)]
765            #[expr($?;Ok(self))]
766            fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
767
768            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
769            #[call(set_distributed_cardinality_effect_task_scale_factor)]
770            #[expr($?;Ok(self))]
771            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
772
773            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
774            #[call(set_distributed_metrics_collection)]
775            #[expr($?;Ok(self))]
776            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
777
778            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
779            #[call(set_distributed_children_isolator_unions)]
780            #[expr($?;Ok(self))]
781            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
782
783            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
784            #[call(set_distributed_broadcast_joins)]
785            #[expr($?;Ok(self))]
786            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
787
788            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
789            #[call(set_distributed_compression)]
790            #[expr($?;Ok(self))]
791            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
792
793            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
794            #[call(set_distributed_shuffle_batch_size)]
795            #[expr($?;Ok(self))]
796            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
797
798            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
799            #[call(set_distributed_passthrough_headers)]
800            #[expr($?;Ok(self))]
801            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
802
803            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
804            #[call(set_distributed_max_tasks_per_stage)]
805            #[expr($?;Ok(self))]
806            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
807        }
808    }
809}
810
811impl DistributedExt for SessionState {
812    delegate! {
813        to self.config_mut() {
814            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
815            #[call(set_distributed_option_extension)]
816            #[expr($;self)]
817            fn with_distributed_option_extension<T: ConfigExtension + Default>(mut self, t: T) -> Self;
818
819            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
820            #[call(set_distributed_option_extension_from_headers)]
821            #[expr($?;Ok(self))]
822            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(mut self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
823
824            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
825            #[call(set_distributed_user_codec)]
826            #[expr($;self)]
827            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(mut self, codec: T) -> Self;
828
829            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
830            #[call(set_distributed_user_codec_arc)]
831            #[expr($;self)]
832            fn with_distributed_user_codec_arc(mut self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
833
834            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
835            #[call(set_distributed_worker_resolver)]
836            #[expr($;self)]
837            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
838
839            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
840            #[call(set_distributed_channel_resolver)]
841            #[expr($;self)]
842            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(mut self, resolver: T) -> Self;
843
844            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
845            #[call(set_distributed_task_estimator)]
846            #[expr($;self)]
847            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(mut self, estimator: T) -> Self;
848
849            fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
850            #[call(set_distributed_files_per_task)]
851            #[expr($?;Ok(self))]
852            fn with_distributed_files_per_task(mut self, files_per_task: usize) -> Result<Self, DataFusionError>;
853
854            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
855            #[call(set_distributed_cardinality_effect_task_scale_factor)]
856            #[expr($?;Ok(self))]
857            fn with_distributed_cardinality_effect_task_scale_factor(mut self, factor: f64) -> Result<Self, DataFusionError>;
858
859            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
860            #[call(set_distributed_metrics_collection)]
861            #[expr($?;Ok(self))]
862            fn with_distributed_metrics_collection(mut self, enabled: bool) -> Result<Self, DataFusionError>;
863
864            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
865            #[call(set_distributed_children_isolator_unions)]
866            #[expr($?;Ok(self))]
867            fn with_distributed_children_isolator_unions(mut self, enabled: bool) -> Result<Self, DataFusionError>;
868
869            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
870            #[call(set_distributed_broadcast_joins)]
871            #[expr($?;Ok(self))]
872            fn with_distributed_broadcast_joins(mut self, enabled: bool) -> Result<Self, DataFusionError>;
873
874            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
875            #[call(set_distributed_compression)]
876            #[expr($?;Ok(self))]
877            fn with_distributed_compression(mut self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
878
879            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
880            #[call(set_distributed_shuffle_batch_size)]
881            #[expr($?;Ok(self))]
882            fn with_distributed_shuffle_batch_size(mut self, batch_size: usize) -> Result<Self, DataFusionError>;
883
884            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
885            #[call(set_distributed_passthrough_headers)]
886            #[expr($?;Ok(self))]
887            fn with_distributed_passthrough_headers(mut self, headers: HeaderMap) -> Result<Self, DataFusionError>;
888
889            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
890            #[call(set_distributed_max_tasks_per_stage)]
891            #[expr($?;Ok(self))]
892            fn with_distributed_max_tasks_per_stage(mut self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
893        }
894    }
895}
896
897impl DistributedExt for SessionContext {
898    delegate! {
899        to self.state_ref().write().config_mut() {
900            fn set_distributed_option_extension<T: ConfigExtension + Default>(&mut self, t: T);
901            #[call(set_distributed_option_extension)]
902            #[expr($;self)]
903            fn with_distributed_option_extension<T: ConfigExtension + Default>(self, t: T) -> Self;
904
905            fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(&mut self, h: &HeaderMap) -> Result<(), DataFusionError>;
906            #[call(set_distributed_option_extension_from_headers)]
907            #[expr($?;Ok(self))]
908            fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(self, headers: &HeaderMap) -> Result<Self, DataFusionError>;
909
910            fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(&mut self, codec: T);
911            #[call(set_distributed_user_codec)]
912            #[expr($;self)]
913            fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(self, codec: T) -> Self;
914
915            fn set_distributed_user_codec_arc(&mut self, codec: Arc<dyn PhysicalExtensionCodec>);
916            #[call(set_distributed_user_codec_arc)]
917            #[expr($;self)]
918            fn with_distributed_user_codec_arc(self, codec: Arc<dyn PhysicalExtensionCodec>) -> Self;
919
920            fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(&mut self, resolver: T);
921            #[call(set_distributed_worker_resolver)]
922            #[expr($;self)]
923            fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
924
925            fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(&mut self, resolver: T);
926            #[call(set_distributed_channel_resolver)]
927            #[expr($;self)]
928            fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(self, resolver: T) -> Self;
929
930            fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(&mut self, estimator: T);
931            #[call(set_distributed_task_estimator)]
932            #[expr($;self)]
933            fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(self, estimator: T) -> Self;
934
935            fn set_distributed_files_per_task(&mut self, files_per_task: usize) -> Result<(), DataFusionError>;
936            #[call(set_distributed_files_per_task)]
937            #[expr($?;Ok(self))]
938            fn with_distributed_files_per_task(self, files_per_task: usize) -> Result<Self, DataFusionError>;
939
940            fn set_distributed_cardinality_effect_task_scale_factor(&mut self, factor: f64) -> Result<(), DataFusionError>;
941            #[call(set_distributed_cardinality_effect_task_scale_factor)]
942            #[expr($?;Ok(self))]
943            fn with_distributed_cardinality_effect_task_scale_factor(self, factor: f64) -> Result<Self, DataFusionError>;
944
945            fn set_distributed_metrics_collection(&mut self, enabled: bool) -> Result<(), DataFusionError>;
946            #[call(set_distributed_metrics_collection)]
947            #[expr($?;Ok(self))]
948            fn with_distributed_metrics_collection(self, enabled: bool) -> Result<Self, DataFusionError>;
949
950            fn set_distributed_children_isolator_unions(&mut self, enabled: bool) -> Result<(), DataFusionError>;
951            #[call(set_distributed_children_isolator_unions)]
952            #[expr($?;Ok(self))]
953            fn with_distributed_children_isolator_unions(self, enabled: bool) -> Result<Self, DataFusionError>;
954
955            fn set_distributed_broadcast_joins(&mut self, enabled: bool) -> Result<(), DataFusionError>;
956            #[call(set_distributed_broadcast_joins)]
957            #[expr($?;Ok(self))]
958            fn with_distributed_broadcast_joins(self, enabled: bool) -> Result<Self, DataFusionError>;
959
960            fn set_distributed_compression(&mut self, compression: Option<CompressionType>) -> Result<(), DataFusionError>;
961            #[call(set_distributed_compression)]
962            #[expr($?;Ok(self))]
963            fn with_distributed_compression(self, compression: Option<CompressionType>) -> Result<Self, DataFusionError>;
964
965            fn set_distributed_shuffle_batch_size(&mut self, batch_size: usize) -> Result<(), DataFusionError>;
966            #[call(set_distributed_shuffle_batch_size)]
967            #[expr($?;Ok(self))]
968            fn with_distributed_shuffle_batch_size(self, batch_size: usize) -> Result<Self, DataFusionError>;
969
970            fn set_distributed_passthrough_headers(&mut self, headers: HeaderMap) -> Result<(), DataFusionError>;
971            #[call(set_distributed_passthrough_headers)]
972            #[expr($?;Ok(self))]
973            fn with_distributed_passthrough_headers(self, headers: HeaderMap) -> Result<Self, DataFusionError>;
974
975            fn set_distributed_max_tasks_per_stage(&mut self, max_tasks_per_stage: usize) -> Result<(), DataFusionError>;
976            #[call(set_distributed_max_tasks_per_stage)]
977            #[expr($?;Ok(self))]
978            fn with_distributed_max_tasks_per_stage(self, max_tasks_per_stage: usize) -> Result<Self, DataFusionError>;
979        }
980    }
981}