Skip to main content

datafusion_functions_window/
row_number.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `row_number` window function implementation
19
20use arrow::datatypes::FieldRef;
21use datafusion_common::arrow::array::ArrayRef;
22use datafusion_common::arrow::array::UInt64Array;
23use datafusion_common::arrow::compute::SortOptions;
24use datafusion_common::arrow::datatypes::DataType;
25use datafusion_common::arrow::datatypes::Field;
26use datafusion_common::{Result, ScalarValue};
27use datafusion_expr::{
28    Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
29};
30use datafusion_functions_window_common::field;
31use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
32use datafusion_macros::user_doc;
33use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34use field::WindowUDFFieldArgs;
35use std::fmt::Debug;
36use std::ops::Range;
37use std::sync::Arc;
38
39define_udwf_and_expr!(
40    RowNumber,
41    row_number,
42    row_number_udwf,
43    "Returns a unique row number for each row in window partition beginning at 1."
44);
45
46/// row_number expression
47#[user_doc(
48    doc_section(label = "Ranking Functions"),
49    description = "Number of the current row within its partition, counting from 1.",
50    syntax_example = "row_number()",
51    sql_example = r#"
52```sql
53-- Example usage of the row_number window function:
54SELECT department,
55  salary,
56  row_number() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num
57FROM employees;
58
59+-------------+--------+---------+
60| department  | salary | row_num |
61+-------------+--------+---------+
62| Sales       | 70000  | 1       |
63| Sales       | 50000  | 2       |
64| Sales       | 50000  | 3       |
65| Sales       | 30000  | 4       |
66| Engineering | 90000  | 1       |
67| Engineering | 80000  | 2       |
68+-------------+--------+---------+
69```
70"#
71)]
72#[derive(Debug, PartialEq, Eq, Hash)]
73pub struct RowNumber {
74    signature: Signature,
75}
76
77impl RowNumber {
78    /// Create a new `row_number` function
79    pub fn new() -> Self {
80        Self {
81            signature: Signature::nullary(Volatility::Immutable),
82        }
83    }
84}
85
86impl Default for RowNumber {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl WindowUDFImpl for RowNumber {
93    fn name(&self) -> &str {
94        "row_number"
95    }
96
97    fn signature(&self) -> &Signature {
98        &self.signature
99    }
100
101    fn partition_evaluator(
102        &self,
103        _partition_evaluator_args: PartitionEvaluatorArgs,
104    ) -> Result<Box<dyn PartitionEvaluator>> {
105        Ok(Box::<NumRowsEvaluator>::default())
106    }
107
108    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
109        Ok(Field::new(field_args.name(), DataType::UInt64, false).into())
110    }
111
112    fn sort_options(&self) -> Option<SortOptions> {
113        Some(SortOptions {
114            descending: false,
115            nulls_first: false,
116        })
117    }
118
119    fn documentation(&self) -> Option<&Documentation> {
120        self.doc()
121    }
122
123    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
124        LimitEffect::None
125    }
126}
127
128/// State for the `row_number` built-in window function.
129#[derive(Debug, Default)]
130struct NumRowsEvaluator {
131    n_rows: usize,
132}
133
134impl PartitionEvaluator for NumRowsEvaluator {
135    fn is_causal(&self) -> bool {
136        // The row_number function doesn't need "future" values to emit results:
137        true
138    }
139
140    fn evaluate_all(
141        &mut self,
142        _values: &[ArrayRef],
143        num_rows: usize,
144    ) -> Result<ArrayRef> {
145        Ok(Arc::new(UInt64Array::from_iter_values(
146            1..(num_rows as u64) + 1,
147        )))
148    }
149
150    fn evaluate(
151        &mut self,
152        _values: &[ArrayRef],
153        _range: &Range<usize>,
154    ) -> Result<ScalarValue> {
155        self.n_rows += 1;
156        Ok(ScalarValue::UInt64(Some(self.n_rows as u64)))
157    }
158
159    fn supports_bounded_execution(&self) -> bool {
160        true
161    }
162}
163
164#[cfg(test)]
165mod tests {
166
167    use datafusion_common::arrow::array::{Array, BooleanArray};
168    use datafusion_common::cast::as_uint64_array;
169
170    use super::*;
171
172    #[test]
173    fn row_number_all_null() -> Result<()> {
174        let values: ArrayRef = Arc::new(BooleanArray::from(vec![
175            None, None, None, None, None, None, None, None,
176        ]));
177        let num_rows = values.len();
178
179        let actual = RowNumber::default()
180            .partition_evaluator(PartitionEvaluatorArgs::default())?
181            .evaluate_all(&[values], num_rows)?;
182        let actual = as_uint64_array(&actual)?;
183
184        assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
185        Ok(())
186    }
187
188    #[test]
189    fn row_number_all_values() -> Result<()> {
190        let values: ArrayRef = Arc::new(BooleanArray::from(vec![
191            true, false, true, false, false, true, false, true,
192        ]));
193        let num_rows = values.len();
194
195        let actual = RowNumber::default()
196            .partition_evaluator(PartitionEvaluatorArgs::default())?
197            .evaluate_all(&[values], num_rows)?;
198        let actual = as_uint64_array(&actual)?;
199
200        assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
201        Ok(())
202    }
203}