datafusion_functions_window/
cume_dist.rs1use arrow::datatypes::FieldRef;
21use datafusion_common::Result;
22use datafusion_common::arrow::array::{ArrayRef, Float64Array};
23use datafusion_common::arrow::datatypes::DataType;
24use datafusion_common::arrow::datatypes::Field;
25use datafusion_expr::{
26 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
32use field::WindowUDFFieldArgs;
33use std::fmt::Debug;
34use std::iter;
35use std::ops::Range;
36use std::sync::Arc;
37
38define_udwf_and_expr!(
39 CumeDist,
40 cume_dist,
41 cume_dist_udwf,
42 "Calculates the cumulative distribution of a value in a group of values."
43);
44
45#[user_doc(
47 doc_section(label = "Ranking Functions"),
48 description = "Relative rank of the current row: (number of rows preceding or peer with the current row) / (total rows).",
49 syntax_example = "cume_dist()",
50 sql_example = r#"
51```sql
52-- Example usage of the cume_dist window function:
53SELECT salary,
54 cume_dist() OVER (ORDER BY salary) AS cume_dist
55FROM employees;
56
57+--------+-----------+
58| salary | cume_dist |
59+--------+-----------+
60| 30000 | 0.33 |
61| 50000 | 0.67 |
62| 70000 | 1.00 |
63+--------+-----------+
64```
65"#
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct CumeDist {
69 signature: Signature,
70}
71
72impl CumeDist {
73 pub fn new() -> Self {
74 Self {
75 signature: Signature::nullary(Volatility::Immutable),
76 }
77 }
78}
79
80impl Default for CumeDist {
81 fn default() -> Self {
82 Self::new()
83 }
84}
85
86impl WindowUDFImpl for CumeDist {
87 fn name(&self) -> &str {
88 "cume_dist"
89 }
90
91 fn signature(&self) -> &Signature {
92 &self.signature
93 }
94
95 fn partition_evaluator(
96 &self,
97 _partition_evaluator_args: PartitionEvaluatorArgs,
98 ) -> Result<Box<dyn PartitionEvaluator>> {
99 Ok(Box::<CumeDistEvaluator>::default())
100 }
101
102 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
103 Ok(Field::new(field_args.name(), DataType::Float64, false).into())
104 }
105
106 fn documentation(&self) -> Option<&Documentation> {
107 self.doc()
108 }
109
110 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
111 LimitEffect::Unknown
112 }
113}
114
115#[derive(Debug, Default)]
116pub(crate) struct CumeDistEvaluator;
117
118impl PartitionEvaluator for CumeDistEvaluator {
119 fn evaluate_all_with_rank(
121 &self,
122 num_rows: usize,
123 ranks_in_partition: &[Range<usize>],
124 ) -> Result<ArrayRef> {
125 let scalar = num_rows as f64;
126 let result = Float64Array::from_iter_values(
127 ranks_in_partition
128 .iter()
129 .scan(0_u64, |acc, range| {
130 let len = range.end - range.start;
131 *acc += len as u64;
132 let value: f64 = (*acc as f64) / scalar;
133 let result = iter::repeat_n(value, len);
134 Some(result)
135 })
136 .flatten(),
137 );
138 Ok(Arc::new(result))
139 }
140
141 fn include_rank(&self) -> bool {
142 true
143 }
144}
145
146#[cfg(test)]
147mod tests {
148 use super::*;
149 use datafusion_common::cast::as_float64_array;
150
151 fn test_f64_result(
152 num_rows: usize,
153 ranks: Vec<Range<usize>>,
154 expected: Vec<f64>,
155 ) -> Result<()> {
156 let evaluator = CumeDistEvaluator;
157 let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
158 let result = as_float64_array(&result)?;
159 let result = result.values().to_vec();
160 assert_eq!(expected, result);
161 Ok(())
162 }
163
164 #[test]
165 #[expect(clippy::single_range_in_vec_init)]
166 fn test_cume_dist() -> Result<()> {
167 test_f64_result(0, vec![], vec![])?;
168
169 test_f64_result(1, vec![0..1], vec![1.0])?;
170
171 test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
172
173 test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
174
175 Ok(())
176 }
177}