1use crate::{expr::Sort, lit};
27use std::fmt::{self, Formatter};
28use std::hash::Hash;
29
30use datafusion_common::{Result, ScalarValue, plan_err};
31#[cfg(feature = "sql")]
32use sqlparser::ast::{self, ValueWithSpan};
33
34#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
39pub struct WindowFrame {
40 pub units: WindowFrameUnits,
42 pub start_bound: WindowFrameBound,
44 pub end_bound: WindowFrameBound,
46 causal: bool,
93}
94
95impl fmt::Display for WindowFrame {
96 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97 write!(
98 f,
99 "{} BETWEEN {} AND {}",
100 self.units, self.start_bound, self.end_bound
101 )?;
102 Ok(())
103 }
104}
105
106impl fmt::Debug for WindowFrame {
107 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108 write!(
109 f,
110 "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
111 self.units, self.start_bound, self.end_bound, self.causal
112 )?;
113 Ok(())
114 }
115}
116
117#[cfg(feature = "sql")]
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119 type Error = datafusion_common::error::DataFusionError;
120
121 fn try_from(value: ast::WindowFrame) -> Result<Self> {
122 let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123 let end_bound = match value.end_bound {
124 Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125 None => WindowFrameBound::CurrentRow,
126 };
127
128 if let WindowFrameBound::Following(val) = &start_bound {
129 if val.is_null() {
130 plan_err!(
131 "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132 )?
133 }
134 } else if let WindowFrameBound::Preceding(val) = &end_bound
135 && val.is_null()
136 {
137 plan_err!("Invalid window frame: end bound cannot be UNBOUNDED PRECEDING")?
138 };
139
140 let units = value.units.into();
141 Ok(Self::new_bounds(units, start_bound, end_bound))
142 }
143}
144
145impl WindowFrame {
146 pub fn new(order_by: Option<bool>) -> Self {
150 if let Some(strict) = order_by {
151 Self {
156 units: if strict {
157 WindowFrameUnits::Rows
158 } else {
159 WindowFrameUnits::Range
160 },
161 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
162 end_bound: WindowFrameBound::CurrentRow,
163 causal: strict,
164 }
165 } else {
166 Self {
170 units: WindowFrameUnits::Rows,
171 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
172 end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
173 causal: false,
174 }
175 }
176 }
177
178 pub fn reverse(&self) -> Self {
182 let start_bound = match &self.end_bound {
183 WindowFrameBound::Preceding(value) => {
184 WindowFrameBound::Following(value.clone())
185 }
186 WindowFrameBound::Following(value) => {
187 WindowFrameBound::Preceding(value.clone())
188 }
189 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
190 };
191 let end_bound = match &self.start_bound {
192 WindowFrameBound::Preceding(value) => {
193 WindowFrameBound::Following(value.clone())
194 }
195 WindowFrameBound::Following(value) => {
196 WindowFrameBound::Preceding(value.clone())
197 }
198 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
199 };
200 Self::new_bounds(self.units, start_bound, end_bound)
201 }
202
203 pub fn is_causal(&self) -> bool {
205 self.causal
206 }
207
208 pub fn new_bounds(
210 units: WindowFrameUnits,
211 start_bound: WindowFrameBound,
212 end_bound: WindowFrameBound,
213 ) -> Self {
214 let causal = match units {
215 WindowFrameUnits::Rows => match &end_bound {
216 WindowFrameBound::Following(value) => {
217 if value.is_null() {
218 false
220 } else {
221 let zero = ScalarValue::new_zero(&value.data_type());
222 zero.map(|zero| value.eq(&zero)).unwrap_or(false)
223 }
224 }
225 _ => true,
226 },
227 WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
228 WindowFrameBound::Preceding(value) => {
229 if value.is_null() {
230 true
232 } else {
233 let zero = ScalarValue::new_zero(&value.data_type());
234 zero.map(|zero| value.gt(&zero)).unwrap_or(false)
235 }
236 }
237 _ => false,
238 },
239 };
240 Self {
241 units,
242 start_bound,
243 end_bound,
244 causal,
245 }
246 }
247
248 pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
250 match self.units {
251 WindowFrameUnits::Range if self.free_range() && order_by.is_empty() => {
256 order_by.push(lit(1u64).sort(true, false));
261 }
262 WindowFrameUnits::Range if self.free_range() => {}
263 WindowFrameUnits::Range if order_by.len() != 1 => {
264 return plan_err!("RANGE requires exactly one ORDER BY column");
265 }
266 WindowFrameUnits::Groups if order_by.is_empty() => {
267 return plan_err!("GROUPS requires an ORDER BY clause");
268 }
269 _ => {}
270 }
271 Ok(())
272 }
273
274 pub fn can_accept_multi_orderby(&self) -> bool {
276 match self.units {
277 WindowFrameUnits::Rows => true,
278 WindowFrameUnits::Range => self.free_range(),
279 WindowFrameUnits::Groups => true,
280 }
281 }
282
283 fn free_range(&self) -> bool {
286 (self.start_bound.is_unbounded()
287 || self.start_bound == WindowFrameBound::CurrentRow)
288 && (self.end_bound.is_unbounded()
289 || self.end_bound == WindowFrameBound::CurrentRow)
290 }
291
292 pub fn is_ever_expanding(&self) -> bool {
296 self.start_bound.is_unbounded()
297 }
298}
299
300#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
308pub enum WindowFrameBound {
309 Preceding(ScalarValue),
316 CurrentRow,
323 Following(ScalarValue),
329}
330
331impl WindowFrameBound {
332 pub fn is_unbounded(&self) -> bool {
333 match self {
334 WindowFrameBound::Preceding(elem) => elem.is_null(),
335 WindowFrameBound::CurrentRow => false,
336 WindowFrameBound::Following(elem) => elem.is_null(),
337 }
338 }
339}
340
341impl WindowFrameBound {
342 #[cfg(feature = "sql")]
343 fn try_parse(
344 value: ast::WindowFrameBound,
345 units: &ast::WindowFrameUnits,
346 ) -> Result<Self> {
347 Ok(match value {
348 ast::WindowFrameBound::Preceding(Some(v)) => {
349 Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
350 }
351 ast::WindowFrameBound::Preceding(None) => {
352 Self::Preceding(ScalarValue::UInt64(None))
353 }
354 ast::WindowFrameBound::Following(Some(v)) => {
355 Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
356 }
357 ast::WindowFrameBound::Following(None) => {
358 Self::Following(ScalarValue::UInt64(None))
359 }
360 ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
361 })
362 }
363}
364
365#[cfg(feature = "sql")]
366fn convert_frame_bound_to_scalar_value(
367 v: ast::Expr,
368 units: &ast::WindowFrameUnits,
369) -> Result<ScalarValue> {
370 use arrow::datatypes::DataType;
371 use datafusion_common::exec_err;
372 match units {
373 ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
375 ast::Expr::Value(ValueWithSpan {
376 value: ast::Value::Number(value, false),
377 span: _,
378 }) => Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?),
379 ast::Expr::Interval(ast::Interval {
380 value,
381 leading_field: None,
382 leading_precision: None,
383 last_field: None,
384 fractional_seconds_precision: None,
385 }) => {
386 let value = match *value {
387 ast::Expr::Value(ValueWithSpan {
388 value: ast::Value::SingleQuotedString(item),
389 span: _,
390 }) => item,
391 e => {
392 return exec_err!("INTERVAL expression cannot be {e:?}");
393 }
394 };
395 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
396 }
397 _ => plan_err!(
398 "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
399 ),
400 },
401 ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
404 ast::Expr::Value(ValueWithSpan {
405 value: ast::Value::Number(value, false),
406 span: _,
407 }) => value,
408 ast::Expr::Interval(ast::Interval {
409 value,
410 leading_field,
411 ..
412 }) => {
413 let result = match *value {
414 ast::Expr::Value(ValueWithSpan {
415 value: ast::Value::SingleQuotedString(item),
416 span: _,
417 }) => item,
418 e => {
419 return exec_err!("INTERVAL expression cannot be {e:?}");
420 }
421 };
422 if let Some(leading_field) = leading_field {
423 format!("{result} {leading_field}")
424 } else {
425 result
426 }
427 }
428 _ => plan_err!(
429 "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
430 )?,
431 }))),
432 }
433}
434
435impl fmt::Display for WindowFrameBound {
436 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
437 match self {
438 WindowFrameBound::Preceding(n) => {
439 if n.is_null() {
440 f.write_str("UNBOUNDED PRECEDING")
441 } else {
442 write!(f, "{n} PRECEDING")
443 }
444 }
445 WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
446 WindowFrameBound::Following(n) => {
447 if n.is_null() {
448 f.write_str("UNBOUNDED FOLLOWING")
449 } else {
450 write!(f, "{n} FOLLOWING")
451 }
452 }
453 }
454 }
455}
456
457#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
460pub enum WindowFrameUnits {
461 Rows,
464 Range,
470 Groups,
474}
475
476impl fmt::Display for WindowFrameUnits {
477 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
478 f.write_str(match self {
479 WindowFrameUnits::Rows => "ROWS",
480 WindowFrameUnits::Range => "RANGE",
481 WindowFrameUnits::Groups => "GROUPS",
482 })
483 }
484}
485
486#[cfg(feature = "sql")]
487impl From<ast::WindowFrameUnits> for WindowFrameUnits {
488 fn from(value: ast::WindowFrameUnits) -> Self {
489 match value {
490 ast::WindowFrameUnits::Range => Self::Range,
491 ast::WindowFrameUnits::Groups => Self::Groups,
492 ast::WindowFrameUnits::Rows => Self::Rows,
493 }
494 }
495}
496
497#[cfg(test)]
498mod tests {
499 use super::*;
500
501 #[test]
502 fn test_window_frame_creation() -> Result<()> {
503 let window_frame = ast::WindowFrame {
504 units: ast::WindowFrameUnits::Range,
505 start_bound: ast::WindowFrameBound::Following(None),
506 end_bound: None,
507 };
508 let err = WindowFrame::try_from(window_frame).unwrap_err();
509 assert_eq!(
510 err.strip_backtrace(),
511 "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
512 );
513
514 let window_frame = ast::WindowFrame {
515 units: ast::WindowFrameUnits::Range,
516 start_bound: ast::WindowFrameBound::Preceding(None),
517 end_bound: Some(ast::WindowFrameBound::Preceding(None)),
518 };
519 let err = WindowFrame::try_from(window_frame).unwrap_err();
520 assert_eq!(
521 err.strip_backtrace(),
522 "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
523 );
524
525 let window_frame = ast::WindowFrame {
526 units: ast::WindowFrameUnits::Rows,
527 start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
528 ast::Expr::value(ast::Value::Number("2".to_string(), false)),
529 ))),
530 end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
531 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
532 )))),
533 };
534
535 let window_frame = WindowFrame::try_from(window_frame)?;
536 assert_eq!(window_frame.units, WindowFrameUnits::Rows);
537 assert_eq!(
538 window_frame.start_bound,
539 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
540 );
541 assert_eq!(
542 window_frame.end_bound,
543 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
544 );
545
546 Ok(())
547 }
548
549 macro_rules! test_bound {
550 ($unit:ident, $value:expr, $expected:expr) => {
551 let preceding = WindowFrameBound::try_parse(
552 ast::WindowFrameBound::Preceding($value),
553 &ast::WindowFrameUnits::$unit,
554 )?;
555 assert_eq!(preceding, WindowFrameBound::Preceding($expected));
556 let following = WindowFrameBound::try_parse(
557 ast::WindowFrameBound::Following($value),
558 &ast::WindowFrameUnits::$unit,
559 )?;
560 assert_eq!(following, WindowFrameBound::Following($expected));
561 };
562 }
563
564 macro_rules! test_bound_err {
565 ($unit:ident, $value:expr, $expected:expr) => {
566 let err = WindowFrameBound::try_parse(
567 ast::WindowFrameBound::Preceding($value),
568 &ast::WindowFrameUnits::$unit,
569 )
570 .unwrap_err();
571 assert_eq!(err.strip_backtrace(), $expected);
572 let err = WindowFrameBound::try_parse(
573 ast::WindowFrameBound::Following($value),
574 &ast::WindowFrameUnits::$unit,
575 )
576 .unwrap_err();
577 assert_eq!(err.strip_backtrace(), $expected);
578 };
579 }
580
581 #[test]
582 fn test_window_frame_bound_creation() -> Result<()> {
583 test_bound!(Rows, None, ScalarValue::UInt64(None));
585 test_bound!(Groups, None, ScalarValue::UInt64(None));
586 test_bound!(Range, None, ScalarValue::UInt64(None));
587
588 let number = Some(Box::new(ast::Expr::Value(
590 ast::Value::Number("42".to_string(), false).into(),
591 )));
592 test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
593 test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
594 test_bound!(
595 Range,
596 number.clone(),
597 ScalarValue::Utf8(Some("42".to_string()))
598 );
599
600 let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
602 value: Box::new(ast::Expr::Value(
603 ast::Value::SingleQuotedString("1".to_string()).into(),
604 )),
605 leading_field: Some(ast::DateTimeField::Day),
606 fractional_seconds_precision: None,
607 last_field: None,
608 leading_precision: None,
609 })));
610 test_bound_err!(
611 Rows,
612 number.clone(),
613 "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
614 );
615 test_bound_err!(
616 Groups,
617 number.clone(),
618 "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
619 );
620 test_bound!(
621 Range,
622 number.clone(),
623 ScalarValue::Utf8(Some("1 DAY".to_string()))
624 );
625
626 Ok(())
627 }
628}