datafusion_optimizer/
eliminate_limit.rs1use crate::optimizer::ApplyOrder;
20use crate::{OptimizerConfig, OptimizerRule};
21use datafusion_common::Result;
22use datafusion_common::tree_node::Transformed;
23use datafusion_expr::logical_plan::{EmptyRelation, FetchType, LogicalPlan, SkipType};
24use std::sync::Arc;
25
26#[derive(Default, Debug)]
34pub struct EliminateLimit;
35
36impl EliminateLimit {
37 #[expect(missing_docs)]
38 pub fn new() -> Self {
39 Self {}
40 }
41}
42
43impl OptimizerRule for EliminateLimit {
44 fn name(&self) -> &str {
45 "eliminate_limit"
46 }
47
48 fn apply_order(&self) -> Option<ApplyOrder> {
49 Some(ApplyOrder::BottomUp)
50 }
51
52 fn supports_rewrite(&self) -> bool {
53 true
54 }
55
56 fn rewrite(
57 &self,
58 plan: LogicalPlan,
59 _config: &dyn OptimizerConfig,
60 ) -> Result<Transformed<LogicalPlan>, datafusion_common::DataFusionError> {
61 match plan {
62 LogicalPlan::Limit(limit) => {
63 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66 };
67
68 if let Some(v) = fetch {
69 if v == 0 {
70 return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
71 EmptyRelation {
72 produce_one_row: false,
73 schema: Arc::clone(limit.input.schema()),
74 },
75 )));
76 }
77 } else if matches!(limit.get_skip_type()?, SkipType::Literal(0)) {
78 #[expect(clippy::used_underscore_binding)]
81 return self.rewrite(Arc::unwrap_or_clone(limit.input), _config);
82 }
83 Ok(Transformed::no(LogicalPlan::Limit(limit)))
84 }
85 _ => Ok(Transformed::no(plan)),
86 }
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::OptimizerContext;
94 use crate::test::*;
95 use datafusion_common::Column;
96 use datafusion_expr::{
97 col,
98 logical_plan::{JoinType, builder::LogicalPlanBuilder},
99 };
100
101 use crate::assert_optimized_plan_eq_snapshot;
102 use crate::push_down_limit::PushDownLimit;
103 use datafusion_expr::test::function_stub::sum;
104
105 macro_rules! assert_optimized_plan_equal {
106 (
107 $plan:expr,
108 @ $expected:literal $(,)?
109 ) => {{
110 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateLimit::new())];
111 let optimizer_ctx = OptimizerContext::new();
112 assert_optimized_plan_eq_snapshot!(
113 optimizer_ctx,
114 rules,
115 $plan,
116 @ $expected,
117 )
118 }};
119 }
120
121 macro_rules! assert_optimized_plan_eq_with_pushdown {
122 (
123 $plan:expr,
124 @ $expected:literal $(,)?
125 ) => {{
126 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
127 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
128 Arc::new(PushDownLimit::new()),
129 Arc::new(EliminateLimit::new())
130 ];
131 assert_optimized_plan_eq_snapshot!(
132 optimizer_ctx,
133 rules,
134 $plan,
135 @ $expected,
136 )
137 }};
138 }
139
140 #[test]
141 fn limit_0_root() -> Result<()> {
142 let table_scan = test_table_scan().unwrap();
143 let plan = LogicalPlanBuilder::from(table_scan)
144 .aggregate(vec![col("a")], vec![sum(col("b"))])?
145 .limit(0, Some(0))?
146 .build()?;
147 assert_optimized_plan_equal!(
149 plan,
150 @ "EmptyRelation: rows=0"
151 )
152 }
153
154 #[test]
155 fn limit_0_nested() -> Result<()> {
156 let table_scan = test_table_scan()?;
157 let plan1 = LogicalPlanBuilder::from(table_scan.clone())
158 .aggregate(vec![col("a")], vec![sum(col("b"))])?
159 .build()?;
160 let plan = LogicalPlanBuilder::from(table_scan)
161 .aggregate(vec![col("a")], vec![sum(col("b"))])?
162 .limit(0, Some(0))?
163 .union(plan1)?
164 .build()?;
165
166 assert_optimized_plan_equal!(
168 plan,
169 @ r"
170 Union
171 EmptyRelation: rows=0
172 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
173 TableScan: test
174 "
175 )
176 }
177
178 #[test]
179 fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
180 let table_scan = test_table_scan()?;
181 let plan = LogicalPlanBuilder::from(table_scan)
182 .aggregate(vec![col("a")], vec![sum(col("b"))])?
183 .limit(0, Some(2))?
184 .limit(2, None)?
185 .build()?;
186
187 assert_optimized_plan_eq_with_pushdown!(
189 plan,
190 @ "EmptyRelation: rows=0"
191 )
192 }
193
194 #[test]
195 fn multi_limit_offset_sort_eliminate() -> Result<()> {
196 let table_scan = test_table_scan()?;
197 let plan = LogicalPlanBuilder::from(table_scan)
198 .aggregate(vec![col("a")], vec![sum(col("b"))])?
199 .limit(0, Some(2))?
200 .sort_by(vec![col("a")])?
201 .limit(2, Some(1))?
202 .build()?;
203
204 assert_optimized_plan_eq_with_pushdown!(
207 plan,
208 @ r"
209 Limit: skip=2, fetch=1
210 Sort: test.a ASC NULLS LAST, fetch=3
211 Limit: skip=0, fetch=2
212 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
213 TableScan: test
214 "
215 )
216 }
217
218 #[test]
219 fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
220 let table_scan = test_table_scan()?;
221 let plan = LogicalPlanBuilder::from(table_scan)
222 .aggregate(vec![col("a")], vec![sum(col("b"))])?
223 .limit(0, Some(2))?
224 .sort_by(vec![col("a")])?
225 .limit(0, Some(1))?
226 .build()?;
227
228 assert_optimized_plan_equal!(
229 plan,
230 @ r"
231 Limit: skip=0, fetch=1
232 Sort: test.a ASC NULLS LAST
233 Limit: skip=0, fetch=2
234 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
235 TableScan: test
236 "
237 )
238 }
239
240 #[test]
241 fn limit_with_ancestor_limit() -> Result<()> {
242 let table_scan = test_table_scan().unwrap();
243 let plan = LogicalPlanBuilder::from(table_scan)
244 .aggregate(vec![col("a")], vec![sum(col("b"))])?
245 .limit(2, Some(1))?
246 .sort_by(vec![col("a")])?
247 .limit(3, Some(1))?
248 .build()?;
249
250 assert_optimized_plan_equal!(
251 plan,
252 @ r"
253 Limit: skip=3, fetch=1
254 Sort: test.a ASC NULLS LAST
255 Limit: skip=2, fetch=1
256 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
257 TableScan: test
258 "
259 )
260 }
261
262 #[test]
263 fn limit_join_with_ancestor_limit() -> Result<()> {
264 let table_scan = test_table_scan()?;
265 let table_scan_inner = test_table_scan_with_name("test1")?;
266 let plan = LogicalPlanBuilder::from(table_scan)
267 .limit(2, Some(1))?
268 .join_using(
269 table_scan_inner,
270 JoinType::Inner,
271 vec![Column::from_name("a".to_string())],
272 )?
273 .limit(3, Some(1))?
274 .build()?;
275
276 assert_optimized_plan_equal!(
277 plan,
278 @ r"
279 Limit: skip=3, fetch=1
280 Inner Join: Using test.a = test1.a
281 Limit: skip=2, fetch=1
282 TableScan: test
283 TableScan: test1
284 "
285 )
286 }
287
288 #[test]
289 fn remove_zero_offset() -> Result<()> {
290 let table_scan = test_table_scan()?;
291 let plan = LogicalPlanBuilder::from(table_scan)
292 .aggregate(vec![col("a")], vec![sum(col("b"))])?
293 .limit(0, None)?
294 .build()?;
295
296 assert_optimized_plan_equal!(
297 plan,
298 @ r"
299 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
300 TableScan: test
301 "
302 )
303 }
304}