Skip to main content

datafusion_physical_optimizer/
limit_pushdown.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce
19//! data transfer as much as possible.
20
21use std::fmt::Debug;
22use std::sync::Arc;
23
24use crate::PhysicalOptimizerRule;
25
26use datafusion_common::config::ConfigOptions;
27use datafusion_common::error::Result;
28use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
29use datafusion_common::utils::combine_limit;
30use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
32use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
33use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
35/// the parent to the child if applicable.
36#[derive(Default, Debug)]
37pub struct LimitPushdown {}
38
39/// This is a "data class" we use within the [`LimitPushdown`] rule to push
40/// down [`LimitExec`] in the plan. GlobalRequirements are hold as a rule-wide state
41/// and holds the fetch and skip information. The struct also has a field named
42/// satisfied which means if the "current" plan is valid in terms of limits or not.
43///
44/// For example: If the plan is satisfied with current fetch info, we decide to not add a LocalLimit
45///
46/// [`LimitPushdown`]: crate::limit_pushdown::LimitPushdown
47/// [`LimitExec`]: crate::limit_pushdown::LimitExec
48#[derive(Default, Clone, Debug)]
49pub struct GlobalRequirements {
50    fetch: Option<usize>,
51    skip: usize,
52    satisfied: bool,
53    preserve_order: bool,
54}
55
56impl LimitPushdown {
57    #[expect(missing_docs)]
58    pub fn new() -> Self {
59        Self {}
60    }
61}
62
63impl PhysicalOptimizerRule for LimitPushdown {
64    fn optimize(
65        &self,
66        plan: Arc<dyn ExecutionPlan>,
67        _config: &ConfigOptions,
68    ) -> Result<Arc<dyn ExecutionPlan>> {
69        let global_state = GlobalRequirements {
70            fetch: None,
71            skip: 0,
72            satisfied: false,
73            preserve_order: false,
74        };
75        pushdown_limits(plan, global_state)
76    }
77
78    fn name(&self) -> &str {
79        "LimitPushdown"
80    }
81
82    fn schema_check(&self) -> bool {
83        true
84    }
85}
86
87/// This enumeration makes `skip` and `fetch` calculations easier by providing
88/// a single API for both local and global limit operators.
89#[derive(Debug)]
90pub enum LimitExec {
91    Global(GlobalLimitExec),
92    Local(LocalLimitExec),
93}
94
95impl LimitExec {
96    fn input(&self) -> &Arc<dyn ExecutionPlan> {
97        match self {
98            Self::Global(global) => global.input(),
99            Self::Local(local) => local.input(),
100        }
101    }
102
103    fn fetch(&self) -> Option<usize> {
104        match self {
105            Self::Global(global) => global.fetch(),
106            Self::Local(local) => Some(local.fetch()),
107        }
108    }
109
110    fn skip(&self) -> usize {
111        match self {
112            Self::Global(global) => global.skip(),
113            Self::Local(_) => 0,
114        }
115    }
116
117    fn preserve_order(&self) -> bool {
118        match self {
119            Self::Global(global) => global.required_ordering().is_some(),
120            Self::Local(local) => local.required_ordering().is_some(),
121        }
122    }
123}
124
125impl From<LimitExec> for Arc<dyn ExecutionPlan> {
126    fn from(limit_exec: LimitExec) -> Self {
127        match limit_exec {
128            LimitExec::Global(global) => Arc::new(global),
129            LimitExec::Local(local) => Arc::new(local),
130        }
131    }
132}
133
134/// This function is the main helper function of the `LimitPushDown` rule.
135/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
136/// an instance of `GlobalRequirements` and modifies these parameters while
137/// checking if the limits can be pushed down or not.
138///
139/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
140/// return a [`TreeNodeRecursion::Continue`].
141pub fn pushdown_limit_helper(
142    mut pushdown_plan: Arc<dyn ExecutionPlan>,
143    mut global_state: GlobalRequirements,
144) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
145    // Extract limit, if exist, and return child inputs.
146    if let Some(limit_exec) = extract_limit(&pushdown_plan) {
147        // If we have fetch/skip info in the global state already, we need to
148        // decide which one to continue with:
149        let (skip, fetch) = combine_limit(
150            global_state.skip,
151            global_state.fetch,
152            limit_exec.skip(),
153            limit_exec.fetch(),
154        );
155        global_state.skip = skip;
156        global_state.fetch = fetch;
157        global_state.preserve_order = limit_exec.preserve_order();
158        global_state.satisfied = false;
159
160        // Now the global state has the most recent information, we can remove
161        // the `LimitExec` plan. We will decide later if we should add it again
162        // or not.
163        return Ok((
164            Transformed {
165                data: Arc::clone(limit_exec.input()),
166                transformed: true,
167                tnr: TreeNodeRecursion::Stop,
168            },
169            global_state,
170        ));
171    }
172
173    // If we have a non-limit operator with fetch capability, update global
174    // state as necessary:
175    if pushdown_plan.fetch().is_some() {
176        if global_state.skip == 0 {
177            global_state.satisfied = true;
178        }
179        (global_state.skip, global_state.fetch) = combine_limit(
180            global_state.skip,
181            global_state.fetch,
182            0,
183            pushdown_plan.fetch(),
184        );
185    }
186
187    let Some(global_fetch) = global_state.fetch else {
188        // There's no valid fetch information, exit early:
189        return if global_state.skip > 0 && !global_state.satisfied {
190            // There might be a case with only offset, if so add a global limit:
191            global_state.satisfied = true;
192            Ok((
193                Transformed::yes(add_global_limit(
194                    pushdown_plan,
195                    global_state.skip,
196                    None,
197                )),
198                global_state,
199            ))
200        } else {
201            // There's no info on offset or fetch, nothing to do:
202            Ok((Transformed::no(pushdown_plan), global_state))
203        };
204    };
205
206    let skip_and_fetch = Some(global_fetch + global_state.skip);
207
208    if pushdown_plan.supports_limit_pushdown() {
209        if !combines_input_partitions(&pushdown_plan) {
210            // We have information in the global state and the plan pushes down,
211            // continue:
212            Ok((Transformed::no(pushdown_plan), global_state))
213        } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
214            // This plan is combining input partitions, so we need to add the
215            // fetch info to plan if possible. If not, we must add a `LimitExec`
216            // with the information from the global state.
217            let mut new_plan = plan_with_fetch;
218            // Execution plans can't (yet) handle skip, so if we have one,
219            // we still need to add a global limit
220            if global_state.skip > 0 {
221                new_plan =
222                    add_global_limit(new_plan, global_state.skip, global_state.fetch);
223            }
224            global_state.fetch = skip_and_fetch;
225            global_state.skip = 0;
226            global_state.satisfied = true;
227            Ok((Transformed::yes(new_plan), global_state))
228        } else if global_state.satisfied {
229            // If the plan is already satisfied, do not add a limit:
230            Ok((Transformed::no(pushdown_plan), global_state))
231        } else {
232            global_state.satisfied = true;
233            Ok((
234                Transformed::yes(add_limit(
235                    pushdown_plan,
236                    global_state.skip,
237                    global_fetch,
238                )),
239                global_state,
240            ))
241        }
242    } else {
243        // The plan does not support push down and it is not a limit. We will need
244        // to add a limit or a fetch. If the plan is already satisfied, we will try
245        // to add the fetch info and return the plan.
246
247        // There's no push down, change fetch & skip to default values:
248        let global_skip = global_state.skip;
249        global_state.fetch = None;
250        global_state.skip = 0;
251
252        let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
253        if global_state.satisfied {
254            if let Some(plan_with_fetch) = maybe_fetchable {
255                let plan_with_preserve_order = plan_with_fetch
256                    .with_preserve_order(global_state.preserve_order)
257                    .unwrap_or(plan_with_fetch);
258                Ok((Transformed::yes(plan_with_preserve_order), global_state))
259            } else {
260                Ok((Transformed::no(pushdown_plan), global_state))
261            }
262        } else {
263            global_state.satisfied = true;
264            pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
265                let plan_with_preserve_order = plan_with_fetch
266                    .with_preserve_order(global_state.preserve_order)
267                    .unwrap_or(plan_with_fetch);
268
269                if global_skip > 0 {
270                    add_global_limit(
271                        plan_with_preserve_order,
272                        global_skip,
273                        Some(global_fetch),
274                    )
275                } else {
276                    plan_with_preserve_order
277                }
278            } else {
279                add_limit(pushdown_plan, global_skip, global_fetch)
280            };
281            Ok((Transformed::yes(pushdown_plan), global_state))
282        }
283    }
284}
285
286/// Pushes down the limit through the plan.
287pub(crate) fn pushdown_limits(
288    pushdown_plan: Arc<dyn ExecutionPlan>,
289    global_state: GlobalRequirements,
290) -> Result<Arc<dyn ExecutionPlan>> {
291    // Call pushdown_limit_helper.
292    // This will either extract the limit node (returning the child), or apply the limit pushdown.
293    let (mut new_node, mut global_state) =
294        pushdown_limit_helper(pushdown_plan, global_state)?;
295
296    // While limits exist, continue combining the global_state.
297    while new_node.tnr == TreeNodeRecursion::Stop {
298        (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
299    }
300
301    // Apply pushdown limits in children
302    let children = new_node.data.children();
303    let new_children = children
304        .into_iter()
305        .map(|child| {
306            pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
307        })
308        .collect::<Result<_>>()?;
309    new_node.data.with_new_children(new_children)
310}
311
312/// Transforms the [`ExecutionPlan`] into a [`LimitExec`] if it is a
313/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
314fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
315    if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
316        Some(LimitExec::Global(GlobalLimitExec::new(
317            Arc::clone(global_limit.input()),
318            global_limit.skip(),
319            global_limit.fetch(),
320        )))
321    } else {
322        plan.as_any()
323            .downcast_ref::<LocalLimitExec>()
324            .map(|local_limit| {
325                LimitExec::Local(LocalLimitExec::new(
326                    Arc::clone(local_limit.input()),
327                    local_limit.fetch(),
328                ))
329            })
330    }
331}
332
333/// Checks if the given plan combines input partitions.
334fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
335    let plan = plan.as_any();
336    plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
337}
338
339/// Adds a limit to the plan, chooses between global and local limits based on
340/// skip value and the number of partitions.
341fn add_limit(
342    pushdown_plan: Arc<dyn ExecutionPlan>,
343    skip: usize,
344    fetch: usize,
345) -> Arc<dyn ExecutionPlan> {
346    if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
347        add_global_limit(pushdown_plan, skip, Some(fetch))
348    } else {
349        Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
350    }
351}
352
353/// Adds a global limit to the plan.
354fn add_global_limit(
355    pushdown_plan: Arc<dyn ExecutionPlan>,
356    skip: usize,
357    fetch: Option<usize>,
358) -> Arc<dyn ExecutionPlan> {
359    Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
360}
361
362// See tests in datafusion/core/tests/physical_optimizer