datafusion_functions_window/
rank.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Implementation of `rank`, `dense_rank`, and `percent_rank` window functions,
19//! which can be evaluated at runtime during query execution.
20
21use std::any::Any;
22use std::fmt::Debug;
23use std::iter;
24use std::ops::Range;
25use std::sync::{Arc, LazyLock};
26
27use crate::define_udwf_and_expr;
28use datafusion_common::arrow::array::ArrayRef;
29use datafusion_common::arrow::array::{Float64Array, UInt64Array};
30use datafusion_common::arrow::compute::SortOptions;
31use datafusion_common::arrow::datatypes::DataType;
32use datafusion_common::arrow::datatypes::Field;
33use datafusion_common::utils::get_row_at_idx;
34use datafusion_common::{exec_err, Result, ScalarValue};
35use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
36use datafusion_expr::{
37    Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
38};
39use datafusion_functions_window_common::field;
40use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
41use field::WindowUDFFieldArgs;
42
43define_udwf_and_expr!(
44    Rank,
45    rank,
46    "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
47    Rank::basic
48);
49
50define_udwf_and_expr!(
51    DenseRank,
52    dense_rank,
53    "Returns rank of the current row without gaps. This function counts peer groups",
54    Rank::dense_rank
55);
56
57define_udwf_and_expr!(
58    PercentRank,
59    percent_rank,
60    "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
61    Rank::percent_rank
62);
63
64/// Rank calculates the rank in the window function with order by
65#[derive(Debug)]
66pub struct Rank {
67    name: String,
68    signature: Signature,
69    rank_type: RankType,
70}
71
72impl Rank {
73    /// Create a new `rank` function with the specified name and rank type
74    pub fn new(name: String, rank_type: RankType) -> Self {
75        Self {
76            name,
77            signature: Signature::nullary(Volatility::Immutable),
78            rank_type,
79        }
80    }
81
82    /// Create a `rank` window function
83    pub fn basic() -> Self {
84        Rank::new("rank".to_string(), RankType::Basic)
85    }
86
87    /// Create a `dense_rank` window function
88    pub fn dense_rank() -> Self {
89        Rank::new("dense_rank".to_string(), RankType::Dense)
90    }
91
92    /// Create a `percent_rank` window function
93    pub fn percent_rank() -> Self {
94        Rank::new("percent_rank".to_string(), RankType::Percent)
95    }
96}
97
98#[derive(Debug, Copy, Clone)]
99pub enum RankType {
100    Basic,
101    Dense,
102    Percent,
103}
104
105static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
106    Documentation::builder(
107        DOC_SECTION_RANKING,
108            "Returns the rank of the current row within its partition, allowing \
109            gaps between ranks. This function provides a ranking similar to `row_number`, but \
110            skips ranks for identical values.",
111
112        "rank()")
113        .build()
114});
115
116fn get_rank_doc() -> &'static Documentation {
117    &RANK_DOCUMENTATION
118}
119
120static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
121    Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
122            rows in a dense manner, meaning consecutive ranks are assigned even for identical \
123            values.", "dense_rank()")
124        .build()
125});
126
127fn get_dense_rank_doc() -> &'static Documentation {
128    &DENSE_RANK_DOCUMENTATION
129}
130
131static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
132    Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
133            The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
134        .build()
135});
136
137fn get_percent_rank_doc() -> &'static Documentation {
138    &PERCENT_RANK_DOCUMENTATION
139}
140
141impl WindowUDFImpl for Rank {
142    fn as_any(&self) -> &dyn Any {
143        self
144    }
145
146    fn name(&self) -> &str {
147        &self.name
148    }
149
150    fn signature(&self) -> &Signature {
151        &self.signature
152    }
153
154    fn partition_evaluator(
155        &self,
156        _partition_evaluator_args: PartitionEvaluatorArgs,
157    ) -> Result<Box<dyn PartitionEvaluator>> {
158        Ok(Box::new(RankEvaluator {
159            state: RankState::default(),
160            rank_type: self.rank_type,
161        }))
162    }
163
164    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
165        let return_type = match self.rank_type {
166            RankType::Basic | RankType::Dense => DataType::UInt64,
167            RankType::Percent => DataType::Float64,
168        };
169
170        let nullable = false;
171        Ok(Field::new(field_args.name(), return_type, nullable))
172    }
173
174    fn sort_options(&self) -> Option<SortOptions> {
175        Some(SortOptions {
176            descending: false,
177            nulls_first: false,
178        })
179    }
180
181    fn documentation(&self) -> Option<&Documentation> {
182        match self.rank_type {
183            RankType::Basic => Some(get_rank_doc()),
184            RankType::Dense => Some(get_dense_rank_doc()),
185            RankType::Percent => Some(get_percent_rank_doc()),
186        }
187    }
188}
189
190/// State for the RANK(rank) built-in window function.
191#[derive(Debug, Clone, Default)]
192pub struct RankState {
193    /// The last values for rank as these values change, we increase n_rank
194    pub last_rank_data: Option<Vec<ScalarValue>>,
195    /// The index where last_rank_boundary is started
196    pub last_rank_boundary: usize,
197    /// Keep the number of entries in current rank
198    pub current_group_count: usize,
199    /// Rank number kept from the start
200    pub n_rank: usize,
201}
202
203/// State for the `rank` built-in window function.
204#[derive(Debug)]
205struct RankEvaluator {
206    state: RankState,
207    rank_type: RankType,
208}
209
210impl PartitionEvaluator for RankEvaluator {
211    fn is_causal(&self) -> bool {
212        matches!(self.rank_type, RankType::Basic | RankType::Dense)
213    }
214
215    fn evaluate(
216        &mut self,
217        values: &[ArrayRef],
218        range: &Range<usize>,
219    ) -> Result<ScalarValue> {
220        let row_idx = range.start;
221        // There is no argument, values are order by column values (where rank is calculated)
222        let range_columns = values;
223        let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
224        let new_rank_encountered =
225            if let Some(state_last_rank_data) = &self.state.last_rank_data {
226                // if rank data changes, new rank is encountered
227                state_last_rank_data != &last_rank_data
228            } else {
229                // First rank seen
230                true
231            };
232        if new_rank_encountered {
233            self.state.last_rank_data = Some(last_rank_data);
234            self.state.last_rank_boundary += self.state.current_group_count;
235            self.state.current_group_count = 1;
236            self.state.n_rank += 1;
237        } else {
238            // data is still in the same rank
239            self.state.current_group_count += 1;
240        }
241
242        match self.rank_type {
243            RankType::Basic => Ok(ScalarValue::UInt64(Some(
244                self.state.last_rank_boundary as u64 + 1,
245            ))),
246            RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
247            RankType::Percent => {
248                exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
249            }
250        }
251    }
252
253    fn evaluate_all_with_rank(
254        &self,
255        num_rows: usize,
256        ranks_in_partition: &[Range<usize>],
257    ) -> Result<ArrayRef> {
258        let result: ArrayRef = match self.rank_type {
259            RankType::Basic => Arc::new(UInt64Array::from_iter_values(
260                ranks_in_partition
261                    .iter()
262                    .scan(1_u64, |acc, range| {
263                        let len = range.end - range.start;
264                        let result = iter::repeat(*acc).take(len);
265                        *acc += len as u64;
266                        Some(result)
267                    })
268                    .flatten(),
269            )),
270
271            RankType::Dense => Arc::new(UInt64Array::from_iter_values(
272                ranks_in_partition
273                    .iter()
274                    .zip(1u64..)
275                    .flat_map(|(range, rank)| {
276                        let len = range.end - range.start;
277                        iter::repeat(rank).take(len)
278                    }),
279            )),
280
281            RankType::Percent => {
282                let denominator = num_rows as f64;
283
284                Arc::new(Float64Array::from_iter_values(
285                    ranks_in_partition
286                        .iter()
287                        .scan(0_u64, |acc, range| {
288                            let len = range.end - range.start;
289                            let value = (*acc as f64) / (denominator - 1.0).max(1.0);
290                            let result = iter::repeat(value).take(len);
291                            *acc += len as u64;
292                            Some(result)
293                        })
294                        .flatten(),
295                ))
296            }
297        };
298
299        Ok(result)
300    }
301
302    fn supports_bounded_execution(&self) -> bool {
303        matches!(self.rank_type, RankType::Basic | RankType::Dense)
304    }
305
306    fn include_rank(&self) -> bool {
307        true
308    }
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use datafusion_common::cast::{as_float64_array, as_uint64_array};
315
316    fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
317        test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
318    }
319
320    #[allow(clippy::single_range_in_vec_init)]
321    fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
322        test_i32_result(expr, vec![0..8], expected)
323    }
324
325    fn test_i32_result(
326        expr: &Rank,
327        ranks: Vec<Range<usize>>,
328        expected: Vec<u64>,
329    ) -> Result<()> {
330        let args = PartitionEvaluatorArgs::default();
331        let result = expr
332            .partition_evaluator(args)?
333            .evaluate_all_with_rank(8, &ranks)?;
334        let result = as_uint64_array(&result)?;
335        let result = result.values();
336        assert_eq!(expected, *result);
337        Ok(())
338    }
339
340    fn test_f64_result(
341        expr: &Rank,
342        num_rows: usize,
343        ranks: Vec<Range<usize>>,
344        expected: Vec<f64>,
345    ) -> Result<()> {
346        let args = PartitionEvaluatorArgs::default();
347        let result = expr
348            .partition_evaluator(args)?
349            .evaluate_all_with_rank(num_rows, &ranks)?;
350        let result = as_float64_array(&result)?;
351        let result = result.values();
352        assert_eq!(expected, *result);
353        Ok(())
354    }
355
356    #[test]
357    fn test_rank() -> Result<()> {
358        let r = Rank::basic();
359        test_without_rank(&r, vec![1; 8])?;
360        test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
361        Ok(())
362    }
363
364    #[test]
365    fn test_dense_rank() -> Result<()> {
366        let r = Rank::dense_rank();
367        test_without_rank(&r, vec![1; 8])?;
368        test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
369        Ok(())
370    }
371
372    #[test]
373    #[allow(clippy::single_range_in_vec_init)]
374    fn test_percent_rank() -> Result<()> {
375        let r = Rank::percent_rank();
376
377        // empty case
378        let expected = vec![0.0; 0];
379        test_f64_result(&r, 0, vec![0..0; 0], expected)?;
380
381        // singleton case
382        let expected = vec![0.0];
383        test_f64_result(&r, 1, vec![0..1], expected)?;
384
385        // uniform case
386        let expected = vec![0.0; 7];
387        test_f64_result(&r, 7, vec![0..7], expected)?;
388
389        // non-trivial case
390        let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
391        test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
392
393        Ok(())
394    }
395}