1use arrow::datatypes::FieldRef;
22use datafusion_common::arrow::array::ArrayRef;
23use datafusion_common::arrow::array::{Float64Array, UInt64Array};
24use datafusion_common::arrow::compute::SortOptions;
25use datafusion_common::arrow::datatypes::DataType;
26use datafusion_common::arrow::datatypes::Field;
27use datafusion_common::utils::get_row_at_idx;
28use datafusion_common::{Result, ScalarValue, exec_err};
29use datafusion_doc::window_doc_sections::DOC_SECTION_RANKING;
30use datafusion_expr::{
31 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
32};
33use datafusion_functions_window_common::field;
34use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
35use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
36use field::WindowUDFFieldArgs;
37use std::any::Any;
38use std::fmt::Debug;
39use std::hash::Hash;
40use std::iter;
41use std::ops::Range;
42use std::sync::{Arc, LazyLock};
43
44define_udwf_and_expr!(
45 Rank,
46 rank,
47 "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
48 Rank::basic
49);
50
51define_udwf_and_expr!(
52 DenseRank,
53 dense_rank,
54 "Returns rank of the current row without gaps. This function counts peer groups",
55 Rank::dense_rank
56);
57
58define_udwf_and_expr!(
59 PercentRank,
60 percent_rank,
61 "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
62 Rank::percent_rank
63);
64
65#[derive(Debug, PartialEq, Eq, Hash)]
67pub struct Rank {
68 name: String,
69 signature: Signature,
70 rank_type: RankType,
71}
72
73impl Rank {
74 pub fn new(name: String, rank_type: RankType) -> Self {
76 Self {
77 name,
78 signature: Signature::nullary(Volatility::Immutable),
79 rank_type,
80 }
81 }
82
83 pub fn basic() -> Self {
85 Rank::new("rank".to_string(), RankType::Basic)
86 }
87
88 pub fn dense_rank() -> Self {
90 Rank::new("dense_rank".to_string(), RankType::Dense)
91 }
92
93 pub fn percent_rank() -> Self {
95 Rank::new("percent_rank".to_string(), RankType::Percent)
96 }
97}
98
99#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
100pub enum RankType {
101 Basic,
102 Dense,
103 Percent,
104}
105
106static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
107 Documentation::builder(
108 DOC_SECTION_RANKING,
109 "Returns the rank of the current row within its partition, allowing \
110 gaps between ranks. This function provides a ranking similar to `row_number`, but \
111 skips ranks for identical values.",
112
113 "rank()")
114 .with_sql_example(r#"
115```sql
116-- Example usage of the rank window function:
117SELECT department,
118 salary,
119 rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
120FROM employees;
121
122+-------------+--------+------+
123| department | salary | rank |
124+-------------+--------+------+
125| Sales | 70000 | 1 |
126| Sales | 50000 | 2 |
127| Sales | 50000 | 2 |
128| Sales | 30000 | 4 |
129| Engineering | 90000 | 1 |
130| Engineering | 80000 | 2 |
131+-------------+--------+------+
132```
133"#)
134 .build()
135});
136
137fn get_rank_doc() -> &'static Documentation {
138 &RANK_DOCUMENTATION
139}
140
141static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
142 Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
143 rows in a dense manner, meaning consecutive ranks are assigned even for identical \
144 values.", "dense_rank()")
145 .with_sql_example(r#"
146```sql
147-- Example usage of the dense_rank window function:
148SELECT department,
149 salary,
150 dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
151FROM employees;
152
153+-------------+--------+------------+
154| department | salary | dense_rank |
155+-------------+--------+------------+
156| Sales | 70000 | 1 |
157| Sales | 50000 | 2 |
158| Sales | 50000 | 2 |
159| Sales | 30000 | 3 |
160| Engineering | 90000 | 1 |
161| Engineering | 80000 | 2 |
162+-------------+--------+------------+
163```"#)
164 .build()
165});
166
167fn get_dense_rank_doc() -> &'static Documentation {
168 &DENSE_RANK_DOCUMENTATION
169}
170
171static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
172 Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
173 The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
174 .with_sql_example(r#"```sql
175 -- Example usage of the percent_rank window function:
176SELECT employee_id,
177 salary,
178 percent_rank() OVER (ORDER BY salary) AS percent_rank
179FROM employees;
180
181+-------------+--------+---------------+
182| employee_id | salary | percent_rank |
183+-------------+--------+---------------+
184| 1 | 30000 | 0.00 |
185| 2 | 50000 | 0.50 |
186| 3 | 70000 | 1.00 |
187+-------------+--------+---------------+
188```"#)
189 .build()
190});
191
192fn get_percent_rank_doc() -> &'static Documentation {
193 &PERCENT_RANK_DOCUMENTATION
194}
195
196impl WindowUDFImpl for Rank {
197 fn as_any(&self) -> &dyn Any {
198 self
199 }
200
201 fn name(&self) -> &str {
202 &self.name
203 }
204
205 fn signature(&self) -> &Signature {
206 &self.signature
207 }
208
209 fn partition_evaluator(
210 &self,
211 _partition_evaluator_args: PartitionEvaluatorArgs,
212 ) -> Result<Box<dyn PartitionEvaluator>> {
213 Ok(Box::new(RankEvaluator {
214 state: RankState::default(),
215 rank_type: self.rank_type,
216 }))
217 }
218
219 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
220 let return_type = match self.rank_type {
221 RankType::Basic | RankType::Dense => DataType::UInt64,
222 RankType::Percent => DataType::Float64,
223 };
224
225 let nullable = false;
226 Ok(Field::new(field_args.name(), return_type, nullable).into())
227 }
228
229 fn sort_options(&self) -> Option<SortOptions> {
230 Some(SortOptions {
231 descending: false,
232 nulls_first: false,
233 })
234 }
235
236 fn documentation(&self) -> Option<&Documentation> {
237 match self.rank_type {
238 RankType::Basic => Some(get_rank_doc()),
239 RankType::Dense => Some(get_dense_rank_doc()),
240 RankType::Percent => Some(get_percent_rank_doc()),
241 }
242 }
243
244 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
245 match self.rank_type {
246 RankType::Basic => LimitEffect::None,
247 RankType::Dense => LimitEffect::None,
248 RankType::Percent => LimitEffect::Unknown,
249 }
250 }
251}
252
253#[derive(Debug, Clone, Default)]
255pub struct RankState {
256 pub last_rank_data: Option<Vec<ScalarValue>>,
258 pub last_rank_boundary: usize,
260 pub current_group_count: usize,
262 pub n_rank: usize,
264}
265
266#[derive(Debug)]
268struct RankEvaluator {
269 state: RankState,
270 rank_type: RankType,
271}
272
273impl PartitionEvaluator for RankEvaluator {
274 fn is_causal(&self) -> bool {
275 matches!(self.rank_type, RankType::Basic | RankType::Dense)
276 }
277
278 fn evaluate(
279 &mut self,
280 values: &[ArrayRef],
281 range: &Range<usize>,
282 ) -> Result<ScalarValue> {
283 let row_idx = range.start;
284 let range_columns = values;
286 let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
287 let new_rank_encountered =
288 if let Some(state_last_rank_data) = &self.state.last_rank_data {
289 state_last_rank_data != &last_rank_data
291 } else {
292 true
294 };
295 if new_rank_encountered {
296 self.state.last_rank_data = Some(last_rank_data);
297 self.state.last_rank_boundary += self.state.current_group_count;
298 self.state.current_group_count = 1;
299 self.state.n_rank += 1;
300 } else {
301 self.state.current_group_count += 1;
303 }
304
305 match self.rank_type {
306 RankType::Basic => Ok(ScalarValue::UInt64(Some(
307 self.state.last_rank_boundary as u64 + 1,
308 ))),
309 RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
310 RankType::Percent => {
311 exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
312 }
313 }
314 }
315
316 fn evaluate_all_with_rank(
317 &self,
318 num_rows: usize,
319 ranks_in_partition: &[Range<usize>],
320 ) -> Result<ArrayRef> {
321 let result: ArrayRef = match self.rank_type {
322 RankType::Basic => Arc::new(UInt64Array::from_iter_values(
323 ranks_in_partition
324 .iter()
325 .scan(1_u64, |acc, range| {
326 let len = range.end - range.start;
327 let result = iter::repeat_n(*acc, len);
328 *acc += len as u64;
329 Some(result)
330 })
331 .flatten(),
332 )),
333
334 RankType::Dense => Arc::new(UInt64Array::from_iter_values(
335 ranks_in_partition
336 .iter()
337 .zip(1u64..)
338 .flat_map(|(range, rank)| {
339 let len = range.end - range.start;
340 iter::repeat_n(rank, len)
341 }),
342 )),
343
344 RankType::Percent => {
345 let denominator = num_rows as f64;
346
347 Arc::new(Float64Array::from_iter_values(
348 ranks_in_partition
349 .iter()
350 .scan(0_u64, |acc, range| {
351 let len = range.end - range.start;
352 let value = (*acc as f64) / (denominator - 1.0).max(1.0);
353 let result = iter::repeat_n(value, len);
354 *acc += len as u64;
355 Some(result)
356 })
357 .flatten(),
358 ))
359 }
360 };
361
362 Ok(result)
363 }
364
365 fn supports_bounded_execution(&self) -> bool {
366 matches!(self.rank_type, RankType::Basic | RankType::Dense)
367 }
368
369 fn include_rank(&self) -> bool {
370 true
371 }
372}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377 use datafusion_common::cast::{as_float64_array, as_uint64_array};
378
379 fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
380 test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
381 }
382
383 #[expect(clippy::single_range_in_vec_init)]
384 fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
385 test_i32_result(expr, vec![0..8], expected)
386 }
387
388 fn test_i32_result(
389 expr: &Rank,
390 ranks: Vec<Range<usize>>,
391 expected: Vec<u64>,
392 ) -> Result<()> {
393 let args = PartitionEvaluatorArgs::default();
394 let result = expr
395 .partition_evaluator(args)?
396 .evaluate_all_with_rank(8, &ranks)?;
397 let result = as_uint64_array(&result)?;
398 let result = result.values();
399 assert_eq!(expected, *result);
400 Ok(())
401 }
402
403 fn test_f64_result(
404 expr: &Rank,
405 num_rows: usize,
406 ranks: Vec<Range<usize>>,
407 expected: Vec<f64>,
408 ) -> Result<()> {
409 let args = PartitionEvaluatorArgs::default();
410 let result = expr
411 .partition_evaluator(args)?
412 .evaluate_all_with_rank(num_rows, &ranks)?;
413 let result = as_float64_array(&result)?;
414 let result = result.values();
415 assert_eq!(expected, *result);
416 Ok(())
417 }
418
419 #[test]
420 fn test_rank() -> Result<()> {
421 let r = Rank::basic();
422 test_without_rank(&r, vec![1; 8])?;
423 test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
424 Ok(())
425 }
426
427 #[test]
428 fn test_dense_rank() -> Result<()> {
429 let r = Rank::dense_rank();
430 test_without_rank(&r, vec![1; 8])?;
431 test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
432 Ok(())
433 }
434
435 #[test]
436 #[expect(clippy::single_range_in_vec_init)]
437 fn test_percent_rank() -> Result<()> {
438 let r = Rank::percent_rank();
439
440 let expected = vec![0.0; 0];
442 test_f64_result(&r, 0, vec![0..0; 0], expected)?;
443
444 let expected = vec![0.0];
446 test_f64_result(&r, 1, vec![0..1], expected)?;
447
448 let expected = vec![0.0; 7];
450 test_f64_result(&r, 7, vec![0..7], expected)?;
451
452 let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
454 test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
455
456 Ok(())
457 }
458}