use crate::core::{Result, Value};
use crate::functions::{
FunctionDataType, FunctionInfo, FunctionSignature, FunctionType, WindowFunction,
};
#[derive(Default)]
pub struct NtileFunction {
num_buckets: i64,
}
impl NtileFunction {
pub fn new(num_buckets: i64) -> Self {
Self { num_buckets }
}
}
impl WindowFunction for NtileFunction {
fn name(&self) -> &str {
"NTILE"
}
fn info(&self) -> FunctionInfo {
FunctionInfo::new(
"NTILE",
FunctionType::Window,
"Divides the partition into n buckets and returns the bucket number",
FunctionSignature::new(
FunctionDataType::Integer,
vec![FunctionDataType::Integer],
1,
1,
),
)
}
fn process(
&self,
partition: &[Value],
_order_by: &[Value],
current_row: usize,
) -> Result<Value> {
let n = self.num_buckets;
if n <= 0 {
return Ok(Value::Integer(1));
}
let total_rows = partition.len().max(1) as i64;
let row_num = (current_row + 1) as i64;
let base_size = total_rows / n;
let remainder = total_rows % n;
let bucket = if row_num <= remainder * (base_size + 1) {
(row_num - 1) / (base_size + 1) + 1
} else {
let rows_in_larger_buckets = remainder * (base_size + 1);
let remaining_row = row_num - rows_in_larger_buckets;
if base_size > 0 {
remainder + (remaining_row - 1) / base_size + 1
} else {
n }
};
Ok(Value::Integer(bucket.min(n)))
}
fn clone_box(&self) -> Box<dyn WindowFunction> {
Box::new(NtileFunction::new(self.num_buckets))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_ntile_even_distribution() {
let f = NtileFunction::new(3);
let partition = vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
Value::Integer(4),
Value::Integer(5),
Value::Integer(6),
];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 3).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 4).unwrap(),
Value::Integer(3)
);
assert_eq!(
f.process(&partition, &order_by, 5).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_ntile_uneven_distribution() {
let f = NtileFunction::new(3);
let partition = vec![
Value::Integer(1),
Value::Integer(2),
Value::Integer(3),
Value::Integer(4),
Value::Integer(5),
];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 3).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 4).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_ntile_more_buckets_than_rows() {
let f = NtileFunction::new(5);
let partition = vec![Value::Integer(1), Value::Integer(2), Value::Integer(3)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_ntile_single_bucket() {
let f = NtileFunction::new(1);
let partition = vec![Value::Integer(1), Value::Integer(2), Value::Integer(3)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(1)
);
}
#[test]
fn test_ntile_zero_buckets() {
let f = NtileFunction::new(0);
let partition = vec![Value::Integer(1)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
}
#[test]
fn test_ntile_negative_buckets() {
let f = NtileFunction::new(-5);
let partition = vec![Value::Integer(1)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
}
}