Skip to main content

datafusion_functions_window/
cume_dist.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `cume_dist` window function implementation
19
20use arrow::datatypes::FieldRef;
21use datafusion_common::Result;
22use datafusion_common::arrow::array::{ArrayRef, Float64Array};
23use datafusion_common::arrow::datatypes::DataType;
24use datafusion_common::arrow::datatypes::Field;
25use datafusion_expr::{
26    Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
32use field::WindowUDFFieldArgs;
33use std::fmt::Debug;
34use std::iter;
35use std::ops::Range;
36use std::sync::Arc;
37
38define_udwf_and_expr!(
39    CumeDist,
40    cume_dist,
41    cume_dist_udwf,
42    "Calculates the cumulative distribution of a value in a group of values."
43);
44
45/// CumeDist calculates the cume_dist in the window function with order by
46#[user_doc(
47    doc_section(label = "Ranking Functions"),
48    description = "Relative rank of the current row: (number of rows preceding or peer with the current row) / (total rows).",
49    syntax_example = "cume_dist()",
50    sql_example = r#"
51```sql
52-- Example usage of the cume_dist window function:
53SELECT salary,
54    cume_dist() OVER (ORDER BY salary) AS cume_dist
55FROM employees;
56
57+--------+-----------+
58| salary | cume_dist |
59+--------+-----------+
60| 30000  | 0.33      |
61| 50000  | 0.67      |
62| 70000  | 1.00      |
63+--------+-----------+
64```
65"#
66)]
67#[derive(Debug, PartialEq, Eq, Hash)]
68pub struct CumeDist {
69    signature: Signature,
70}
71
72impl CumeDist {
73    pub fn new() -> Self {
74        Self {
75            signature: Signature::nullary(Volatility::Immutable),
76        }
77    }
78}
79
80impl Default for CumeDist {
81    fn default() -> Self {
82        Self::new()
83    }
84}
85
86impl WindowUDFImpl for CumeDist {
87    fn name(&self) -> &str {
88        "cume_dist"
89    }
90
91    fn signature(&self) -> &Signature {
92        &self.signature
93    }
94
95    fn partition_evaluator(
96        &self,
97        _partition_evaluator_args: PartitionEvaluatorArgs,
98    ) -> Result<Box<dyn PartitionEvaluator>> {
99        Ok(Box::<CumeDistEvaluator>::default())
100    }
101
102    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
103        Ok(Field::new(field_args.name(), DataType::Float64, false).into())
104    }
105
106    fn documentation(&self) -> Option<&Documentation> {
107        self.doc()
108    }
109
110    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
111        LimitEffect::Unknown
112    }
113}
114
115#[derive(Debug, Default)]
116pub(crate) struct CumeDistEvaluator;
117
118impl PartitionEvaluator for CumeDistEvaluator {
119    /// Computes the cumulative distribution for all rows in the partition
120    fn evaluate_all_with_rank(
121        &self,
122        num_rows: usize,
123        ranks_in_partition: &[Range<usize>],
124    ) -> Result<ArrayRef> {
125        let scalar = num_rows as f64;
126        let result = Float64Array::from_iter_values(
127            ranks_in_partition
128                .iter()
129                .scan(0_u64, |acc, range| {
130                    let len = range.end - range.start;
131                    *acc += len as u64;
132                    let value: f64 = (*acc as f64) / scalar;
133                    let result = iter::repeat_n(value, len);
134                    Some(result)
135                })
136                .flatten(),
137        );
138        Ok(Arc::new(result))
139    }
140
141    fn include_rank(&self) -> bool {
142        true
143    }
144}
145
146#[cfg(test)]
147mod tests {
148    use super::*;
149    use datafusion_common::cast::as_float64_array;
150
151    fn test_f64_result(
152        num_rows: usize,
153        ranks: Vec<Range<usize>>,
154        expected: Vec<f64>,
155    ) -> Result<()> {
156        let evaluator = CumeDistEvaluator;
157        let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
158        let result = as_float64_array(&result)?;
159        let result = result.values().to_vec();
160        assert_eq!(expected, result);
161        Ok(())
162    }
163
164    #[test]
165    #[expect(clippy::single_range_in_vec_init)]
166    fn test_cume_dist() -> Result<()> {
167        test_f64_result(0, vec![], vec![])?;
168
169        test_f64_result(1, vec![0..1], vec![1.0])?;
170
171        test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
172
173        test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
174
175        Ok(())
176    }
177}