Skip to main content

datafusion_functions_window/
rank.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of `rank`, `dense_rank`, and `percent_rank` window functions,
19//! which can be evaluated at runtime during query execution.
20
21use arrow::datatypes::FieldRef;
22use datafusion_common::arrow::array::ArrayRef;
23use datafusion_common::arrow::array::{Float64Array, UInt64Array};
24use datafusion_common::arrow::compute::SortOptions;
25use datafusion_common::arrow::datatypes::DataType;
26use datafusion_common::arrow::datatypes::Field;
27use datafusion_common::utils::get_row_at_idx;
28use datafusion_common::{Result, ScalarValue, exec_err};
29use datafusion_doc::window_doc_sections::DOC_SECTION_RANKING;
30use datafusion_expr::{
31    Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
32};
33use datafusion_functions_window_common::field;
34use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
35use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
36use field::WindowUDFFieldArgs;
37use std::fmt::Debug;
38use std::hash::Hash;
39use std::iter;
40use std::ops::Range;
41use std::sync::{Arc, LazyLock};
42
43define_udwf_and_expr!(
44    Rank,
45    rank,
46    rank_udwf,
47    "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
48    Rank::basic
49);
50
51define_udwf_and_expr!(
52    DenseRank,
53    dense_rank,
54    dense_rank_udwf,
55    "Returns rank of the current row without gaps. This function counts peer groups",
56    Rank::dense_rank
57);
58
59define_udwf_and_expr!(
60    PercentRank,
61    percent_rank,
62    percent_rank_udwf,
63    "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
64    Rank::percent_rank
65);
66
67/// Rank calculates the rank in the window function with order by
68#[derive(Debug, PartialEq, Eq, Hash)]
69pub struct Rank {
70    name: String,
71    signature: Signature,
72    rank_type: RankType,
73}
74
75impl Rank {
76    /// Create a new `rank` function with the specified name and rank type
77    pub fn new(name: String, rank_type: RankType) -> Self {
78        Self {
79            name,
80            signature: Signature::nullary(Volatility::Immutable),
81            rank_type,
82        }
83    }
84
85    /// Create a `rank` window function
86    pub fn basic() -> Self {
87        Rank::new("rank".to_string(), RankType::Basic)
88    }
89
90    /// Create a `dense_rank` window function
91    pub fn dense_rank() -> Self {
92        Rank::new("dense_rank".to_string(), RankType::Dense)
93    }
94
95    /// Create a `percent_rank` window function
96    pub fn percent_rank() -> Self {
97        Rank::new("percent_rank".to_string(), RankType::Percent)
98    }
99}
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
102pub enum RankType {
103    Basic,
104    Dense,
105    Percent,
106}
107
108static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
109    Documentation::builder(
110        DOC_SECTION_RANKING,
111            "Returns the rank of the current row within its partition, allowing \
112            gaps between ranks. This function provides a ranking similar to `row_number`, but \
113            skips ranks for identical values.",
114
115        "rank()")
116        .with_sql_example(r#"
117```sql
118-- Example usage of the rank window function:
119SELECT department,
120    salary,
121    rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
122FROM employees;
123
124+-------------+--------+------+
125| department  | salary | rank |
126+-------------+--------+------+
127| Sales       | 70000  | 1    |
128| Sales       | 50000  | 2    |
129| Sales       | 50000  | 2    |
130| Sales       | 30000  | 4    |
131| Engineering | 90000  | 1    |
132| Engineering | 80000  | 2    |
133+-------------+--------+------+
134```
135"#)
136        .build()
137});
138
139fn get_rank_doc() -> &'static Documentation {
140    &RANK_DOCUMENTATION
141}
142
143static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
144    Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
145            rows in a dense manner, meaning consecutive ranks are assigned even for identical \
146            values.", "dense_rank()")
147        .with_sql_example(r#"
148```sql
149-- Example usage of the dense_rank window function:
150SELECT department,
151    salary,
152    dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
153FROM employees;
154
155+-------------+--------+------------+
156| department  | salary | dense_rank |
157+-------------+--------+------------+
158| Sales       | 70000  | 1          |
159| Sales       | 50000  | 2          |
160| Sales       | 50000  | 2          |
161| Sales       | 30000  | 3          |
162| Engineering | 90000  | 1          |
163| Engineering | 80000  | 2          |
164+-------------+--------+------------+
165```"#)
166        .build()
167});
168
169fn get_dense_rank_doc() -> &'static Documentation {
170    &DENSE_RANK_DOCUMENTATION
171}
172
173static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
174    Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
175            The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
176        .with_sql_example(r#"```sql
177    -- Example usage of the percent_rank window function:
178SELECT employee_id,
179    salary,
180    percent_rank() OVER (ORDER BY salary) AS percent_rank
181FROM employees;
182
183+-------------+--------+---------------+
184| employee_id | salary | percent_rank  |
185+-------------+--------+---------------+
186| 1           | 30000  | 0.00          |
187| 2           | 50000  | 0.50          |
188| 3           | 70000  | 1.00          |
189+-------------+--------+---------------+
190```"#)
191        .build()
192});
193
194fn get_percent_rank_doc() -> &'static Documentation {
195    &PERCENT_RANK_DOCUMENTATION
196}
197
198impl WindowUDFImpl for Rank {
199    fn name(&self) -> &str {
200        &self.name
201    }
202
203    fn signature(&self) -> &Signature {
204        &self.signature
205    }
206
207    fn partition_evaluator(
208        &self,
209        _partition_evaluator_args: PartitionEvaluatorArgs,
210    ) -> Result<Box<dyn PartitionEvaluator>> {
211        Ok(Box::new(RankEvaluator {
212            state: RankState::default(),
213            rank_type: self.rank_type,
214        }))
215    }
216
217    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
218        let return_type = match self.rank_type {
219            RankType::Basic | RankType::Dense => DataType::UInt64,
220            RankType::Percent => DataType::Float64,
221        };
222
223        let nullable = false;
224        Ok(Field::new(field_args.name(), return_type, nullable).into())
225    }
226
227    fn sort_options(&self) -> Option<SortOptions> {
228        Some(SortOptions {
229            descending: false,
230            nulls_first: false,
231        })
232    }
233
234    fn documentation(&self) -> Option<&Documentation> {
235        match self.rank_type {
236            RankType::Basic => Some(get_rank_doc()),
237            RankType::Dense => Some(get_dense_rank_doc()),
238            RankType::Percent => Some(get_percent_rank_doc()),
239        }
240    }
241
242    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
243        match self.rank_type {
244            RankType::Basic => LimitEffect::None,
245            RankType::Dense => LimitEffect::None,
246            RankType::Percent => LimitEffect::Unknown,
247        }
248    }
249}
250
251/// State for the RANK(rank) built-in window function.
252#[derive(Debug, Clone, Default)]
253pub struct RankState {
254    /// The last values for rank as these values change, we increase n_rank
255    pub last_rank_data: Option<Vec<ScalarValue>>,
256    /// The index where last_rank_boundary is started
257    pub last_rank_boundary: usize,
258    /// Keep the number of entries in current rank
259    pub current_group_count: usize,
260    /// Rank number kept from the start
261    pub n_rank: usize,
262}
263
264/// State for the `rank` built-in window function.
265#[derive(Debug)]
266struct RankEvaluator {
267    state: RankState,
268    rank_type: RankType,
269}
270
271impl PartitionEvaluator for RankEvaluator {
272    fn is_causal(&self) -> bool {
273        matches!(self.rank_type, RankType::Basic | RankType::Dense)
274    }
275
276    fn evaluate(
277        &mut self,
278        values: &[ArrayRef],
279        range: &Range<usize>,
280    ) -> Result<ScalarValue> {
281        let row_idx = range.start;
282        // There is no argument, values are order by column values (where rank is calculated)
283        let range_columns = values;
284        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
285        let new_rank_encountered =
286            if let Some(state_last_rank_data) = &self.state.last_rank_data {
287                // if rank data changes, new rank is encountered
288                state_last_rank_data != &last_rank_data
289            } else {
290                // First rank seen
291                true
292            };
293        if new_rank_encountered {
294            self.state.last_rank_data = Some(last_rank_data);
295            self.state.last_rank_boundary += self.state.current_group_count;
296            self.state.current_group_count = 1;
297            self.state.n_rank += 1;
298        } else {
299            // data is still in the same rank
300            self.state.current_group_count += 1;
301        }
302
303        match self.rank_type {
304            RankType::Basic => Ok(ScalarValue::UInt64(Some(
305                self.state.last_rank_boundary as u64 + 1,
306            ))),
307            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
308            RankType::Percent => {
309                exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
310            }
311        }
312    }
313
314    fn evaluate_all_with_rank(
315        &self,
316        num_rows: usize,
317        ranks_in_partition: &[Range<usize>],
318    ) -> Result<ArrayRef> {
319        let result: ArrayRef = match self.rank_type {
320            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
321                ranks_in_partition
322                    .iter()
323                    .scan(1_u64, |acc, range| {
324                        let len = range.end - range.start;
325                        let result = iter::repeat_n(*acc, len);
326                        *acc += len as u64;
327                        Some(result)
328                    })
329                    .flatten(),
330            )),
331
332            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
333                ranks_in_partition
334                    .iter()
335                    .zip(1u64..)
336                    .flat_map(|(range, rank)| {
337                        let len = range.end - range.start;
338                        iter::repeat_n(rank, len)
339                    }),
340            )),
341
342            RankType::Percent => {
343                let denominator = num_rows as f64;
344
345                Arc::new(Float64Array::from_iter_values(
346                    ranks_in_partition
347                        .iter()
348                        .scan(0_u64, |acc, range| {
349                            let len = range.end - range.start;
350                            let value = (*acc as f64) / (denominator - 1.0).max(1.0);
351                            let result = iter::repeat_n(value, len);
352                            *acc += len as u64;
353                            Some(result)
354                        })
355                        .flatten(),
356                ))
357            }
358        };
359
360        Ok(result)
361    }
362
363    fn supports_bounded_execution(&self) -> bool {
364        matches!(self.rank_type, RankType::Basic | RankType::Dense)
365    }
366
367    fn include_rank(&self) -> bool {
368        true
369    }
370}
371
372#[cfg(test)]
373mod tests {
374    use super::*;
375    use datafusion_common::cast::{as_float64_array, as_uint64_array};
376
377    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
378        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
379    }
380
381    #[expect(clippy::single_range_in_vec_init)]
382    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
383        test_i32_result(expr, vec![0..8], expected)
384    }
385
386    fn test_i32_result(
387        expr: &Rank,
388        ranks: Vec<Range<usize>>,
389        expected: Vec<u64>,
390    ) -> Result<()> {
391        let args = PartitionEvaluatorArgs::default();
392        let result = expr
393            .partition_evaluator(args)?
394            .evaluate_all_with_rank(8, &ranks)?;
395        let result = as_uint64_array(&result)?;
396        let result = result.values();
397        assert_eq!(expected, *result);
398        Ok(())
399    }
400
401    fn test_f64_result(
402        expr: &Rank,
403        num_rows: usize,
404        ranks: Vec<Range<usize>>,
405        expected: Vec<f64>,
406    ) -> Result<()> {
407        let args = PartitionEvaluatorArgs::default();
408        let result = expr
409            .partition_evaluator(args)?
410            .evaluate_all_with_rank(num_rows, &ranks)?;
411        let result = as_float64_array(&result)?;
412        let result = result.values();
413        assert_eq!(expected, *result);
414        Ok(())
415    }
416
417    #[test]
418    fn test_rank() -> Result<()> {
419        let r = Rank::basic();
420        test_without_rank(&r, vec![1; 8])?;
421        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
422        Ok(())
423    }
424
425    #[test]
426    fn test_dense_rank() -> Result<()> {
427        let r = Rank::dense_rank();
428        test_without_rank(&r, vec![1; 8])?;
429        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
430        Ok(())
431    }
432
433    #[test]
434    #[expect(clippy::single_range_in_vec_init)]
435    fn test_percent_rank() -> Result<()> {
436        let r = Rank::percent_rank();
437
438        // empty case
439        let expected = vec![0.0; 0];
440        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
441
442        // singleton case
443        let expected = vec![0.0];
444        test_f64_result(&r, 1, vec![0..1], expected)?;
445
446        // uniform case
447        let expected = vec![0.0; 7];
448        test_f64_result(&r, 7, vec![0..7], expected)?;
449
450        // non-trivial case
451        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
452        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
453
454        Ok(())
455    }
456}