1use crate::define_udwf_and_expr;
22use arrow::datatypes::FieldRef;
23use datafusion_common::arrow::array::ArrayRef;
24use datafusion_common::arrow::array::{Float64Array, UInt64Array};
25use datafusion_common::arrow::compute::SortOptions;
26use datafusion_common::arrow::datatypes::DataType;
27use datafusion_common::arrow::datatypes::Field;
28use datafusion_common::utils::get_row_at_idx;
29use datafusion_common::{exec_err, Result, ScalarValue};
30use datafusion_expr::window_doc_sections::DOC_SECTION_RANKING;
31use datafusion_expr::{
32 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
33};
34use datafusion_functions_window_common::field;
35use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
36use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
37use field::WindowUDFFieldArgs;
38use std::any::Any;
39use std::fmt::Debug;
40use std::hash::Hash;
41use std::iter;
42use std::ops::Range;
43use std::sync::{Arc, LazyLock};
44
45define_udwf_and_expr!(
46 Rank,
47 rank,
48 "Returns rank of the current row with gaps. Same as `row_number` of its first peer",
49 Rank::basic
50);
51
52define_udwf_and_expr!(
53 DenseRank,
54 dense_rank,
55 "Returns rank of the current row without gaps. This function counts peer groups",
56 Rank::dense_rank
57);
58
59define_udwf_and_expr!(
60 PercentRank,
61 percent_rank,
62 "Returns the relative rank of the current row: (rank - 1) / (total rows - 1)",
63 Rank::percent_rank
64);
65
66#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct Rank {
69 name: String,
70 signature: Signature,
71 rank_type: RankType,
72}
73
74impl Rank {
75 pub fn new(name: String, rank_type: RankType) -> Self {
77 Self {
78 name,
79 signature: Signature::nullary(Volatility::Immutable),
80 rank_type,
81 }
82 }
83
84 pub fn basic() -> Self {
86 Rank::new("rank".to_string(), RankType::Basic)
87 }
88
89 pub fn dense_rank() -> Self {
91 Rank::new("dense_rank".to_string(), RankType::Dense)
92 }
93
94 pub fn percent_rank() -> Self {
96 Rank::new("percent_rank".to_string(), RankType::Percent)
97 }
98}
99
100#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
101pub enum RankType {
102 Basic,
103 Dense,
104 Percent,
105}
106
107static RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
108 Documentation::builder(
109 DOC_SECTION_RANKING,
110 "Returns the rank of the current row within its partition, allowing \
111 gaps between ranks. This function provides a ranking similar to `row_number`, but \
112 skips ranks for identical values.",
113
114 "rank()")
115 .with_sql_example(r#"
116```sql
117-- Example usage of the rank window function:
118SELECT department,
119 salary,
120 rank() OVER (PARTITION BY department ORDER BY salary DESC) AS rank
121FROM employees;
122
123+-------------+--------+------+
124| department | salary | rank |
125+-------------+--------+------+
126| Sales | 70000 | 1 |
127| Sales | 50000 | 2 |
128| Sales | 50000 | 2 |
129| Sales | 30000 | 4 |
130| Engineering | 90000 | 1 |
131| Engineering | 80000 | 2 |
132+-------------+--------+------+
133```
134"#)
135 .build()
136});
137
138fn get_rank_doc() -> &'static Documentation {
139 &RANK_DOCUMENTATION
140}
141
142static DENSE_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
143 Documentation::builder(DOC_SECTION_RANKING, "Returns the rank of the current row without gaps. This function ranks \
144 rows in a dense manner, meaning consecutive ranks are assigned even for identical \
145 values.", "dense_rank()")
146 .with_sql_example(r#"
147```sql
148-- Example usage of the dense_rank window function:
149SELECT department,
150 salary,
151 dense_rank() OVER (PARTITION BY department ORDER BY salary DESC) AS dense_rank
152FROM employees;
153
154+-------------+--------+------------+
155| department | salary | dense_rank |
156+-------------+--------+------------+
157| Sales | 70000 | 1 |
158| Sales | 50000 | 2 |
159| Sales | 50000 | 2 |
160| Sales | 30000 | 3 |
161| Engineering | 90000 | 1 |
162| Engineering | 80000 | 2 |
163+-------------+--------+------------+
164```"#)
165 .build()
166});
167
168fn get_dense_rank_doc() -> &'static Documentation {
169 &DENSE_RANK_DOCUMENTATION
170}
171
172static PERCENT_RANK_DOCUMENTATION: LazyLock<Documentation> = LazyLock::new(|| {
173 Documentation::builder(DOC_SECTION_RANKING, "Returns the percentage rank of the current row within its partition. \
174 The value ranges from 0 to 1 and is computed as `(rank - 1) / (total_rows - 1)`.", "percent_rank()")
175 .with_sql_example(r#"```sql
176 -- Example usage of the percent_rank window function:
177SELECT employee_id,
178 salary,
179 percent_rank() OVER (ORDER BY salary) AS percent_rank
180FROM employees;
181
182+-------------+--------+---------------+
183| employee_id | salary | percent_rank |
184+-------------+--------+---------------+
185| 1 | 30000 | 0.00 |
186| 2 | 50000 | 0.50 |
187| 3 | 70000 | 1.00 |
188+-------------+--------+---------------+
189```"#)
190 .build()
191});
192
193fn get_percent_rank_doc() -> &'static Documentation {
194 &PERCENT_RANK_DOCUMENTATION
195}
196
197impl WindowUDFImpl for Rank {
198 fn as_any(&self) -> &dyn Any {
199 self
200 }
201
202 fn name(&self) -> &str {
203 &self.name
204 }
205
206 fn signature(&self) -> &Signature {
207 &self.signature
208 }
209
210 fn partition_evaluator(
211 &self,
212 _partition_evaluator_args: PartitionEvaluatorArgs,
213 ) -> Result<Box<dyn PartitionEvaluator>> {
214 Ok(Box::new(RankEvaluator {
215 state: RankState::default(),
216 rank_type: self.rank_type,
217 }))
218 }
219
220 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
221 let return_type = match self.rank_type {
222 RankType::Basic | RankType::Dense => DataType::UInt64,
223 RankType::Percent => DataType::Float64,
224 };
225
226 let nullable = false;
227 Ok(Field::new(field_args.name(), return_type, nullable).into())
228 }
229
230 fn sort_options(&self) -> Option<SortOptions> {
231 Some(SortOptions {
232 descending: false,
233 nulls_first: false,
234 })
235 }
236
237 fn documentation(&self) -> Option<&Documentation> {
238 match self.rank_type {
239 RankType::Basic => Some(get_rank_doc()),
240 RankType::Dense => Some(get_dense_rank_doc()),
241 RankType::Percent => Some(get_percent_rank_doc()),
242 }
243 }
244
245 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
246 match self.rank_type {
247 RankType::Basic => LimitEffect::None,
248 RankType::Dense => LimitEffect::None,
249 RankType::Percent => LimitEffect::Unknown,
250 }
251 }
252}
253
254#[derive(Debug, Clone, Default)]
256pub struct RankState {
257 pub last_rank_data: Option<Vec<ScalarValue>>,
259 pub last_rank_boundary: usize,
261 pub current_group_count: usize,
263 pub n_rank: usize,
265}
266
267#[derive(Debug)]
269struct RankEvaluator {
270 state: RankState,
271 rank_type: RankType,
272}
273
274impl PartitionEvaluator for RankEvaluator {
275 fn is_causal(&self) -> bool {
276 matches!(self.rank_type, RankType::Basic | RankType::Dense)
277 }
278
279 fn evaluate(
280 &mut self,
281 values: &[ArrayRef],
282 range: &Range<usize>,
283 ) -> Result<ScalarValue> {
284 let row_idx = range.start;
285 let range_columns = values;
287 let last_rank_data = get_row_at_idx(range_columns, row_idx)?;
288 let new_rank_encountered =
289 if let Some(state_last_rank_data) = &self.state.last_rank_data {
290 state_last_rank_data != &last_rank_data
292 } else {
293 true
295 };
296 if new_rank_encountered {
297 self.state.last_rank_data = Some(last_rank_data);
298 self.state.last_rank_boundary += self.state.current_group_count;
299 self.state.current_group_count = 1;
300 self.state.n_rank += 1;
301 } else {
302 self.state.current_group_count += 1;
304 }
305
306 match self.rank_type {
307 RankType::Basic => Ok(ScalarValue::UInt64(Some(
308 self.state.last_rank_boundary as u64 + 1,
309 ))),
310 RankType::Dense => Ok(ScalarValue::UInt64(Some(self.state.n_rank as u64))),
311 RankType::Percent => {
312 exec_err!("Can not execute PERCENT_RANK in a streaming fashion")
313 }
314 }
315 }
316
317 fn evaluate_all_with_rank(
318 &self,
319 num_rows: usize,
320 ranks_in_partition: &[Range<usize>],
321 ) -> Result<ArrayRef> {
322 let result: ArrayRef = match self.rank_type {
323 RankType::Basic => Arc::new(UInt64Array::from_iter_values(
324 ranks_in_partition
325 .iter()
326 .scan(1_u64, |acc, range| {
327 let len = range.end - range.start;
328 let result = iter::repeat_n(*acc, len);
329 *acc += len as u64;
330 Some(result)
331 })
332 .flatten(),
333 )),
334
335 RankType::Dense => Arc::new(UInt64Array::from_iter_values(
336 ranks_in_partition
337 .iter()
338 .zip(1u64..)
339 .flat_map(|(range, rank)| {
340 let len = range.end - range.start;
341 iter::repeat_n(rank, len)
342 }),
343 )),
344
345 RankType::Percent => {
346 let denominator = num_rows as f64;
347
348 Arc::new(Float64Array::from_iter_values(
349 ranks_in_partition
350 .iter()
351 .scan(0_u64, |acc, range| {
352 let len = range.end - range.start;
353 let value = (*acc as f64) / (denominator - 1.0).max(1.0);
354 let result = iter::repeat_n(value, len);
355 *acc += len as u64;
356 Some(result)
357 })
358 .flatten(),
359 ))
360 }
361 };
362
363 Ok(result)
364 }
365
366 fn supports_bounded_execution(&self) -> bool {
367 matches!(self.rank_type, RankType::Basic | RankType::Dense)
368 }
369
370 fn include_rank(&self) -> bool {
371 true
372 }
373}
374
375#[cfg(test)]
376mod tests {
377 use super::*;
378 use datafusion_common::cast::{as_float64_array, as_uint64_array};
379
380 fn test_with_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
381 test_i32_result(expr, vec![0..2, 2..3, 3..6, 6..7, 7..8], expected)
382 }
383
384 #[allow(clippy::single_range_in_vec_init)]
385 fn test_without_rank(expr: &Rank, expected: Vec<u64>) -> Result<()> {
386 test_i32_result(expr, vec![0..8], expected)
387 }
388
389 fn test_i32_result(
390 expr: &Rank,
391 ranks: Vec<Range<usize>>,
392 expected: Vec<u64>,
393 ) -> Result<()> {
394 let args = PartitionEvaluatorArgs::default();
395 let result = expr
396 .partition_evaluator(args)?
397 .evaluate_all_with_rank(8, &ranks)?;
398 let result = as_uint64_array(&result)?;
399 let result = result.values();
400 assert_eq!(expected, *result);
401 Ok(())
402 }
403
404 fn test_f64_result(
405 expr: &Rank,
406 num_rows: usize,
407 ranks: Vec<Range<usize>>,
408 expected: Vec<f64>,
409 ) -> Result<()> {
410 let args = PartitionEvaluatorArgs::default();
411 let result = expr
412 .partition_evaluator(args)?
413 .evaluate_all_with_rank(num_rows, &ranks)?;
414 let result = as_float64_array(&result)?;
415 let result = result.values();
416 assert_eq!(expected, *result);
417 Ok(())
418 }
419
420 #[test]
421 fn test_rank() -> Result<()> {
422 let r = Rank::basic();
423 test_without_rank(&r, vec![1; 8])?;
424 test_with_rank(&r, vec![1, 1, 3, 4, 4, 4, 7, 8])?;
425 Ok(())
426 }
427
428 #[test]
429 fn test_dense_rank() -> Result<()> {
430 let r = Rank::dense_rank();
431 test_without_rank(&r, vec![1; 8])?;
432 test_with_rank(&r, vec![1, 1, 2, 3, 3, 3, 4, 5])?;
433 Ok(())
434 }
435
436 #[test]
437 #[allow(clippy::single_range_in_vec_init)]
438 fn test_percent_rank() -> Result<()> {
439 let r = Rank::percent_rank();
440
441 let expected = vec![0.0; 0];
443 test_f64_result(&r, 0, vec![0..0; 0], expected)?;
444
445 let expected = vec![0.0];
447 test_f64_result(&r, 1, vec![0..1], expected)?;
448
449 let expected = vec![0.0; 7];
451 test_f64_result(&r, 7, vec![0..7], expected)?;
452
453 let expected = vec![0.0, 0.0, 0.0, 0.5, 0.5, 0.5, 0.5];
455 test_f64_result(&r, 7, vec![0..3, 3..7], expected)?;
456
457 Ok(())
458 }
459}