datafusion_functions_window/
ntile.rs1use crate::utils::{get_scalar_value_from_args, get_unsigned_integer};
21use arrow::datatypes::FieldRef;
22use datafusion_common::arrow::array::{ArrayRef, UInt64Array};
23use datafusion_common::arrow::datatypes::{DataType, Field};
24use datafusion_common::{Result, exec_datafusion_err, exec_err};
25use datafusion_expr::{
26 Documentation, LimitEffect, PartitionEvaluator, Signature, Volatility, WindowUDFImpl,
27};
28use datafusion_functions_window_common::field;
29use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
30use datafusion_macros::user_doc;
31use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
32use field::WindowUDFFieldArgs;
33use std::fmt::Debug;
34use std::sync::Arc;
35
36define_udwf_and_expr!(
37 Ntile,
38 ntile,
39 [arg],
40 ntile_udwf,
41 "Integer ranging from 1 to the argument value, dividing the partition as equally as possible."
42);
43
44#[user_doc(
45 doc_section(label = "Ranking Functions"),
46 description = "Integer ranging from 1 to the argument value, dividing the partition as equally as possible",
47 syntax_example = "ntile(expression)",
48 argument(
49 name = "expression",
50 description = "An integer describing the number groups the partition should be split into"
51 ),
52 sql_example = r#"
53```sql
54-- Example usage of the ntile window function:
55SELECT employee_id,
56 salary,
57 ntile(4) OVER (ORDER BY salary DESC) AS quartile
58FROM employees;
59
60+-------------+--------+----------+
61| employee_id | salary | quartile |
62+-------------+--------+----------+
63| 1 | 90000 | 1 |
64| 2 | 85000 | 1 |
65| 3 | 80000 | 2 |
66| 4 | 70000 | 2 |
67| 5 | 60000 | 3 |
68| 6 | 50000 | 3 |
69| 7 | 40000 | 4 |
70| 8 | 30000 | 4 |
71+-------------+--------+----------+
72```
73"#
74)]
75#[derive(Debug, PartialEq, Eq, Hash)]
76pub struct Ntile {
77 signature: Signature,
78}
79
80impl Ntile {
81 pub fn new() -> Self {
83 Self {
84 signature: Signature::uniform(
85 1,
86 vec![
87 DataType::UInt64,
88 DataType::UInt32,
89 DataType::UInt16,
90 DataType::UInt8,
91 DataType::Int64,
92 DataType::Int32,
93 DataType::Int16,
94 DataType::Int8,
95 ],
96 Volatility::Immutable,
97 ),
98 }
99 }
100}
101
102impl Default for Ntile {
103 fn default() -> Self {
104 Self::new()
105 }
106}
107
108impl WindowUDFImpl for Ntile {
109 fn name(&self) -> &str {
110 "ntile"
111 }
112
113 fn signature(&self) -> &Signature {
114 &self.signature
115 }
116
117 fn partition_evaluator(
118 &self,
119 partition_evaluator_args: PartitionEvaluatorArgs,
120 ) -> Result<Box<dyn PartitionEvaluator>> {
121 let scalar_n =
123 get_scalar_value_from_args(partition_evaluator_args.input_exprs(), 0)?
124 .ok_or_else(|| {
125 exec_datafusion_err!("NTILE requires a positive integer")
126 })?;
127
128 if scalar_n.is_null() {
129 return exec_err!("NTILE requires a positive integer, but finds NULL");
130 }
131
132 let n = get_unsigned_integer(&scalar_n)
136 .map_err(|_| exec_datafusion_err!("NTILE requires a positive integer"))?;
137
138 if n == 0 {
139 return exec_err!("NTILE requires a positive integer");
140 }
141
142 Ok(Box::new(NtileEvaluator { n }))
143 }
144 fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
145 let nullable = false;
146
147 Ok(Field::new(field_args.name(), DataType::UInt64, nullable).into())
148 }
149
150 fn documentation(&self) -> Option<&Documentation> {
151 self.doc()
152 }
153
154 fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
155 LimitEffect::Unknown
156 }
157}
158
159#[derive(Debug)]
160struct NtileEvaluator {
161 n: u64,
162}
163
164impl PartitionEvaluator for NtileEvaluator {
165 fn evaluate_all(
166 &mut self,
167 _values: &[ArrayRef],
168 num_rows: usize,
169 ) -> Result<ArrayRef> {
170 let num_rows = num_rows as u64;
176 let n = self.n;
177 let mut vec: Vec<u64> = Vec::with_capacity(num_rows as usize);
178 let base = num_rows / n;
179 let remainder = num_rows % n;
180 let large_bucket_size = base + 1;
181 let large_rows = remainder * large_bucket_size;
182 for i in 0..num_rows {
183 let bucket = if i < large_rows {
184 i / large_bucket_size + 1
185 } else {
186 remainder + (i - large_rows) / base + 1
189 };
190 vec.push(bucket);
191 }
192 Ok(Arc::new(UInt64Array::from(vec)))
193 }
194}