Skip to main content

datafusion_expr/
window_frame.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Window frame module
19//!
20//! The frame-spec determines which output rows are read by an aggregate window function. The frame-spec consists of four parts:
21//! - A frame type - either ROWS, RANGE or GROUPS,
22//! - A starting frame boundary,
23//! - An ending frame boundary,
24//! - An EXCLUDE clause.
25
26use crate::{expr::Sort, lit};
27use std::fmt::{self, Formatter};
28use std::hash::Hash;
29
30use datafusion_common::{Result, ScalarValue, plan_err};
31#[cfg(feature = "sql")]
32use sqlparser::ast::{self, ValueWithSpan};
33
34/// The frame specification determines which output rows are read by an aggregate
35/// window function. The ending frame boundary can be omitted if the `BETWEEN`
36/// and `AND` keywords that surround the starting frame boundary are also omitted,
37/// in which case the ending frame boundary defaults to `CURRENT ROW`.
38#[derive(Clone, PartialEq, Eq, PartialOrd, Hash)]
39pub struct WindowFrame {
40    /// Frame type - either `ROWS`, `RANGE` or `GROUPS`
41    pub units: WindowFrameUnits,
42    /// Starting frame boundary
43    pub start_bound: WindowFrameBound,
44    /// Ending frame boundary
45    pub end_bound: WindowFrameBound,
46    /// Flag indicating whether the frame is causal (i.e. computing the result
47    /// for the current row doesn't depend on any subsequent rows).
48    ///
49    /// Example causal window frames:
50    /// ```text
51    ///                +--------------+
52    ///      Future    |              |
53    ///         |      |              |
54    ///         |      |              |
55    ///    Current Row |+------------+|  ---
56    ///         |      |              |   |
57    ///         |      |              |   |
58    ///         |      |              |   |  Window Frame 1
59    ///       Past     |              |   |
60    ///                |              |   |
61    ///                |              |  ---
62    ///                +--------------+
63    ///
64    ///                +--------------+
65    ///      Future    |              |
66    ///         |      |              |
67    ///         |      |              |
68    ///    Current Row |+------------+|
69    ///         |      |              |
70    ///         |      |              | ---
71    ///         |      |              |  |
72    ///       Past     |              |  |  Window Frame 2
73    ///                |              |  |
74    ///                |              | ---
75    ///                +--------------+
76    /// ```
77    /// Example non-causal window frame:
78    /// ```text
79    ///                +--------------+
80    ///      Future    |              |
81    ///         |      |              |
82    ///         |      |              | ---
83    ///    Current Row |+------------+|  |
84    ///         |      |              |  |  Window Frame 3
85    ///         |      |              |  |
86    ///         |      |              | ---
87    ///       Past     |              |
88    ///                |              |
89    ///                |              |
90    ///                +--------------+
91    /// ```
92    causal: bool,
93}
94
95impl fmt::Display for WindowFrame {
96    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
97        write!(
98            f,
99            "{} BETWEEN {} AND {}",
100            self.units, self.start_bound, self.end_bound
101        )?;
102        Ok(())
103    }
104}
105
106impl fmt::Debug for WindowFrame {
107    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
108        write!(
109            f,
110            "WindowFrame {{ units: {:?}, start_bound: {:?}, end_bound: {:?}, is_causal: {:?} }}",
111            self.units, self.start_bound, self.end_bound, self.causal
112        )?;
113        Ok(())
114    }
115}
116
117#[cfg(feature = "sql")]
118impl TryFrom<ast::WindowFrame> for WindowFrame {
119    type Error = datafusion_common::error::DataFusionError;
120
121    fn try_from(value: ast::WindowFrame) -> Result<Self> {
122        let start_bound = WindowFrameBound::try_parse(value.start_bound, &value.units)?;
123        let end_bound = match value.end_bound {
124            Some(bound) => WindowFrameBound::try_parse(bound, &value.units)?,
125            None => WindowFrameBound::CurrentRow,
126        };
127
128        if let WindowFrameBound::Following(val) = &start_bound {
129            if val.is_null() {
130                plan_err!(
131                    "Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING"
132                )?
133            }
134        } else if let WindowFrameBound::Preceding(val) = &end_bound
135            && val.is_null()
136        {
137            plan_err!("Invalid window frame: end bound cannot be UNBOUNDED PRECEDING")?
138        };
139
140        let units = value.units.into();
141        Ok(Self::new_bounds(units, start_bound, end_bound))
142    }
143}
144
145impl WindowFrame {
146    /// Creates a new, default window frame (with the meaning of default
147    /// depending on whether the frame contains an `ORDER BY` clause and this
148    /// ordering is strict (i.e. no ties).
149    pub fn new(order_by: Option<bool>) -> Self {
150        if let Some(strict) = order_by {
151            // This window frame covers the table (or partition if `PARTITION BY`
152            // is used) from beginning to the `CURRENT ROW` (with same rank). It
153            // is used when the `OVER` clause contains an `ORDER BY` clause but
154            // no frame.
155            Self {
156                units: if strict {
157                    WindowFrameUnits::Rows
158                } else {
159                    WindowFrameUnits::Range
160                },
161                start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
162                end_bound: WindowFrameBound::CurrentRow,
163                causal: strict,
164            }
165        } else {
166            // This window frame covers the whole table (or partition if `PARTITION BY`
167            // is used). It is used when the `OVER` clause does not contain an
168            // `ORDER BY` clause and there is no frame.
169            Self {
170                units: WindowFrameUnits::Rows,
171                start_bound: WindowFrameBound::Preceding(ScalarValue::UInt64(None)),
172                end_bound: WindowFrameBound::Following(ScalarValue::UInt64(None)),
173                causal: false,
174            }
175        }
176    }
177
178    /// Get reversed window frame. For example
179    /// `3 ROWS PRECEDING AND 2 ROWS FOLLOWING` -->
180    /// `2 ROWS PRECEDING AND 3 ROWS FOLLOWING`
181    pub fn reverse(&self) -> Self {
182        let start_bound = match &self.end_bound {
183            WindowFrameBound::Preceding(value) => {
184                WindowFrameBound::Following(value.clone())
185            }
186            WindowFrameBound::Following(value) => {
187                WindowFrameBound::Preceding(value.clone())
188            }
189            WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
190        };
191        let end_bound = match &self.start_bound {
192            WindowFrameBound::Preceding(value) => {
193                WindowFrameBound::Following(value.clone())
194            }
195            WindowFrameBound::Following(value) => {
196                WindowFrameBound::Preceding(value.clone())
197            }
198            WindowFrameBound::CurrentRow => WindowFrameBound::CurrentRow,
199        };
200        Self::new_bounds(self.units, start_bound, end_bound)
201    }
202
203    /// Get whether window frame is causal
204    pub fn is_causal(&self) -> bool {
205        self.causal
206    }
207
208    /// Initializes window frame from units (type), start bound and end bound.
209    pub fn new_bounds(
210        units: WindowFrameUnits,
211        start_bound: WindowFrameBound,
212        end_bound: WindowFrameBound,
213    ) -> Self {
214        let causal = match units {
215            WindowFrameUnits::Rows => match &end_bound {
216                WindowFrameBound::Following(value) => {
217                    if value.is_null() {
218                        // Unbounded following
219                        false
220                    } else {
221                        let zero = ScalarValue::new_zero(&value.data_type());
222                        zero.map(|zero| value.eq(&zero)).unwrap_or(false)
223                    }
224                }
225                _ => true,
226            },
227            WindowFrameUnits::Range | WindowFrameUnits::Groups => match &end_bound {
228                WindowFrameBound::Preceding(value) => {
229                    if value.is_null() {
230                        // Unbounded preceding
231                        true
232                    } else {
233                        let zero = ScalarValue::new_zero(&value.data_type());
234                        zero.map(|zero| value.gt(&zero)).unwrap_or(false)
235                    }
236                }
237                _ => false,
238            },
239        };
240        Self {
241            units,
242            start_bound,
243            end_bound,
244            causal,
245        }
246    }
247
248    /// Regularizes the ORDER BY clause of the window frame.
249    pub fn regularize_order_bys(&self, order_by: &mut Vec<Sort>) -> Result<()> {
250        match self.units {
251            // Normally, RANGE frames require an ORDER BY clause with exactly
252            // one column. However, an ORDER BY clause may be absent or have
253            // more than one column when the start/end bounds are UNBOUNDED or
254            // CURRENT ROW.
255            WindowFrameUnits::Range if self.free_range() && order_by.is_empty() => {
256                // If an ORDER BY clause is absent, it is equivalent to an
257                // ORDER BY clause with constant value as sort key. If an
258                // ORDER BY clause is present but has more than one column,
259                // it is unchanged. Note that this follows PostgreSQL behavior.
260                order_by.push(lit(1u64).sort(true, false));
261            }
262            WindowFrameUnits::Range if self.free_range() => {}
263            WindowFrameUnits::Range if order_by.len() != 1 => {
264                return plan_err!("RANGE requires exactly one ORDER BY column");
265            }
266            WindowFrameUnits::Groups if order_by.is_empty() => {
267                return plan_err!("GROUPS requires an ORDER BY clause");
268            }
269            _ => {}
270        }
271        Ok(())
272    }
273
274    /// Returns whether the window frame can accept multiple ORDER BY expressions.
275    pub fn can_accept_multi_orderby(&self) -> bool {
276        match self.units {
277            WindowFrameUnits::Rows => true,
278            WindowFrameUnits::Range => self.free_range(),
279            WindowFrameUnits::Groups => true,
280        }
281    }
282
283    /// Returns whether the window frame is "free range"; i.e. its start/end
284    /// bounds are UNBOUNDED or CURRENT ROW.
285    fn free_range(&self) -> bool {
286        (self.start_bound.is_unbounded()
287            || self.start_bound == WindowFrameBound::CurrentRow)
288            && (self.end_bound.is_unbounded()
289                || self.end_bound == WindowFrameBound::CurrentRow)
290    }
291
292    /// Is the window frame ever-expanding (it always grows in the superset sense).
293    /// Useful when understanding if set-monotonicity properties of functions can
294    /// be exploited.
295    pub fn is_ever_expanding(&self) -> bool {
296        self.start_bound.is_unbounded()
297    }
298}
299
300/// There are five ways to describe starting and ending frame boundaries:
301///
302/// 1. UNBOUNDED PRECEDING
303/// 2. `<expr>` PRECEDING
304/// 3. CURRENT ROW
305/// 4. `<expr>` FOLLOWING
306/// 5. UNBOUNDED FOLLOWING
307#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
308pub enum WindowFrameBound {
309    /// 1. UNBOUNDED PRECEDING
310    ///    The frame boundary is the first row in the partition.
311    ///
312    /// 2. `<expr>` PRECEDING
313    ///    `<expr>` must be a non-negative constant numeric expression. The boundary is a row that
314    ///    is `<expr>` "units" prior to the current row.
315    Preceding(ScalarValue),
316    /// 3. The current row.
317    ///
318    /// For RANGE and GROUPS frame types, peers of the current row are also
319    /// included in the frame, unless specifically excluded by the EXCLUDE clause.
320    /// This is true regardless of whether CURRENT ROW is used as the starting or ending frame
321    /// boundary.
322    CurrentRow,
323    /// 4. This is the same as "`<expr>` PRECEDING" except that the boundary is `<expr>` units after the
324    ///    current rather than before the current row.
325    ///
326    /// 5. UNBOUNDED FOLLOWING
327    ///    The frame boundary is the last row in the partition.
328    Following(ScalarValue),
329}
330
331impl WindowFrameBound {
332    pub fn is_unbounded(&self) -> bool {
333        match self {
334            WindowFrameBound::Preceding(elem) => elem.is_null(),
335            WindowFrameBound::CurrentRow => false,
336            WindowFrameBound::Following(elem) => elem.is_null(),
337        }
338    }
339}
340
341impl WindowFrameBound {
342    #[cfg(feature = "sql")]
343    fn try_parse(
344        value: ast::WindowFrameBound,
345        units: &ast::WindowFrameUnits,
346    ) -> Result<Self> {
347        Ok(match value {
348            ast::WindowFrameBound::Preceding(Some(v)) => {
349                Self::Preceding(convert_frame_bound_to_scalar_value(*v, units)?)
350            }
351            ast::WindowFrameBound::Preceding(None) => {
352                Self::Preceding(ScalarValue::UInt64(None))
353            }
354            ast::WindowFrameBound::Following(Some(v)) => {
355                Self::Following(convert_frame_bound_to_scalar_value(*v, units)?)
356            }
357            ast::WindowFrameBound::Following(None) => {
358                Self::Following(ScalarValue::UInt64(None))
359            }
360            ast::WindowFrameBound::CurrentRow => Self::CurrentRow,
361        })
362    }
363}
364
365#[cfg(feature = "sql")]
366fn convert_frame_bound_to_scalar_value(
367    v: ast::Expr,
368    units: &ast::WindowFrameUnits,
369) -> Result<ScalarValue> {
370    use arrow::datatypes::DataType;
371    use datafusion_common::exec_err;
372    match units {
373        // For ROWS and GROUPS we are sure that the ScalarValue must be a non-negative integer ...
374        ast::WindowFrameUnits::Rows | ast::WindowFrameUnits::Groups => match v {
375            ast::Expr::Value(ValueWithSpan {
376                value: ast::Value::Number(value, false),
377                span: _,
378            }) => Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?),
379            ast::Expr::Interval(ast::Interval {
380                value,
381                leading_field: None,
382                leading_precision: None,
383                last_field: None,
384                fractional_seconds_precision: None,
385            }) => {
386                let value = match *value {
387                    ast::Expr::Value(ValueWithSpan {
388                        value: ast::Value::SingleQuotedString(item),
389                        span: _,
390                    }) => item,
391                    e => {
392                        return exec_err!("INTERVAL expression cannot be {e:?}");
393                    }
394                };
395                Ok(ScalarValue::try_from_string(value, &DataType::UInt64)?)
396            }
397            _ => plan_err!(
398                "Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
399            ),
400        },
401        // ... instead for RANGE it could be anything depending on the type of the ORDER BY clause,
402        // so we use a ScalarValue::Utf8.
403        ast::WindowFrameUnits::Range => Ok(ScalarValue::Utf8(Some(match v {
404            ast::Expr::Value(ValueWithSpan {
405                value: ast::Value::Number(value, false),
406                span: _,
407            }) => value,
408            ast::Expr::Interval(ast::Interval {
409                value,
410                leading_field,
411                ..
412            }) => {
413                let result = match *value {
414                    ast::Expr::Value(ValueWithSpan {
415                        value: ast::Value::SingleQuotedString(item),
416                        span: _,
417                    }) => item,
418                    e => {
419                        return exec_err!("INTERVAL expression cannot be {e:?}");
420                    }
421                };
422                if let Some(leading_field) = leading_field {
423                    format!("{result} {leading_field}")
424                } else {
425                    result
426                }
427            }
428            _ => plan_err!(
429                "Invalid window frame: frame offsets for RANGE must be either a numeric value, a string value or an interval"
430            )?,
431        }))),
432    }
433}
434
435impl fmt::Display for WindowFrameBound {
436    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
437        match self {
438            WindowFrameBound::Preceding(n) => {
439                if n.is_null() {
440                    f.write_str("UNBOUNDED PRECEDING")
441                } else {
442                    write!(f, "{n} PRECEDING")
443                }
444            }
445            WindowFrameBound::CurrentRow => f.write_str("CURRENT ROW"),
446            WindowFrameBound::Following(n) => {
447                if n.is_null() {
448                    f.write_str("UNBOUNDED FOLLOWING")
449                } else {
450                    write!(f, "{n} FOLLOWING")
451                }
452            }
453        }
454    }
455}
456
457/// There are three frame types: ROWS, GROUPS, and RANGE. The frame type determines how the
458/// starting and ending boundaries of the frame are measured.
459#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
460pub enum WindowFrameUnits {
461    /// The ROWS frame type means that the starting and ending boundaries for the frame are
462    /// determined by counting individual rows relative to the current row.
463    Rows,
464    /// The RANGE frame type requires that the ORDER BY clause of the window have exactly one
465    /// term. Call that term "X". With the RANGE frame type, the elements of the frame are
466    /// determined by computing the value of expression X for all rows in the partition and framing
467    /// those rows for which the value of X is within a certain range of the value of X for the
468    /// current row.
469    Range,
470    /// The GROUPS frame type means that the starting and ending boundaries are determine
471    /// by counting "groups" relative to the current group. A "group" is a set of rows that all have
472    /// equivalent values for all all terms of the window ORDER BY clause.
473    Groups,
474}
475
476impl fmt::Display for WindowFrameUnits {
477    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
478        f.write_str(match self {
479            WindowFrameUnits::Rows => "ROWS",
480            WindowFrameUnits::Range => "RANGE",
481            WindowFrameUnits::Groups => "GROUPS",
482        })
483    }
484}
485
486#[cfg(feature = "sql")]
487impl From<ast::WindowFrameUnits> for WindowFrameUnits {
488    fn from(value: ast::WindowFrameUnits) -> Self {
489        match value {
490            ast::WindowFrameUnits::Range => Self::Range,
491            ast::WindowFrameUnits::Groups => Self::Groups,
492            ast::WindowFrameUnits::Rows => Self::Rows,
493        }
494    }
495}
496
497#[cfg(test)]
498mod tests {
499    use super::*;
500
501    #[test]
502    fn test_window_frame_creation() -> Result<()> {
503        let window_frame = ast::WindowFrame {
504            units: ast::WindowFrameUnits::Range,
505            start_bound: ast::WindowFrameBound::Following(None),
506            end_bound: None,
507        };
508        let err = WindowFrame::try_from(window_frame).unwrap_err();
509        assert_eq!(
510            err.strip_backtrace(),
511            "Error during planning: Invalid window frame: start bound cannot be UNBOUNDED FOLLOWING".to_owned()
512        );
513
514        let window_frame = ast::WindowFrame {
515            units: ast::WindowFrameUnits::Range,
516            start_bound: ast::WindowFrameBound::Preceding(None),
517            end_bound: Some(ast::WindowFrameBound::Preceding(None)),
518        };
519        let err = WindowFrame::try_from(window_frame).unwrap_err();
520        assert_eq!(
521            err.strip_backtrace(),
522            "Error during planning: Invalid window frame: end bound cannot be UNBOUNDED PRECEDING".to_owned()
523        );
524
525        let window_frame = ast::WindowFrame {
526            units: ast::WindowFrameUnits::Rows,
527            start_bound: ast::WindowFrameBound::Preceding(Some(Box::new(
528                ast::Expr::value(ast::Value::Number("2".to_string(), false)),
529            ))),
530            end_bound: Some(ast::WindowFrameBound::Preceding(Some(Box::new(
531                ast::Expr::value(ast::Value::Number("1".to_string(), false)),
532            )))),
533        };
534
535        let window_frame = WindowFrame::try_from(window_frame)?;
536        assert_eq!(window_frame.units, WindowFrameUnits::Rows);
537        assert_eq!(
538            window_frame.start_bound,
539            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(2)))
540        );
541        assert_eq!(
542            window_frame.end_bound,
543            WindowFrameBound::Preceding(ScalarValue::UInt64(Some(1)))
544        );
545
546        Ok(())
547    }
548
549    macro_rules! test_bound {
550        ($unit:ident, $value:expr, $expected:expr) => {
551            let preceding = WindowFrameBound::try_parse(
552                ast::WindowFrameBound::Preceding($value),
553                &ast::WindowFrameUnits::$unit,
554            )?;
555            assert_eq!(preceding, WindowFrameBound::Preceding($expected));
556            let following = WindowFrameBound::try_parse(
557                ast::WindowFrameBound::Following($value),
558                &ast::WindowFrameUnits::$unit,
559            )?;
560            assert_eq!(following, WindowFrameBound::Following($expected));
561        };
562    }
563
564    macro_rules! test_bound_err {
565        ($unit:ident, $value:expr, $expected:expr) => {
566            let err = WindowFrameBound::try_parse(
567                ast::WindowFrameBound::Preceding($value),
568                &ast::WindowFrameUnits::$unit,
569            )
570            .unwrap_err();
571            assert_eq!(err.strip_backtrace(), $expected);
572            let err = WindowFrameBound::try_parse(
573                ast::WindowFrameBound::Following($value),
574                &ast::WindowFrameUnits::$unit,
575            )
576            .unwrap_err();
577            assert_eq!(err.strip_backtrace(), $expected);
578        };
579    }
580
581    #[test]
582    fn test_window_frame_bound_creation() -> Result<()> {
583        //  Unbounded
584        test_bound!(Rows, None, ScalarValue::UInt64(None));
585        test_bound!(Groups, None, ScalarValue::UInt64(None));
586        test_bound!(Range, None, ScalarValue::UInt64(None));
587
588        // Number
589        let number = Some(Box::new(ast::Expr::Value(
590            ast::Value::Number("42".to_string(), false).into(),
591        )));
592        test_bound!(Rows, number.clone(), ScalarValue::UInt64(Some(42)));
593        test_bound!(Groups, number.clone(), ScalarValue::UInt64(Some(42)));
594        test_bound!(
595            Range,
596            number.clone(),
597            ScalarValue::Utf8(Some("42".to_string()))
598        );
599
600        // Interval
601        let number = Some(Box::new(ast::Expr::Interval(ast::Interval {
602            value: Box::new(ast::Expr::Value(
603                ast::Value::SingleQuotedString("1".to_string()).into(),
604            )),
605            leading_field: Some(ast::DateTimeField::Day),
606            fractional_seconds_precision: None,
607            last_field: None,
608            leading_precision: None,
609        })));
610        test_bound_err!(
611            Rows,
612            number.clone(),
613            "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
614        );
615        test_bound_err!(
616            Groups,
617            number.clone(),
618            "Error during planning: Invalid window frame: frame offsets for ROWS / GROUPS must be non negative integers"
619        );
620        test_bound!(
621            Range,
622            number.clone(),
623            ScalarValue::Utf8(Some("1 DAY".to_string()))
624        );
625
626        Ok(())
627    }
628}