1use crate::{expr::Sort, lit};
27use arrow::datatypes::DataType;
28use std::fmt::{self, Formatter};
29use std::hash::Hash;
30
31use datafusion_common::{plan_err, sql_err, DataFusionError, Result, ScalarValue};
32use sqlparser::ast::{self, ValueWithSpan};
33use sqlparser::parser::ParserError::ParserError;
34
35#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct WindowFrame {
41 pub units: WindowFrameUnits,
43 pub start_bound: WindowFrameBound,
45 pub end_bound: WindowFrameBound,
47 causal: bool,
94}
95
96impl fmt::Display for WindowFrame {
97 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
98 write!(
99 f,
100 "{} BETWEEN {} AND {}",
101 self.units, self.start_bound, self.end_bound
102 )?;
103 Ok(())
104 }
105}
106
107impl fmt::Debug for WindowFrame {
108 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109 write!(
110 f,
111 "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
112 self.units, self.start_bound, self.end_bound, self.causal
113 )?;
114 Ok(())
115 }
116}
117
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119 type Error = DataFusionError;
120
121 fn try_from(value: ast::WindowFrame) -> Result<Self> {
122 let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123 let end_bound = match value.end_bound {
124 Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125 None => WindowFrameBound::CurrentRow,
126 };
127
128 if let WindowFrameBound::Following(val) = &start_bound {
129 if val.is_null() {
130 plan_err!(
131 "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132 )?
133 }
134 } else if let WindowFrameBound::Preceding(val) = &end_bound {
135 if val.is_null() {
136 plan_err!(
137 "Invalid window frame: end bound cannot be UNBOUNDED PRECEDING"
138 )?
139 }
140 };
141
142 let units = value.units.into();
143 Ok(Self::new_bounds(units, start_bound, end_bound))
144 }
145}
146
147impl WindowFrame {
148 pub fn new(order_by: Option<bool>) -> Self {
152 if let Some(strict) = order_by {
153 Self {
158 units: if strict {
159 WindowFrameUnits::Rows
160 } else {
161 WindowFrameUnits::Range
162 },
163 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
164 end_bound: WindowFrameBound::CurrentRow,
165 causal: strict,
166 }
167 } else {
168 Self {
172 units: WindowFrameUnits::Rows,
173 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
174 end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
175 causal: false,
176 }
177 }
178 }
179
180 pub fn reverse(&self) -> Self {
184 let start_bound = match &self.end_bound {
185 WindowFrameBound::Preceding(value) => {
186 WindowFrameBound::Following(value.clone())
187 }
188 WindowFrameBound::Following(value) => {
189 WindowFrameBound::Preceding(value.clone())
190 }
191 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
192 };
193 let end_bound = match &self.start_bound {
194 WindowFrameBound::Preceding(value) => {
195 WindowFrameBound::Following(value.clone())
196 }
197 WindowFrameBound::Following(value) => {
198 WindowFrameBound::Preceding(value.clone())
199 }
200 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
201 };
202 Self::new_bounds(self.units, start_bound, end_bound)
203 }
204
205 pub fn is_causal(&self) -> bool {
207 self.causal
208 }
209
210 pub fn new_bounds(
212 units: WindowFrameUnits,
213 start_bound: WindowFrameBound,
214 end_bound: WindowFrameBound,
215 ) -> Self {
216 let causal = match units {
217 WindowFrameUnits::Rows => match &end_bound {
218 WindowFrameBound::Following(value) => {
219 if value.is_null() {
220 false
222 } else {
223 let zero = ScalarValue::new_zero(&value.data_type());
224 zero.map(|zero| value.eq(&zero)).unwrap_or(false)
225 }
226 }
227 _ => true,
228 },
229 WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
230 WindowFrameBound::Preceding(value) => {
231 if value.is_null() {
232 true
234 } else {
235 let zero = ScalarValue::new_zero(&value.data_type());
236 zero.map(|zero| value.gt(&zero)).unwrap_or(false)
237 }
238 }
239 _ => false,
240 },
241 };
242 Self {
243 units,
244 start_bound,
245 end_bound,
246 causal,
247 }
248 }
249
250 pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
252 match self.units {
253 WindowFrameUnits::Range if self.free_range() => {
258 if order_by.is_empty() {
263 order_by.push(lit(1u64).sort(true, false));
264 }
265 }
266 WindowFrameUnits::Range if order_by.len() != 1 => {
267 return plan_err!("RANGE requires exactly one ORDER BY column");
268 }
269 WindowFrameUnits::Groups if order_by.is_empty() => {
270 return plan_err!("GROUPS requires an ORDER BY clause");
271 }
272 _ => {}
273 }
274 Ok(())
275 }
276
277 pub fn can_accept_multi_orderby(&self) -> bool {
279 match self.units {
280 WindowFrameUnits::Rows => true,
281 WindowFrameUnits::Range => self.free_range(),
282 WindowFrameUnits::Groups => true,
283 }
284 }
285
286 fn free_range(&self) -> bool {
289 (self.start_bound.is_unbounded()
290 || self.start_bound == WindowFrameBound::CurrentRow)
291 && (self.end_bound.is_unbounded()
292 || self.end_bound == WindowFrameBound::CurrentRow)
293 }
294
295 pub fn is_ever_expanding(&self) -> bool {
299 self.start_bound.is_unbounded()
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
312pub enum WindowFrameBound {
313 Preceding(ScalarValue),
320 CurrentRow,
327 Following(ScalarValue),
333}
334
335impl WindowFrameBound {
336 pub fn is_unbounded(&self) -> bool {
337 match self {
338 WindowFrameBound::Preceding(elem) => elem.is_null(),
339 WindowFrameBound::CurrentRow => false,
340 WindowFrameBound::Following(elem) => elem.is_null(),
341 }
342 }
343}
344
345impl WindowFrameBound {
346 fn try_parse(
347 value: ast::WindowFrameBound,
348 units: &ast::WindowFrameUnits,
349 ) -> Result<Self> {
350 Ok(match value {
351 ast::WindowFrameBound::Preceding(Some(v)) => {
352 Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
353 }
354 ast::WindowFrameBound::Preceding(None) => {
355 Self::Preceding(ScalarValue::UInt64(None))
356 }
357 ast::WindowFrameBound::Following(Some(v)) => {
358 Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
359 }
360 ast::WindowFrameBound::Following(None) => {
361 Self::Following(ScalarValue::UInt64(None))
362 }
363 ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
364 })
365 }
366}
367
368fn convert_frame_bound_to_scalar_value(
369 v: ast::Expr,
370 units: &ast::WindowFrameUnits,
371) -> Result<ScalarValue> {
372 match units {
373 ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
375 ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => {
376 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
377 },
378 ast::Expr::Interval(ast::Interval {
379 value,
380 leading_field: None,
381 leading_precision: None,
382 last_field: None,
383 fractional_seconds_precision: None,
384 }) => {
385 let value = match *value {
386 ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
387 e => {
388 return sql_err!(ParserError(format!(
389 "INTERVAL expression cannot be {e:?}"
390 )));
391 }
392 };
393 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
394 }
395 _ => plan_err!(
396 "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
397 ),
398 },
399 ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
402 ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => value,
403 ast::Expr::Interval(ast::Interval {
404 value,
405 leading_field,
406 ..
407 }) => {
408 let result = match *value {
409 ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
410 e => {
411 return sql_err!(ParserError(format!(
412 "INTERVAL expression cannot be {e:?}"
413 )));
414 }
415 };
416 if let Some(leading_field) = leading_field {
417 format!("{result} {leading_field}")
418 } else {
419 result
420 }
421 }
422 _ => plan_err!(
423 "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
424 )?,
425 }))),
426 }
427}
428
429impl fmt::Display for WindowFrameBound {
430 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
431 match self {
432 WindowFrameBound::Preceding(n) => {
433 if n.is_null() {
434 f.write_str("UNBOUNDED PRECEDING")
435 } else {
436 write!(f, "{n} PRECEDING")
437 }
438 }
439 WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
440 WindowFrameBound::Following(n) => {
441 if n.is_null() {
442 f.write_str("UNBOUNDED FOLLOWING")
443 } else {
444 write!(f, "{n} FOLLOWING")
445 }
446 }
447 }
448 }
449}
450
451#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
454pub enum WindowFrameUnits {
455 Rows,
458 Range,
464 Groups,
468}
469
470impl fmt::Display for WindowFrameUnits {
471 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
472 f.write_str(match self {
473 WindowFrameUnits::Rows => "ROWS",
474 WindowFrameUnits::Range => "RANGE",
475 WindowFrameUnits::Groups => "GROUPS",
476 })
477 }
478}
479
480impl From<ast::WindowFrameUnits> for WindowFrameUnits {
481 fn from(value: ast::WindowFrameUnits) -> Self {
482 match value {
483 ast::WindowFrameUnits::Range => Self::Range,
484 ast::WindowFrameUnits::Groups => Self::Groups,
485 ast::WindowFrameUnits::Rows => Self::Rows,
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn test_window_frame_creation() -> Result<()> {
496 let window_frame = ast::WindowFrame {
497 units: ast::WindowFrameUnits::Range,
498 start_bound: ast::WindowFrameBound::Following(None),
499 end_bound: None,
500 };
501 let err = WindowFrame::try_from(window_frame).unwrap_err();
502 assert_eq!(
503 err.strip_backtrace(),
504 "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
505 );
506
507 let window_frame = ast::WindowFrame {
508 units: ast::WindowFrameUnits::Range,
509 start_bound: ast::WindowFrameBound::Preceding(None),
510 end_bound: Some(ast::WindowFrameBound::Preceding(None)),
511 };
512 let err = WindowFrame::try_from(window_frame).unwrap_err();
513 assert_eq!(
514 err.strip_backtrace(),
515 "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
516 );
517
518 let window_frame = ast::WindowFrame {
519 units: ast::WindowFrameUnits::Rows,
520 start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
521 ast::Expr::value(ast::Value::Number("2".to_string(), false)),
522 ))),
523 end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
524 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
525 )))),
526 };
527
528 let window_frame = WindowFrame::try_from(window_frame)?;
529 assert_eq!(window_frame.units, WindowFrameUnits::Rows);
530 assert_eq!(
531 window_frame.start_bound,
532 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
533 );
534 assert_eq!(
535 window_frame.end_bound,
536 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
537 );
538
539 Ok(())
540 }
541
542 macro_rules! test_bound {
543 ($unit:ident, $value:expr, $expected:expr) => {
544 let preceding = WindowFrameBound::try_parse(
545 ast::WindowFrameBound::Preceding($value),
546 &ast::WindowFrameUnits::$unit,
547 )?;
548 assert_eq!(preceding, WindowFrameBound::Preceding($expected));
549 let following = WindowFrameBound::try_parse(
550 ast::WindowFrameBound::Following($value),
551 &ast::WindowFrameUnits::$unit,
552 )?;
553 assert_eq!(following, WindowFrameBound::Following($expected));
554 };
555 }
556
557 macro_rules! test_bound_err {
558 ($unit:ident, $value:expr, $expected:expr) => {
559 let err = WindowFrameBound::try_parse(
560 ast::WindowFrameBound::Preceding($value),
561 &ast::WindowFrameUnits::$unit,
562 )
563 .unwrap_err();
564 assert_eq!(err.strip_backtrace(), $expected);
565 let err = WindowFrameBound::try_parse(
566 ast::WindowFrameBound::Following($value),
567 &ast::WindowFrameUnits::$unit,
568 )
569 .unwrap_err();
570 assert_eq!(err.strip_backtrace(), $expected);
571 };
572 }
573
574 #[test]
575 fn test_window_frame_bound_creation() -> Result<()> {
576 test_bound!(Rows, None, ScalarValue::UInt64(None));
578 test_bound!(Groups, None, ScalarValue::UInt64(None));
579 test_bound!(Range, None, ScalarValue::UInt64(None));
580
581 let number = Some(Box::new(ast::Expr::Value(
583 ast::Value::Number("42".to_string(), false).into(),
584 )));
585 test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
586 test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
587 test_bound!(
588 Range,
589 number.clone(),
590 ScalarValue::Utf8(Some("42".to_string()))
591 );
592
593 let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
595 value: Box::new(ast::Expr::Value(
596 ast::Value::SingleQuotedString("1".to_string()).into(),
597 )),
598 leading_field: Some(ast::DateTimeField::Day),
599 fractional_seconds_precision: None,
600 last_field: None,
601 leading_precision: None,
602 })));
603 test_bound_err!(Rows, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
604 test_bound_err!(Groups, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
605 test_bound!(
606 Range,
607 number.clone(),
608 ScalarValue::Utf8(Some("1 DAY".to_string()))
609 );
610
611 Ok(())
612 }
613}