pub trait DistributedExt: Sized {
Show 38 methods
// Required methods
fn with_distributed_option_extension<T: ConfigExtension + Default>(
self,
t: T,
) -> Self;
fn set_distributed_option_extension<T: ConfigExtension + Default>(
&mut self,
t: T,
);
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
self,
headers: &HeaderMap,
) -> Result<Self, DataFusionError>;
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
&mut self,
headers: &HeaderMap,
) -> Result<(), DataFusionError>;
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
self,
codec: T,
) -> Self;
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
&mut self,
codec: T,
);
fn with_distributed_user_codec_arc(
self,
codec: Arc<dyn PhysicalExtensionCodec>,
) -> Self;
fn set_distributed_user_codec_arc(
&mut self,
codec: Arc<dyn PhysicalExtensionCodec>,
);
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
self,
resolver: T,
) -> Self;
fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
&mut self,
resolver: T,
);
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
self,
resolver: T,
) -> Self;
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
&mut self,
resolver: T,
);
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
self,
estimator: T,
) -> Self;
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
&mut self,
estimator: T,
);
fn with_distributed_file_scan_config_bytes_per_partition(
self,
bytes_per_partition: usize,
) -> Result<Self, DataFusionError>;
fn set_distributed_file_scan_config_bytes_per_partition(
&mut self,
bytes_per_partition: usize,
) -> Result<(), DataFusionError>;
fn with_distributed_cardinality_effect_task_scale_factor(
self,
factor: f64,
) -> Result<Self, DataFusionError>;
fn set_distributed_cardinality_effect_task_scale_factor(
&mut self,
factor: f64,
) -> Result<(), DataFusionError>;
fn with_distributed_metrics_collection(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;
fn set_distributed_metrics_collection(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
fn with_distributed_children_isolator_unions(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;
fn set_distributed_children_isolator_unions(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
fn with_distributed_broadcast_joins(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;
fn set_distributed_broadcast_joins(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
fn with_distributed_compression(
self,
compression: Option<CompressionType>,
) -> Result<Self, DataFusionError>;
fn set_distributed_compression(
&mut self,
compression: Option<CompressionType>,
) -> Result<(), DataFusionError>;
fn with_distributed_shuffle_batch_size(
self,
batch_size: usize,
) -> Result<Self, DataFusionError>;
fn set_distributed_shuffle_batch_size(
&mut self,
batch_size: usize,
) -> Result<(), DataFusionError>;
fn with_distributed_passthrough_headers(
self,
headers: HeaderMap,
) -> Result<Self, DataFusionError>;
fn set_distributed_passthrough_headers(
&mut self,
headers: HeaderMap,
) -> Result<(), DataFusionError>;
fn with_distributed_max_tasks_per_stage(
self,
max_tasks_per_stage: usize,
) -> Result<Self, DataFusionError>;
fn set_distributed_max_tasks_per_stage(
&mut self,
max_tasks_per_stage: usize,
) -> Result<(), DataFusionError>;
fn with_distributed_partial_reduce(
self,
enabled: bool,
) -> Result<Self, DataFusionError>;
fn set_distributed_partial_reduce(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>;
fn with_distributed_worker_connection_buffer_budget_bytes(
self,
budget_bytes: usize,
) -> Result<Self, DataFusionError>;
fn set_distributed_worker_connection_buffer_budget_bytes(
&mut self,
budget_bytes: usize,
) -> Result<(), DataFusionError>;
fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Self
where T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)
where T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static;
}Expand description
Extends DataFusion with distributed capabilities.
Required Methods§
Sourcefn with_distributed_option_extension<T: ConfigExtension + Default>(
self,
t: T,
) -> Self
fn with_distributed_option_extension<T: ConfigExtension + Default>( self, t: T, ) -> Self
Adds the provided ConfigExtension to the distributed context. The ConfigExtension will be serialized using gRPC metadata and sent across tasks. Users are expected to call this method with their own extensions to be able to access them in any place in the plan.
This method also adds the provided ConfigExtension to the current session option extensions, the same as calling SessionConfig::with_option_extension.
Example:
extensions_options! {
pub struct CustomExtension {
pub foo: String, default = "".to_string()
pub bar: usize, default = 0
pub baz: bool, default = false
}
}
impl ConfigExtension for CustomExtension {
const PREFIX: &'static str = "custom";
}
let mut my_custom_extension = CustomExtension::default();
// Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
// Flight request, it will be sent through gRPC metadata.
let state = SessionStateBuilder::new()
.with_distributed_option_extension(my_custom_extension)
.build();
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
// This function can be provided to a Worker to tell it how to
// build sessions that retrieve the CustomExtension from gRPC metadata.
Ok(ctx
.builder
.with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
.build())
}Sourcefn set_distributed_option_extension<T: ConfigExtension + Default>(
&mut self,
t: T,
)
fn set_distributed_option_extension<T: ConfigExtension + Default>( &mut self, t: T, )
Same as DistributedExt::with_distributed_option_extension but with an in-place mutation
Sourcefn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
self,
headers: &HeaderMap,
) -> Result<Self, DataFusionError>
fn with_distributed_option_extension_from_headers<T: ConfigExtension + Default>( self, headers: &HeaderMap, ) -> Result<Self, DataFusionError>
Adds the provided ConfigExtension to the distributed context. The ConfigExtension will be serialized using gRPC metadata and sent across tasks. Users are expected to call this method with their own extensions to be able to access them in any place in the plan.
- If there was a ConfigExtension of the same type already present, it’s updated with an in-place mutation based on the headers that came over the wire.
- If there was no ConfigExtension set before, it will get added, as if SessionConfig::with_option_extension was being called.
Example:
extensions_options! {
pub struct CustomExtension {
pub foo: String, default = "".to_string()
pub bar: usize, default = 0
pub baz: bool, default = false
}
}
impl ConfigExtension for CustomExtension {
const PREFIX: &'static str = "custom";
}
let mut my_custom_extension = CustomExtension::default();
// Now, the CustomExtension will be able to cross network boundaries. Upon making an Arrow
// Flight request, it will be sent through gRPC metadata.
let state = SessionStateBuilder::new()
.with_distributed_option_extension(my_custom_extension)
.build();
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
// This function can be provided to a Worker to tell it how to
// build sessions that retrieve the CustomExtension from gRPC metadata.
Ok(ctx
.builder
.with_distributed_option_extension_from_headers::<CustomExtension>(&ctx.headers)?
.build())
}Sourcefn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>(
&mut self,
headers: &HeaderMap,
) -> Result<(), DataFusionError>
fn set_distributed_option_extension_from_headers<T: ConfigExtension + Default>( &mut self, headers: &HeaderMap, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_option_extension_from_headers but with an in-place mutation
Sourcefn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
self,
codec: T,
) -> Self
fn with_distributed_user_codec<T: PhysicalExtensionCodec + 'static>( self, codec: T, ) -> Self
Injects a user-defined PhysicalExtensionCodec that is capable of encoding/decoding custom execution nodes. Multiple user-defined PhysicalExtensionCodec can be added by calling this method several times.
Example:
#[derive(Debug)]
struct CustomExecCodec;
impl PhysicalExtensionCodec for CustomExecCodec {
fn try_decode(&self, buf: &[u8], inputs: &[Arc<dyn ExecutionPlan>], ctx: &TaskContext) -> datafusion::common::Result<Arc<dyn ExecutionPlan>> {
todo!()
}
fn try_encode(&self, node: Arc<dyn ExecutionPlan>, buf: &mut Vec<u8>) -> datafusion::common::Result<()> {
todo!()
}
}
let state = SessionStateBuilder::new()
.with_distributed_user_codec(CustomExecCodec)
.build();
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
// This function can be provided to a Worker to tell it how to
// encode/decode CustomExec nodes.
Ok(SessionStateBuilder::new()
.with_distributed_user_codec(CustomExecCodec)
.build())
}Sourcefn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>(
&mut self,
codec: T,
)
fn set_distributed_user_codec<T: PhysicalExtensionCodec + 'static>( &mut self, codec: T, )
Same as DistributedExt::with_distributed_user_codec but with an in-place mutation
Sourcefn with_distributed_user_codec_arc(
self,
codec: Arc<dyn PhysicalExtensionCodec>,
) -> Self
fn with_distributed_user_codec_arc( self, codec: Arc<dyn PhysicalExtensionCodec>, ) -> Self
Same as DistributedExt::with_distributed_user_codec but with a dynamic argument.
Sourcefn set_distributed_user_codec_arc(
&mut self,
codec: Arc<dyn PhysicalExtensionCodec>,
)
fn set_distributed_user_codec_arc( &mut self, codec: Arc<dyn PhysicalExtensionCodec>, )
Same as DistributedExt::set_distributed_user_codec but with a dynamic argument.
Sourcefn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
self,
resolver: T,
) -> Self
fn with_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>( self, resolver: T, ) -> Self
This is what tells Distributed DataFusion the URLs of the workers available for serving queries.
It injects a WorkerResolver implementation for Distributed DataFusion to resolve worker nodes in the cluster. When running in distributed mode, setting a WorkerResolver is required.
Even if this is required to be present in the SessionContext that first initiates and plans the query, it’s not necessary to be present in a Worker’s session state builder, as no planning happens there.
Example:
struct CustomWorkerResolver;
#[async_trait]
impl WorkerResolver for CustomWorkerResolver {
fn get_urls(&self) -> Result<Vec<Url>, DataFusionError> {
todo!()
}
}
// This tweaks the SessionState so that it can plan for distributed queries and execute them.
let state = SessionStateBuilder::new()
.with_distributed_worker_resolver(CustomWorkerResolver)
.with_distributed_planner()
.build();Sourcefn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>(
&mut self,
resolver: T,
)
fn set_distributed_worker_resolver<T: WorkerResolver + Send + Sync + 'static>( &mut self, resolver: T, )
Same as DistributedExt::with_distributed_channel_resolver but with an in-place mutation.
Sourcefn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
self,
resolver: T,
) -> Self
fn with_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>( self, resolver: T, ) -> Self
This is what tells Distributed DataFusion how to build a Worker gRPC client out of a worker URL.
There’s a default implementation that caches the Worker client instances so that there’s only one per URL, but users can decide to override that behavior in favor of their own solution.
Example:
struct CustomChannelResolver;
#[async_trait]
impl ChannelResolver for CustomChannelResolver {
async fn get_worker_client_for_url(&self, url: &Url) -> Result<WorkerServiceClient<BoxCloneSyncChannel>, DataFusionError> {
// Build a custom WorkerServiceClient wrapped with tower layers or something similar.
todo!()
}
}
// This tweaks the SessionState so that it can plan for distributed queries and execute them.
let state = SessionStateBuilder::new()
.with_distributed_channel_resolver(CustomChannelResolver)
.with_distributed_planner()
.build();
// This function can be provided to a Worker so that, upon receiving a distributed
// part of a plan, it knows how to resolve gRPC channels from URLs for making network calls to other nodes.
async fn build_state(ctx: WorkerQueryContext) -> Result<SessionState, DataFusionError> {
// If you have a custom channel resolver, it should also be passed in the
// Worker session builder.
Ok(ctx
.builder
.with_distributed_channel_resolver(CustomChannelResolver)
.build())
}Sourcefn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>(
&mut self,
resolver: T,
)
fn set_distributed_channel_resolver<T: ChannelResolver + Send + Sync + 'static>( &mut self, resolver: T, )
Same as DistributedExt::with_distributed_channel_resolver but with an in-place mutation.
Sourcefn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
self,
estimator: T,
) -> Self
fn with_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>( self, estimator: T, ) -> Self
Adds a distributed task count estimator. TaskEstimators are executed on each node sequentially until one returns an estimation on the number of tasks that should be used for the stage containing that node.
Many nodes might decide to provide an estimation, so a reconciliation between all of them is performed internally during planning.
┌───────────────────────┐
│SortPreservingMergeExec│
└───────────────────────┘
▲
┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
┌───────────┴───────────┐ │
│ │ SortExec │
└───────────────────────┘ │
│ ┌───────────────────────┐
│ AggregateExec │ │
│ └───────────────────────┘
─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
┌───────────────────────┐ │
│ │ FilterExec │
└───────────────────────┘ │
│ ┌───────────────────────┐ a TaskEstimator estimates the amount of tasks
│ SomeExec │◀───┼── based on how much data will be pulled.
│ └───────────────────────┘
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘Sourcefn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>(
&mut self,
estimator: T,
)
fn set_distributed_task_estimator<T: TaskEstimator + Send + Sync + 'static>( &mut self, estimator: T, )
Same as DistributedExt::with_distributed_task_estimator but with an in-place mutation.
Sourcefn with_distributed_file_scan_config_bytes_per_partition(
self,
bytes_per_partition: usize,
) -> Result<Self, DataFusionError>
fn with_distributed_file_scan_config_bytes_per_partition( self, bytes_per_partition: usize, ) -> Result<Self, DataFusionError>
Sets the number of bytes each partition in a stage with a FileScanConfig node is
expected to scan. A task runs target_partitions partitions, so the task count is
roughly total_scan_bytes / bytes_per_partition / target_partitions (capped at the
number of available workers). Reducing this number increases the amount of tasks.
┌───────────────────────┐
│SortPreservingMergeExec│
└───────────────────────┘
▲
┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2
┌───────────┴───────────┐ │
│ │ SortExec │
└───────────────────────┘ │
│ ┌───────────────────────┐
│ AggregateExec │ │
│ └───────────────────────┘
─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1
┌───────────────────────┐ │
│ │ FilterExec │
└───────────────────────┘ │
│ ┌───────────────────────┐ Sets the bytes scanned per
│ FileScanConfig │◀───┼─ partition. Less
│ └───────────────────────┘ bytes_per_partition == more tasks
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘Sourcefn set_distributed_file_scan_config_bytes_per_partition(
&mut self,
bytes_per_partition: usize,
) -> Result<(), DataFusionError>
fn set_distributed_file_scan_config_bytes_per_partition( &mut self, bytes_per_partition: usize, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_file_scan_config_bytes_per_partition but with an in-place mutation.
Sourcefn with_distributed_cardinality_effect_task_scale_factor(
self,
factor: f64,
) -> Result<Self, DataFusionError>
fn with_distributed_cardinality_effect_task_scale_factor( self, factor: f64, ) -> Result<Self, DataFusionError>
The number of tasks in each stage is calculated in a bottom-to-top fashion.
Bottom stages containing leaf nodes will provide an estimation of the amount of tasks for those stages, but upper stages might see a reduction (or increment) in the amount of tasks based on the cardinality effect bottom stages have in the data.
For example: If there are two stages, and the leaf stage is estimated to use 10 tasks, the upper stage might use less (e.g. 5) if it sees that the leaf stage is returning less data because of filters or aggregations.
This function sets the scale factor for when encountering these nodes that change the cardinality of the data. For example, if a stage with 10 tasks contains an AggregateExec node, and the scale factor is 2.0, the following stage will use 10 / 2.0 = 5 tasks.
┌───────────────────────┐
│SortPreservingMergeExec│
└───────────────────────┘
▲
┌ ─ ─ ─ ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─ ─ ─ ─ Stage 2 (N/scale_factor tasks)
┌───────────┴───────────┐ │
│ │ SortExec │
└───────────────────────┘ │
│ ┌───────────────────────┐
│ AggregateExec │ │
│ └───────────────────────┘
─ ─ ─ ─ ─ ─ ─ ─▲─ ─ ─ ─ ─ ─ ─ ─ ┘
┌ ─ ─ ─ ─ ─ ─ ─ ┴ ─ ─ ─ ─ ─ ─ ─ ─ Stage 1 (N tasks)
┌───────────────────────┐ │ A filter reduces cardinality,
│ │ FilterExec │◀────────therefore the next stage will have
└───────────────────────┘ │ less tasks according to this factor
│ ┌───────────────────────┐
│ FileScanConfig │ │
│ └───────────────────────┘
─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘Sourcefn set_distributed_cardinality_effect_task_scale_factor(
&mut self,
factor: f64,
) -> Result<(), DataFusionError>
fn set_distributed_cardinality_effect_task_scale_factor( &mut self, factor: f64, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_cardinality_effect_task_scale_factor but with an in-place mutation.
Sourcefn with_distributed_metrics_collection(
self,
enabled: bool,
) -> Result<Self, DataFusionError>
fn with_distributed_metrics_collection( self, enabled: bool, ) -> Result<Self, DataFusionError>
Enables metrics collection across network boundaries so that all the metrics gather in each node are accessible from the head stage that started running the query.
Sourcefn set_distributed_metrics_collection(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>
fn set_distributed_metrics_collection( &mut self, enabled: bool, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_metrics_collection but with an in-place mutation.
Sourcefn with_distributed_children_isolator_unions(
self,
enabled: bool,
) -> Result<Self, DataFusionError>
fn with_distributed_children_isolator_unions( self, enabled: bool, ) -> Result<Self, DataFusionError>
Enables children isolator unions for distributing UNION operations across as many tasks as the sum of all the tasks required for each child.
For example, if there is a UNION with 3 children, requiring one task each, it will result in a plan with 3 tasks where each task runs one child:
┌─────────────────────────────┐┌─────────────────────────────┐┌─────────────────────────────┐
│ Task 1 ││ Task 2 ││ Task 3 │
│┌───────────────────────────┐││┌───────────────────────────┐││┌───────────────────────────┐│
││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││││ ChildrenIsolatorUnionExec ││
│└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘││└───▲─────────▲─────────▲───┘│
│ │ ││ │ ││ │ │
│┌───┴───┐ ┌ ─│ ─ ┌ ─│ ─ ││┌ ─│ ─ ┌───┴───┐ ┌ ─│ ─ ││┌ ─│ ─ ┌ ─│ ─ ┌───┴───┐│
││Child 1│ Child 2│ Child 3│││ Child 1│ │Child 2│ Child 3│││ Child 1│ Child 2│ │Child 3││
│└───────┘ └ ─ ─ └ ─ ─ ││└ ─ ─ └───────┘ └ ─ ─ ││└ ─ ─ └ ─ ─ └───────┘│
└─────────────────────────────┘└─────────────────────────────┘└─────────────────────────────┘Sourcefn set_distributed_children_isolator_unions(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>
fn set_distributed_children_isolator_unions( &mut self, enabled: bool, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_children_isolator_unions but with an in-place mutation.
Sourcefn with_distributed_broadcast_joins(
self,
enabled: bool,
) -> Result<Self, DataFusionError>
fn with_distributed_broadcast_joins( self, enabled: bool, ) -> Result<Self, DataFusionError>
Enables broadcast joins for CollectLeft hash joins. When enabled, the build side of a CollectLeft join is broadcast to all consumer tasks instead of being coalesced into a single partition.
Note: This option is disabled by default until the implementation is smarter about when to broadcast.
Sourcefn set_distributed_broadcast_joins(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>
fn set_distributed_broadcast_joins( &mut self, enabled: bool, ) -> Result<(), DataFusionError>
Same as [DistributedExt::with_distributed_broadcast_joins_enabled] but with an in-place mutation.
Sourcefn with_distributed_compression(
self,
compression: Option<CompressionType>,
) -> Result<Self, DataFusionError>
fn with_distributed_compression( self, compression: Option<CompressionType>, ) -> Result<Self, DataFusionError>
The compression type to use for sending data over the wire.
The default is CompressionType::LZ4_FRAME.
Sourcefn set_distributed_compression(
&mut self,
compression: Option<CompressionType>,
) -> Result<(), DataFusionError>
fn set_distributed_compression( &mut self, compression: Option<CompressionType>, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_compression but with an in-place mutation.
Sourcefn with_distributed_shuffle_batch_size(
self,
batch_size: usize,
) -> Result<Self, DataFusionError>
fn with_distributed_shuffle_batch_size( self, batch_size: usize, ) -> Result<Self, DataFusionError>
Overrides datafusion.execution.batch_size for worker-executed stages, letting users
tune shuffle batch sizes (specifically RepartitionExec’s output batching via its
internal LimitedBatchCoalescer) independently of the global batch size.
Set to 0 (the default) to apply no override.
Sourcefn set_distributed_shuffle_batch_size(
&mut self,
batch_size: usize,
) -> Result<(), DataFusionError>
fn set_distributed_shuffle_batch_size( &mut self, batch_size: usize, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_shuffle_batch_size but with an in-place mutation.
Sourcefn with_distributed_passthrough_headers(
self,
headers: HeaderMap,
) -> Result<Self, DataFusionError>
fn with_distributed_passthrough_headers( self, headers: HeaderMap, ) -> Result<Self, DataFusionError>
Sets arbitrary HTTP headers that will be forwarded unchanged to worker nodes. These headers are included in outgoing Arrow Flight requests to workers.
Returns an error if any header name starts with the reserved prefix
x-datafusion-distributed-config-, which is used internally.
Example:
let mut passthrough = HeaderMap::new();
passthrough.insert("x-custom-priority", "high".parse().unwrap());
let state = SessionStateBuilder::new()
.with_distributed_passthrough_headers(passthrough)
.unwrap()
.build();Sourcefn set_distributed_passthrough_headers(
&mut self,
headers: HeaderMap,
) -> Result<(), DataFusionError>
fn set_distributed_passthrough_headers( &mut self, headers: HeaderMap, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_passthrough_headers but with an in-place mutation.
Sourcefn with_distributed_max_tasks_per_stage(
self,
max_tasks_per_stage: usize,
) -> Result<Self, DataFusionError>
fn with_distributed_max_tasks_per_stage( self, max_tasks_per_stage: usize, ) -> Result<Self, DataFusionError>
Sets the maximum tasks that will be assigned for each stage.
If not specified, the number of workers returned by the provided WorkerResolver is taken.
Sourcefn set_distributed_max_tasks_per_stage(
&mut self,
max_tasks_per_stage: usize,
) -> Result<(), DataFusionError>
fn set_distributed_max_tasks_per_stage( &mut self, max_tasks_per_stage: usize, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_max_tasks_per_stage but with an in-place mutation.
Sourcefn with_distributed_partial_reduce(
self,
enabled: bool,
) -> Result<Self, DataFusionError>
fn with_distributed_partial_reduce( self, enabled: bool, ) -> Result<Self, DataFusionError>
Enables or disables the PartialReduce optimization, which inserts an extra aggregation pass above hash RepartitionExec before network shuffles to reduce shuffle data size. Disabled by default because its effectiveness is workload-dependent: it helps when aggregation significantly reduces cardinality, but adds overhead when it does not.
Sourcefn set_distributed_partial_reduce(
&mut self,
enabled: bool,
) -> Result<(), DataFusionError>
fn set_distributed_partial_reduce( &mut self, enabled: bool, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_partial_reduce but with an in-place mutation.
Sourcefn with_distributed_worker_connection_buffer_budget_bytes(
self,
budget_bytes: usize,
) -> Result<Self, DataFusionError>
fn with_distributed_worker_connection_buffer_budget_bytes( self, budget_bytes: usize, ) -> Result<Self, DataFusionError>
Sets the soft byte budget that each per-worker connection will buffer in memory before pausing the gRPC pull from that worker. Per-partition channels are unbounded (to avoid head-of-line blocking between sibling partitions), so backpressure is enforced globally per worker connection using this budget.
Sourcefn set_distributed_worker_connection_buffer_budget_bytes(
&mut self,
budget_bytes: usize,
) -> Result<(), DataFusionError>
fn set_distributed_worker_connection_buffer_budget_bytes( &mut self, budget_bytes: usize, ) -> Result<(), DataFusionError>
Same as DistributedExt::with_distributed_worker_connection_buffer_budget_bytes but with an in-place mutation.
Sourcefn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Selfwhere
T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
fn with_distributed_work_unit_feed<T, P, F>(self, getter: F) -> Selfwhere
T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
Registers a WorkUnitFeed so that Distributed DataFusion can discover it while traversing plans. For more info, refer to WorkUnitFeed docs.
This method uses some type system trickery so that users can provide a callback like this:
SessionStateBuilder::new()
.with_distributed_work_unit_feed(|p: &MyCustomPlan| &p.my_work_unit_feed);Sourcefn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)where
T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
fn set_distributed_work_unit_feed<T, P, F>(&mut self, getter: F)where
T: ExecutionPlan + 'static,
P: WorkUnitFeedProvider + 'static,
P::WorkUnit: 'static,
F: Fn(&T) -> Option<&WorkUnitFeed<P>> + Send + Sync + 'static,
Same as DistributedExt::with_distributed_work_unit_feed but with an in-place mutation.
Dyn Compatibility§
This trait is not dyn compatible.
In older versions of Rust, dyn compatibility was called "object safety".