NestedLoopJoinExec

Struct NestedLoopJoinExec 

Source
pub struct NestedLoopJoinExec { /* private fields */ }
Expand description

NestedLoopJoinExec is a build-probe join operator designed for joins that do not have equijoin keys in their ON clause.

§Execution Flow

                                               Incoming right batch
               Left Side Buffered Batches
                      ┌───────────┐              ┌───────────────┐
                      │ ┌───────┐ │              │               │
                      │ │       │ │              │               │
 Current Left Row ───▶│ ├───────├─┤──────────┐   │               │
                      │ │       │ │          │   └───────────────┘
                      │ │       │ │          │           │
                      │ │       │ │          │           │
                      │ └───────┘ │          │           │
                      │ ┌───────┐ │          │           │
                      │ │       │ │          │     ┌─────┘
                      │ │       │ │          │     │
                      │ │       │ │          │     │
                      │ │       │ │          │     │
                      │ │       │ │          │     │
                      │ └───────┘ │          ▼     ▼
                      │   ......  │  ┌──────────────────────┐
                      │           │  │X (Cartesian Product) │
                      │           │  └──────────┬───────────┘
                      └───────────┘             │
                                                │
                                                ▼
                                     ┌───────┬───────────────┐
                                     │       │               │
                                     │       │               │
                                     │       │               │
                                     └───────┴───────────────┘
                                       Intermediate Batch
                                 (For join predicate evaluation)

The execution follows a two-phase design:

§1. Buffering Left Input

  • The operator eagerly buffers all left-side input batches into memory, util a memory limit is reached. Currently, an out-of-memory error will be thrown if all the left-side input batches cannot fit into memory at once. In the future, it’s possible to make this case finish execution. (see ‘Memory-limited Execution’ section)
  • The rationale for buffering the left side is that scanning the right side can be expensive (e.g., decoding Parquet files), so buffering more left rows reduces the number of right-side scan passes required.

§2. Probing Right Input

  • Right-side input is streamed batch by batch.
  • For each right-side batch:
    • It evaluates the join filter against the full buffered left input. This results in a Cartesian product between the right batch and each left row – with the join predicate/filter applied – for each inner loop iteration.
    • Matched results are accumulated into an output buffer. (see more in Output Buffering Strategy section)
  • This process continues until all right-side input is consumed.

§Producing unmatched build-side data

  • For special join types like left/full joins, it’s required to also output unmatched pairs. During execution, bitmaps are kept for both left and right sides of the input; they’ll be handled by dedicated states in NLJStream.
  • The final output of the left side unmatched rows is handled by a single partition for simplicity, since it only counts a small portion of the execution time. (e.g. if probe side has 10k rows, the final output of unmatched build side only roughly counts for 1/10k of the total time)

§Output Buffering Strategy

The operator uses an intermediate output buffer to accumulate results. Once the output threshold is reached (currently set to the same value as batch_size in the configuration), the results will be eagerly output.

§Extra Notes

  • The operator always considers the left side as the build (buffered) side. Therefore, the physical optimizer should assign the smaller input to the left.
  • The design try to minimize the intermediate data size to approximately 1 batch, for better cache locality and memory efficiency.

§TODO: Memory-limited Execution

If the memory budget is exceeded during left-side buffering, fallback strategies such as streaming left batches and re-scanning the right side may be implemented in the future.

Tracking issue: https://github.com/apache/datafusion/issues/15760

§Clone / Shared State

Note this structure includes a [OnceAsync] that is used to coordinate the loading of the left side with the processing in each output stream. Therefore it can not be Clone

Implementations§

Source§

impl NestedLoopJoinExec

Source

pub fn try_new( left: Arc<dyn ExecutionPlan>, right: Arc<dyn ExecutionPlan>, filter: Option<JoinFilter>, join_type: &JoinType, projection: Option<Vec<usize>>, ) -> Result<Self>

Try to create a new NestedLoopJoinExec

Source

pub fn left(&self) -> &Arc<dyn ExecutionPlan>

left side

Source

pub fn right(&self) -> &Arc<dyn ExecutionPlan>

right side

Source

pub fn filter(&self) -> Option<&JoinFilter>

Filters applied before join output

Source

pub fn join_type(&self) -> &JoinType

How the join is performed

Source

pub fn projection(&self) -> Option<&Vec<usize>>

Source

pub fn contains_projection(&self) -> bool

Source

pub fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>

Source

pub fn swap_inputs(&self) -> Result<Arc<dyn ExecutionPlan>>

Returns a new ExecutionPlan that runs NestedLoopsJoins with the left and right inputs swapped.

§Notes:

This function should be called BEFORE inserting any repartitioning operators on the join’s children. Check super::HashJoinExec::swap_inputs for more details.

Trait Implementations§

Source§

impl Debug for NestedLoopJoinExec

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl DisplayAs for NestedLoopJoinExec

Source§

