use crate::core::{Result, Value};
use crate::functions::{
FunctionDataType, FunctionInfo, FunctionSignature, FunctionType, WindowFunction,
};
#[derive(Default)]
pub struct RankFunction;
impl WindowFunction for RankFunction {
fn name(&self) -> &str {
"RANK"
}
fn info(&self) -> FunctionInfo {
FunctionInfo::new(
"RANK",
FunctionType::Window,
"Returns the rank of the current row within the partition, with gaps for ties",
FunctionSignature::new(FunctionDataType::Integer, vec![], 0, 0),
)
}
fn process(
&self,
partition: &[Value],
_order_by: &[Value],
current_row: usize,
) -> Result<Value> {
if partition.is_empty() || current_row >= partition.len() {
return Ok(Value::Integer((current_row + 1) as i64));
}
let current_value = &partition[current_row];
let mut rank = 1i64;
for (i, value) in partition.iter().enumerate() {
if i >= current_row {
break;
}
if is_less_than(value, current_value) {
rank += 1;
}
}
Ok(Value::Integer(rank))
}
fn clone_box(&self) -> Box<dyn WindowFunction> {
Box::new(RankFunction)
}
}
#[derive(Default)]
pub struct DenseRankFunction;
impl WindowFunction for DenseRankFunction {
fn name(&self) -> &str {
"DENSE_RANK"
}
fn info(&self) -> FunctionInfo {
FunctionInfo::new(
"DENSE_RANK",
FunctionType::Window,
"Returns the rank of the current row within the partition, without gaps for ties",
FunctionSignature::new(FunctionDataType::Integer, vec![], 0, 0),
)
}
fn process(
&self,
partition: &[Value],
_order_by: &[Value],
current_row: usize,
) -> Result<Value> {
if partition.is_empty() || current_row >= partition.len() {
return Ok(Value::Integer((current_row + 1) as i64));
}
let current_value = &partition[current_row];
let mut seen_values: Vec<&Value> = Vec::new();
for (i, value) in partition.iter().enumerate() {
if i >= current_row {
break;
}
if is_less_than(value, current_value)
&& !seen_values.iter().any(|v| values_equal(v, value))
{
seen_values.push(value);
}
}
Ok(Value::Integer(seen_values.len() as i64 + 1))
}
fn clone_box(&self) -> Box<dyn WindowFunction> {
Box::new(DenseRankFunction)
}
}
fn is_less_than(a: &Value, b: &Value) -> bool {
if a.is_null() || b.is_null() {
return false;
}
match (a, b) {
(Value::Integer(a), Value::Integer(b)) => a < b,
(Value::Float(a), Value::Float(b)) => a < b,
(Value::Integer(a), Value::Float(b)) => (*a as f64) < *b,
(Value::Float(a), Value::Integer(b)) => *a < (*b as f64),
(Value::Text(a), Value::Text(b)) => a < b,
(Value::Boolean(a), Value::Boolean(b)) => !a && *b, (Value::Timestamp(a), Value::Timestamp(b)) => a < b,
_ => false,
}
}
fn values_equal(a: &Value, b: &Value) -> bool {
if a.is_null() && b.is_null() {
return true;
}
match (a, b) {
(Value::Integer(a), Value::Integer(b)) => a == b,
(Value::Float(a), Value::Float(b)) => a == b,
(Value::Integer(a), Value::Float(b)) => (*a as f64) == *b,
(Value::Float(a), Value::Integer(b)) => *a == (*b as f64),
(Value::Text(a), Value::Text(b)) => a == b,
(Value::Boolean(a), Value::Boolean(b)) => a == b,
(Value::Timestamp(a), Value::Timestamp(b)) => a == b,
_ => false,
}
}
#[derive(Default)]
pub struct PercentRankFunction;
impl WindowFunction for PercentRankFunction {
fn name(&self) -> &str {
"PERCENT_RANK"
}
fn info(&self) -> FunctionInfo {
FunctionInfo::new(
"PERCENT_RANK",
FunctionType::Window,
"Returns the relative rank of the current row: (rank - 1) / (total_rows - 1)",
FunctionSignature::new(FunctionDataType::Float, vec![], 0, 0),
)
}
fn process(
&self,
partition: &[Value],
_order_by: &[Value],
current_row: usize,
) -> Result<Value> {
let n = partition.len();
if n <= 1 {
return Ok(Value::Float(0.0));
}
if current_row >= n {
return Ok(Value::Float(0.0));
}
let current_value = &partition[current_row];
let mut rank = 1i64;
for (i, value) in partition.iter().enumerate() {
if i >= current_row {
break;
}
if is_less_than(value, current_value) {
rank += 1;
}
}
let percent_rank = (rank - 1) as f64 / (n - 1) as f64;
Ok(Value::Float(percent_rank))
}
fn clone_box(&self) -> Box<dyn WindowFunction> {
Box::new(PercentRankFunction)
}
}
#[derive(Default)]
pub struct CumeDistFunction;
impl WindowFunction for CumeDistFunction {
fn name(&self) -> &str {
"CUME_DIST"
}
fn info(&self) -> FunctionInfo {
FunctionInfo::new(
"CUME_DIST",
FunctionType::Window,
"Returns the cumulative distribution of a value: (rows <= current) / total_rows",
FunctionSignature::new(FunctionDataType::Float, vec![], 0, 0),
)
}
fn process(
&self,
partition: &[Value],
_order_by: &[Value],
current_row: usize,
) -> Result<Value> {
let n = partition.len();
if n == 0 {
return Ok(Value::Float(1.0));
}
if current_row >= n {
return Ok(Value::Float(1.0));
}
let current_value = &partition[current_row];
let mut count = 0i64;
for value in partition.iter() {
if is_less_than(value, current_value) || values_equal(value, current_value) {
count += 1;
}
}
let cume_dist = count as f64 / n as f64;
Ok(Value::Float(cume_dist))
}
fn clone_box(&self) -> Box<dyn WindowFunction> {
Box::new(CumeDistFunction)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rank_unique_values() {
let f = RankFunction;
let partition = vec![Value::Integer(10), Value::Integer(20), Value::Integer(30)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_rank_with_ties() {
let f = RankFunction;
let partition = vec![Value::Integer(10), Value::Integer(10), Value::Integer(30)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_dense_rank_unique_values() {
let f = DenseRankFunction;
let partition = vec![Value::Integer(10), Value::Integer(20), Value::Integer(30)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_dense_rank_with_ties() {
let f = DenseRankFunction;
let partition = vec![Value::Integer(10), Value::Integer(10), Value::Integer(30)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(2)
);
}
#[test]
fn test_rank_empty_partition() {
let f = RankFunction;
let partition = vec![];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
}
#[test]
fn test_dense_rank_empty_partition() {
let f = DenseRankFunction;
let partition = vec![];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
}
#[test]
fn test_rank_strings() {
let f = RankFunction;
let partition = vec![
Value::text("apple"),
Value::text("banana"),
Value::text("cherry"),
];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Integer(1)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Integer(2)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Integer(3)
);
}
#[test]
fn test_percent_rank_unique_values() {
let f = PercentRankFunction;
let partition = vec![
Value::Integer(10),
Value::Integer(20),
Value::Integer(30),
Value::Integer(40),
];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Float(0.0)
);
if let Value::Float(v) = f.process(&partition, &order_by, 1).unwrap() {
assert!((v - 0.3333333333333333).abs() < 0.0001);
} else {
panic!("Expected float");
}
if let Value::Float(v) = f.process(&partition, &order_by, 2).unwrap() {
assert!((v - 0.6666666666666666).abs() < 0.0001);
} else {
panic!("Expected float");
}
assert_eq!(
f.process(&partition, &order_by, 3).unwrap(),
Value::Float(1.0)
);
}
#[test]
fn test_percent_rank_with_ties() {
let f = PercentRankFunction;
let partition = vec![Value::Integer(10), Value::Integer(10), Value::Integer(30)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Float(0.0)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Float(0.0)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Float(1.0)
);
}
#[test]
fn test_percent_rank_single_row() {
let f = PercentRankFunction;
let partition = vec![Value::Integer(10)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Float(0.0)
);
}
#[test]
fn test_cume_dist_unique_values() {
let f = CumeDistFunction;
let partition = vec![
Value::Integer(10),
Value::Integer(20),
Value::Integer(30),
Value::Integer(40),
];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Float(0.25)
);
assert_eq!(
f.process(&partition, &order_by, 1).unwrap(),
Value::Float(0.5)
);
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Float(0.75)
);
assert_eq!(
f.process(&partition, &order_by, 3).unwrap(),
Value::Float(1.0)
);
}
#[test]
fn test_cume_dist_with_ties() {
let f = CumeDistFunction;
let partition = vec![Value::Integer(10), Value::Integer(10), Value::Integer(30)];
let order_by = vec![];
if let Value::Float(v) = f.process(&partition, &order_by, 0).unwrap() {
assert!((v - 0.6666666666666666).abs() < 0.0001);
} else {
panic!("Expected float");
}
if let Value::Float(v) = f.process(&partition, &order_by, 1).unwrap() {
assert!((v - 0.6666666666666666).abs() < 0.0001);
} else {
panic!("Expected float");
}
assert_eq!(
f.process(&partition, &order_by, 2).unwrap(),
Value::Float(1.0)
);
}
#[test]
fn test_cume_dist_single_row() {
let f = CumeDistFunction;
let partition = vec![Value::Integer(10)];
let order_by = vec![];
assert_eq!(
f.process(&partition, &order_by, 0).unwrap(),
Value::Float(1.0)
);
}
}