datafusion_optimizer/
eliminate_limit.rs1use crate::optimizer::ApplyOrder;
20use crate::{OptimizerConfig, OptimizerRule};
21use datafusion_common::tree_node::Transformed;
22use datafusion_common::Result;
23use datafusion_expr::logical_plan::{EmptyRelation, FetchType, LogicalPlan, SkipType};
24use std::sync::Arc;
25
26#[derive(Default, Debug)]
34pub struct EliminateLimit;
35
36impl EliminateLimit {
37 #[allow(missing_docs)]
38 pub fn new() -> Self {
39 Self {}
40 }
41}
42
43impl OptimizerRule for EliminateLimit {
44 fn name(&self) -> &str {
45 "eliminate_limit"
46 }
47
48 fn apply_order(&self) -> Option<ApplyOrder> {
49 Some(ApplyOrder::BottomUp)
50 }
51
52 fn supports_rewrite(&self) -> bool {
53 true
54 }
55
56 fn rewrite(
57 &self,
58 plan: LogicalPlan,
59 _config: &dyn OptimizerConfig,
60 ) -> Result<Transformed<LogicalPlan>, datafusion_common::DataFusionError> {
61 match plan {
62 LogicalPlan::Limit(limit) => {
63 let FetchType::Literal(fetch) = limit.get_fetch_type()? else {
65 return Ok(Transformed::no(LogicalPlan::Limit(limit)));
66 };
67
68 if let Some(v) = fetch {
69 if v == 0 {
70 return Ok(Transformed::yes(LogicalPlan::EmptyRelation(
71 EmptyRelation {
72 produce_one_row: false,
73 schema: Arc::clone(limit.input.schema()),
74 },
75 )));
76 }
77 } else if matches!(limit.get_skip_type()?, SkipType::Literal(0)) {
78 #[allow(clippy::used_underscore_binding)]
81 return self.rewrite(Arc::unwrap_or_clone(limit.input), _config);
82 }
83 Ok(Transformed::no(LogicalPlan::Limit(limit)))
84 }
85 _ => Ok(Transformed::no(plan)),
86 }
87 }
88}
89
90#[cfg(test)]
91mod tests {
92 use super::*;
93 use crate::test::*;
94 use crate::OptimizerContext;
95 use datafusion_common::Column;
96 use datafusion_expr::{
97 col,
98 logical_plan::{builder::LogicalPlanBuilder, JoinType},
99 };
100 use std::sync::Arc;
101
102 use crate::assert_optimized_plan_eq_snapshot;
103 use crate::push_down_limit::PushDownLimit;
104 use datafusion_expr::test::function_stub::sum;
105
106 macro_rules! assert_optimized_plan_equal {
107 (
108 $plan:expr,
109 @ $expected:literal $(,)?
110 ) => {{
111 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![Arc::new(EliminateLimit::new())];
112 let optimizer_ctx = OptimizerContext::new();
113 assert_optimized_plan_eq_snapshot!(
114 optimizer_ctx,
115 rules,
116 $plan,
117 @ $expected,
118 )
119 }};
120 }
121
122 macro_rules! assert_optimized_plan_eq_with_pushdown {
123 (
124 $plan:expr,
125 @ $expected:literal $(,)?
126 ) => {{
127 let optimizer_ctx = OptimizerContext::new().with_max_passes(1);
128 let rules: Vec<Arc<dyn crate::OptimizerRule + Send + Sync>> = vec![
129 Arc::new(PushDownLimit::new()),
130 Arc::new(EliminateLimit::new())
131 ];
132 assert_optimized_plan_eq_snapshot!(
133 optimizer_ctx,
134 rules,
135 $plan,
136 @ $expected,
137 )
138 }};
139 }
140
141 #[test]
142 fn limit_0_root() -> Result<()> {
143 let table_scan = test_table_scan().unwrap();
144 let plan = LogicalPlanBuilder::from(table_scan)
145 .aggregate(vec![col("a")], vec![sum(col("b"))])?
146 .limit(0, Some(0))?
147 .build()?;
148 assert_optimized_plan_equal!(
150 plan,
151 @ "EmptyRelation: rows=0"
152 )
153 }
154
155 #[test]
156 fn limit_0_nested() -> Result<()> {
157 let table_scan = test_table_scan()?;
158 let plan1 = LogicalPlanBuilder::from(table_scan.clone())
159 .aggregate(vec![col("a")], vec![sum(col("b"))])?
160 .build()?;
161 let plan = LogicalPlanBuilder::from(table_scan)
162 .aggregate(vec![col("a")], vec![sum(col("b"))])?
163 .limit(0, Some(0))?
164 .union(plan1)?
165 .build()?;
166
167 assert_optimized_plan_equal!(
169 plan,
170 @ r"
171 Union
172 EmptyRelation: rows=0
173 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
174 TableScan: test
175 "
176 )
177 }
178
179 #[test]
180 fn limit_fetch_with_ancestor_limit_skip() -> Result<()> {
181 let table_scan = test_table_scan()?;
182 let plan = LogicalPlanBuilder::from(table_scan)
183 .aggregate(vec![col("a")], vec![sum(col("b"))])?
184 .limit(0, Some(2))?
185 .limit(2, None)?
186 .build()?;
187
188 assert_optimized_plan_eq_with_pushdown!(
190 plan,
191 @ "EmptyRelation: rows=0"
192 )
193 }
194
195 #[test]
196 fn multi_limit_offset_sort_eliminate() -> Result<()> {
197 let table_scan = test_table_scan()?;
198 let plan = LogicalPlanBuilder::from(table_scan)
199 .aggregate(vec![col("a")], vec![sum(col("b"))])?
200 .limit(0, Some(2))?
201 .sort_by(vec![col("a")])?
202 .limit(2, Some(1))?
203 .build()?;
204
205 assert_optimized_plan_eq_with_pushdown!(
208 plan,
209 @ r"
210 Limit: skip=2, fetch=1
211 Sort: test.a ASC NULLS LAST, fetch=3
212 Limit: skip=0, fetch=2
213 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
214 TableScan: test
215 "
216 )
217 }
218
219 #[test]
220 fn limit_fetch_with_ancestor_limit_fetch() -> Result<()> {
221 let table_scan = test_table_scan()?;
222 let plan = LogicalPlanBuilder::from(table_scan)
223 .aggregate(vec![col("a")], vec![sum(col("b"))])?
224 .limit(0, Some(2))?
225 .sort_by(vec![col("a")])?
226 .limit(0, Some(1))?
227 .build()?;
228
229 assert_optimized_plan_equal!(
230 plan,
231 @ r"
232 Limit: skip=0, fetch=1
233 Sort: test.a ASC NULLS LAST
234 Limit: skip=0, fetch=2
235 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
236 TableScan: test
237 "
238 )
239 }
240
241 #[test]
242 fn limit_with_ancestor_limit() -> Result<()> {
243 let table_scan = test_table_scan().unwrap();
244 let plan = LogicalPlanBuilder::from(table_scan)
245 .aggregate(vec![col("a")], vec![sum(col("b"))])?
246 .limit(2, Some(1))?
247 .sort_by(vec![col("a")])?
248 .limit(3, Some(1))?
249 .build()?;
250
251 assert_optimized_plan_equal!(
252 plan,
253 @ r"
254 Limit: skip=3, fetch=1
255 Sort: test.a ASC NULLS LAST
256 Limit: skip=2, fetch=1
257 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
258 TableScan: test
259 "
260 )
261 }
262
263 #[test]
264 fn limit_join_with_ancestor_limit() -> Result<()> {
265 let table_scan = test_table_scan()?;
266 let table_scan_inner = test_table_scan_with_name("test1")?;
267 let plan = LogicalPlanBuilder::from(table_scan)
268 .limit(2, Some(1))?
269 .join_using(
270 table_scan_inner,
271 JoinType::Inner,
272 vec![Column::from_name("a".to_string())],
273 )?
274 .limit(3, Some(1))?
275 .build()?;
276
277 assert_optimized_plan_equal!(
278 plan,
279 @ r"
280 Limit: skip=3, fetch=1
281 Inner Join: Using test.a = test1.a
282 Limit: skip=2, fetch=1
283 TableScan: test
284 TableScan: test1
285 "
286 )
287 }
288
289 #[test]
290 fn remove_zero_offset() -> Result<()> {
291 let table_scan = test_table_scan()?;
292 let plan = LogicalPlanBuilder::from(table_scan)
293 .aggregate(vec![col("a")], vec![sum(col("b"))])?
294 .limit(0, None)?
295 .build()?;
296
297 assert_optimized_plan_equal!(
298 plan,
299 @ r"
300 Aggregate: groupBy=[[test.a]], aggr=[[sum(test.b)]]
301 TableScan: test
302 "
303 )
304 }
305}