datafusion_functions_window/
rank.rs1use std::any::Any;
22use std::fmt::Debug;
23use std::iter;
24use std::ops::Range;
25use std::sync::{Arc, LazyLock};
26
27use crate::define_udwf_and_expr;
28use datafusion_common::arrow::array::ArrayRef;
29use datafusion_common::arrow::array::{Float64Array, UInt64Array};
30use datafusion_common::arrow::compute::SortOptions;
31use datafusion_common::arrow::datatypes::DataType;
32use datafusion_common::arrow::datatypes::Field;
33use datafusion_common::utils::get_row_at_idx;
34use datafusion_common::{exec_err, Result, ScalarValue};
35use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
36use datafusion_expr::{
37 Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
38};
39use datafusion_functions_window_common::field;
40use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
41use field::WindowUDFFieldArgs;
42
43define_udwf_and_expr!(
44 Rank,
45 rank,
46 "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
47 Rank::basic
48);
49
50define_udwf_and_expr!(
51 DenseRank,
52 dense_rank,
53 "Returns rank of the current row without gaps. This function counts peer groups",
54 Rank::dense_rank
55);
56
57define_udwf_and_expr!(
58 PercentRank,
59 percent_rank,
60 "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
61 Rank::percent_rank
62);
63
64#[derive(Debug)]
66pub struct Rank {
67 name: String,
68 signature: Signature,
69 rank_type: RankType,
70}
71
72impl Rank {
73 pub fn new(name: String, rank_type: RankType) -> Self {
75 Self {
76 name,
77 signature: Signature::nullary(Volatility::Immutable),
78 rank_type,
79 }
80 }
81
82 pub fn basic() -> Self {
84 Rank::new("rank".to_string(), RankType::Basic)
85 }
86
87 pub fn dense_rank() -> Self {
89 Rank::new("dense_rank".to_string(), RankType::Dense)
90 }
91
92 pub fn percent_rank() -> Self {
94 Rank::new("percent_rank".to_string(), RankType::Percent)
95 }
96}
97
98#[derive(Debug, Copy, Clone)]
99pub enum RankType {
100 Basic,
101 Dense,
102 Percent,
103}
104
105static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
106 Documentation::builder(
107 DOC_SECTION_RANKING,
108 "Returns the rank of the current row within its partition, allowing \
109 gaps between ranks. This function provides a ranking similar to `row_number`, but \
110 skips ranks for identical values.",
111
112 "rank()")
113 .build()
114});
115
116fn get_rank_doc() -> &'static Documentation {
117 &RANK_DOCUMENTATION
118}
119
120static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
121 Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
122 rows in a dense manner, meaning consecutive ranks are assigned even for identical \
123 values.", "dense_rank()")
124 .build()
125});
126
127fn get_dense_rank_doc() -> &'static Documentation {
128 &DENSE_RANK_DOCUMENTATION
129}
130
131static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
132 Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
133 The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
134 .build()
135});
136
137fn get_percent_rank_doc() -> &'static Documentation {
138 &PERCENT_RANK_DOCUMENTATION
139}
140
141impl WindowUDFImpl for Rank {
142 fn as_any(&self) -> &dyn Any {
143 self
144 }
145
146 fn name(&self) -> &str {
147 &self.name
148 }
149
150 fn signature(&self) -> &Signature {
151 &self.signature
152 }
153
154 fn partition_evaluator(
155 &self,
156 _partition_evaluator_args: PartitionEvaluatorArgs,
157 ) -> Result<Box<dyn PartitionEvaluator>> {
158 Ok(Box::new(RankEvaluator {
159 state: RankState::default(),
160 rank_type: self.rank_type,
161 }))
162 }
163
164 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
165 let return_type = match self.rank_type {
166 RankType::Basic | RankType::Dense => DataType::UInt64,
167 RankType::Percent => DataType::Float64,
168 };
169
170 let nullable = false;
171 Ok(Field::new(field_args.name(), return_type, nullable))
172 }
173
174 fn sort_options(&self) -> Option<SortOptions> {
175 Some(SortOptions {
176 descending: false,
177 nulls_first: false,
178 })
179 }
180
181 fn documentation(&self) -> Option<&Documentation> {
182 match self.rank_type {
183 RankType::Basic => Some(get_rank_doc()),
184 RankType::Dense => Some(get_dense_rank_doc()),
185 RankType::Percent => Some(get_percent_rank_doc()),
186 }
187 }
188}
189
190#[derive(Debug, Clone, Default)]
192pub struct RankState {
193 pub last_rank_data: Option<Vec<ScalarValue>>,
195 pub last_rank_boundary: usize,
197 pub current_group_count: usize,
199 pub n_rank: usize,
201}
202
203#[derive(Debug)]
205struct RankEvaluator {
206 state: RankState,
207 rank_type: RankType,
208}
209
210impl PartitionEvaluator for RankEvaluator {
211 fn is_causal(&self) -> bool {
212 matches!(self.rank_type, RankType::Basic | RankType::Dense)
213 }
214
215 fn evaluate(
216 &mut self,
217 values: &[ArrayRef],
218 range: &Range<usize>,
219 ) -> Result<ScalarValue> {
220 let row_idx = range.start;
221 let range_columns = values;
223 let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
224 let new_rank_encountered =
225 if let Some(state_last_rank_data) = &self.state.last_rank_data {
226 state_last_rank_data != &last_rank_data
228 } else {
229 true
231 };
232 if new_rank_encountered {
233 self.state.last_rank_data = Some(last_rank_data);
234 self.state.last_rank_boundary += self.state.current_group_count;
235 self.state.current_group_count = 1;
236 self.state.n_rank += 1;
237 } else {
238 self.state.current_group_count += 1;
240 }
241
242 match self.rank_type {
243 RankType::Basic => Ok(ScalarValue::UInt64(Some(
244 self.state.last_rank_boundary as u64 + 1,
245 ))),
246 RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
247 RankType::Percent => {
248 exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
249 }
250 }
251 }
252
253 fn evaluate_all_with_rank(
254 &self,
255 num_rows: usize,
256 ranks_in_partition: &[Range<usize>],
257 ) -> Result<ArrayRef> {
258 let result: ArrayRef = match self.rank_type {
259 RankType::Basic => Arc::new(UInt64Array::from_iter_values(
260 ranks_in_partition
261 .iter()
262 .scan(1_u64, |acc, range| {
263 let len = range.end - range.start;
264 let result = iter::repeat(*acc).take(len);
265 *acc += len as u64;
266 Some(result)
267 })
268 .flatten(),
269 )),
270
271 RankType::Dense => Arc::new(UInt64Array::from_iter_values(
272 ranks_in_partition
273 .iter()
274 .zip(1u64..)
275 .flat_map(|(range, rank)| {
276 let len = range.end - range.start;
277 iter::repeat(rank).take(len)
278 }),
279 )),
280
281 RankType::Percent => {
282 let denominator = num_rows as f64;
283
284 Arc::new(Float64Array::from_iter_values(
285 ranks_in_partition
286 .iter()
287 .scan(0_u64, |acc, range| {
288 let len = range.end - range.start;
289 let value = (*acc as f64) / (denominator - 1.0).max(1.0);
290 let result = iter::repeat(value).take(len);
291 *acc += len as u64;
292 Some(result)
293 })
294 .flatten(),
295 ))
296 }
297 };
298
299 Ok(result)
300 }
301
302 fn supports_bounded_execution(&self) -> bool {
303 matches!(self.rank_type, RankType::Basic | RankType::Dense)
304 }
305
306 fn include_rank(&self) -> bool {
307 true
308 }
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use datafusion_common::cast::{as_float64_array, as_uint64_array};
315
316 fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
317 test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
318 }
319
320 #[allow(clippy::single_range_in_vec_init)]
321 fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
322 test_i32_result(expr, vec![0..8], expected)
323 }
324
325 fn test_i32_result(
326 expr: &Rank,
327 ranks: Vec<Range<usize>>,
328 expected: Vec<u64>,
329 ) -> Result<()> {
330 let args = PartitionEvaluatorArgs::default();
331 let result = expr
332 .partition_evaluator(args)?
333 .evaluate_all_with_rank(8, &ranks)?;
334 let result = as_uint64_array(&result)?;
335 let result = result.values();
336 assert_eq!(expected, *result);
337 Ok(())
338 }
339
340 fn test_f64_result(
341 expr: &Rank,
342 num_rows: usize,
343 ranks: Vec<Range<usize>>,
344 expected: Vec<f64>,
345 ) -> Result<()> {
346 let args = PartitionEvaluatorArgs::default();
347 let result = expr
348 .partition_evaluator(args)?
349 .evaluate_all_with_rank(num_rows, &ranks)?;
350 let result = as_float64_array(&result)?;
351 let result = result.values();
352 assert_eq!(expected, *result);
353 Ok(())
354 }
355
356 #[test]
357 fn test_rank() -> Result<()> {
358 let r = Rank::basic();
359 test_without_rank(&r, vec![1; 8])?;
360 test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
361 Ok(())
362 }
363
364 #[test]
365 fn test_dense_rank() -> Result<()> {
366 let r = Rank::dense_rank();
367 test_without_rank(&r, vec![1; 8])?;
368 test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
369 Ok(())
370 }
371
372 #[test]
373 #[allow(clippy::single_range_in_vec_init)]
374 fn test_percent_rank() -> Result<()> {
375 let r = Rank::percent_rank();
376
377 let expected = vec![0.0; 0];
379 test_f64_result(&r, 0, vec![0..0; 0], expected)?;
380
381 let expected = vec![0.0];
383 test_f64_result(&r, 1, vec![0..1], expected)?;
384
385 let expected = vec![0.0; 7];
387 test_f64_result(&r, 7, vec![0..7], expected)?;
388
389 let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
391 test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
392
393 Ok(())
394 }
395}