Skip to main content

datafusion_physical_optimizer/
limit_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce
19//! data transfer as much as possible.
20//!
21//! # Plan Limit Absorption
22//! In addition to pushing down `GlobalLimitExec` and `LocalLimitExec` nodes in
23//! the plan, some operators can "absorb" a limit and stop early during
24//! execution.
25//!
26//! ## Background: vectorized volcano execution model
27//! DataFusion uses a batched volcano model. For most operators, output is
28//! produced in batches of `datafusion.execution.batch_size` (default 8192), so
29//! the batch sizes typically look like:
30//! ```text
31//! 8192, 8192, ..., 8192, 100 (the final batch may be partial)
32//! ```
33//!
34//! ## Example
35//! For a join with an expensive, selective predicate:
36//! ```text
37//! GlobalLimitExec: skip=0, fetch=10
38//! -- NestedLoopJoinExec(on=expr_expensive_and_selective)
39//! --- DataSourceExec()
40//! --- DataSourceExec()
41//! ```
42//!
43//! Under this model, `NestedLoopJoinExec` would keep working until it can emit
44//! a full batch (8192 rows), even though the query only needs 10. If the limit
45//! cannot be pushed below the join, we can still embed it inside the join so it
46//! stops once the limit is satisfied. The transformed plan looks like:
47//!
48//! ```text
49//! NestedLoopJoinExec(on=expr_expensive_and_selective, fetch=10)
50//! --- DataSourceExec()
51//! --- DataSourceExec()
52//! ```
53//!
54//! ## Implementation
55//! The current optimizer rule optionally pushes `fetch` requirements into
56//! operators via [`ExecutionPlan::with_fetch`].
57//!
58//! To support early termination in operators, [`LimitedBatchCoalescer`](https://docs.rs/datafusion/latest/datafusion/physical_plan/coalesce/struct.LimitedBatchCoalescer.html)
59//! can help manage the output buffer.
60//!
61//! Reference implementation in Hash Join: <https://github.com/apache/datafusion/pull/20228>
62
63use std::fmt::Debug;
64use std::sync::Arc;
65
66use crate::PhysicalOptimizerRule;
67
68use datafusion_common::config::ConfigOptions;
69use datafusion_common::error::Result;
70use datafusion_common::stats::Precision;
71use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
72use datafusion_common::utils::combine_limit;
73use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
74use datafusion_physical_plan::empty::EmptyExec;
75use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
76use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
77use datafusion_physical_plan::projection::ProjectionExec;
78use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
79use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
80/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
81/// the parent to the child if applicable.
82#[derive(Default, Debug)]
83pub struct LimitPushdown {}
84
85/// This is a "data class" we use within the [`LimitPushdown`] rule to push
86/// down limits in the plan. GlobalRequirements are hold as a rule-wide state
87/// and holds the fetch and skip information. The struct also has a field named
88/// satisfied which means if the "current" plan is valid in terms of limits or not.
89///
90/// For example: If the plan is satisfied with current fetch info, we decide to not add a LocalLimit
91///
92/// [`LimitPushdown`]: crate::limit_pushdown::LimitPushdown
93#[derive(Default, Clone, Debug)]
94pub struct GlobalRequirements {
95    fetch: Option<usize>,
96    skip: usize,
97    satisfied: bool,
98    preserve_order: bool,
99}
100
101impl LimitPushdown {
102    #[expect(missing_docs)]
103    pub fn new() -> Self {
104        Self {}
105    }
106}
107
108impl PhysicalOptimizerRule for LimitPushdown {
109    fn optimize(
110        &self,
111        plan: Arc<dyn ExecutionPlan>,
112        _config: &ConfigOptions,
113    ) -> Result<Arc<dyn ExecutionPlan>> {
114        let global_state = GlobalRequirements {
115            fetch: None,
116            skip: 0,
117            satisfied: false,
118            preserve_order: false,
119        };
120        pushdown_limits(plan, global_state)
121    }
122
123    fn name(&self) -> &str {
124        "LimitPushdown"
125    }
126
127    fn schema_check(&self) -> bool {
128        true
129    }
130}
131
132struct LimitInfo {
133    input: Arc<dyn ExecutionPlan>,
134    fetch: Option<usize>,
135    skip: usize,
136    preserve_order: bool,
137}
138
139/// This function is the main helper function of the `LimitPushDown` rule.
140/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
141/// an instance of `GlobalRequirements` and modifies these parameters while
142/// checking if the limits can be pushed down or not.
143///
144/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
145/// return a [`TreeNodeRecursion::Continue`].
146pub fn pushdown_limit_helper(
147    mut pushdown_plan: Arc<dyn ExecutionPlan>,
148    mut global_state: GlobalRequirements,
149) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
150    // Extract limit, if exist, and return child inputs.
151    if let Some(limit_info) = extract_limit(&pushdown_plan) {
152        // If we have fetch/skip info in the global state already, we need to
153        // decide which one to continue with:
154        let (skip, fetch) = combine_limit(
155            global_state.skip,
156            global_state.fetch,
157            limit_info.skip,
158            limit_info.fetch,
159        );
160        global_state.skip = skip;
161        global_state.fetch = fetch;
162        global_state.preserve_order = limit_info.preserve_order;
163        global_state.satisfied = false;
164
165        if let Some(fetch) = fetch
166            && limit_satisfied_by_input(&limit_info.input, skip, fetch)?
167        {
168            // The input already produces at most `fetch` rows, so no new limit
169            // node is needed. Mark satisfied so downstream won't re-add one,
170            // but preserve skip/fetch so any nested limit nodes (e.g. an inner
171            // GlobalLimitExec) can still be merged with the outer constraint.
172            global_state.satisfied = true;
173
174            return Ok((
175                Transformed {
176                    data: limit_info.input,
177                    transformed: true,
178                    tnr: TreeNodeRecursion::Stop,
179                },
180                global_state,
181            ));
182        }
183
184        // Now the global state has the most recent information, we can remove
185        // the limit node. We will decide later if we should add it again or
186        // not.
187        return Ok((
188            Transformed {
189                data: limit_info.input,
190                transformed: true,
191                tnr: TreeNodeRecursion::Stop,
192            },
193            global_state,
194        ));
195    }
196
197    // If we have a non-limit operator with fetch capability, update global
198    // state as necessary:
199    if pushdown_plan.fetch().is_some() {
200        if global_state.skip == 0 {
201            global_state.satisfied = true;
202        }
203        (global_state.skip, global_state.fetch) = combine_limit(
204            global_state.skip,
205            global_state.fetch,
206            0,
207            pushdown_plan.fetch(),
208        );
209    }
210
211    let Some(global_fetch) = global_state.fetch else {
212        // There's no valid fetch information, exit early:
213        return if global_state.skip > 0 && !global_state.satisfied {
214            // There might be a case with only offset, if so add a global limit:
215            global_state.satisfied = true;
216            Ok((
217                Transformed::yes(add_global_limit(
218                    pushdown_plan,
219                    global_state.skip,
220                    None,
221                )),
222                global_state,
223            ))
224        } else {
225            // There's no info on offset or fetch, nothing to do:
226            Ok((Transformed::no(pushdown_plan), global_state))
227        };
228    };
229
230    let skip_and_fetch = Some(global_fetch + global_state.skip);
231
232    if pushdown_plan.supports_limit_pushdown() {
233        if !combines_input_partitions(&pushdown_plan) {
234            // We have information in the global state and the plan pushes down,
235            // continue:
236            Ok((Transformed::no(pushdown_plan), global_state))
237        } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
238            // This plan is combining input partitions, so we need to add the
239            // fetch info to plan if possible. If not, we must add a limit node
240            // with the information from the global state.
241            let mut new_plan = plan_with_fetch;
242            // Execution plans can't (yet) handle skip, so if we have one,
243            // we still need to add a global limit
244            if global_state.skip > 0 {
245                new_plan =
246                    add_global_limit(new_plan, global_state.skip, global_state.fetch);
247            }
248            global_state.fetch = skip_and_fetch;
249            global_state.skip = 0;
250            global_state.satisfied = true;
251            Ok((Transformed::yes(new_plan), global_state))
252        } else if global_state.satisfied {
253            // If the plan is already satisfied, do not add a limit:
254            Ok((Transformed::no(pushdown_plan), global_state))
255        } else {
256            global_state.satisfied = true;
257            Ok((
258                Transformed::yes(add_limit(
259                    pushdown_plan,
260                    global_state.skip,
261                    global_fetch,
262                )),
263                global_state,
264            ))
265        }
266    } else {
267        // The plan does not support push down and it is not a limit. We will need
268        // to add a limit or a fetch. If the plan is already satisfied, we will try
269        // to add the fetch info and return the plan.
270
271        // There's no push down, change fetch & skip to default values:
272        let global_skip = global_state.skip;
273        global_state.fetch = None;
274        global_state.skip = 0;
275
276        let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
277        if global_state.satisfied {
278            if let Some(plan_with_fetch) = maybe_fetchable {
279                let plan_with_preserve_order = plan_with_fetch
280                    .with_preserve_order(global_state.preserve_order)
281                    .unwrap_or(plan_with_fetch);
282                Ok((Transformed::yes(plan_with_preserve_order), global_state))
283            } else {
284                Ok((Transformed::no(pushdown_plan), global_state))
285            }
286        } else {
287            global_state.satisfied = true;
288            pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
289                let plan_with_preserve_order = plan_with_fetch
290                    .with_preserve_order(global_state.preserve_order)
291                    .unwrap_or(plan_with_fetch);
292
293                if global_skip > 0 {
294                    add_global_limit(
295                        plan_with_preserve_order,
296                        global_skip,
297                        Some(global_fetch),
298                    )
299                } else {
300                    plan_with_preserve_order
301                }
302            } else {
303                add_limit(pushdown_plan, global_skip, global_fetch)
304            };
305            Ok((Transformed::yes(pushdown_plan), global_state))
306        }
307    }
308}
309
310/// Returns true if exact input statistics prove that applying the limit would
311/// not remove any rows.
312fn limit_satisfied_by_input(
313    plan: &Arc<dyn ExecutionPlan>,
314    skip: usize,
315    fetch: usize,
316) -> Result<bool> {
317    if skip > 0 {
318        return Ok(false);
319    }
320
321    if plan.output_partitioning().partition_count() != 1 {
322        return Ok(false);
323    }
324
325    let Some(num_rows) = limit_eliminable_exact_num_rows(plan)? else {
326        return Ok(false);
327    };
328
329    Ok(num_rows <= fetch)
330}
331
332/// Returns exact row counts only from a conservative whitelist of operators
333/// whose row-count guarantees are strong enough to remove a limit.
334fn limit_eliminable_exact_num_rows(
335    plan: &Arc<dyn ExecutionPlan>,
336) -> Result<Option<usize>> {
337    // Unwrap any wrapping ProjectionExec layers; projections preserve row count
338    // but may derive statistics in ways that are not trustworthy, so we peek
339    // through them to the underlying producer.
340    let mut current = plan;
341    while let Some(projection) = current.downcast_ref::<ProjectionExec>() {
342        current = projection.input();
343    }
344
345    if current.is::<EmptyExec>() {
346        return Ok(Some(0));
347    }
348
349    if current.is::<PlaceholderRowExec>() {
350        return Ok(Some(1));
351    }
352
353    if matches!(
354        current.partition_statistics(None)?.num_rows,
355        Precision::Exact(0)
356    ) {
357        return Ok(Some(0));
358    }
359
360    Ok(None)
361}
362
363/// Pushes down the limit through the plan.
364pub(crate) fn pushdown_limits(
365    pushdown_plan: Arc<dyn ExecutionPlan>,
366    global_state: GlobalRequirements,
367) -> Result<Arc<dyn ExecutionPlan>> {
368    // Call pushdown_limit_helper.
369    // This will either extract the limit node (returning the child), or apply the limit pushdown.
370    let (mut new_node, mut global_state) =
371        pushdown_limit_helper(pushdown_plan, global_state)?;
372
373    // While limits exist, continue combining the global_state.
374    while new_node.tnr == TreeNodeRecursion::Stop {
375        (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
376    }
377
378    // Once a limit has been materialized above the current node, child
379    // subtrees should not inherit its `skip`. Keep `fetch`, but clear
380    // `skip` before recursing so child-local limits are not merged with
381    // an `OFFSET` that has already been applied.
382    if global_state.satisfied {
383        global_state.skip = 0;
384    }
385
386    // Apply pushdown limits in children
387    let children = new_node.data.children();
388    let mut changed = false;
389    let new_children = children
390        .into_iter()
391        .map(|child: &Arc<dyn ExecutionPlan>| {
392            let new_child = pushdown_limits(
393                Arc::<dyn ExecutionPlan>::clone(child),
394                global_state.clone(),
395            )?;
396            // Tracking if any of the children changed
397            changed |= !Arc::ptr_eq(child, &new_child);
398            Ok(new_child)
399        })
400        .collect::<Result<_>>()?;
401
402    if changed {
403        new_node.data.with_new_children(new_children)
404    } else {
405        Ok(new_node.data)
406    }
407}
408
409/// Extracts limit information from the [`ExecutionPlan`] if it is a
410/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
411fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitInfo> {
412    if let Some(global_limit) = plan.downcast_ref::<GlobalLimitExec>() {
413        Some(LimitInfo {
414            input: Arc::clone(global_limit.input()),
415            fetch: global_limit.fetch(),
416            skip: global_limit.skip(),
417            preserve_order: global_limit.required_ordering().is_some(),
418        })
419    } else {
420        plan.downcast_ref::<LocalLimitExec>()
421            .map(|local_limit| LimitInfo {
422                input: Arc::clone(local_limit.input()),
423                fetch: Some(local_limit.fetch()),
424                skip: 0,
425                preserve_order: local_limit.required_ordering().is_some(),
426            })
427    }
428}
429
430/// Checks if the given plan combines input partitions.
431fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
432    plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
433}
434
435/// Adds a limit to the plan, chooses between global and local limits based on
436/// skip value and the number of partitions.
437fn add_limit(
438    pushdown_plan: Arc<dyn ExecutionPlan>,
439    skip: usize,
440    fetch: usize,
441) -> Arc<dyn ExecutionPlan> {
442    if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
443        add_global_limit(pushdown_plan, skip, Some(fetch))
444    } else {
445        Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
446    }
447}
448
449/// Adds a global limit to the plan.
450fn add_global_limit(
451    pushdown_plan: Arc<dyn ExecutionPlan>,
452    skip: usize,
453    fetch: Option<usize>,
454) -> Arc<dyn ExecutionPlan> {
455    Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
456}
457
458// See tests in datafusion/core/tests/physical_optimizer