1use crate::{expr::Sort, lit};
27use std::fmt::{self, Formatter};
28use std::hash::Hash;
29
30use datafusion_common::{plan_err, Result, ScalarValue};
31#[cfg(feature = "sql")]
32use sqlparser::ast::{self, ValueWithSpan};
33
34#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
39pub struct WindowFrame {
40 pub units: WindowFrameUnits,
42 pub start_bound: WindowFrameBound,
44 pub end_bound: WindowFrameBound,
46 causal: bool,
93}
94
95impl fmt::Display for WindowFrame {
96 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97 write!(
98 f,
99 "{} BETWEEN {} AND {}",
100 self.units, self.start_bound, self.end_bound
101 )?;
102 Ok(())
103 }
104}
105
106impl fmt::Debug for WindowFrame {
107 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108 write!(
109 f,
110 "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
111 self.units, self.start_bound, self.end_bound, self.causal
112 )?;
113 Ok(())
114 }
115}
116
117#[cfg(feature = "sql")]
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119 type Error = datafusion_common::error::DataFusionError;
120
121 fn try_from(value: ast::WindowFrame) -> Result<Self> {
122 let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123 let end_bound = match value.end_bound {
124 Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125 None => WindowFrameBound::CurrentRow,
126 };
127
128 if let WindowFrameBound::Following(val) = &start_bound {
129 if val.is_null() {
130 plan_err!(
131 "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132 )?
133 }
134 } else if let WindowFrameBound::Preceding(val) = &end_bound {
135 if val.is_null() {
136 plan_err!(
137 "Invalid window frame: end bound cannot be UNBOUNDED PRECEDING"
138 )?
139 }
140 };
141
142 let units = value.units.into();
143 Ok(Self::new_bounds(units, start_bound, end_bound))
144 }
145}
146
147impl WindowFrame {
148 pub fn new(order_by: Option<bool>) -> Self {
152 if let Some(strict) = order_by {
153 Self {
158 units: if strict {
159 WindowFrameUnits::Rows
160 } else {
161 WindowFrameUnits::Range
162 },
163 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
164 end_bound: WindowFrameBound::CurrentRow,
165 causal: strict,
166 }
167 } else {
168 Self {
172 units: WindowFrameUnits::Rows,
173 start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
174 end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
175 causal: false,
176 }
177 }
178 }
179
180 pub fn reverse(&self) -> Self {
184 let start_bound = match &self.end_bound {
185 WindowFrameBound::Preceding(value) => {
186 WindowFrameBound::Following(value.clone())
187 }
188 WindowFrameBound::Following(value) => {
189 WindowFrameBound::Preceding(value.clone())
190 }
191 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
192 };
193 let end_bound = match &self.start_bound {
194 WindowFrameBound::Preceding(value) => {
195 WindowFrameBound::Following(value.clone())
196 }
197 WindowFrameBound::Following(value) => {
198 WindowFrameBound::Preceding(value.clone())
199 }
200 WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
201 };
202 Self::new_bounds(self.units, start_bound, end_bound)
203 }
204
205 pub fn is_causal(&self) -> bool {
207 self.causal
208 }
209
210 pub fn new_bounds(
212 units: WindowFrameUnits,
213 start_bound: WindowFrameBound,
214 end_bound: WindowFrameBound,
215 ) -> Self {
216 let causal = match units {
217 WindowFrameUnits::Rows => match &end_bound {
218 WindowFrameBound::Following(value) => {
219 if value.is_null() {
220 false
222 } else {
223 let zero = ScalarValue::new_zero(&value.data_type());
224 zero.map(|zero| value.eq(&zero)).unwrap_or(false)
225 }
226 }
227 _ => true,
228 },
229 WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
230 WindowFrameBound::Preceding(value) => {
231 if value.is_null() {
232 true
234 } else {
235 let zero = ScalarValue::new_zero(&value.data_type());
236 zero.map(|zero| value.gt(&zero)).unwrap_or(false)
237 }
238 }
239 _ => false,
240 },
241 };
242 Self {
243 units,
244 start_bound,
245 end_bound,
246 causal,
247 }
248 }
249
250 pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
252 match self.units {
253 WindowFrameUnits::Range if self.free_range() => {
258 if order_by.is_empty() {
263 order_by.push(lit(1u64).sort(true, false));
264 }
265 }
266 WindowFrameUnits::Range if order_by.len() != 1 => {
267 return plan_err!("RANGE requires exactly one ORDER BY column");
268 }
269 WindowFrameUnits::Groups if order_by.is_empty() => {
270 return plan_err!("GROUPS requires an ORDER BY clause");
271 }
272 _ => {}
273 }
274 Ok(())
275 }
276
277 pub fn can_accept_multi_orderby(&self) -> bool {
279 match self.units {
280 WindowFrameUnits::Rows => true,
281 WindowFrameUnits::Range => self.free_range(),
282 WindowFrameUnits::Groups => true,
283 }
284 }
285
286 fn free_range(&self) -> bool {
289 (self.start_bound.is_unbounded()
290 || self.start_bound == WindowFrameBound::CurrentRow)
291 && (self.end_bound.is_unbounded()
292 || self.end_bound == WindowFrameBound::CurrentRow)
293 }
294
295 pub fn is_ever_expanding(&self) -> bool {
299 self.start_bound.is_unbounded()
300 }
301}
302
303#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
311pub enum WindowFrameBound {
312 Preceding(ScalarValue),
319 CurrentRow,
326 Following(ScalarValue),
332}
333
334impl WindowFrameBound {
335 pub fn is_unbounded(&self) -> bool {
336 match self {
337 WindowFrameBound::Preceding(elem) => elem.is_null(),
338 WindowFrameBound::CurrentRow => false,
339 WindowFrameBound::Following(elem) => elem.is_null(),
340 }
341 }
342}
343
344impl WindowFrameBound {
345 #[cfg(feature = "sql")]
346 fn try_parse(
347 value: ast::WindowFrameBound,
348 units: &ast::WindowFrameUnits,
349 ) -> Result<Self> {
350 Ok(match value {
351 ast::WindowFrameBound::Preceding(Some(v)) => {
352 Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
353 }
354 ast::WindowFrameBound::Preceding(None) => {
355 Self::Preceding(ScalarValue::UInt64(None))
356 }
357 ast::WindowFrameBound::Following(Some(v)) => {
358 Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
359 }
360 ast::WindowFrameBound::Following(None) => {
361 Self::Following(ScalarValue::UInt64(None))
362 }
363 ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
364 })
365 }
366}
367
368#[cfg(feature = "sql")]
369fn convert_frame_bound_to_scalar_value(
370 v: ast::Expr,
371 units: &ast::WindowFrameUnits,
372) -> Result<ScalarValue> {
373 use arrow::datatypes::DataType;
374 use datafusion_common::exec_err;
375 match units {
376 ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
378 ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => {
379 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
380 },
381 ast::Expr::Interval(ast::Interval {
382 value,
383 leading_field: None,
384 leading_precision: None,
385 last_field: None,
386 fractional_seconds_precision: None,
387 }) => {
388 let value = match *value {
389 ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
390 e => {
391 return exec_err!(
392 "INTERVAL expression cannot be {e:?}"
393 );
394 }
395 };
396 Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
397 }
398 _ => plan_err!(
399 "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
400 ),
401 },
402 ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
405 ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => value,
406 ast::Expr::Interval(ast::Interval {
407 value,
408 leading_field,
409 ..
410 }) => {
411 let result = match *value {
412 ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
413 e => {
414 return exec_err!(
415 "INTERVAL expression cannot be {e:?}"
416 );
417 }
418 };
419 if let Some(leading_field) = leading_field {
420 format!("{result} {leading_field}")
421 } else {
422 result
423 }
424 }
425 _ => plan_err!(
426 "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
427 )?,
428 }))),
429 }
430}
431
432impl fmt::Display for WindowFrameBound {
433 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
434 match self {
435 WindowFrameBound::Preceding(n) => {
436 if n.is_null() {
437 f.write_str("UNBOUNDED PRECEDING")
438 } else {
439 write!(f, "{n} PRECEDING")
440 }
441 }
442 WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
443 WindowFrameBound::Following(n) => {
444 if n.is_null() {
445 f.write_str("UNBOUNDED FOLLOWING")
446 } else {
447 write!(f, "{n} FOLLOWING")
448 }
449 }
450 }
451 }
452}
453
454#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
457pub enum WindowFrameUnits {
458 Rows,
461 Range,
467 Groups,
471}
472
473impl fmt::Display for WindowFrameUnits {
474 fn fmt(&self, f: &mut Formatter) -> fmt::Result {
475 f.write_str(match self {
476 WindowFrameUnits::Rows => "ROWS",
477 WindowFrameUnits::Range => "RANGE",
478 WindowFrameUnits::Groups => "GROUPS",
479 })
480 }
481}
482
483#[cfg(feature = "sql")]
484impl From<ast::WindowFrameUnits> for WindowFrameUnits {
485 fn from(value: ast::WindowFrameUnits) -> Self {
486 match value {
487 ast::WindowFrameUnits::Range => Self::Range,
488 ast::WindowFrameUnits::Groups => Self::Groups,
489 ast::WindowFrameUnits::Rows => Self::Rows,
490 }
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn test_window_frame_creation() -> Result<()> {
500 let window_frame = ast::WindowFrame {
501 units: ast::WindowFrameUnits::Range,
502 start_bound: ast::WindowFrameBound::Following(None),
503 end_bound: None,
504 };
505 let err = WindowFrame::try_from(window_frame).unwrap_err();
506 assert_eq!(
507 err.strip_backtrace(),
508 "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
509 );
510
511 let window_frame = ast::WindowFrame {
512 units: ast::WindowFrameUnits::Range,
513 start_bound: ast::WindowFrameBound::Preceding(None),
514 end_bound: Some(ast::WindowFrameBound::Preceding(None)),
515 };
516 let err = WindowFrame::try_from(window_frame).unwrap_err();
517 assert_eq!(
518 err.strip_backtrace(),
519 "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
520 );
521
522 let window_frame = ast::WindowFrame {
523 units: ast::WindowFrameUnits::Rows,
524 start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
525 ast::Expr::value(ast::Value::Number("2".to_string(), false)),
526 ))),
527 end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
528 ast::Expr::value(ast::Value::Number("1".to_string(), false)),
529 )))),
530 };
531
532 let window_frame = WindowFrame::try_from(window_frame)?;
533 assert_eq!(window_frame.units, WindowFrameUnits::Rows);
534 assert_eq!(
535 window_frame.start_bound,
536 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
537 );
538 assert_eq!(
539 window_frame.end_bound,
540 WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
541 );
542
543 Ok(())
544 }
545
546 macro_rules! test_bound {
547 ($unit:ident, $value:expr, $expected:expr) => {
548 let preceding = WindowFrameBound::try_parse(
549 ast::WindowFrameBound::Preceding($value),
550 &ast::WindowFrameUnits::$unit,
551 )?;
552 assert_eq!(preceding, WindowFrameBound::Preceding($expected));
553 let following = WindowFrameBound::try_parse(
554 ast::WindowFrameBound::Following($value),
555 &ast::WindowFrameUnits::$unit,
556 )?;
557 assert_eq!(following, WindowFrameBound::Following($expected));
558 };
559 }
560
561 macro_rules! test_bound_err {
562 ($unit:ident, $value:expr, $expected:expr) => {
563 let err = WindowFrameBound::try_parse(
564 ast::WindowFrameBound::Preceding($value),
565 &ast::WindowFrameUnits::$unit,
566 )
567 .unwrap_err();
568 assert_eq!(err.strip_backtrace(), $expected);
569 let err = WindowFrameBound::try_parse(
570 ast::WindowFrameBound::Following($value),
571 &ast::WindowFrameUnits::$unit,
572 )
573 .unwrap_err();
574 assert_eq!(err.strip_backtrace(), $expected);
575 };
576 }
577
578 #[test]
579 fn test_window_frame_bound_creation() -> Result<()> {
580 test_bound!(Rows, None, ScalarValue::UInt64(None));
582 test_bound!(Groups, None, ScalarValue::UInt64(None));
583 test_bound!(Range, None, ScalarValue::UInt64(None));
584
585 let number = Some(Box::new(ast::Expr::Value(
587 ast::Value::Number("42".to_string(), false).into(),
588 )));
589 test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
590 test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
591 test_bound!(
592 Range,
593 number.clone(),
594 ScalarValue::Utf8(Some("42".to_string()))
595 );
596
597 let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
599 value: Box::new(ast::Expr::Value(
600 ast::Value::SingleQuotedString("1".to_string()).into(),
601 )),
602 leading_field: Some(ast::DateTimeField::Day),
603 fractional_seconds_precision: None,
604 last_field: None,
605 leading_precision: None,
606 })));
607 test_bound_err!(Rows, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
608 test_bound_err!(Groups, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
609 test_bound!(
610 Range,
611 number.clone(),
612 ScalarValue::Utf8(Some("1 DAY".to_string()))
613 );
614
615 Ok(())
616 }
617}