datafusion_functions_window/
cume_dist.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! `cume_dist` window function implementation
19
20use datafusion_common::arrow::array::{ArrayRef, Float64Array};
21use datafusion_common::arrow::datatypes::DataType;
22use datafusion_common::arrow::datatypes::Field;
23use datafusion_common::Result;
24use datafusion_expr::{
25    Documentation, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
26};
27use datafusion_functions_window_common::field;
28use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
29use datafusion_macros::user_doc;
30use field::WindowUDFFieldArgs;
31use std::any::Any;
32use std::fmt::Debug;
33use std::iter;
34use std::ops::Range;
35use std::sync::Arc;
36
37define_udwf_and_expr!(
38    CumeDist,
39    cume_dist,
40    "Calculates the cumulative distribution of a value in a group of values."
41);
42
43/// CumeDist calculates the cume_dist in the window function with order by
44#[user_doc(
45    doc_section(label = "Ranking Functions"),
46    description = "Relative rank of the current row: (number of rows preceding or peer with current row) / (total rows).",
47    syntax_example = "cume_dist()"
48)]
49#[derive(Debug)]
50pub struct CumeDist {
51    signature: Signature,
52}
53
54impl CumeDist {
55    pub fn new() -> Self {
56        Self {
57            signature: Signature::nullary(Volatility::Immutable),
58        }
59    }
60}
61
62impl Default for CumeDist {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68impl WindowUDFImpl for CumeDist {
69    /// Return a reference to Any that can be used for downcasting
70    fn as_any(&self) -> &dyn Any {
71        self
72    }
73
74    fn name(&self) -> &str {
75        "cume_dist"
76    }
77
78    fn signature(&self) -> &Signature {
79        &self.signature
80    }
81
82    fn partition_evaluator(
83        &self,
84        _partition_evaluator_args: PartitionEvaluatorArgs,
85    ) -> Result<Box<dyn PartitionEvaluator>> {
86        Ok(Box::<CumeDistEvaluator>::default())
87    }
88
89    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<Field> {
90        Ok(Field::new(field_args.name(), DataType::Float64, false))
91    }
92
93    fn documentation(&self) -> Option<&Documentation> {
94        self.doc()
95    }
96}
97
98#[derive(Debug, Default)]
99pub(crate) struct CumeDistEvaluator;
100
101impl PartitionEvaluator for CumeDistEvaluator {
102    /// Computes the cumulative distribution for all rows in the partition
103    fn evaluate_all_with_rank(
104        &self,
105        num_rows: usize,
106        ranks_in_partition: &[Range<usize>],
107    ) -> Result<ArrayRef> {
108        let scalar = num_rows as f64;
109        let result = Float64Array::from_iter_values(
110            ranks_in_partition
111                .iter()
112                .scan(0_u64, |acc, range| {
113                    let len = range.end - range.start;
114                    *acc += len as u64;
115                    let value: f64 = (*acc as f64) / scalar;
116                    let result = iter::repeat(value).take(len);
117                    Some(result)
118                })
119                .flatten(),
120        );
121        Ok(Arc::new(result))
122    }
123
124    fn include_rank(&self) -> bool {
125        true
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132    use datafusion_common::cast::as_float64_array;
133
134    fn test_f64_result(
135        num_rows: usize,
136        ranks: Vec<Range<usize>>,
137        expected: Vec<f64>,
138    ) -> Result<()> {
139        let evaluator = CumeDistEvaluator;
140        let result = evaluator.evaluate_all_with_rank(num_rows, &ranks)?;
141        let result = as_float64_array(&result)?;
142        let result = result.values().to_vec();
143        assert_eq!(expected, result);
144        Ok(())
145    }
146
147    #[test]
148    #[allow(clippy::single_range_in_vec_init)]
149    fn test_cume_dist() -> Result<()> {
150        test_f64_result(0, vec![], vec![])?;
151
152        test_f64_result(1, vec![0..1], vec![1.0])?;
153
154        test_f64_result(2, vec![0..2], vec![1.0, 1.0])?;
155
156        test_f64_result(4, vec![0..2, 2..4], vec![0.5, 0.5, 1.0, 1.0])?;
157
158        Ok(())
159    }
160}