1use crate::{expr::Sort, lit};
27use std::fmt::{self, Formatter};
28use std::hash::Hash;
29
30use datafusion_common::{Result, ScalarValue, plan_err};
31#[cfg(feature = "sql")]
32use sqlparser::ast::{self, ValueWithSpan};
33
34#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
39pub struct WindowFrame {
40 pub units: WindowFrameUnits,
42 pub start_bound: WindowFrameBound,
44 pub end_bound: WindowFrameBound,
46 causal: bool,
93}
94
95impl fmt::Display for WindowFrame {
96 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97 write!(
98 f,
99 "{} BETWEEN {} AND {}",
100 self.units, self.start_bound, self.end_bound
101 )?;
102 Ok(())
103 }
104}
105
106impl fmt::Debug for WindowFrame {
107 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108 write!(
109 f,
110 "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
111 self.units, self.start_bound, self.end_bound, self.causal
112 )?;
113 Ok(())
114 }
115}
116
117#[cfg(feature = "sql")]
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119 type Error = datafusion_common::error::DataFusionError;
120
121 fn try_from(value: ast::WindowFrame) -> Result<Self> {
122 let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123 let end_bound = match value.end_bound {
124 Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125 None => WindowFrameBound::CurrentRow,
126 };
127
128 if let WindowFrameBound::Following(val) = &start_bound {
129 if val.is_null() {
130 plan_err!(
131 "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132 )?
133 }
134 } else if let WindowFrameBound::Preceding(val) = &end_bound
135 && val.is_null()
136 {
137 plan_err!("Invalid window frame: end bound cannot be UNBOUNDED PRECEDING")?
138 };
139
140 let units = value.units.into();
141 Ok(Self::new_bounds(units, start_bound, end_bound))
142 }
143}
144
145impl WindowFrame {
146 pub fn new(order_by: Option<bool>) -> Self {
150 if let Some(strict) = order_by {
151 Self {
156 units: if strict {
157 WindowFrameUnits::Rows
158 } else {
159 WindowFrameUnits::Range
160 },
161 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
162 end_bound: WindowFrameBound::CurrentRow,
163 causal: strict,
164 }
165 } else {
166 Self {
170 units: WindowFrameUnits::Rows,
171 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
172 end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
173 causal: false,
174 }
175 }
176 }
177
178 pub fn reverse(&self) -> Self {
182 let start_bound = match &self.end_bound {
183 WindowFrameBound::Preceding(value) => {
184 WindowFrameBound::Following(value.clone())
185 }
186 WindowFrameBound::Following(value) => {
187 WindowFrameBound::Preceding(value.clone())
188 }
189 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
190 };
191 let end_bound = match &self.start_bound {
192 WindowFrameBound::Preceding(value) => {
193 WindowFrameBound::Following(value.clone())
194 }
195 WindowFrameBound::Following(value) => {
196 WindowFrameBound::Preceding(value.clone())
197 }
198 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
199 };
200 Self::new_bounds(self.units, start_bound, end_bound)
201 }
202
203 pub fn is_causal(&self) -> bool {
205 self.causal
206 }
207
208 pub fn new_bounds(
210 units: WindowFrameUnits,
211 start_bound: WindowFrameBound,
212 end_bound: WindowFrameBound,
213 ) -> Self {
214 let causal = match units {
215 WindowFrameUnits::Rows => match &end_bound {
216 WindowFrameBound::Following(value) => {
217 if value.is_null() {
218 false
220 } else {
221 let zero = ScalarValue::new_zero(&value.data_type());
222 zero.map(|zero| value.eq(&zero)).unwrap_or(false)
223 }
224 }
225 _ => true,
226 },
227 WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
228 WindowFrameBound::Preceding(value) => {
229 if value.is_null() {
230 true
232 } else {
233 let zero = ScalarValue::new_zero(&value.data_type());
234 zero.map(|zero| value.gt(&zero)).unwrap_or(false)
235 }
236 }
237 _ => false,
238 },
239 };
240 Self {
241 units,
242 start_bound,
243 end_bound,
244 causal,
245 }
246 }
247
248 pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
250 match self.units {
251 WindowFrameUnits::Range if self.free_range() => {
256 if order_by.is_empty() {
261 order_by.push(lit(1u64).sort(true, false));
262 }
263 }
264 WindowFrameUnits::Range if order_by.len() != 1 => {
265 return plan_err!("RANGE requires exactly one ORDER BY column");
266 }
267 WindowFrameUnits::Groups if order_by.is_empty() => {
268 return plan_err!("GROUPS requires an ORDER BY clause");
269 }
270 _ => {}
271 }
272 Ok(())
273 }
274
275 pub fn can_accept_multi_orderby(&self) -> bool {
277 match self.units {
278 WindowFrameUnits::Rows => true,
279 WindowFrameUnits::Range => self.free_range(),
280 WindowFrameUnits::Groups => true,
281 }
282 }
283
284 fn free_range(&self) -> bool {
287 (self.start_bound.is_unbounded()
288 || self.start_bound == WindowFrameBound::CurrentRow)
289 && (self.end_bound.is_unbounded()
290 || self.end_bound == WindowFrameBound::CurrentRow)
291 }
292
293 pub fn is_ever_expanding(&self) -> bool {
297 self.start_bound.is_unbounded()
298 }
299}
300
301#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
309pub enum WindowFrameBound {
310 Preceding(ScalarValue),
317 CurrentRow,
324 Following(ScalarValue),
330}
331
332impl WindowFrameBound {
333 pub fn is_unbounded(&self) -> bool {
334 match self {
335 WindowFrameBound::Preceding(elem) => elem.is_null(),
336 WindowFrameBound::CurrentRow => false,
337 WindowFrameBound::Following(elem) => elem.is_null(),
338 }
339 }
340}
341
342impl WindowFrameBound {
343 #[cfg(feature = "sql")]
344 fn try_parse(
345 value: ast::WindowFrameBound,
346 units: &ast::WindowFrameUnits,
347 ) -> Result<Self> {
348 Ok(match value {
349 ast::WindowFrameBound::Preceding(Some(v)) => {
350 Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
351 }
352 ast::WindowFrameBound::Preceding(None) => {
353 Self::Preceding(ScalarValue::UInt64(None))
354 }
355 ast::WindowFrameBound::Following(Some(v)) => {
356 Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
357 }
358 ast::WindowFrameBound::Following(None) => {
359 Self::Following(ScalarValue::UInt64(None))
360 }
361 ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
362 })
363 }
364}
365
366#[cfg(feature = "sql")]
367fn convert_frame_bound_to_scalar_value(
368 v: ast::Expr,
369 units: &ast::WindowFrameUnits,
370) -> Result<ScalarValue> {
371 use arrow::datatypes::DataType;
372 use datafusion_common::exec_err;
373 match units {
374 ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
376 ast::Expr::Value(ValueWithSpan {
377 value: ast::Value::Number(value, false),
378 span: _,
379 }) => Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?),
380 ast::Expr::Interval(ast::Interval {
381 value,
382 leading_field: None,
383 leading_precision: None,
384 last_field: None,
385 fractional_seconds_precision: None,
386 }) => {
387 let value = match *value {
388 ast::Expr::Value(ValueWithSpan {
389 value: ast::Value::SingleQuotedString(item),
390 span: _,
391 }) => item,
392 e => {
393 return exec_err!("INTERVAL expression cannot be {e:?}");
394 }
395 };
396 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
397 }
398 _ => plan_err!(
399 "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
400 ),
401 },
402 ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
405 ast::Expr::Value(ValueWithSpan {
406 value: ast::Value::Number(value, false),
407 span: _,
408 }) => value,
409 ast::Expr::Interval(ast::Interval {
410 value,
411 leading_field,
412 ..
413 }) => {
414 let result = match *value {
415 ast::Expr::Value(ValueWithSpan {
416 value: ast::Value::SingleQuotedString(item),
417 span: _,
418 }) => item,
419 e => {
420 return exec_err!("INTERVAL expression cannot be {e:?}");
421 }
422 };
423 if let Some(leading_field) = leading_field {
424 format!("{result} {leading_field}")
425 } else {
426 result
427 }
428 }
429 _ => plan_err!(
430 "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
431 )?,
432 }))),
433 }
434}
435
436impl fmt::Display for WindowFrameBound {
437 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
438 match self {
439 WindowFrameBound::Preceding(n) => {
440 if n.is_null() {
441 f.write_str("UNBOUNDED PRECEDING")
442 } else {
443 write!(f, "{n} PRECEDING")
444 }
445 }
446 WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
447 WindowFrameBound::Following(n) => {
448 if n.is_null() {
449 f.write_str("UNBOUNDED FOLLOWING")
450 } else {
451 write!(f, "{n} FOLLOWING")
452 }
453 }
454 }
455 }
456}
457
458#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
461pub enum WindowFrameUnits {
462 Rows,
465 Range,
471 Groups,
475}
476
477impl fmt::Display for WindowFrameUnits {
478 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
479 f.write_str(match self {
480 WindowFrameUnits::Rows => "ROWS",
481 WindowFrameUnits::Range => "RANGE",
482 WindowFrameUnits::Groups => "GROUPS",
483 })
484 }
485}
486
487#[cfg(feature = "sql")]
488impl From<ast::WindowFrameUnits> for WindowFrameUnits {
489 fn from(value: ast::WindowFrameUnits) -> Self {
490 match value {
491 ast::WindowFrameUnits::Range => Self::Range,
492 ast::WindowFrameUnits::Groups => Self::Groups,
493 ast::WindowFrameUnits::Rows => Self::Rows,
494 }
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501
502 #[test]
503 fn test_window_frame_creation() -> Result<()> {
504 let window_frame = ast::WindowFrame {
505 units: ast::WindowFrameUnits::Range,
506 start_bound: ast::WindowFrameBound::Following(None),
507 end_bound: None,
508 };
509 let err = WindowFrame::try_from(window_frame).unwrap_err();
510 assert_eq!(
511 err.strip_backtrace(),
512 "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
513 );
514
515 let window_frame = ast::WindowFrame {
516 units: ast::WindowFrameUnits::Range,
517 start_bound: ast::WindowFrameBound::Preceding(None),
518 end_bound: Some(ast::WindowFrameBound::Preceding(None)),
519 };
520 let err = WindowFrame::try_from(window_frame).unwrap_err();
521 assert_eq!(
522 err.strip_backtrace(),
523 "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
524 );
525
526 let window_frame = ast::WindowFrame {
527 units: ast::WindowFrameUnits::Rows,
528 start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
529 ast::Expr::value(ast::Value::Number("2".to_string(), false)),
530 ))),
531 end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
532 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
533 )))),
534 };
535
536 let window_frame = WindowFrame::try_from(window_frame)?;
537 assert_eq!(window_frame.units, WindowFrameUnits::Rows);
538 assert_eq!(
539 window_frame.start_bound,
540 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
541 );
542 assert_eq!(
543 window_frame.end_bound,
544 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
545 );
546
547 Ok(())
548 }
549
550 macro_rules! test_bound {
551 ($unit:ident, $value:expr, $expected:expr) => {
552 let preceding = WindowFrameBound::try_parse(
553 ast::WindowFrameBound::Preceding($value),
554 &ast::WindowFrameUnits::$unit,
555 )?;
556 assert_eq!(preceding, WindowFrameBound::Preceding($expected));
557 let following = WindowFrameBound::try_parse(
558 ast::WindowFrameBound::Following($value),
559 &ast::WindowFrameUnits::$unit,
560 )?;
561 assert_eq!(following, WindowFrameBound::Following($expected));
562 };
563 }
564
565 macro_rules! test_bound_err {
566 ($unit:ident, $value:expr, $expected:expr) => {
567 let err = WindowFrameBound::try_parse(
568 ast::WindowFrameBound::Preceding($value),
569 &ast::WindowFrameUnits::$unit,
570 )
571 .unwrap_err();
572 assert_eq!(err.strip_backtrace(), $expected);
573 let err = WindowFrameBound::try_parse(
574 ast::WindowFrameBound::Following($value),
575 &ast::WindowFrameUnits::$unit,
576 )
577 .unwrap_err();
578 assert_eq!(err.strip_backtrace(), $expected);
579 };
580 }
581
582 #[test]
583 fn test_window_frame_bound_creation() -> Result<()> {
584 test_bound!(Rows, None, ScalarValue::UInt64(None));
586 test_bound!(Groups, None, ScalarValue::UInt64(None));
587 test_bound!(Range, None, ScalarValue::UInt64(None));
588
589 let number = Some(Box::new(ast::Expr::Value(
591 ast::Value::Number("42".to_string(), false).into(),
592 )));
593 test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
594 test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
595 test_bound!(
596 Range,
597 number.clone(),
598 ScalarValue::Utf8(Some("42".to_string()))
599 );
600
601 let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
603 value: Box::new(ast::Expr::Value(
604 ast::Value::SingleQuotedString("1".to_string()).into(),
605 )),
606 leading_field: Some(ast::DateTimeField::Day),
607 fractional_seconds_precision: None,
608 last_field: None,
609 leading_precision: None,
610 })));
611 test_bound_err!(
612 Rows,
613 number.clone(),
614 "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
615 );
616 test_bound_err!(
617 Groups,
618 number.clone(),
619 "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
620 );
621 test_bound!(
622 Range,
623 number.clone(),
624 ScalarValue::Utf8(Some("1 DAY".to_string()))
625 );
626
627 Ok(())
628 }
629}