datafusion_functions_window/
cume_dist.rs1use arrow::datatypes::FieldRef;
21use datafusion_common::arrow::array::{ArrayRef, Float64Array};
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_common::arrow::datatypes::Field;
24use datafusion_common::Result;
25use datafusion_expr::{
26 Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use field::WindowUDFFieldArgs;
32use std::any::Any;
33use std::fmt::Debug;
34use std::iter;
35use std::ops::Range;
36use std::sync::Arc;
37
38define_udwf_and_expr!(
39 CumeDist,
40 cume_dist,
41 "Calculates the cumulative distribution of a value in a group of values."
42);
43
44#[user_doc(
46 doc_section(label = "Ranking Functions"),
47 description = "Relative rank of the current row: (number of rows preceding or peer with the current row) / (total rows).",
48 syntax_example = "cume_dist()",
49 sql_example = r#"
50```sql
51-- Example usage of the cume_dist window function:
52SELECT salary,
53 cume_dist() OVER (ORDER BY salary) AS cume_dist
54FROM employees;
55
56+--------+-----------+
57| salary | cume_dist |
58+--------+-----------+
59| 30000 | 0.33 |
60| 50000 | 0.67 |
61| 70000 | 1.00 |
62+--------+-----------+
63```
64"#
65)]
66#[derive(Debug, PartialEq, Eq, Hash)]
67pub struct CumeDist {
68 signature: Signature,
69}
70
71impl CumeDist {
72 pub fn new() -> Self {
73 Self {
74 signature: Signature::nullary(Volatility::Immutable),
75 }
76 }
77}
78
79impl Default for CumeDist {
80 fn default() -> Self {
81 Self::new()
82 }
83}
84
85impl WindowUDFImpl for CumeDist {
86 fn as_any(&self) -> &dyn Any {
88 self
89 }
90
91 fn name(&self) -> &str {
92 "cume_dist"
93 }
94
95 fn signature(&self) -> &Signature {
96 &self.signature
97 }
98
99 fn partition_evaluator(
100 &self,
101 _partition_evaluator_args: PartitionEvaluatorArgs,
102 ) -> Result<Box<dyn PartitionEvaluator>> {
103 Ok(Box::<CumeDistEvaluator>::default())
104 }
105
106 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
107 Ok(Field::new(field_args.name(), DataType::Float64, false).into())
108 }
109
110 fn documentation(&self) -> Option<&Documentation> {
111 self.doc()
112 }
113}
114
115#[derive(Debug, Default)]
116pub(crate) struct CumeDistEvaluator;
117
118impl PartitionEvaluator for CumeDistEvaluator {
119 fn evaluate_all_with_rank(
121 &self,
122 num_rows: usize,
123 ranks_in_partition: &[Range<usize>],
124 ) -> Result<ArrayRef> {
125 let scalar = num_rows as f64;
126 let result = Float64Array::from_iter_values(
127 ranks_in_partition
128 .iter()
129 .scan(0_u64, |acc, range| {
130 let len = range.end - range.start;
131 *acc += len as u64;
132 let value: f64 = (*acc as f64) / scalar;
133 let result = iter::repeat_n(value, len);
134 Some(result)
135 })
136 .flatten(),
137 );
138 Ok(Arc::new(result))
139 }
140
141 fn include_rank(&self) -> bool {
142 true
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use datafusion_common::cast::as_float64_array;
150
151 fn test_f64_result(
152 num_rows: usize,
153 ranks: Vec<Range<usize>>,
154 expected: Vec<f64>,
155 ) -> Result<()> {
156 let evaluator = CumeDistEvaluator;
157 let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
158 let result = as_float64_array(&result)?;
159 let result = result.values().to_vec();
160 assert_eq!(expected, result);
161 Ok(())
162 }
163
164 #[test]
165 #[allow(clippy::single_range_in_vec_init)]
166 fn test_cume_dist() -> Result<()> {
167 test_f64_result(0, vec![], vec![])?;
168
169 test_f64_result(1, vec![0..1], vec![1.0])?;
170
171 test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
172
173 test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
174
175 Ok(())
176 }
177}