pub struct SortMergeJoinExec {
pub left: Arc<dyn ExecutionPlan>,
pub right: Arc<dyn ExecutionPlan>,
pub on: JoinOn,
pub filter: Option<JoinFilter>,
pub join_type: JoinType,
pub sort_options: Vec<SortOptions>,
pub null_equality: NullEquality,
/* private fields */
}Expand description
Join execution plan that executes equi-join predicates on multiple partitions using Sort-Merge join algorithm and applies an optional filter post join. Can be used to join arbitrarily large inputs where one or both of the inputs don’t fit in the available memory.
§Join Expressions
Equi-join predicate (e.g. <col1> = <col2>) expressions are represented by Self::on.
Non-equality predicates, which can not be pushed down to join inputs (e.g.
<col1> != <col2>) are known as “filter expressions” and are evaluated
after the equijoin predicates. They are represented by Self::filter. These are optional
expressions.
§Sorting
Assumes that both the left and right input to the join are pre-sorted. It is not the responsibility of this execution plan to sort the inputs.
§“Streamed” vs “Buffered”
The number of record batches of streamed input currently present in the memory will depend
on the output batch size of the execution plan. There is no spilling support for streamed input.
The comparisons are performed from values of join keys in streamed input with the values of
join keys in buffered input. One row in streamed record batch could be matched with multiple rows in
buffered input batches. The streamed input is managed through the states in StreamedState
and streamed input batches are represented by StreamedBatch.
Buffered input is buffered for all record batches having the same value of join key.
If the memory limit increases beyond the specified value and spilling is enabled,
buffered batches could be spilled to disk. If spilling is disabled, the execution
will fail under the same conditions. Multiple record batches of buffered could currently reside
in memory/disk during the execution. The number of buffered batches residing in
memory/disk depends on the number of rows of buffered input having the same value
of join key as that of streamed input rows currently present in memory. Due to pre-sorted inputs,
the algorithm understands when it is not needed anymore, and releases the buffered batches
from memory/disk. The buffered input is managed through the states in BufferedState
and buffered input batches are represented by BufferedBatch.
Depending on the type of join, left or right input may be selected as streamed or buffered respectively. For example, in a left-outer join, the left execution plan will be selected as streamed input while in a right-outer join, the right execution plan will be selected as the streamed input.
Reference for the algorithm: https://en.wikipedia.org/wiki/Sort-merge_join.
Helpful short video demonstration: https://www.youtube.com/watch?v=jiWCPJtDE2c.
Fields§
§left: Arc<dyn ExecutionPlan>Left sorted joining execution plan
right: Arc<dyn ExecutionPlan>Right sorting joining execution plan
on: JoinOnSet of common columns used to join on
filter: Option<JoinFilter>Filters which are applied while finding matching rows
join_type: JoinTypeHow the join is performed
sort_options: Vec<SortOptions>Sort options of join columns used in sorting left and right execution plans
null_equality: NullEqualityDefines the null equality for the join.
Implementations§
Source§impl SortMergeJoinExec
impl SortMergeJoinExec
Sourcepub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
on: JoinOn,
filter: Option<JoinFilter>,
join_type: JoinType,
sort_options: Vec<SortOptions>,
null_equality: NullEquality,
) -> Result<Self>
pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, on: JoinOn, filter: Option<JoinFilter>, join_type: JoinType, sort_options: Vec<SortOptions>, null_equality: NullEquality, ) -> Result<Self>
Tries to create a new SortMergeJoinExec.
The inputs are sorted using sort_options are applied to the columns in the on
§Error
This function errors when it is not possible to join the left and right sides on keys on.
Sourcepub fn probe_side(join_type: &JoinType) -> JoinSide
pub fn probe_side(join_type: &JoinType) -> JoinSide
Get probe side (e.g streaming side) information for this sort merge join. In current implementation, probe side is determined according to join type.
Sourcepub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
pub fn on(&self) -> &[(PhysicalExprRef, PhysicalExprRef)]
Set of common columns used to join on
Sourcepub fn right(&self) -> &Arc<dyn ExecutionPlan>
pub fn right(&self) -> &Arc<dyn ExecutionPlan>
Ref to right execution plan
Sourcepub fn left(&self) -> &Arc<dyn ExecutionPlan>
pub fn left(&self) -> &Arc<dyn ExecutionPlan>
Ref to left execution plan
Sourcepub fn filter(&self) -> &Option<JoinFilter>
pub fn filter(&self) -> &Option<JoinFilter>
Ref to join filter
Sourcepub fn sort_options(&self) -> &[SortOptions]
pub fn sort_options(&self) -> &[SortOptions]
Ref to sort options
Sourcepub fn null_equality(&self) -> NullEquality
pub fn null_equality(&self) -> NullEquality
Null equality
Sourcepub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>
§Notes:
This function should be called BEFORE inserting any repartitioning
operators on the join’s children. Check super::super::HashJoinExec::swap_inputs
for more details.
Trait Implementations§
Source§impl Clone for SortMergeJoinExec
impl Clone for SortMergeJoinExec
Source§fn clone(&self) -> SortMergeJoinExec
fn clone(&self) -> SortMergeJoinExec
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moreSource§impl Debug for SortMergeJoinExec
impl Debug for SortMergeJoinExec
Source§impl DisplayAs for SortMergeJoinExec
impl DisplayAs for SortMergeJoinExec
Source§impl ExecutionPlan for SortMergeJoinExec
impl ExecutionPlan for SortMergeJoinExec
Source§fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
Tries to swap the projection with its input SortMergeJoinExec. If it can be done,
it returns the new swapped version having the SortMergeJoinExec as the top plan.
Otherwise, it returns None.
Source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
Source§fn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Any so that it can be
downcast to a specific implementation.Source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan, such as output
ordering(s), partitioning information etc. Read moreSource§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,Source§fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
ExecutionPlan. Read moreSource§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false if this ExecutionPlan’s implementation may reorder
rows within or between partitions. Read moreSource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlans that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).Source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan where all existing children were replaced
by the children, in orderSource§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
Source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metrics for this
ExecutionPlan. If no Metrics are available, return None. Read moreSource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
partition_statistics method insteadExecutionPlan node. If statistics are not
available, should return Statistics::new_unknown (the default), not
an error. Read moreSource§fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
ExecutionPlan node.
If statistics are not available, should return Statistics::new_unknown
(the default), not an error.
If partition is None, it returns statistics for the entire plan.Source§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name but can be called without an instance.Source§fn check_invariants(&self, check: InvariantLevel) -> Result<()>
fn check_invariants(&self, check: InvariantLevel) -> Result<()>
Source§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan benefits from increased
parallelization at its input for each child. Read moreSource§fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan. Read moreSource§fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan to
produce target_partitions partitions. Read moreSource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan node, if it supports
fetch limits. Returns None otherwise.Source§fn fetch(&self) -> Option<usize>
fn fetch(&self) -> Option<usize>
None means there is no fetch.Source§fn cardinality_effect(&self) -> CardinalityEffect
fn cardinality_effect(&self) -> CardinalityEffect
Source§fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription>
fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription>
ExecutionPlan::gather_filters_for_pushdown: Read moreSource§fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
ExecutionPlan::gather_filters_for_pushdown.
It allows the current node to process the results of filter pushdown from
its children, deciding whether to absorb filters, modify the plan, or pass
filters back up to its parent. Read moreAuto Trait Implementations§
impl Freeze for SortMergeJoinExec
impl !RefUnwindSafe for SortMergeJoinExec
impl Send for SortMergeJoinExec
impl Sync for SortMergeJoinExec
impl Unpin for SortMergeJoinExec
impl !UnwindSafe for SortMergeJoinExec
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> CloneToUninit for Twhere
T: Clone,
impl<T> CloneToUninit for Twhere
T: Clone,
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more