datafusion_functions_window/
cume_dist.rs1use datafusion_common::arrow::array::{ArrayRef, Float64Array};
21use datafusion_common::arrow::datatypes::DataType;
22use datafusion_common::arrow::datatypes::Field;
23use datafusion_common::Result;
24use datafusion_expr::{
25 Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
26};
27use datafusion_functions_window_common::field;
28use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
29use datafusion_macros::user_doc;
30use field::WindowUDFFieldArgs;
31use std::any::Any;
32use std::fmt::Debug;
33use std::iter;
34use std::ops::Range;
35use std::sync::Arc;
36
37define_udwf_and_expr!(
38 CumeDist,
39 cume_dist,
40 "Calculates the cumulative distribution of a value in a group of values."
41);
42
43#[user_doc(
45 doc_section(label = "Ranking Functions"),
46 description = "Relative rank of the current row: (number of rows preceding or peer with current row) / (total rows).",
47 syntax_example = "cume_dist()"
48)]
49#[derive(Debug)]
50pub struct CumeDist {
51 signature: Signature,
52}
53
54impl CumeDist {
55 pub fn new() -> Self {
56 Self {
57 signature: Signature::nullary(Volatility::Immutable),
58 }
59 }
60}
61
62impl Default for CumeDist {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68impl WindowUDFImpl for CumeDist {
69 fn as_any(&self) -> &dyn Any {
71 self
72 }
73
74 fn name(&self) -> &str {
75 "cume_dist"
76 }
77
78 fn signature(&self) -> &Signature {
79 &self.signature
80 }
81
82 fn partition_evaluator(
83 &self,
84 _partition_evaluator_args: PartitionEvaluatorArgs,
85 ) -> Result<Box<dyn PartitionEvaluator>> {
86 Ok(Box::<CumeDistEvaluator>::default())
87 }
88
89 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
90 Ok(Field::new(field_args.name(), DataType::Float64, false))
91 }
92
93 fn documentation(&self) -> Option<&Documentation> {
94 self.doc()
95 }
96}
97
98#[derive(Debug, Default)]
99pub(crate) struct CumeDistEvaluator;
100
101impl PartitionEvaluator for CumeDistEvaluator {
102 fn evaluate_all_with_rank(
104 &self,
105 num_rows: usize,
106 ranks_in_partition: &[Range<usize>],
107 ) -> Result<ArrayRef> {
108 let scalar = num_rows as f64;
109 let result = Float64Array::from_iter_values(
110 ranks_in_partition
111 .iter()
112 .scan(0_u64, |acc, range| {
113 let len = range.end - range.start;
114 *acc += len as u64;
115 let value: f64 = (*acc as f64) / scalar;
116 let result = iter::repeat(value).take(len);
117 Some(result)
118 })
119 .flatten(),
120 );
121 Ok(Arc::new(result))
122 }
123
124 fn include_rank(&self) -> bool {
125 true
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132 use datafusion_common::cast::as_float64_array;
133
134 fn test_f64_result(
135 num_rows: usize,
136 ranks: Vec<Range<usize>>,
137 expected: Vec<f64>,
138 ) -> Result<()> {
139 let evaluator = CumeDistEvaluator;
140 let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
141 let result = as_float64_array(&result)?;
142 let result = result.values().to_vec();
143 assert_eq!(expected, result);
144 Ok(())
145 }
146
147 #[test]
148 #[allow(clippy::single_range_in_vec_init)]
149 fn test_cume_dist() -> Result<()> {
150 test_f64_result(0, vec![], vec![])?;
151
152 test_f64_result(1, vec![0..1], vec![1.0])?;
153
154 test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
155
156 test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
157
158 Ok(())
159 }
160}