datafusion_functions_window/
ntile.rs1use crate::utils::{
21 get_scalar_value_from_args, get_signed_integer, get_unsigned_integer,
22};
23use arrow::datatypes::FieldRef;
24use datafusion_common::arrow::array::{ArrayRef, UInt64Array};
25use datafusion_common::arrow::datatypes::{DataType, Field};
26use datafusion_common::{exec_err, DataFusionError, Result};
27use datafusion_expr::{
28 Documentation, Expr, LimitEffect, PartitionEvaluator, Signature, Volatility,
29 WindowUDFImpl,
30};
31use datafusion_functions_window_common::field;
32use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
33use datafusion_macros::user_doc;
34use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
35use field::WindowUDFFieldArgs;
36use std::any::Any;
37use std::fmt::Debug;
38use std::sync::Arc;
39
40get_or_init_udwf!(
41 Ntile,
42 ntile,
43 "integer ranging from 1 to the argument value, dividing the partition as equally as possible"
44);
45
46pub fn ntile(arg: Expr) -> Expr {
47 ntile_udwf().call(vec![arg])
48}
49
50#[user_doc(
51 doc_section(label = "Ranking Functions"),
52 description = "Integer ranging from 1 to the argument value, dividing the partition as equally as possible",
53 syntax_example = "ntile(expression)",
54 argument(
55 name = "expression",
56 description = "An integer describing the number groups the partition should be split into"
57 ),
58 sql_example = r#"
59```sql
60-- Example usage of the ntile window function:
61SELECT employee_id,
62 salary,
63 ntile(4) OVER (ORDER BY salary DESC) AS quartile
64FROM employees;
65
66+-------------+--------+----------+
67| employee_id | salary | quartile |
68+-------------+--------+----------+
69| 1 | 90000 | 1 |
70| 2 | 85000 | 1 |
71| 3 | 80000 | 2 |
72| 4 | 70000 | 2 |
73| 5 | 60000 | 3 |
74| 6 | 50000 | 3 |
75| 7 | 40000 | 4 |
76| 8 | 30000 | 4 |
77+-------------+--------+----------+
78```
79"#
80)]
81#[derive(Debug, PartialEq, Eq, Hash)]
82pub struct Ntile {
83 signature: Signature,
84}
85
86impl Ntile {
87 pub fn new() -> Self {
89 Self {
90 signature: Signature::uniform(
91 1,
92 vec![
93 DataType::UInt64,
94 DataType::UInt32,
95 DataType::UInt16,
96 DataType::UInt8,
97 DataType::Int64,
98 DataType::Int32,
99 DataType::Int16,
100 DataType::Int8,
101 ],
102 Volatility::Immutable,
103 ),
104 }
105 }
106}
107
108impl Default for Ntile {
109 fn default() -> Self {
110 Self::new()
111 }
112}
113
114impl WindowUDFImpl for Ntile {
115 fn as_any(&self) -> &dyn Any {
116 self
117 }
118
119 fn name(&self) -> &str {
120 "ntile"
121 }
122
123 fn signature(&self) -> &Signature {
124 &self.signature
125 }
126
127 fn partition_evaluator(
128 &self,
129 partition_evaluator_args: PartitionEvaluatorArgs,
130 ) -> Result<Box<dyn PartitionEvaluator>> {
131 let scalar_n =
132 get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)?
133 .ok_or_else(|| {
134 DataFusionError::Execution(
135 "NTILE requires a positive integer".to_string(),
136 )
137 })?;
138
139 if scalar_n.is_null() {
140 return exec_err!("NTILE requires a positive integer, but finds NULL");
141 }
142
143 if scalar_n.is_unsigned() {
144 let n = get_unsigned_integer(scalar_n)?;
145 Ok(Box::new(NtileEvaluator { n }))
146 } else {
147 let n: i64 = get_signed_integer(scalar_n)?;
148 if n <= 0 {
149 return exec_err!("NTILE requires a positive integer");
150 }
151 Ok(Box::new(NtileEvaluator { n: n as u64 }))
152 }
153 }
154 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
155 let nullable = false;
156
157 Ok(Field::new(field_args.name(), DataType::UInt64, nullable).into())
158 }
159
160 fn documentation(&self) -> Option<&Documentation> {
161 self.doc()
162 }
163
164 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
165 LimitEffect::Unknown
166 }
167}
168
169#[derive(Debug)]
170struct NtileEvaluator {
171 n: u64,
172}
173
174impl PartitionEvaluator for NtileEvaluator {
175 fn evaluate_all(
176 &mut self,
177 _values: &[ArrayRef],
178 num_rows: usize,
179 ) -> Result<ArrayRef> {
180 let num_rows = num_rows as u64;
181 let mut vec: Vec<u64> = Vec::new();
182 let n = u64::min(self.n, num_rows);
183 for i in 0..num_rows {
184 let res = i * n / num_rows;
185 vec.push(res + 1)
186 }
187 Ok(Arc::new(UInt64Array::from(vec)))
188 }
189}