pub struct NestedLoopJoinExec { /* private fields */ }Expand description
NestedLoopJoinExec is a build-probe join operator designed for joins that
do not have equijoin keys in their ON clause.
§Execution Flow
Incoming right batch
Left Side Buffered Batches
┌───────────┐ ┌───────────────┐
│ ┌───────┐ │ │ │
│ │ │ │ │ │
Current Left Row ───▶│ ├───────├─┤──────────┐ │ │
│ │ │ │ │ └───────────────┘
│ │ │ │ │ │
│ │ │ │ │ │
│ └───────┘ │ │ │
│ ┌───────┐ │ │ │
│ │ │ │ │ ┌─────┘
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ │ │ │ │ │
│ └───────┘ │ ▼ ▼
│ ...... │ ┌──────────────────────┐
│ │ │X (Cartesian Product) │
│ │ └──────────┬───────────┘
└───────────┘ │
│
▼
┌───────┬───────────────┐
│ │ │
│ │ │
│ │ │
└───────┴───────────────┘
Intermediate Batch
(For join predicate evaluation)The execution follows a two-phase design:
§1. Buffering Left Input
- The operator eagerly buffers all left-side input batches into memory, util a memory limit is reached. Currently, an out-of-memory error will be thrown if all the left-side input batches cannot fit into memory at once. In the future, it’s possible to make this case finish execution. (see ‘Memory-limited Execution’ section)
- The rationale for buffering the left side is that scanning the right side can be expensive (e.g., decoding Parquet files), so buffering more left rows reduces the number of right-side scan passes required.
§2. Probing Right Input
- Right-side input is streamed batch by batch.
- For each right-side batch:
- It evaluates the join filter against the full buffered left input. This results in a Cartesian product between the right batch and each left row – with the join predicate/filter applied – for each inner loop iteration.
- Matched results are accumulated into an output buffer. (see more in
Output Buffering Strategysection)
- This process continues until all right-side input is consumed.
§Producing unmatched build-side data
- For special join types like left/full joins, it’s required to also output
unmatched pairs. During execution, bitmaps are kept for both left and right
sides of the input; they’ll be handled by dedicated states in
NLJStream. - The final output of the left side unmatched rows is handled by a single partition for simplicity, since it only counts a small portion of the execution time. (e.g. if probe side has 10k rows, the final output of unmatched build side only roughly counts for 1/10k of the total time)
§Output Buffering Strategy
The operator uses an intermediate output buffer to accumulate results. Once
the output threshold is reached (currently set to the same value as
batch_size in the configuration), the results will be eagerly output.
§Extra Notes
- The operator always considers the left side as the build (buffered) side. Therefore, the physical optimizer should assign the smaller input to the left.
- The design try to minimize the intermediate data size to approximately 1 batch, for better cache locality and memory efficiency.
§TODO: Memory-limited Execution
If the memory budget is exceeded during left-side buffering, fallback strategies such as streaming left batches and re-scanning the right side may be implemented in the future.
Tracking issue: https://github.com/apache/datafusion/issues/15760
§Clone / Shared State
Note this structure includes a [OnceAsync] that is used to coordinate the
loading of the left side with the processing in each output stream.
Therefore it can not be Clone
Implementations§
Source§impl NestedLoopJoinExec
impl NestedLoopJoinExec
Sourcepub fn try_new(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
) -> Result<Self>
pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, filter: Option<JoinFilter>, join_type: &JoinType, projection: Option<Vec<usize>>, ) -> Result<Self>
Try to create a new NestedLoopJoinExec
Sourcepub fn left(&self) -> &Arc<dyn ExecutionPlan>
pub fn left(&self) -> &Arc<dyn ExecutionPlan>
left side
Sourcepub fn right(&self) -> &Arc<dyn ExecutionPlan>
pub fn right(&self) -> &Arc<dyn ExecutionPlan>
right side
Sourcepub fn filter(&self) -> Option<&JoinFilter>
pub fn filter(&self) -> Option<&JoinFilter>
Filters applied before join output
pub fn projection(&self) -> Option<&Vec<usize>>
pub fn contains_projection(&self) -> bool
pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>
Sourcepub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>
pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>
Returns a new ExecutionPlan that runs NestedLoopsJoins with the left
and right inputs swapped.
§Notes:
This function should be called BEFORE inserting any repartitioning
operators on the join’s children. Check super::HashJoinExec::swap_inputs
for more details.
Trait Implementations§
Source§impl Debug for NestedLoopJoinExec
impl Debug for NestedLoopJoinExec
Source§impl DisplayAs for NestedLoopJoinExec
impl DisplayAs for NestedLoopJoinExec
Source§impl ExecutionPlan for NestedLoopJoinExec
impl ExecutionPlan for NestedLoopJoinExec
Source§fn try_swapping_with_projection(
&self,
projection: &ProjectionExec,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
Tries to push projection down through nested_loop_join. If possible, performs the
pushdown and returns a new NestedLoopJoinExec as the top plan which has projections
as its children. Otherwise, returns None.
Source§fn name(&self) -> &'static str
fn name(&self) -> &'static str
Source§fn as_any(&self) -> &dyn Any
fn as_any(&self) -> &dyn Any
Any so that it can be
downcast to a specific implementation.Source§fn properties(&self) -> &PlanProperties
fn properties(&self) -> &PlanProperties
ExecutionPlan, such as output
ordering(s), partitioning information etc. Read moreSource§fn required_input_distribution(&self) -> Vec<Distribution>
fn required_input_distribution(&self) -> Vec<Distribution>
ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,Source§fn maintains_input_order(&self) -> Vec<bool>
fn maintains_input_order(&self) -> Vec<bool>
false if this ExecutionPlan’s implementation may reorder
rows within or between partitions. Read moreSource§fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>
ExecutionPlans that act as inputs to this plan.
The returned list will be empty for leaf nodes such as scans, will contain
a single value for unary nodes, or two values for binary nodes (such as
joins).Source§fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>>
fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan where all existing children were replaced
by the children, in orderSource§fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream>
fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>
Source§fn metrics(&self) -> Option<MetricsSet>
fn metrics(&self) -> Option<MetricsSet>
Metrics for this
ExecutionPlan. If no Metrics are available, return None. Read moreSource§fn statistics(&self) -> Result<Statistics>
fn statistics(&self) -> Result<Statistics>
partition_statistics method insteadExecutionPlan node. If statistics are not
available, should return Statistics::new_unknown (the default), not
an error. Read moreSource§fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>
ExecutionPlan node.
If statistics are not available, should return Statistics::new_unknown
(the default), not an error.
If partition is None, it returns statistics for the entire plan.Source§fn static_name() -> &'static strwhere
Self: Sized,
fn static_name() -> &'static strwhere
Self: Sized,
name but can be called without an instance.Source§fn check_invariants(&self, check: InvariantLevel) -> Result<()>
fn check_invariants(&self, check: InvariantLevel) -> Result<()>
Source§fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>
ExecutionPlan. Read moreSource§fn benefits_from_input_partitioning(&self) -> Vec<bool>
fn benefits_from_input_partitioning(&self) -> Vec<bool>
ExecutionPlan benefits from increased
parallelization at its input for each child. Read moreSource§fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>
ExecutionPlan. Read moreSource§fn repartitioned(
&self,
_target_partitions: usize,
_config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>>
fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>
ExecutionPlan to
produce target_partitions partitions. Read moreSource§fn supports_limit_pushdown(&self) -> bool
fn supports_limit_pushdown(&self) -> bool
Source§fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>
ExecutionPlan node, if it supports
fetch limits. Returns None otherwise.Source§fn fetch(&self) -> Option<usize>
fn fetch(&self) -> Option<usize>
None means there is no fetch.Source§fn cardinality_effect(&self) -> CardinalityEffect
fn cardinality_effect(&self) -> CardinalityEffect
Source§fn gather_filters_for_pushdown(
&self,
_phase: FilterPushdownPhase,
parent_filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterDescription>
fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription>
ExecutionPlan::gather_filters_for_pushdown: Read moreSource§fn handle_child_pushdown_result(
&self,
_phase: FilterPushdownPhase,
child_pushdown_result: ChildPushdownResult,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>
ExecutionPlan::gather_filters_for_pushdown.
It allows the current node to process the results of filter pushdown from
its children, deciding whether to absorb filters, modify the plan, or pass
filters back up to its parent. Read moreAuto Trait Implementations§
impl !Freeze for NestedLoopJoinExec
impl !RefUnwindSafe for NestedLoopJoinExec
impl Send for NestedLoopJoinExec
impl Sync for NestedLoopJoinExec
impl Unpin for NestedLoopJoinExec
impl !UnwindSafe for NestedLoopJoinExec
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more