datafusion_physical_optimizer/
limit_pushdown.rs1use std::fmt::Debug;
22use std::sync::Arc;
23
24use crate::PhysicalOptimizerRule;
25
26use datafusion_common::config::ConfigOptions;
27use datafusion_common::error::Result;
28use datafusion_common::tree_node::{Transformed, TreeNodeRecursion};
29use datafusion_common::utils::combine_limit;
30use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
31use datafusion_physical_plan::limit::{GlobalLimitExec, LocalLimitExec};
32use datafusion_physical_plan::sorts::sort_preserving_merge::SortPreservingMergeExec;
33use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};
34#[derive(Default, Debug)]
37pub struct LimitPushdown {}
38
39#[derive(Default, Clone, Debug)]
49pub struct GlobalRequirements {
50 fetch: Option<usize>,
51 skip: usize,
52 satisfied: bool,
53 preserve_order: bool,
54}
55
56impl LimitPushdown {
57 #[expect(missing_docs)]
58 pub fn new() -> Self {
59 Self {}
60 }
61}
62
63impl PhysicalOptimizerRule for LimitPushdown {
64 fn optimize(
65 &self,
66 plan: Arc<dyn ExecutionPlan>,
67 _config: &ConfigOptions,
68 ) -> Result<Arc<dyn ExecutionPlan>> {
69 let global_state = GlobalRequirements {
70 fetch: None,
71 skip: 0,
72 satisfied: false,
73 preserve_order: false,
74 };
75 pushdown_limits(plan, global_state)
76 }
77
78 fn name(&self) -> &str {
79 "LimitPushdown"
80 }
81
82 fn schema_check(&self) -> bool {
83 true
84 }
85}
86
87#[derive(Debug)]
90pub enum LimitExec {
91 Global(GlobalLimitExec),
92 Local(LocalLimitExec),
93}
94
95impl LimitExec {
96 fn input(&self) -> &Arc<dyn ExecutionPlan> {
97 match self {
98 Self::Global(global) => global.input(),
99 Self::Local(local) => local.input(),
100 }
101 }
102
103 fn fetch(&self) -> Option<usize> {
104 match self {
105 Self::Global(global) => global.fetch(),
106 Self::Local(local) => Some(local.fetch()),
107 }
108 }
109
110 fn skip(&self) -> usize {
111 match self {
112 Self::Global(global) => global.skip(),
113 Self::Local(_) => 0,
114 }
115 }
116
117 fn preserve_order(&self) -> bool {
118 match self {
119 Self::Global(global) => global.required_ordering().is_some(),
120 Self::Local(local) => local.required_ordering().is_some(),
121 }
122 }
123}
124
125impl From<LimitExec> for Arc<dyn ExecutionPlan> {
126 fn from(limit_exec: LimitExec) -> Self {
127 match limit_exec {
128 LimitExec::Global(global) => Arc::new(global),
129 LimitExec::Local(local) => Arc::new(local),
130 }
131 }
132}
133
134pub fn pushdown_limit_helper(
142 mut pushdown_plan: Arc<dyn ExecutionPlan>,
143 mut global_state: GlobalRequirements,
144) -> Result<(Transformed<Arc<dyn ExecutionPlan>>, GlobalRequirements)> {
145 if let Some(limit_exec) = extract_limit(&pushdown_plan) {
147 let (skip, fetch) = combine_limit(
150 global_state.skip,
151 global_state.fetch,
152 limit_exec.skip(),
153 limit_exec.fetch(),
154 );
155 global_state.skip = skip;
156 global_state.fetch = fetch;
157 global_state.preserve_order = limit_exec.preserve_order();
158 global_state.satisfied = false;
159
160 return Ok((
164 Transformed {
165 data: Arc::clone(limit_exec.input()),
166 transformed: true,
167 tnr: TreeNodeRecursion::Stop,
168 },
169 global_state,
170 ));
171 }
172
173 if pushdown_plan.fetch().is_some() {
176 if global_state.skip == 0 {
177 global_state.satisfied = true;
178 }
179 (global_state.skip, global_state.fetch) = combine_limit(
180 global_state.skip,
181 global_state.fetch,
182 0,
183 pushdown_plan.fetch(),
184 );
185 }
186
187 let Some(global_fetch) = global_state.fetch else {
188 return if global_state.skip > 0 && !global_state.satisfied {
190 global_state.satisfied = true;
192 Ok((
193 Transformed::yes(add_global_limit(
194 pushdown_plan,
195 global_state.skip,
196 None,
197 )),
198 global_state,
199 ))
200 } else {
201 Ok((Transformed::no(pushdown_plan), global_state))
203 };
204 };
205
206 let skip_and_fetch = Some(global_fetch + global_state.skip);
207
208 if pushdown_plan.supports_limit_pushdown() {
209 if !combines_input_partitions(&pushdown_plan) {
210 Ok((Transformed::no(pushdown_plan), global_state))
213 } else if let Some(plan_with_fetch) = pushdown_plan.with_fetch(skip_and_fetch) {
214 let mut new_plan = plan_with_fetch;
218 if global_state.skip > 0 {
221 new_plan =
222 add_global_limit(new_plan, global_state.skip, global_state.fetch);
223 }
224 global_state.fetch = skip_and_fetch;
225 global_state.skip = 0;
226 global_state.satisfied = true;
227 Ok((Transformed::yes(new_plan), global_state))
228 } else if global_state.satisfied {
229 Ok((Transformed::no(pushdown_plan), global_state))
231 } else {
232 global_state.satisfied = true;
233 Ok((
234 Transformed::yes(add_limit(
235 pushdown_plan,
236 global_state.skip,
237 global_fetch,
238 )),
239 global_state,
240 ))
241 }
242 } else {
243 let global_skip = global_state.skip;
249 global_state.fetch = None;
250 global_state.skip = 0;
251
252 let maybe_fetchable = pushdown_plan.with_fetch(skip_and_fetch);
253 if global_state.satisfied {
254 if let Some(plan_with_fetch) = maybe_fetchable {
255 let plan_with_preserve_order = plan_with_fetch
256 .with_preserve_order(global_state.preserve_order)
257 .unwrap_or(plan_with_fetch);
258 Ok((Transformed::yes(plan_with_preserve_order), global_state))
259 } else {
260 Ok((Transformed::no(pushdown_plan), global_state))
261 }
262 } else {
263 global_state.satisfied = true;
264 pushdown_plan = if let Some(plan_with_fetch) = maybe_fetchable {
265 let plan_with_preserve_order = plan_with_fetch
266 .with_preserve_order(global_state.preserve_order)
267 .unwrap_or(plan_with_fetch);
268
269 if global_skip > 0 {
270 add_global_limit(
271 plan_with_preserve_order,
272 global_skip,
273 Some(global_fetch),
274 )
275 } else {
276 plan_with_preserve_order
277 }
278 } else {
279 add_limit(pushdown_plan, global_skip, global_fetch)
280 };
281 Ok((Transformed::yes(pushdown_plan), global_state))
282 }
283 }
284}
285
286pub(crate) fn pushdown_limits(
288 pushdown_plan: Arc<dyn ExecutionPlan>,
289 global_state: GlobalRequirements,
290) -> Result<Arc<dyn ExecutionPlan>> {
291 let (mut new_node, mut global_state) =
294 pushdown_limit_helper(pushdown_plan, global_state)?;
295
296 while new_node.tnr == TreeNodeRecursion::Stop {
298 (new_node, global_state) = pushdown_limit_helper(new_node.data, global_state)?;
299 }
300
301 let children = new_node.data.children();
303 let new_children = children
304 .into_iter()
305 .map(|child| {
306 pushdown_limits(Arc::<dyn ExecutionPlan>::clone(child), global_state.clone())
307 })
308 .collect::<Result<_>>()?;
309 new_node.data.with_new_children(new_children)
310}
311
312fn extract_limit(plan: &Arc<dyn ExecutionPlan>) -> Option<LimitExec> {
315 if let Some(global_limit) = plan.as_any().downcast_ref::<GlobalLimitExec>() {
316 Some(LimitExec::Global(GlobalLimitExec::new(
317 Arc::clone(global_limit.input()),
318 global_limit.skip(),
319 global_limit.fetch(),
320 )))
321 } else {
322 plan.as_any()
323 .downcast_ref::<LocalLimitExec>()
324 .map(|local_limit| {
325 LimitExec::Local(LocalLimitExec::new(
326 Arc::clone(local_limit.input()),
327 local_limit.fetch(),
328 ))
329 })
330 }
331}
332
333fn combines_input_partitions(plan: &Arc<dyn ExecutionPlan>) -> bool {
335 let plan = plan.as_any();
336 plan.is::<CoalescePartitionsExec>() || plan.is::<SortPreservingMergeExec>()
337}
338
339fn add_limit(
342 pushdown_plan: Arc<dyn ExecutionPlan>,
343 skip: usize,
344 fetch: usize,
345) -> Arc<dyn ExecutionPlan> {
346 if skip > 0 || pushdown_plan.output_partitioning().partition_count() == 1 {
347 add_global_limit(pushdown_plan, skip, Some(fetch))
348 } else {
349 Arc::new(LocalLimitExec::new(pushdown_plan, fetch + skip)) as _
350 }
351}
352
353fn add_global_limit(
355 pushdown_plan: Arc<dyn ExecutionPlan>,
356 skip: usize,
357 fetch: Option<usize>,
358) -> Arc<dyn ExecutionPlan> {
359 Arc::new(GlobalLimitExec::new(pushdown_plan, skip, fetch)) as _
360}
361
362