pub struct RepartitionExec { /* private fields */ }Expand description
Maps N input partitions to M output partitions based on a
Partitioning scheme.
§Background
DataFusion, like most other commercial systems, with the notable exception of DuckDB, uses the “Exchange Operator” based approach to parallelism which works well in practice given sufficient care in implementation.
DataFusion’s planner picks the target number of partitions and
then RepartitionExec redistributes RecordBatches to that number
of output partitions.
For example, given target_partitions=3 (trying to use 3 cores)
but scanning an input with 2 partitions, RepartitionExec can be
used to get 3 even streams of RecordBatches
▲ ▲ ▲
│ │ │
│ │ │
│ │ │
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ GroupBy │ │ GroupBy │ │ GroupBy │
│ (Partial) │ │ (Partial) │ │ (Partial) │
└───────────────┘ └───────────────┘ └───────────────┘
▲ ▲ ▲
└──────────────────┼──────────────────┘
│
┌─────────────────────────┐
│ RepartitionExec │
│ (hash/round robin) │
└─────────────────────────┘
▲ ▲
┌───────────┘ └───────────┐
│ │
│ │
.─────────. .─────────.
,─' '─. ,─' '─.
; Input : ; Input :
: Partition 0 ; : Partition 1 ;
╲ ╱ ╲ ╱
'─. ,─' '─. ,─'
`───────' `───────'§Error Handling
If any of the input partitions return an error, the error is propagated to all output partitions and inputs are not polled again.
§Output Ordering
If more than one stream is being repartitioned, the output will be some
arbitrary interleaving (and thus unordered) unless
Self::with_preserve_order specifies otherwise.
§Spilling Architecture
RepartitionExec uses SpillPool channels to handle
memory pressure during repartitioning. Each (input partition, output partition)
pair gets its own SpillPool channel for FIFO ordering.
Input Partitions (N) Output Partitions (M)
──────────────────── ─────────────────────
Input 0 ──┐ ┌──▶ Output 0
│ ┌──────────────┐ │
├─▶│ SpillPool │────┤
│ │ [In0→Out0] │ │
Input 1 ──┤ └──────────────┘ ├──▶ Output 1
│ │
│ ┌──────────────┐ │
├─▶│ SpillPool │────┤
│ │ [In1→Out0] │ │
Input 2 ──┤ └──────────────┘ ├──▶ Output 2
│ │
│ ... (N×M SpillPools total)
│ │
│ ┌──────────────┐ │
└─▶│ SpillPool │────┘
│ [InN→OutM] │
└──────────────┘
Each SpillPool maintains FIFO order for its (input, output) pair.
See `RepartitionBatch` for details on the memory/spill decision logic.§Footnote
The “Exchange Operator” was first described in the 1989 paper Encapsulation of parallelism in the Volcano query processing system Paper which uses the term “Exchange” for the concept of repartitioning data across threads.
Implementations§
Source§impl RepartitionExec
impl RepartitionExec
Sourcepub fn input(&self) -> &Arc<dyn ExecutionPlan>
pub fn input(&self) -> &Arc<dyn ExecutionPlan>
Input execution plan
Sourcepub fn partitioning(&self) -> &Partitioning
pub fn partitioning(&self) -> &Partitioning
Partitioning scheme to use
Sourcepub fn preserve_order(&self) -> bool
pub fn preserve_order(&self) -> bool
Get preserve_order flag of the RepartitionExec
true means SortPreservingRepartitionExec, false means RepartitionExec
Source§impl RepartitionExec
impl RepartitionExec
Sourcepub fn try_new(
input: Arc<dyn ExecutionPlan>,
partitioning: Partitioning,
) -> Result<Self>
pub fn try_new( input: Arc<dyn ExecutionPlan>, partitioning: Partitioning, ) -> Result<Self>
Create a new RepartitionExec, that produces output partitioning, and
does not preserve the order of the input (see Self::with_preserve_order
for more details)
Sourcepub fn with_preserve_order(self) -> Self
pub fn with_preserve_order(self) -> Self
Specify if this repartitioning operation should preserve the order of rows from its input when producing output. Preserving order is more expensive at runtime, so should only be set if the output of this operator can take advantage of it.
If the input is not ordered, or has only one partition, this is a no op,
and the node remains a RepartitionExec.
Trait Implementations§
Source§impl Clone for RepartitionExec
impl Clone for RepartitionExec
Source§fn clone(&self) -> RepartitionExec
fn clone(&self) -> RepartitionExec
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for RepartitionExec
impl Debug for RepartitionExec
Source§impl DisplayAs for RepartitionExec
impl DisplayAs for RepartitionExec
Source§impl ExecutionPlan for RepartitionExec
impl ExecutionPlan for RepartitionExec
Source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
Source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan, such as output
ordering(s), partitioning information etc. Read moreSource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlans that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).Source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan where all existing children were replaced
by the children, in orderSource§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan benefits from increased
parallelization at its input for each child. Read moreSource§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false if this ExecutionPlan’s implementation may reorder
rows within or between partitions. Read moreSource§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
Source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metrics for this
ExecutionPlan. If no Metrics are available, return None. Read moreSource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
partition_statistics method insteadExecutionPlan node. If statistics are not
available, should return Statistics::new_unknown (the default), not
an error. Read moreSource§fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
ExecutionPlan node.
If statistics are not available, should return Statistics::new_unknown
(the default), not an error.
If partition is None, it returns statistics for the entire plan.Source§fn cardinality_effect(&self) -> CardinalityEffect
fn cardinality_effect(&self) -> CardinalityEffect
Source§fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan. Read moreSource§fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription>
fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription>
ExecutionPlan::gather_filters_for_pushdown: Read moreSource§fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
ExecutionPlan::gather_filters_for_pushdown.
It allows the current node to process the results of filter pushdown from
its children, deciding whether to absorb filters, modify the plan, or pass
filters back up to its parent. Read moreSource§fn repartitioned(
&self,
target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan to
produce target_partitions partitions. Read moreSource§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name but can be called without an instance.Source§fn check_invariants(&self, check: InvariantLevel) -> Result<()>
fn check_invariants(&self, check: InvariantLevel) -> Result<()>
Source§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,Source§fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
ExecutionPlan. Read moreSource§fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan. Read moreSource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan node, if it supports
fetch limits. Returns None otherwise.Auto Trait Implementations§
impl Freeze for RepartitionExec
impl !RefUnwindSafe for RepartitionExec
impl Send for RepartitionExec
impl Sync for RepartitionExec
impl Unpin for RepartitionExec
impl !UnwindSafe for RepartitionExec
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more