datafusion_physical_optimizer/limit_pushdown.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`LimitPushdown`] pushes `LIMIT` down through `ExecutionPlan`s to reduce
19//! data transfer as much as possible.
20//!
21//! # Plan Limit Absorption
22//! In addition to pushing down `GlobalLimitExec` and `LocalLimitExec` nodes in
23//! the plan, some operators can "absorb" a limit and stop early during
24//! execution.
25//!
26//! ## Background: vectorized volcano execution model
27//! DataFusion uses a batched volcano model. For most operators, output is
28//! produced in batches of `datafusion.execution.batch_size` (default 8192), so
29//! the batch sizes typically look like:
30//! ```text
31//! 8192, 8192, ..., 8192, 100 (the final batch may be partial)
32//! ```
33//!
34//! ## Example
35//! For a join with an expensive, selective predicate:
36//! ```text
37//! GlobalLimitExec: skip=0, fetch=10
38//! -- NestedLoopJoinExec(on=expr_expensive_and_selective)
39//! --- DataSourceExec()
40//! --- DataSourceExec()
41//! ```
42//!
43//! Under this model, `NestedLoopJoinExec` would keep working until it can emit
44//! a full batch (8192 rows), even though the query only needs 10. If the limit
45//! cannot be pushed below the join, we can still embed it inside the join so it
46//! stops once the limit is satisfied. The transformed plan looks like:
47//!
48//! ```text
49//! NestedLoopJoinExec(on=expr_expensive_and_selective, fetch=10)
50//! --- DataSourceExec()
51//! --- DataSourceExec()
52//! ```
53//!
54//! ## Implementation
55//! The current optimizer rule optionally pushes `fetch` requirements into
56//! operators via [`ExecutionPlan::with_fetch`].
57//!
58//! To support early termination in operators, [`LimitedBatchCoalescer`](https://docs.rs/datafusion/latest/datafusion/physical_plan/coalesce/struct.LimitedBatchCoalescer.html)
59//! can help manage the output buffer.
60//!
61//! Reference implementation in Hash Join: <https://github.com/apache/datafusion/pull/20228>
62
63use std::fmt::Debug;
64use std::sync::Arc;
65
66use crate::PhysicalOptimizerRule;
67
68use datafusion_common::config::ConfigOptions;
69use datafusion_common::error::Result;
70use datafusion_common::stats::Precision;
71use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
72use datafusion_common::utils::combine_limit;
73use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
74use datafusion_physical_plan::empty::EmptyExec;
75use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
76use datafusion_physical_plan::placeholder_row::PlaceholderRowExec;
77use datafusion_physical_plan::projection::ProjectionExec;
78use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
79use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
80/// This rule inspects [`ExecutionPlan`]'s and pushes down the fetch limit from
81/// the parent to the child if applicable.
82#[derive(Default, Debug)]
83pub struct LimitPushdown {}
84
85/// This is a "data class" we use within the [`LimitPushdown`] rule to push
86/// down limits in the plan. GlobalRequirements are hold as a rule-wide state
87/// and holds the fetch and skip information. The struct also has a field named
88/// satisfied which means if the "current" plan is valid in terms of limits or not.
89///
90/// For example: If the plan is satisfied with current fetch info, we decide to not add a LocalLimit
91///
92/// [`LimitPushdown`]: crate::limit_pushdown::LimitPushdown
93#[derive(Default, Clone, Debug)]
94pub struct GlobalRequirements {
95 fetch: Option<usize>,
96 skip: usize,
97 satisfied: bool,
98 preserve_order: bool,
99}
100
101impl LimitPushdown {
102 #[expect(missing_docs)]
103 pub fn new() -> Self {
104 Self {}
105 }
106}
107
108impl PhysicalOptimizerRule for LimitPushdown {
109 fn optimize(
110 &self,
111 plan: Arc<dyn ExecutionPlan>,
112 _config: &ConfigOptions,
113 ) -> Result<Arc<dyn ExecutionPlan>> {
114 let global_state = GlobalRequirements {
115 fetch: None,
116 skip: 0,
117 satisfied: false,
118 preserve_order: false,
119 };
120 pushdown_limits(plan, global_state)
121 }
122
123 fn name(&self) -> &str {
124 "LimitPushdown"
125 }
126
127 fn schema_check(&self) -> bool {
128 true
129 }
130}
131
132struct LimitInfo {
133 input: Arc<dyn ExecutionPlan>,
134 fetch: Option<usize>,
135 skip: usize,
136 preserve_order: bool,
137}
138
139/// This function is the main helper function of the `LimitPushDown` rule.
140/// The helper takes an `ExecutionPlan` and a global (algorithm) state which is
141/// an instance of `GlobalRequirements` and modifies these parameters while
142/// checking if the limits can be pushed down or not.
143///
144/// If a limit is encountered, a [`TreeNodeRecursion::Stop`] is returned. Otherwise,
145/// return a [`TreeNodeRecursion::Continue`].
146pub fn pushdown_limit_helper(
147 mut pushdown_plan: Arc<dyn ExecutionPlan>,
148 mut global_state: GlobalRequirements,
149) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
150 // Extract limit, if exist, and return child inputs.
151 if let Some(limit_info) = extract_limit(&pushdown_plan) {
152 // If we have fetch/skip info in the global state already, we need to
153 // decide which one to continue with:
154 let (skip, fetch) = combine_limit(
155 global_state.skip,
156 global_state.fetch,
157 limit_info.skip,
158 limit_info.fetch,
159 );
160 global_state.skip = skip;
161 global_state.fetch = fetch;
162 global_state.preserve_order = limit_info.preserve_order;
163 global_state.satisfied = false;
164
165 if let Some(fetch) = fetch
166 && limit_satisfied_by_input(&limit_info.input, skip, fetch)?
167 {
168 // The input already produces at most `fetch` rows, so no new limit
169 // node is needed. Mark satisfied so downstream won't re-add one,
170 // but preserve skip/fetch so any nested limit nodes (e.g. an inner
171 // GlobalLimitExec) can still be merged with the outer constraint.
172 global_state.satisfied = true;
173
174 return Ok((
175 Transformed {
176 data: limit_info.input,
177 transformed: true,
178 tnr: TreeNodeRecursion::Stop,
179 },
180 global_state,
181 ));
182 }
183
184 // Now the global state has the most recent information, we can remove
185 // the limit node. We will decide later if we should add it again or
186 // not.
187 return Ok((
188 Transformed {
189 data: limit_info.input,
190 transformed: true,
191 tnr: TreeNodeRecursion::Stop,
192 },
193 global_state,
194 ));
195 }
196
197 // If we have a non-limit operator with fetch capability, update global
198 // state as necessary:
199 if pushdown_plan.fetch().is_some() {
200 if global_state.skip == 0 {
201 global_state.satisfied = true;
202 }
203 (global_state.skip, global_state.fetch) = combine_limit(
204 global_state.skip,
205 global_state.fetch,
206 0,
207 pushdown_plan.fetch(),
208 );
209 }
210
211 let Some(global_fetch) = global_state.fetch else {
212 // There's no valid fetch information, exit early:
213 return if global_state.skip > 0 && !global_state.satisfied {
214 // There might be a case with only offset, if so add a global limit:
215 global_state.satisfied = true;
216 Ok((
217 Transformed::yes(add_global_limit(
218 pushdown_plan,
219 global_state.skip,
220 None,
221 )),
222 global_state,
223 ))
224 } else {
225 // There's no info on offset or fetch, nothing to do:
226 Ok((Transformed::no(pushdown_plan), global_state))
227 };
228 };
229
230 let skip_and_fetch = Some(global_fetch + global_state.skip);
231
232 if pushdown_plan.supports_limit_pushdown() {
233 if !combines_input_partitions(&pushdown_plan) {
234 // We have information in the global state and the plan pushes down,
235 // continue:
236 Ok((Transformed::no(pushdown_plan), global_state))
237 } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
238 // This plan is combining input partitions, so we need to add the
239 // fetch info to plan if possible. If not, we must add a limit node
240 // with the information from the global state.
241 let mut new_plan = plan_with_fetch;
242 // Execution plans can't (yet) handle skip, so if we have one,
243 // we still need to add a global limit
244 if global_state.skip > 0 {
245 new_plan =
246 add_global_limit(new_plan, global_state.skip, global_state.fetch);
247 }
248 global_state.fetch = skip_and_fetch;
249 global_state.skip = 0;
250 global_state.satisfied = true;
251 Ok((Transformed::yes(new_plan), global_state))
252 } else if global_state.satisfied {
253 // If the plan is already satisfied, do not add a limit:
254 Ok((Transformed::no(pushdown_plan), global_state))
255 } else {
256 global_state.satisfied = true;
257 Ok((
258 Transformed::yes(add_limit(
259 pushdown_plan,
260 global_state.skip,
261 global_fetch,
262 )),
263 global_state,
264 ))
265 }
266 } else {
267 // The plan does not support push down and it is not a limit. We will need
268 // to add a limit or a fetch. If the plan is already satisfied, we will try
269 // to add the fetch info and return the plan.
270
271 // There's no push down, change fetch & skip to default values:
272 let global_skip = global_state.skip;
273 global_state.fetch = None;
274 global_state.skip = 0;
275
276 let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
277 if global_state.satisfied {
278 if let Some(plan_with_fetch) = maybe_fetchable {
279 let plan_with_preserve_order = plan_with_fetch
280 .with_preserve_order(global_state.preserve_order)
281 .unwrap_or(plan_with_fetch);
282 Ok((Transformed::yes(plan_with_preserve_order), global_state))
283 } else {
284 Ok((Transformed::no(pushdown_plan), global_state))
285 }
286 } else {
287 global_state.satisfied = true;
288 pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
289 let plan_with_preserve_order = plan_with_fetch
290 .with_preserve_order(global_state.preserve_order)
291 .unwrap_or(plan_with_fetch);
292
293 if global_skip > 0 {
294 add_global_limit(
295 plan_with_preserve_order,
296 global_skip,
297 Some(global_fetch),
298 )
299 } else {
300 plan_with_preserve_order
301 }
302 } else {
303 add_limit(pushdown_plan, global_skip, global_fetch)
304 };
305 Ok((Transformed::yes(pushdown_plan), global_state))
306 }
307 }
308}
309
310/// Returns true if exact input statistics prove that applying the limit would
311/// not remove any rows.
312fn limit_satisfied_by_input(
313 plan: &Arc<dyn ExecutionPlan>,
314 skip: usize,
315 fetch: usize,
316) -> Result<bool> {
317 if skip > 0 {
318 return Ok(false);
319 }
320
321 if plan.output_partitioning().partition_count() != 1 {
322 return Ok(false);
323 }
324
325 let Some(num_rows) = limit_eliminable_exact_num_rows(plan)? else {
326 return Ok(false);
327 };
328
329 Ok(num_rows <= fetch)
330}
331
332/// Returns exact row counts only from a conservative whitelist of operators
333/// whose row-count guarantees are strong enough to remove a limit.
334fn limit_eliminable_exact_num_rows(
335 plan: &Arc<dyn ExecutionPlan>,
336) -> Result<Option<usize>> {
337 // Unwrap any wrapping ProjectionExec layers; projections preserve row count
338 // but may derive statistics in ways that are not trustworthy, so we peek
339 // through them to the underlying producer.
340 let mut current = plan;
341 while let Some(projection) = current.downcast_ref::<ProjectionExec>() {
342 current = projection.input();
343 }
344
345 if current.is::<EmptyExec>() {
346 return Ok(Some(0));
347 }
348
349 if current.is::<PlaceholderRowExec>() {
350 return Ok(Some(1));
351 }
352
353 if matches!(
354 current.partition_statistics(None)?.num_rows,
355 Precision::Exact(0)
356 ) {
357 return Ok(Some(0));
358 }
359
360 Ok(None)
361}
362
363/// Pushes down the limit through the plan.
364pub(crate) fn pushdown_limits(
365 pushdown_plan: Arc<dyn ExecutionPlan>,
366 global_state: GlobalRequirements,
367) -> Result<Arc<dyn ExecutionPlan>> {
368 // Call pushdown_limit_helper.
369 // This will either extract the limit node (returning the child), or apply the limit pushdown.
370 let (mut new_node, mut global_state) =
371 pushdown_limit_helper(pushdown_plan, global_state)?;
372
373 // While limits exist, continue combining the global_state.
374 while new_node.tnr == TreeNodeRecursion::Stop {
375 (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
376 }
377
378 // Once a limit has been materialized above the current node, child
379 // subtrees should not inherit its `skip`. Keep `fetch`, but clear
380 // `skip` before recursing so child-local limits are not merged with
381 // an `OFFSET` that has already been applied.
382 if global_state.satisfied {
383 global_state.skip = 0;
384 }
385
386 // Apply pushdown limits in children
387 let children = new_node.data.children();
388 let mut changed = false;
389 let new_children = children
390 .into_iter()
391 .map(|child: &Arc<dyn ExecutionPlan>| {
392 let new_child = pushdown_limits(
393 Arc::<dyn ExecutionPlan>::clone(child),
394 global_state.clone(),
395 )?;
396 // Tracking if any of the children changed
397 changed |= !Arc::ptr_eq(child, &new_child);
398 Ok(new_child)
399 })
400 .collect::<Result<_>>()?;
401
402 if changed {
403 new_node.data.with_new_children(new_children)
404 } else {
405 Ok(new_node.data)
406 }
407}
408
409/// Extracts limit information from the [`ExecutionPlan`] if it is a
410/// [`GlobalLimitExec`] or a [`LocalLimitExec`].
411fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitInfo> {
412 if let Some(global_limit) = plan.downcast_ref::<GlobalLimitExec>() {
413 Some(LimitInfo {
414 input: Arc::clone(global_limit.input()),
415 fetch: global_limit.fetch(),
416 skip: global_limit.skip(),
417 preserve_order: global_limit.required_ordering().is_some(),
418 })
419 } else {
420 plan.downcast_ref::<LocalLimitExec>()
421 .map(|local_limit| LimitInfo {
422 input: Arc::clone(local_limit.input()),
423 fetch: Some(local_limit.fetch()),
424 skip: 0,
425 preserve_order: local_limit.required_ordering().is_some(),
426 })
427 }
428}
429
430/// Checks if the given plan combines input partitions.
431fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
432 plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
433}
434
435/// Adds a limit to the plan, chooses between global and local limits based on
436/// skip value and the number of partitions.
437fn add_limit(
438 pushdown_plan: Arc<dyn ExecutionPlan>,
439 skip: usize,
440 fetch: usize,
441) -> Arc<dyn ExecutionPlan> {
442 if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
443 add_global_limit(pushdown_plan, skip, Some(fetch))
444 } else {
445 Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
446 }
447}
448
449/// Adds a global limit to the plan.
450fn add_global_limit(
451 pushdown_plan: Arc<dyn ExecutionPlan>,
452 skip: usize,
453 fetch: Option<usize>,
454) -> Arc<dyn ExecutionPlan> {
455 Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
456}
457
458// See tests in datafusion/core/tests/physical_optimizer