datafusion_functions_window/
row_number.rs1use arrow::datatypes::FieldRef;
21use datafusion_common::arrow::array::ArrayRef;
22use datafusion_common::arrow::array::UInt64Array;
23use datafusion_common::arrow::compute::SortOptions;
24use datafusion_common::arrow::datatypes::DataType;
25use datafusion_common::arrow::datatypes::Field;
26use datafusion_common::{Result, ScalarValue};
27use datafusion_expr::{
28 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
29};
30use datafusion_functions_window_common::field;
31use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
32use datafusion_macros::user_doc;
33use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
34use field::WindowUDFFieldArgs;
35use std::fmt::Debug;
36use std::ops::Range;
37use std::sync::Arc;
38
39define_udwf_and_expr!(
40 RowNumber,
41 row_number,
42 row_number_udwf,
43 "Returns a unique row number for each row in window partition beginning at 1."
44);
45
46#[user_doc(
48 doc_section(label = "Ranking Functions"),
49 description = "Number of the current row within its partition, counting from 1.",
50 syntax_example = "row_number()",
51 sql_example = r#"
52```sql
53-- Example usage of the row_number window function:
54SELECT department,
55 salary,
56 row_number() OVER (PARTITION BY department ORDER BY salary DESC) AS row_num
57FROM employees;
58
59+-------------+--------+---------+
60| department | salary | row_num |
61+-------------+--------+---------+
62| Sales | 70000 | 1 |
63| Sales | 50000 | 2 |
64| Sales | 50000 | 3 |
65| Sales | 30000 | 4 |
66| Engineering | 90000 | 1 |
67| Engineering | 80000 | 2 |
68+-------------+--------+---------+
69```
70"#
71)]
72#[derive(Debug, PartialEq, Eq, Hash)]
73pub struct RowNumber {
74 signature: Signature,
75}
76
77impl RowNumber {
78 pub fn new() -> Self {
80 Self {
81 signature: Signature::nullary(Volatility::Immutable),
82 }
83 }
84}
85
86impl Default for RowNumber {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl WindowUDFImpl for RowNumber {
93 fn name(&self) -> &str {
94 "row_number"
95 }
96
97 fn signature(&self) -> &Signature {
98 &self.signature
99 }
100
101 fn partition_evaluator(
102 &self,
103 _partition_evaluator_args: PartitionEvaluatorArgs,
104 ) -> Result<Box<dyn PartitionEvaluator>> {
105 Ok(Box::<NumRowsEvaluator>::default())
106 }
107
108 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
109 Ok(Field::new(field_args.name(), DataType::UInt64, false).into())
110 }
111
112 fn sort_options(&self) -> Option<SortOptions> {
113 Some(SortOptions {
114 descending: false,
115 nulls_first: false,
116 })
117 }
118
119 fn documentation(&self) -> Option<&Documentation> {
120 self.doc()
121 }
122
123 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
124 LimitEffect::None
125 }
126}
127
128#[derive(Debug, Default)]
130struct NumRowsEvaluator {
131 n_rows: usize,
132}
133
134impl PartitionEvaluator for NumRowsEvaluator {
135 fn is_causal(&self) -> bool {
136 true
138 }
139
140 fn evaluate_all(
141 &mut self,
142 _values: &[ArrayRef],
143 num_rows: usize,
144 ) -> Result<ArrayRef> {
145 Ok(Arc::new(UInt64Array::from_iter_values(
146 1..(num_rows as u64) + 1,
147 )))
148 }
149
150 fn evaluate(
151 &mut self,
152 _values: &[ArrayRef],
153 _range: &Range<usize>,
154 ) -> Result<ScalarValue> {
155 self.n_rows += 1;
156 Ok(ScalarValue::UInt64(Some(self.n_rows as u64)))
157 }
158
159 fn supports_bounded_execution(&self) -> bool {
160 true
161 }
162}
163
164#[cfg(test)]
165mod tests {
166
167 use datafusion_common::arrow::array::{Array, BooleanArray};
168 use datafusion_common::cast::as_uint64_array;
169
170 use super::*;
171
172 #[test]
173 fn row_number_all_null() -> Result<()> {
174 let values: ArrayRef = Arc::new(BooleanArray::from(vec![
175 None, None, None, None, None, None, None, None,
176 ]));
177 let num_rows = values.len();
178
179 let actual = RowNumber::default()
180 .partition_evaluator(PartitionEvaluatorArgs::default())?
181 .evaluate_all(&[values], num_rows)?;
182 let actual = as_uint64_array(&actual)?;
183
184 assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
185 Ok(())
186 }
187
188 #[test]
189 fn row_number_all_values() -> Result<()> {
190 let values: ArrayRef = Arc::new(BooleanArray::from(vec![
191 true, false, true, false, false, true, false, true,
192 ]));
193 let num_rows = values.len();
194
195 let actual = RowNumber::default()
196 .partition_evaluator(PartitionEvaluatorArgs::default())?
197 .evaluate_all(&[values], num_rows)?;
198 let actual = as_uint64_array(&actual)?;
199
200 assert_eq!(vec![1, 2, 3, 4, 5, 6, 7, 8], *actual.values());
201 Ok(())
202 }
203}