datafusion_functions_window/
cume_dist.rs1use arrow::datatypes::FieldRef;
21use datafusion_common::arrow::array::{ArrayRef, Float64Array};
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_common::arrow::datatypes::Field;
24use datafusion_common::Result;
25use datafusion_expr::{
26 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
32use field::WindowUDFFieldArgs;
33use std::any::Any;
34use std::fmt::Debug;
35use std::iter;
36use std::ops::Range;
37use std::sync::Arc;
38
39define_udwf_and_expr!(
40 CumeDist,
41 cume_dist,
42 "Calculates the cumulative distribution of a value in a group of values."
43);
44
45#[user_doc(
47 doc_section(label = "Ranking Functions"),
48 description = "Relative rank of the current row: (number of rows preceding or peer with the current row) / (total rows).",
49 syntax_example = "cume_dist()",
50 sql_example = r#"
51```sql
52-- Example usage of the cume_dist window function:
53SELECT salary,
54 cume_dist() OVER (ORDER BY salary) AS cume_dist
55FROM employees;
56
57+--------+-----------+
58| salary | cume_dist |
59+--------+-----------+
60| 30000 | 0.33 |
61| 50000 | 0.67 |
62| 70000 | 1.00 |
63+--------+-----------+
64```
65"#
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct CumeDist {
69 signature: Signature,
70}
71
72impl CumeDist {
73 pub fn new() -> Self {
74 Self {
75 signature: Signature::nullary(Volatility::Immutable),
76 }
77 }
78}
79
80impl Default for CumeDist {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl WindowUDFImpl for CumeDist {
87 fn as_any(&self) -> &dyn Any {
89 self
90 }
91
92 fn name(&self) -> &str {
93 "cume_dist"
94 }
95
96 fn signature(&self) -> &Signature {
97 &self.signature
98 }
99
100 fn partition_evaluator(
101 &self,
102 _partition_evaluator_args: PartitionEvaluatorArgs,
103 ) -> Result<Box<dyn PartitionEvaluator>> {
104 Ok(Box::<CumeDistEvaluator>::default())
105 }
106
107 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
108 Ok(Field::new(field_args.name(), DataType::Float64, false).into())
109 }
110
111 fn documentation(&self) -> Option<&Documentation> {
112 self.doc()
113 }
114
115 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
116 LimitEffect::Unknown
117 }
118}
119
120#[derive(Debug, Default)]
121pub(crate) struct CumeDistEvaluator;
122
123impl PartitionEvaluator for CumeDistEvaluator {
124 fn evaluate_all_with_rank(
126 &self,
127 num_rows: usize,
128 ranks_in_partition: &[Range<usize>],
129 ) -> Result<ArrayRef> {
130 let scalar = num_rows as f64;
131 let result = Float64Array::from_iter_values(
132 ranks_in_partition
133 .iter()
134 .scan(0_u64, |acc, range| {
135 let len = range.end - range.start;
136 *acc += len as u64;
137 let value: f64 = (*acc as f64) / scalar;
138 let result = iter::repeat_n(value, len);
139 Some(result)
140 })
141 .flatten(),
142 );
143 Ok(Arc::new(result))
144 }
145
146 fn include_rank(&self) -> bool {
147 true
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use datafusion_common::cast::as_float64_array;
155
156 fn test_f64_result(
157 num_rows: usize,
158 ranks: Vec<Range<usize>>,
159 expected: Vec<f64>,
160 ) -> Result<()> {
161 let evaluator = CumeDistEvaluator;
162 let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
163 let result = as_float64_array(&result)?;
164 let result = result.values().to_vec();
165 assert_eq!(expected, result);
166 Ok(())
167 }
168
169 #[test]
170 #[allow(clippy::single_range_in_vec_init)]
171 fn test_cume_dist() -> Result<()> {
172 test_f64_result(0, vec![], vec![])?;
173
174 test_f64_result(1, vec![0..1], vec![1.0])?;
175
176 test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
177
178 test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
179
180 Ok(())
181 }
182}