fn fmt_as(&self, t: DisplayFormatType, f: &mut Formatter<'_>) -> Result

Format according to DisplayFormatType, used when verbose representation looks different from the default one Read more
Source§

impl EmbeddedProjection for NestedLoopJoinExec

Source§

fn with_projection(&self, projection: Option<Vec<usize>>) -> Result<Self>

Source§

impl ExecutionPlan for NestedLoopJoinExec

Source§

fn try_swapping_with_projection( &self, projection: &ProjectionExec, ) -> Result<Option<Arc<dyn ExecutionPlan>>>

Tries to push projection down through nested_loop_join. If possible, performs the pushdown and returns a new NestedLoopJoinExec as the top plan which has projections as its children. Otherwise, returns None.

Source§

fn name(&self) -> &'static str

Short name for the ExecutionPlan, such as ‘DataSourceExec’. Read more
Source§

fn as_any(&self) -> &dyn Any

Returns the execution plan as Any so that it can be downcast to a specific implementation.
Source§

fn properties(&self) -> &PlanProperties

Return properties of the output of the ExecutionPlan, such as output ordering(s), partitioning information etc. Read more
Source§

fn required_input_distribution(&self) -> Vec<Distribution>

Specifies the data distribution requirements for all the children for this ExecutionPlan, By default it’s [Distribution::UnspecifiedDistribution] for each child,
Source§

fn maintains_input_order(&self) -> Vec<bool>

Returns false if this ExecutionPlan’s implementation may reorder rows within or between partitions. Read more
Source§

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>>

Get a list of children ExecutionPlans that act as inputs to this plan. The returned list will be empty for leaf nodes such as scans, will contain a single value for unary nodes, or two values for binary nodes (such as joins).
Source§

fn with_new_children( self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>>

Returns a new ExecutionPlan where all existing children were replaced by the children, in order
Source§

fn execute( &self, partition: usize, context: Arc<TaskContext>, ) -> Result<SendableRecordBatchStream>

Begin execution of partition, returning a Stream of RecordBatches. Read more
Source§

fn metrics(&self) -> Option<MetricsSet>

Return a snapshot of the set of Metrics for this ExecutionPlan. If no Metrics are available, return None. Read more
Source§

fn statistics(&self) -> Result<Statistics>

👎Deprecated since 48.0.0: Use partition_statistics method instead
Returns statistics for this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error. Read more
Source§

fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics>

Returns statistics for a specific partition of this ExecutionPlan node. If statistics are not available, should return Statistics::new_unknown (the default), not an error. If partition is None, it returns statistics for the entire plan.
Source§

fn static_name() -> &'static str
where Self: Sized,

Short name for the ExecutionPlan, such as ‘DataSourceExec’. Like name but can be called without an instance.
Source§

fn schema(&self) -> SchemaRef

Get the schema for this execution plan
Source§

fn check_invariants(&self, check: InvariantLevel) -> Result<()>

Returns an error if this individual node does not conform to its invariants. These invariants are typically only checked in debug mode. Read more
Source§

fn required_input_ordering(&self) -> Vec<Option<OrderingRequirements>>

Specifies the ordering required for all of the children of this ExecutionPlan. Read more
Source§

fn benefits_from_input_partitioning(&self) -> Vec<bool>

Specifies whether the ExecutionPlan benefits from increased parallelization at its input for each child. Read more
Source§

fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>>

Reset any internal state within this ExecutionPlan. Read more
Source§

fn repartitioned( &self, _target_partitions: usize, _config: &ConfigOptions, ) -> Result<Option<Arc<dyn ExecutionPlan>>>

If supported, attempt to increase the partitioning of this ExecutionPlan to produce target_partitions partitions. Read more
Source§

fn supports_limit_pushdown(&self) -> bool

Returns true if a limit can be safely pushed down through this ExecutionPlan node. Read more
Source§

fn with_fetch(&self, _limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>>

Returns a fetching variant of this ExecutionPlan node, if it supports fetch limits. Returns None otherwise.
Source§

fn fetch(&self) -> Option<usize>

Gets the fetch count for the operator, None means there is no fetch.
Source§

fn cardinality_effect(&self) -> CardinalityEffect

Gets the effect on cardinality, if known
Source§

fn gather_filters_for_pushdown( &self, _phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, _config: &ConfigOptions, ) -> Result<FilterDescription>

Collect filters that this node can push down to its children. Filters that are being pushed down from parents are passed in, and the node may generate additional filters to push down. For example, given the plan FilterExec -> HashJoinExec -> DataSourceExec, what will happen is that we recurse down the plan calling ExecutionPlan::gather_filters_for_pushdown: Read more
Source§

fn handle_child_pushdown_result( &self, _phase: FilterPushdownPhase, child_pushdown_result: ChildPushdownResult, _config: &ConfigOptions, ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>>

Handle the result of a child pushdown. This method is called as we recurse back up the plan tree after pushing filters down to child nodes via ExecutionPlan::gather_filters_for_pushdown. It allows the current node to process the results of filter pushdown from its children, deciding whether to absorb filters, modify the plan, or pass filters back up to its parent. Read more
Source§

fn with_new_state( &self, _state: Arc<dyn Any + Send + Sync>, ) -> Option<Arc<dyn ExecutionPlan>>

Injects arbitrary run-time state into this execution plan, returning a new plan instance that incorporates that state if it is relevant to the concrete node implementation. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> ErasedDestructor for T
where T: 'static,