datafusion_functions_window/
cume_dist.rs1use arrow::datatypes::FieldRef;
21use datafusion_common::arrow::array::{ArrayRef, Float64Array};
22use datafusion_common::arrow::datatypes::DataType;
23use datafusion_common::arrow::datatypes::Field;
24use datafusion_common::Result;
25use datafusion_expr::{
26 Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use field::WindowUDFFieldArgs;
32use std::any::Any;
33use std::fmt::Debug;
34use std::iter;
35use std::ops::Range;
36use std::sync::Arc;
37
38define_udwf_and_expr!(
39 CumeDist,
40 cume_dist,
41 "Calculates the cumulative distribution of a value in a group of values."
42);
43
44#[user_doc(
46 doc_section(label = "Ranking Functions"),
47 description = "Relative rank of the current row: (number of rows preceding or peer with the current row) / (total rows).",
48 syntax_example = "cume_dist()",
49 sql_example = r#"```sql
50 --Example usage of the cume_dist window function:
51 SELECT salary,
52 cume_dist() OVER (ORDER BY salary) AS cume_dist
53 FROM employees;
54```
55```sql
56+--------+-----------+
57| salary | cume_dist |
58+--------+-----------+
59| 30000 | 0.33 |
60| 50000 | 0.67 |
61| 70000 | 1.00 |
62+--------+-----------+
63```"#
64)]
65#[derive(Debug)]
66pub struct CumeDist {
67 signature: Signature,
68}
69
70impl CumeDist {
71 pub fn new() -> Self {
72 Self {
73 signature: Signature::nullary(Volatility::Immutable),
74 }
75 }
76}
77
78impl Default for CumeDist {
79 fn default() -> Self {
80 Self::new()
81 }
82}
83
84impl WindowUDFImpl for CumeDist {
85 fn as_any(&self) -> &dyn Any {
87 self
88 }
89
90 fn name(&self) -> &str {
91 "cume_dist"
92 }
93
94 fn signature(&self) -> &Signature {
95 &self.signature
96 }
97
98 fn partition_evaluator(
99 &self,
100 _partition_evaluator_args: PartitionEvaluatorArgs,
101 ) -> Result<Box<dyn PartitionEvaluator>> {
102 Ok(Box::<CumeDistEvaluator>::default())
103 }
104
105 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
106 Ok(Field::new(field_args.name(), DataType::Float64, false).into())
107 }
108
109 fn documentation(&self) -> Option<&Documentation> {
110 self.doc()
111 }
112}
113
114#[derive(Debug, Default)]
115pub(crate) struct CumeDistEvaluator;
116
117impl PartitionEvaluator for CumeDistEvaluator {
118 fn evaluate_all_with_rank(
120 &self,
121 num_rows: usize,
122 ranks_in_partition: &[Range<usize>],
123 ) -> Result<ArrayRef> {
124 let scalar = num_rows as f64;
125 let result = Float64Array::from_iter_values(
126 ranks_in_partition
127 .iter()
128 .scan(0_u64, |acc, range| {
129 let len = range.end - range.start;
130 *acc += len as u64;
131 let value: f64 = (*acc as f64) / scalar;
132 let result = iter::repeat_n(value, len);
133 Some(result)
134 })
135 .flatten(),
136 );
137 Ok(Arc::new(result))
138 }
139
140 fn include_rank(&self) -> bool {
141 true
142 }
143}
144
145#[cfg(test)]
146mod tests {
147 use super::*;
148 use datafusion_common::cast::as_float64_array;
149
150 fn test_f64_result(
151 num_rows: usize,
152 ranks: Vec<Range<usize>>,
153 expected: Vec<f64>,
154 ) -> Result<()> {
155 let evaluator = CumeDistEvaluator;
156 let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
157 let result = as_float64_array(&result)?;
158 let result = result.values().to_vec();
159 assert_eq!(expected, result);
160 Ok(())
161 }
162
163 #[test]
164 #[allow(clippy::single_range_in_vec_init)]
165 fn test_cume_dist() -> Result<()> {
166 test_f64_result(0, vec![], vec![])?;
167
168 test_f64_result(1, vec![0..1], vec![1.0])?;
169
170 test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
171
172 test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
173
174 Ok(())
175 }
176}