datafusion_expr/
window_frame.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Window frame module
19//!
20//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts:
21//! - A frame type - either ROWS, RANGE or GROUPS,
22//! - A starting frame boundary,
23//! - An ending frame boundary,
24//! - An EXCLUDE clause.
25
26use crate::{expr::Sort, lit};
27use arrow::datatypes::DataType;
28use std::fmt::{self, Formatter};
29use std::hash::Hash;
30
31use datafusion_common::{plan_err, sql_err, DataFusionError, Result, ScalarValue};
32use sqlparser::ast::{self, ValueWithSpan};
33use sqlparser::parser::ParserError::ParserError;
34
35/// The frame specification determines which output rows are read by an aggregate
36/// window function. The ending frame boundary can be omitted if the `BETWEEN`
37/// and `AND` keywords that surround the starting frame boundary are also omitted,
38/// in which case the ending frame boundary defaults to `CURRENT ROW`.
39#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
40pub struct WindowFrame {
41    /// Frame type - either `ROWS`, `RANGE` or `GROUPS`
42    pub units: WindowFrameUnits,
43    /// Starting frame boundary
44    pub start_bound: WindowFrameBound,
45    /// Ending frame boundary
46    pub end_bound: WindowFrameBound,
47    /// Flag indicating whether the frame is causal (i.e. computing the result
48    /// for the current row doesn't depend on any subsequent rows).
49    ///
50    /// Example causal window frames:
51    /// ```text
52    ///                +--------------+
53    ///      Future    |              |
54    ///         |      |              |
55    ///         |      |              |
56    ///    Current Row |+------------+|  ---
57    ///         |      |              |   |
58    ///         |      |              |   |
59    ///         |      |              |   |  Window Frame 1
60    ///       Past     |              |   |
61    ///                |              |   |
62    ///                |              |  ---
63    ///                +--------------+
64    ///
65    ///                +--------------+
66    ///      Future    |              |
67    ///         |      |              |
68    ///         |      |              |
69    ///    Current Row |+------------+|
70    ///         |      |              |
71    ///         |      |              | ---
72    ///         |      |              |  |
73    ///       Past     |              |  |  Window Frame 2
74    ///                |              |  |
75    ///                |              | ---
76    ///                +--------------+
77    /// ```
78    /// Example non-causal window frame:
79    /// ```text
80    ///                +--------------+
81    ///      Future    |              |
82    ///         |      |              |
83    ///         |      |              | ---
84    ///    Current Row |+------------+|  |
85    ///         |      |              |  |  Window Frame 3
86    ///         |      |              |  |
87    ///         |      |              | ---
88    ///       Past     |              |
89    ///                |              |
90    ///                |              |
91    ///                +--------------+
92    /// ```
93    causal: bool,
94}
95
96impl fmt::Display for WindowFrame {
97    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
98        write!(
99            f,
100            "{} BETWEEN {} AND {}",
101            self.units, self.start_bound, self.end_bound
102        )?;
103        Ok(())
104    }
105}
106
107impl fmt::Debug for WindowFrame {
108    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
109        write!(
110            f,
111            "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
112            self.units, self.start_bound, self.end_bound, self.causal
113        )?;
114        Ok(())
115    }
116}
117
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119    type Error = DataFusionError;
120
121    fn try_from(value: ast::WindowFrame) -> Result<Self> {
122        let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123        let end_bound = match value.end_bound {
124            Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125            None => WindowFrameBound::CurrentRow,
126        };
127
128        if let WindowFrameBound::Following(val) = &start_bound {
129            if val.is_null() {
130                plan_err!(
131                    "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132                )?
133            }
134        } else if let WindowFrameBound::Preceding(val) = &end_bound {
135            if val.is_null() {
136                plan_err!(
137                    "Invalid window frame: end bound cannot be UNBOUNDED PRECEDING"
138                )?
139            }
140        };
141
142        let units = value.units.into();
143        Ok(Self::new_bounds(units, start_bound, end_bound))
144    }
145}
146
147impl WindowFrame {
148    /// Creates a new, default window frame (with the meaning of default
149    /// depending on whether the frame contains an `ORDER BY` clause and this
150    /// ordering is strict (i.e. no ties).
151    pub fn new(order_by: Option<bool>) -> Self {
152        if let Some(strict) = order_by {
153            // This window frame covers the table (or partition if `PARTITION BY`
154            // is used) from beginning to the `CURRENT ROW` (with same rank). It
155            // is used when the `OVER` clause contains an `ORDER BY` clause but
156            // no frame.
157            Self {
158                units: if strict {
159                    WindowFrameUnits::Rows
160                } else {
161                    WindowFrameUnits::Range
162                },
163                start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
164                end_bound: WindowFrameBound::CurrentRow,
165                causal: strict,
166            }
167        } else {
168            // This window frame covers the whole table (or partition if `PARTITION BY`
169            // is used). It is used when the `OVER` clause does not contain an
170            // `ORDER BY` clause and there is no frame.
171            Self {
172                units: WindowFrameUnits::Rows,
173                start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
174                end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
175                causal: false,
176            }
177        }
178    }
179
180    /// Get reversed window frame. For example
181    /// `3 ROWS PRECEDING AND 2 ROWS FOLLOWING` -->
182    /// `2 ROWS PRECEDING AND 3 ROWS FOLLOWING`
183    pub fn reverse(&self) -> Self {
184        let start_bound = match &self.end_bound {
185            WindowFrameBound::Preceding(value) => {
186                WindowFrameBound::Following(value.clone())
187            }
188            WindowFrameBound::Following(value) => {
189                WindowFrameBound::Preceding(value.clone())
190            }
191            WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
192        };
193        let end_bound = match &self.start_bound {
194            WindowFrameBound::Preceding(value) => {
195                WindowFrameBound::Following(value.clone())
196            }
197            WindowFrameBound::Following(value) => {
198                WindowFrameBound::Preceding(value.clone())
199            }
200            WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
201        };
202        Self::new_bounds(self.units, start_bound, end_bound)
203    }
204
205    /// Get whether window frame is causal
206    pub fn is_causal(&self) -> bool {
207        self.causal
208    }
209
210    /// Initializes window frame from units (type), start bound and end bound.
211    pub fn new_bounds(
212        units: WindowFrameUnits,
213        start_bound: WindowFrameBound,
214        end_bound: WindowFrameBound,
215    ) -> Self {
216        let causal = match units {
217            WindowFrameUnits::Rows => match &end_bound {
218                WindowFrameBound::Following(value) => {
219                    if value.is_null() {
220                        // Unbounded following
221                        false
222                    } else {
223                        let zero = ScalarValue::new_zero(&value.data_type());
224                        zero.map(|zero| value.eq(&zero)).unwrap_or(false)
225                    }
226                }
227                _ => true,
228            },
229            WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
230                WindowFrameBound::Preceding(value) => {
231                    if value.is_null() {
232                        // Unbounded preceding
233                        true
234                    } else {
235                        let zero = ScalarValue::new_zero(&value.data_type());
236                        zero.map(|zero| value.gt(&zero)).unwrap_or(false)
237                    }
238                }
239                _ => false,
240            },
241        };
242        Self {
243            units,
244            start_bound,
245            end_bound,
246            causal,
247        }
248    }
249
250    /// Regularizes the ORDER BY clause of the window frame.
251    pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
252        match self.units {
253            // Normally, RANGE frames require an ORDER BY clause with exactly
254            // one column. However, an ORDER BY clause may be absent or have
255            // more than one column when the start/end bounds are UNBOUNDED or
256            // CURRENT ROW.
257            WindowFrameUnits::Range if self.free_range() => {
258                // If an ORDER BY clause is absent, it is equivalent to an
259                // ORDER BY clause with constant value as sort key. If an
260                // ORDER BY clause is present but has more than one column,
261                // it is unchanged. Note that this follows PostgreSQL behavior.
262                if order_by.is_empty() {
263                    order_by.push(lit(1u64).sort(true, false));
264                }
265            }
266            WindowFrameUnits::Range if order_by.len() != 1 => {
267                return plan_err!("RANGE requires exactly one ORDER BY column");
268            }
269            WindowFrameUnits::Groups if order_by.is_empty() => {
270                return plan_err!("GROUPS requires an ORDER BY clause");
271            }
272            _ => {}
273        }
274        Ok(())
275    }
276
277    /// Returns whether the window frame can accept multiple ORDER BY expressions.
278    pub fn can_accept_multi_orderby(&self) -> bool {
279        match self.units {
280            WindowFrameUnits::Rows => true,
281            WindowFrameUnits::Range => self.free_range(),
282            WindowFrameUnits::Groups => true,
283        }
284    }
285
286    /// Returns whether the window frame is "free range"; i.e. its start/end
287    /// bounds are UNBOUNDED or CURRENT ROW.
288    fn free_range(&self) -> bool {
289        (self.start_bound.is_unbounded()
290            || self.start_bound == WindowFrameBound::CurrentRow)
291            && (self.end_bound.is_unbounded()
292                || self.end_bound == WindowFrameBound::CurrentRow)
293    }
294
295    /// Is the window frame ever-expanding (it always grows in the superset sense).
296    /// Useful when understanding if set-monotonicity properties of functions can
297    /// be exploited.
298    pub fn is_ever_expanding(&self) -> bool {
299        self.start_bound.is_unbounded()
300    }
301}
302
303/// There are five ways to describe starting and ending frame boundaries:
304///
305/// 1. UNBOUNDED PRECEDING
306/// 2. `<expr>` PRECEDING
307/// 3. CURRENT ROW
308/// 4. `<expr>` FOLLOWING
309/// 5. UNBOUNDED FOLLOWING
310///
311#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
312pub enum WindowFrameBound {
313    /// 1. UNBOUNDED PRECEDING
314    ///    The frame boundary is the first row in the partition.
315    ///
316    /// 2. `<expr>` PRECEDING
317    ///    `<expr>` must be a non-negative constant numeric expression. The boundary is a row that
318    ///    is `<expr>` "units" prior to the current row.
319    Preceding(ScalarValue),
320    /// 3. The current row.
321    ///
322    /// For RANGE and GROUPS frame types, peers of the current row are also
323    /// included in the frame, unless specifically excluded by the EXCLUDE clause.
324    /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame
325    /// boundary.
326    CurrentRow,
327    /// 4. This is the same as "`<expr>` PRECEDING" except that the boundary is `<expr>` units after the
328    ///    current rather than before the current row.
329    ///
330    /// 5. UNBOUNDED FOLLOWING
331    ///    The frame boundary is the last row in the partition.
332    Following(ScalarValue),
333}
334
335impl WindowFrameBound {
336    pub fn is_unbounded(&self) -> bool {
337        match self {
338            WindowFrameBound::Preceding(elem) => elem.is_null(),
339            WindowFrameBound::CurrentRow => false,
340            WindowFrameBound::Following(elem) => elem.is_null(),
341        }
342    }
343}
344
345impl WindowFrameBound {
346    fn try_parse(
347        value: ast::WindowFrameBound,
348        units: &ast::WindowFrameUnits,
349    ) -> Result<Self> {
350        Ok(match value {
351            ast::WindowFrameBound::Preceding(Some(v)) => {
352                Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
353            }
354            ast::WindowFrameBound::Preceding(None) => {
355                Self::Preceding(ScalarValue::UInt64(None))
356            }
357            ast::WindowFrameBound::Following(Some(v)) => {
358                Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
359            }
360            ast::WindowFrameBound::Following(None) => {
361                Self::Following(ScalarValue::UInt64(None))
362            }
363            ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
364        })
365    }
366}
367
368fn convert_frame_bound_to_scalar_value(
369    v: ast::Expr,
370    units: &ast::WindowFrameUnits,
371) -> Result<ScalarValue> {
372    match units {
373        // For ROWS and GROUPS we are sure that the ScalarValue must be a non-negative integer ...
374        ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
375            ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => {
376                Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
377            },
378            ast::Expr::Interval(ast::Interval {
379                value,
380                leading_field: None,
381                leading_precision: None,
382                last_field: None,
383                fractional_seconds_precision: None,
384            }) => {
385                let value = match *value {
386                    ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
387                    e => {
388                        return sql_err!(ParserError(format!(
389                            "INTERVAL expression cannot be {e:?}"
390                        )));
391                    }
392                };
393                Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
394            }
395            _ => plan_err!(
396                "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
397            ),
398        },
399        // ... instead for RANGE it could be anything depending on the type of the ORDER BY clause,
400        // so we use a ScalarValue::Utf8.
401        ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
402            ast::Expr::Value(ValueWithSpan{value: ast::Value::Number(value, false), span: _}) => value,
403            ast::Expr::Interval(ast::Interval {
404                value,
405                leading_field,
406                ..
407            }) => {
408                let result = match *value {
409                    ast::Expr::Value(ValueWithSpan{value: ast::Value::SingleQuotedString(item), span: _}) => item,
410                    e => {
411                        return sql_err!(ParserError(format!(
412                            "INTERVAL expression cannot be {e:?}"
413                        )));
414                    }
415                };
416                if let Some(leading_field) = leading_field {
417                    format!("{result} {leading_field}")
418                } else {
419                    result
420                }
421            }
422            _ => plan_err!(
423                "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
424            )?,
425        }))),
426    }
427}
428
429impl fmt::Display for WindowFrameBound {
430    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
431        match self {
432            WindowFrameBound::Preceding(n) => {
433                if n.is_null() {
434                    f.write_str("UNBOUNDED PRECEDING")
435                } else {
436                    write!(f, "{n} PRECEDING")
437                }
438            }
439            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
440            WindowFrameBound::Following(n) => {
441                if n.is_null() {
442                    f.write_str("UNBOUNDED FOLLOWING")
443                } else {
444                    write!(f, "{n} FOLLOWING")
445                }
446            }
447        }
448    }
449}
450
451/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the
452/// starting and ending boundaries of the frame are measured.
453#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
454pub enum WindowFrameUnits {
455    /// The ROWS frame type means that the starting and ending boundaries for the frame are
456    /// determined by counting individual rows relative to the current row.
457    Rows,
458    /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one
459    /// term. Call that term "X". With the RANGE frame type, the elements of the frame are
460    /// determined by computing the value of expression X for all rows in the partition and framing
461    /// those rows for which the value of X is within a certain range of the value of X for the
462    /// current row.
463    Range,
464    /// The GROUPS frame type means that the starting and ending boundaries are determine
465    /// by counting "groups" relative to the current group. A "group" is a set of rows that all have
466    /// equivalent values for all all terms of the window ORDER BY clause.
467    Groups,
468}
469
470impl fmt::Display for WindowFrameUnits {
471    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
472        f.write_str(match self {
473            WindowFrameUnits::Rows => "ROWS",
474            WindowFrameUnits::Range => "RANGE",
475            WindowFrameUnits::Groups => "GROUPS",
476        })
477    }
478}
479
480impl From<ast::WindowFrameUnits> for WindowFrameUnits {
481    fn from(value: ast::WindowFrameUnits) -> Self {
482        match value {
483            ast::WindowFrameUnits::Range => Self::Range,
484            ast::WindowFrameUnits::Groups => Self::Groups,
485            ast::WindowFrameUnits::Rows => Self::Rows,
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn test_window_frame_creation() -> Result<()> {
496        let window_frame = ast::WindowFrame {
497            units: ast::WindowFrameUnits::Range,
498            start_bound: ast::WindowFrameBound::Following(None),
499            end_bound: None,
500        };
501        let err = WindowFrame::try_from(window_frame).unwrap_err();
502        assert_eq!(
503            err.strip_backtrace(),
504            "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
505        );
506
507        let window_frame = ast::WindowFrame {
508            units: ast::WindowFrameUnits::Range,
509            start_bound: ast::WindowFrameBound::Preceding(None),
510            end_bound: Some(ast::WindowFrameBound::Preceding(None)),
511        };
512        let err = WindowFrame::try_from(window_frame).unwrap_err();
513        assert_eq!(
514            err.strip_backtrace(),
515            "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
516        );
517
518        let window_frame = ast::WindowFrame {
519            units: ast::WindowFrameUnits::Rows,
520            start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
521                ast::Expr::value(ast::Value::Number("2".to_string(), false)),
522            ))),
523            end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
524                ast::Expr::value(ast::Value::Number("1".to_string(), false)),
525            )))),
526        };
527
528        let window_frame = WindowFrame::try_from(window_frame)?;
529        assert_eq!(window_frame.units, WindowFrameUnits::Rows);
530        assert_eq!(
531            window_frame.start_bound,
532            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
533        );
534        assert_eq!(
535            window_frame.end_bound,
536            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
537        );
538
539        Ok(())
540    }
541
542    macro_rules! test_bound {
543        ($unit:ident, $value:expr, $expected:expr) => {
544            let preceding = WindowFrameBound::try_parse(
545                ast::WindowFrameBound::Preceding($value),
546                &ast::WindowFrameUnits::$unit,
547            )?;
548            assert_eq!(preceding, WindowFrameBound::Preceding($expected));
549            let following = WindowFrameBound::try_parse(
550                ast::WindowFrameBound::Following($value),
551                &ast::WindowFrameUnits::$unit,
552            )?;
553            assert_eq!(following, WindowFrameBound::Following($expected));
554        };
555    }
556
557    macro_rules! test_bound_err {
558        ($unit:ident, $value:expr, $expected:expr) => {
559            let err = WindowFrameBound::try_parse(
560                ast::WindowFrameBound::Preceding($value),
561                &ast::WindowFrameUnits::$unit,
562            )
563            .unwrap_err();
564            assert_eq!(err.strip_backtrace(), $expected);
565            let err = WindowFrameBound::try_parse(
566                ast::WindowFrameBound::Following($value),
567                &ast::WindowFrameUnits::$unit,
568            )
569            .unwrap_err();
570            assert_eq!(err.strip_backtrace(), $expected);
571        };
572    }
573
574    #[test]
575    fn test_window_frame_bound_creation() -> Result<()> {
576        //  Unbounded
577        test_bound!(Rows, None, ScalarValue::UInt64(None));
578        test_bound!(Groups, None, ScalarValue::UInt64(None));
579        test_bound!(Range, None, ScalarValue::UInt64(None));
580
581        // Number
582        let number = Some(Box::new(ast::Expr::Value(
583            ast::Value::Number("42".to_string(), false).into(),
584        )));
585        test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
586        test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
587        test_bound!(
588            Range,
589            number.clone(),
590            ScalarValue::Utf8(Some("42".to_string()))
591        );
592
593        // Interval
594        let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
595            value: Box::new(ast::Expr::Value(
596                ast::Value::SingleQuotedString("1".to_string()).into(),
597            )),
598            leading_field: Some(ast::DateTimeField::Day),
599            fractional_seconds_precision: None,
600            last_field: None,
601            leading_precision: None,
602        })));
603        test_bound_err!(Rows, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
604        test_bound_err!(Groups, number.clone(), "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers");
605        test_bound!(
606            Range,
607            number.clone(),
608            ScalarValue::Utf8(Some("1 DAY".to_string()))
609        );
610
611        Ok(())
612    }
613}