1use arrow::datatypes::FieldRef;
22use datafusion_common::arrow::array::ArrayRef;
23use datafusion_common::arrow::array::{Float64Array, UInt64Array};
24use datafusion_common::arrow::compute::SortOptions;
25use datafusion_common::arrow::datatypes::DataType;
26use datafusion_common::arrow::datatypes::Field;
27use datafusion_common::utils::get_row_at_idx;
28use datafusion_common::{Result, ScalarValue, exec_err};
29use datafusion_doc::window_doc_sections::DOC_SECTION_RANKING;
30use datafusion_expr::{
31 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
32};
33use datafusion_functions_window_common::field;
34use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
35use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
36use field::WindowUDFFieldArgs;
37use std::fmt::Debug;
38use std::hash::Hash;
39use std::iter;
40use std::ops::Range;
41use std::sync::{Arc, LazyLock};
42
43define_udwf_and_expr!(
44 Rank,
45 rank,
46 rank_udwf,
47 "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
48 Rank::basic
49);
50
51define_udwf_and_expr!(
52 DenseRank,
53 dense_rank,
54 dense_rank_udwf,
55 "Returns rank of the current row without gaps. This function counts peer groups",
56 Rank::dense_rank
57);
58
59define_udwf_and_expr!(
60 PercentRank,
61 percent_rank,
62 percent_rank_udwf,
63 "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
64 Rank::percent_rank
65);
66
67#[derive(Debug, PartialEq, Eq, Hash)]
69pub struct Rank {
70 name: String,
71 signature: Signature,
72 rank_type: RankType,
73}
74
75impl Rank {
76 pub fn new(name: String, rank_type: RankType) -> Self {
78 Self {
79 name,
80 signature: Signature::nullary(Volatility::Immutable),
81 rank_type,
82 }
83 }
84
85 pub fn basic() -> Self {
87 Rank::new("rank".to_string(), RankType::Basic)
88 }
89
90 pub fn dense_rank() -> Self {
92 Rank::new("dense_rank".to_string(), RankType::Dense)
93 }
94
95 pub fn percent_rank() -> Self {
97 Rank::new("percent_rank".to_string(), RankType::Percent)
98 }
99}
100
101#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
102pub enum RankType {
103 Basic,
104 Dense,
105 Percent,
106}
107
108static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
109 Documentation::builder(
110 DOC_SECTION_RANKING,
111 "Returns the rank of the current row within its partition, allowing \
112 gaps between ranks. This function provides a ranking similar to `row_number`, but \
113 skips ranks for identical values.",
114
115 "rank()")
116 .with_sql_example(r#"
117```sql
118-- Example usage of the rank window function:
119SELECT department,
120 salary,
121 rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
122FROM employees;
123
124+-------------+--------+------+
125| department | salary | rank |
126+-------------+--------+------+
127| Sales | 70000 | 1 |
128| Sales | 50000 | 2 |
129| Sales | 50000 | 2 |
130| Sales | 30000 | 4 |
131| Engineering | 90000 | 1 |
132| Engineering | 80000 | 2 |
133+-------------+--------+------+
134```
135"#)
136 .build()
137});
138
139fn get_rank_doc() -> &'static Documentation {
140 &RANK_DOCUMENTATION
141}
142
143static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
144 Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
145 rows in a dense manner, meaning consecutive ranks are assigned even for identical \
146 values.", "dense_rank()")
147 .with_sql_example(r#"
148```sql
149-- Example usage of the dense_rank window function:
150SELECT department,
151 salary,
152 dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
153FROM employees;
154
155+-------------+--------+------------+
156| department | salary | dense_rank |
157+-------------+--------+------------+
158| Sales | 70000 | 1 |
159| Sales | 50000 | 2 |
160| Sales | 50000 | 2 |
161| Sales | 30000 | 3 |
162| Engineering | 90000 | 1 |
163| Engineering | 80000 | 2 |
164+-------------+--------+------------+
165```"#)
166 .build()
167});
168
169fn get_dense_rank_doc() -> &'static Documentation {
170 &DENSE_RANK_DOCUMENTATION
171}
172
173static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
174 Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
175 The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
176 .with_sql_example(r#"```sql
177 -- Example usage of the percent_rank window function:
178SELECT employee_id,
179 salary,
180 percent_rank() OVER (ORDER BY salary) AS percent_rank
181FROM employees;
182
183+-------------+--------+---------------+
184| employee_id | salary | percent_rank |
185+-------------+--------+---------------+
186| 1 | 30000 | 0.00 |
187| 2 | 50000 | 0.50 |
188| 3 | 70000 | 1.00 |
189+-------------+--------+---------------+
190```"#)
191 .build()
192});
193
194fn get_percent_rank_doc() -> &'static Documentation {
195 &PERCENT_RANK_DOCUMENTATION
196}
197
198impl WindowUDFImpl for Rank {
199 fn name(&self) -> &str {
200 &self.name
201 }
202
203 fn signature(&self) -> &Signature {
204 &self.signature
205 }
206
207 fn partition_evaluator(
208 &self,
209 _partition_evaluator_args: PartitionEvaluatorArgs,
210 ) -> Result<Box<dyn PartitionEvaluator>> {
211 Ok(Box::new(RankEvaluator {
212 state: RankState::default(),
213 rank_type: self.rank_type,
214 }))
215 }
216
217 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
218 let return_type = match self.rank_type {
219 RankType::Basic | RankType::Dense => DataType::UInt64,
220 RankType::Percent => DataType::Float64,
221 };
222
223 let nullable = false;
224 Ok(Field::new(field_args.name(), return_type, nullable).into())
225 }
226
227 fn sort_options(&self) -> Option<SortOptions> {
228 Some(SortOptions {
229 descending: false,
230 nulls_first: false,
231 })
232 }
233
234 fn documentation(&self) -> Option<&Documentation> {
235 match self.rank_type {
236 RankType::Basic => Some(get_rank_doc()),
237 RankType::Dense => Some(get_dense_rank_doc()),
238 RankType::Percent => Some(get_percent_rank_doc()),
239 }
240 }
241
242 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
243 match self.rank_type {
244 RankType::Basic => LimitEffect::None,
245 RankType::Dense => LimitEffect::None,
246 RankType::Percent => LimitEffect::Unknown,
247 }
248 }
249}
250
251#[derive(Debug, Clone, Default)]
253pub struct RankState {
254 pub last_rank_data: Option<Vec<ScalarValue>>,
256 pub last_rank_boundary: usize,
258 pub current_group_count: usize,
260 pub n_rank: usize,
262}
263
264#[derive(Debug)]
266struct RankEvaluator {
267 state: RankState,
268 rank_type: RankType,
269}
270
271impl PartitionEvaluator for RankEvaluator {
272 fn is_causal(&self) -> bool {
273 matches!(self.rank_type, RankType::Basic | RankType::Dense)
274 }
275
276 fn evaluate(
277 &mut self,
278 values: &[ArrayRef],
279 range: &Range<usize>,
280 ) -> Result<ScalarValue> {
281 let row_idx = range.start;
282 let range_columns = values;
284 let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
285 let new_rank_encountered =
286 if let Some(state_last_rank_data) = &self.state.last_rank_data {
287 state_last_rank_data != &last_rank_data
289 } else {
290 true
292 };
293 if new_rank_encountered {
294 self.state.last_rank_data = Some(last_rank_data);
295 self.state.last_rank_boundary += self.state.current_group_count;
296 self.state.current_group_count = 1;
297 self.state.n_rank += 1;
298 } else {
299 self.state.current_group_count += 1;
301 }
302
303 match self.rank_type {
304 RankType::Basic => Ok(ScalarValue::UInt64(Some(
305 self.state.last_rank_boundary as u64 + 1,
306 ))),
307 RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
308 RankType::Percent => {
309 exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
310 }
311 }
312 }
313
314 fn evaluate_all_with_rank(
315 &self,
316 num_rows: usize,
317 ranks_in_partition: &[Range<usize>],
318 ) -> Result<ArrayRef> {
319 let result: ArrayRef = match self.rank_type {
320 RankType::Basic => Arc::new(UInt64Array::from_iter_values(
321 ranks_in_partition
322 .iter()
323 .scan(1_u64, |acc, range| {
324 let len = range.end - range.start;
325 let result = iter::repeat_n(*acc, len);
326 *acc += len as u64;
327 Some(result)
328 })
329 .flatten(),
330 )),
331
332 RankType::Dense => Arc::new(UInt64Array::from_iter_values(
333 ranks_in_partition
334 .iter()
335 .zip(1u64..)
336 .flat_map(|(range, rank)| {
337 let len = range.end - range.start;
338 iter::repeat_n(rank, len)
339 }),
340 )),
341
342 RankType::Percent => {
343 let denominator = num_rows as f64;
344
345 Arc::new(Float64Array::from_iter_values(
346 ranks_in_partition
347 .iter()
348 .scan(0_u64, |acc, range| {
349 let len = range.end - range.start;
350 let value = (*acc as f64) / (denominator - 1.0).max(1.0);
351 let result = iter::repeat_n(value, len);
352 *acc += len as u64;
353 Some(result)
354 })
355 .flatten(),
356 ))
357 }
358 };
359
360 Ok(result)
361 }
362
363 fn supports_bounded_execution(&self) -> bool {
364 matches!(self.rank_type, RankType::Basic | RankType::Dense)
365 }
366
367 fn include_rank(&self) -> bool {
368 true
369 }
370}
371
372#[cfg(test)]
373mod tests {
374 use super::*;
375 use datafusion_common::cast::{as_float64_array, as_uint64_array};
376
377 fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
378 test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
379 }
380
381 #[expect(clippy::single_range_in_vec_init)]
382 fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
383 test_i32_result(expr, vec![0..8], expected)
384 }
385
386 fn test_i32_result(
387 expr: &Rank,
388 ranks: Vec<Range<usize>>,
389 expected: Vec<u64>,
390 ) -> Result<()> {
391 let args = PartitionEvaluatorArgs::default();
392 let result = expr
393 .partition_evaluator(args)?
394 .evaluate_all_with_rank(8, &ranks)?;
395 let result = as_uint64_array(&result)?;
396 let result = result.values();
397 assert_eq!(expected, *result);
398 Ok(())
399 }
400
401 fn test_f64_result(
402 expr: &Rank,
403 num_rows: usize,
404 ranks: Vec<Range<usize>>,
405 expected: Vec<f64>,
406 ) -> Result<()> {
407 let args = PartitionEvaluatorArgs::default();
408 let result = expr
409 .partition_evaluator(args)?
410 .evaluate_all_with_rank(num_rows, &ranks)?;
411 let result = as_float64_array(&result)?;
412 let result = result.values();
413 assert_eq!(expected, *result);
414 Ok(())
415 }
416
417 #[test]
418 fn test_rank() -> Result<()> {
419 let r = Rank::basic();
420 test_without_rank(&r, vec![1; 8])?;
421 test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
422 Ok(())
423 }
424
425 #[test]
426 fn test_dense_rank() -> Result<()> {
427 let r = Rank::dense_rank();
428 test_without_rank(&r, vec![1; 8])?;
429 test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
430 Ok(())
431 }
432
433 #[test]
434 #[expect(clippy::single_range_in_vec_init)]
435 fn test_percent_rank() -> Result<()> {
436 let r = Rank::percent_rank();
437
438 let expected = vec![0.0; 0];
440 test_f64_result(&r, 0, vec![0..0; 0], expected)?;
441
442 let expected = vec![0.0];
444 test_f64_result(&r, 1, vec![0..1], expected)?;
445
446 let expected = vec![0.0; 7];
448 test_f64_result(&r, 7, vec![0..7], expected)?;
449
450 let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
452 test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
453
454 Ok(())
455 }
456}