datafusion_functions_window/
rank.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of `rank`, `dense_rank`, and `percent_rank` window functions,
19//! which can be evaluated at runtime during query execution.
20
21use crate::define_udwf_and_expr;
22use arrow::datatypes::FieldRef;
23use datafusion_common::arrow::array::ArrayRef;
24use datafusion_common::arrow::array::{Float64Array, UInt64Array};
25use datafusion_common::arrow::compute::SortOptions;
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::arrow::datatypes::Field;
28use datafusion_common::utils::get_row_at_idx;
29use datafusion_common::{exec_err, Result, ScalarValue};
30use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
31use datafusion_expr::{
32    Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
33};
34use datafusion_functions_window_common::field;
35use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
36use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
37use field::WindowUDFFieldArgs;
38use std::any::Any;
39use std::fmt::Debug;
40use std::hash::Hash;
41use std::iter;
42use std::ops::Range;
43use std::sync::{Arc, LazyLock};
44
45define_udwf_and_expr!(
46    Rank,
47    rank,
48    "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
49    Rank::basic
50);
51
52define_udwf_and_expr!(
53    DenseRank,
54    dense_rank,
55    "Returns rank of the current row without gaps. This function counts peer groups",
56    Rank::dense_rank
57);
58
59define_udwf_and_expr!(
60    PercentRank,
61    percent_rank,
62    "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
63    Rank::percent_rank
64);
65
66/// Rank calculates the rank in the window function with order by
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct Rank {
69    name: String,
70    signature: Signature,
71    rank_type: RankType,
72}
73
74impl Rank {
75    /// Create a new `rank` function with the specified name and rank type
76    pub fn new(name: String, rank_type: RankType) -> Self {
77        Self {
78            name,
79            signature: Signature::nullary(Volatility::Immutable),
80            rank_type,
81        }
82    }
83
84    /// Create a `rank` window function
85    pub fn basic() -> Self {
86        Rank::new("rank".to_string(), RankType::Basic)
87    }
88
89    /// Create a `dense_rank` window function
90    pub fn dense_rank() -> Self {
91        Rank::new("dense_rank".to_string(), RankType::Dense)
92    }
93
94    /// Create a `percent_rank` window function
95    pub fn percent_rank() -> Self {
96        Rank::new("percent_rank".to_string(), RankType::Percent)
97    }
98}
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
101pub enum RankType {
102    Basic,
103    Dense,
104    Percent,
105}
106
107static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
108    Documentation::builder(
109        DOC_SECTION_RANKING,
110            "Returns the rank of the current row within its partition, allowing \
111            gaps between ranks. This function provides a ranking similar to `row_number`, but \
112            skips ranks for identical values.",
113
114        "rank()")
115        .with_sql_example(r#"
116```sql
117-- Example usage of the rank window function:
118SELECT department,
119    salary,
120    rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
121FROM employees;
122
123+-------------+--------+------+
124| department  | salary | rank |
125+-------------+--------+------+
126| Sales       | 70000  | 1    |
127| Sales       | 50000  | 2    |
128| Sales       | 50000  | 2    |
129| Sales       | 30000  | 4    |
130| Engineering | 90000  | 1    |
131| Engineering | 80000  | 2    |
132+-------------+--------+------+
133```
134"#)
135        .build()
136});
137
138fn get_rank_doc() -> &'static Documentation {
139    &RANK_DOCUMENTATION
140}
141
142static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
143    Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
144            rows in a dense manner, meaning consecutive ranks are assigned even for identical \
145            values.", "dense_rank()")
146        .with_sql_example(r#"
147```sql
148-- Example usage of the dense_rank window function:
149SELECT department,
150    salary,
151    dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
152FROM employees;
153
154+-------------+--------+------------+
155| department  | salary | dense_rank |
156+-------------+--------+------------+
157| Sales       | 70000  | 1          |
158| Sales       | 50000  | 2          |
159| Sales       | 50000  | 2          |
160| Sales       | 30000  | 3          |
161| Engineering | 90000  | 1          |
162| Engineering | 80000  | 2          |
163+-------------+--------+------------+
164```"#)
165        .build()
166});
167
168fn get_dense_rank_doc() -> &'static Documentation {
169    &DENSE_RANK_DOCUMENTATION
170}
171
172static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
173    Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
174            The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
175        .with_sql_example(r#"```sql
176    -- Example usage of the percent_rank window function:
177SELECT employee_id,
178    salary,
179    percent_rank() OVER (ORDER BY salary) AS percent_rank
180FROM employees;
181
182+-------------+--------+---------------+
183| employee_id | salary | percent_rank  |
184+-------------+--------+---------------+
185| 1           | 30000  | 0.00          |
186| 2           | 50000  | 0.50          |
187| 3           | 70000  | 1.00          |
188+-------------+--------+---------------+
189```"#)
190        .build()
191});
192
193fn get_percent_rank_doc() -> &'static Documentation {
194    &PERCENT_RANK_DOCUMENTATION
195}
196
197impl WindowUDFImpl for Rank {
198    fn as_any(&self) -> &dyn Any {
199        self
200    }
201
202    fn name(&self) -> &str {
203        &self.name
204    }
205
206    fn signature(&self) -> &Signature {
207        &self.signature
208    }
209
210    fn partition_evaluator(
211        &self,
212        _partition_evaluator_args: PartitionEvaluatorArgs,
213    ) -> Result<Box<dyn PartitionEvaluator>> {
214        Ok(Box::new(RankEvaluator {
215            state: RankState::default(),
216            rank_type: self.rank_type,
217        }))
218    }
219
220    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
221        let return_type = match self.rank_type {
222            RankType::Basic | RankType::Dense => DataType::UInt64,
223            RankType::Percent => DataType::Float64,
224        };
225
226        let nullable = false;
227        Ok(Field::new(field_args.name(), return_type, nullable).into())
228    }
229
230    fn sort_options(&self) -> Option<SortOptions> {
231        Some(SortOptions {
232            descending: false,
233            nulls_first: false,
234        })
235    }
236
237    fn documentation(&self) -> Option<&Documentation> {
238        match self.rank_type {
239            RankType::Basic => Some(get_rank_doc()),
240            RankType::Dense => Some(get_dense_rank_doc()),
241            RankType::Percent => Some(get_percent_rank_doc()),
242        }
243    }
244
245    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
246        match self.rank_type {
247            RankType::Basic => LimitEffect::None,
248            RankType::Dense => LimitEffect::None,
249            RankType::Percent => LimitEffect::Unknown,
250        }
251    }
252}
253
254/// State for the RANK(rank) built-in window function.
255#[derive(Debug, Clone, Default)]
256pub struct RankState {
257    /// The last values for rank as these values change, we increase n_rank
258    pub last_rank_data: Option<Vec<ScalarValue>>,
259    /// The index where last_rank_boundary is started
260    pub last_rank_boundary: usize,
261    /// Keep the number of entries in current rank
262    pub current_group_count: usize,
263    /// Rank number kept from the start
264    pub n_rank: usize,
265}
266
267/// State for the `rank` built-in window function.
268#[derive(Debug)]
269struct RankEvaluator {
270    state: RankState,
271    rank_type: RankType,
272}
273
274impl PartitionEvaluator for RankEvaluator {
275    fn is_causal(&self) -> bool {
276        matches!(self.rank_type, RankType::Basic | RankType::Dense)
277    }
278
279    fn evaluate(
280        &mut self,
281        values: &[ArrayRef],
282        range: &Range<usize>,
283    ) -> Result<ScalarValue> {
284        let row_idx = range.start;
285        // There is no argument, values are order by column values (where rank is calculated)
286        let range_columns = values;
287        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
288        let new_rank_encountered =
289            if let Some(state_last_rank_data) = &self.state.last_rank_data {
290                // if rank data changes, new rank is encountered
291                state_last_rank_data != &last_rank_data
292            } else {
293                // First rank seen
294                true
295            };
296        if new_rank_encountered {
297            self.state.last_rank_data = Some(last_rank_data);
298            self.state.last_rank_boundary += self.state.current_group_count;
299            self.state.current_group_count = 1;
300            self.state.n_rank += 1;
301        } else {
302            // data is still in the same rank
303            self.state.current_group_count += 1;
304        }
305
306        match self.rank_type {
307            RankType::Basic => Ok(ScalarValue::UInt64(Some(
308                self.state.last_rank_boundary as u64 + 1,
309            ))),
310            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
311            RankType::Percent => {
312                exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
313            }
314        }
315    }
316
317    fn evaluate_all_with_rank(
318        &self,
319        num_rows: usize,
320        ranks_in_partition: &[Range<usize>],
321    ) -> Result<ArrayRef> {
322        let result: ArrayRef = match self.rank_type {
323            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
324                ranks_in_partition
325                    .iter()
326                    .scan(1_u64, |acc, range| {
327                        let len = range.end - range.start;
328                        let result = iter::repeat_n(*acc, len);
329                        *acc += len as u64;
330                        Some(result)
331                    })
332                    .flatten(),
333            )),
334
335            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
336                ranks_in_partition
337                    .iter()
338                    .zip(1u64..)
339                    .flat_map(|(range, rank)| {
340                        let len = range.end - range.start;
341                        iter::repeat_n(rank, len)
342                    }),
343            )),
344
345            RankType::Percent => {
346                let denominator = num_rows as f64;
347
348                Arc::new(Float64Array::from_iter_values(
349                    ranks_in_partition
350                        .iter()
351                        .scan(0_u64, |acc, range| {
352                            let len = range.end - range.start;
353                            let value = (*acc as f64) / (denominator - 1.0).max(1.0);
354                            let result = iter::repeat_n(value, len);
355                            *acc += len as u64;
356                            Some(result)
357                        })
358                        .flatten(),
359                ))
360            }
361        };
362
363        Ok(result)
364    }
365
366    fn supports_bounded_execution(&self) -> bool {
367        matches!(self.rank_type, RankType::Basic | RankType::Dense)
368    }
369
370    fn include_rank(&self) -> bool {
371        true
372    }
373}
374
375#[cfg(test)]
376mod tests {
377    use super::*;
378    use datafusion_common::cast::{as_float64_array, as_uint64_array};
379
380    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
381        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
382    }
383
384    #[allow(clippy::single_range_in_vec_init)]
385    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
386        test_i32_result(expr, vec![0..8], expected)
387    }
388
389    fn test_i32_result(
390        expr: &Rank,
391        ranks: Vec<Range<usize>>,
392        expected: Vec<u64>,
393    ) -> Result<()> {
394        let args = PartitionEvaluatorArgs::default();
395        let result = expr
396            .partition_evaluator(args)?
397            .evaluate_all_with_rank(8, &ranks)?;
398        let result = as_uint64_array(&result)?;
399        let result = result.values();
400        assert_eq!(expected, *result);
401        Ok(())
402    }
403
404    fn test_f64_result(
405        expr: &Rank,
406        num_rows: usize,
407        ranks: Vec<Range<usize>>,
408        expected: Vec<f64>,
409    ) -> Result<()> {
410        let args = PartitionEvaluatorArgs::default();
411        let result = expr
412            .partition_evaluator(args)?
413            .evaluate_all_with_rank(num_rows, &ranks)?;
414        let result = as_float64_array(&result)?;
415        let result = result.values();
416        assert_eq!(expected, *result);
417        Ok(())
418    }
419
420    #[test]
421    fn test_rank() -> Result<()> {
422        let r = Rank::basic();
423        test_without_rank(&r, vec![1; 8])?;
424        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
425        Ok(())
426    }
427
428    #[test]
429    fn test_dense_rank() -> Result<()> {
430        let r = Rank::dense_rank();
431        test_without_rank(&r, vec![1; 8])?;
432        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
433        Ok(())
434    }
435
436    #[test]
437    #[allow(clippy::single_range_in_vec_init)]
438    fn test_percent_rank() -> Result<()> {
439        let r = Rank::percent_rank();
440
441        // empty case
442        let expected = vec![0.0; 0];
443        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
444
445        // singleton case
446        let expected = vec![0.0];
447        test_f64_result(&r, 1, vec![0..1], expected)?;
448
449        // uniform case
450        let expected = vec![0.0; 7];
451        test_f64_result(&r, 7, vec![0..7], expected)?;
452
453        // non-trivial case
454        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
455        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
456
457        Ok(())
458    }
459}