use std::ops::{Add, Div, Mul, Sub};
use num::Zero;
use crate::array::{Array, BooleanArray, PrimitiveArray};
use crate::builder::{ArrayBuilder, PrimitiveArrayBuilder};
use crate::datatypes;
use crate::datatypes::ArrowNumericType;
use crate::error::{ArrowError, Result};
pub fn add<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
T::Native: Add<Output = T::Native>
+ Sub<Output = T::Native>
+ Mul<Output = T::Native>
+ Div<Output = T::Native>
+ Zero,
{
math_op(left, right, |a, b| Ok(a + b))
}
pub fn subtract<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
T::Native: Add<Output = T::Native>
+ Sub<Output = T::Native>
+ Mul<Output = T::Native>
+ Div<Output = T::Native>
+ Zero,
{
math_op(left, right, |a, b| Ok(a - b))
}
pub fn multiply<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
T::Native: Add<Output = T::Native>
+ Sub<Output = T::Native>
+ Mul<Output = T::Native>
+ Div<Output = T::Native>
+ Zero,
{
math_op(left, right, |a, b| Ok(a * b))
}
pub fn divide<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
T::Native: Add<Output = T::Native>
+ Sub<Output = T::Native>
+ Mul<Output = T::Native>
+ Div<Output = T::Native>
+ Zero,
{
math_op(left, right, |a, b| {
if b.is_zero() {
Err(ArrowError::DivideByZero)
} else {
Ok(a / b)
}
})
}
fn math_op<T, F>(
left: &PrimitiveArray<T>,
right: &PrimitiveArray<T>,
op: F,
) -> Result<PrimitiveArray<T>>
where
T: datatypes::ArrowNumericType,
F: Fn(T::Native, T::Native) -> Result<T::Native>,
{
if left.len() != right.len() {
return Err(ArrowError::ComputeError(
"Cannot perform math operation on arrays of different length".to_string(),
));
}
let mut b = PrimitiveArrayBuilder::<T>::new(left.len());
for i in 0..left.len() {
let index = i;
if left.is_null(i) || right.is_null(i) {
b.push_null()?;
} else {
b.push(op(left.value(index), right.value(index))?)?;
}
}
Ok(b.finish())
}
pub fn min<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
where
T: ArrowNumericType,
{
min_max_helper(array, |a, b| a < b)
}
pub fn max<T>(array: &PrimitiveArray<T>) -> Option<T::Native>
where
T: ArrowNumericType,
{
min_max_helper(array, |a, b| a > b)
}
fn min_max_helper<T, F>(array: &PrimitiveArray<T>, cmp: F) -> Option<T::Native>
where
T: ArrowNumericType,
F: Fn(T::Native, T::Native) -> bool,
{
let mut n: Option<T::Native> = None;
let data = array.data();
for i in 0..data.len() {
if data.is_null(i) {
continue;
}
let m = array.value(i);
match n {
None => n = Some(m),
Some(nn) => {
if cmp(m, nn) {
n = Some(m)
}
}
}
}
n
}
pub fn eq<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| a == b)
}
pub fn neq<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| a != b)
}
pub fn lt<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| match (a, b) {
(None, _) => true,
(_, None) => false,
(Some(aa), Some(bb)) => aa < bb,
})
}
pub fn lt_eq<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| match (a, b) {
(None, _) => true,
(_, None) => false,
(Some(aa), Some(bb)) => aa <= bb,
})
}
pub fn gt<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| match (a, b) {
(None, _) => false,
(_, None) => true,
(Some(aa), Some(bb)) => aa > bb,
})
}
pub fn gt_eq<T>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>) -> Result<BooleanArray>
where
T: ArrowNumericType,
{
bool_op(left, right, |a, b| match (a, b) {
(None, _) => false,
(_, None) => true,
(Some(aa), Some(bb)) => aa >= bb,
})
}
fn bool_op<T, F>(left: &PrimitiveArray<T>, right: &PrimitiveArray<T>, op: F) -> Result<BooleanArray>
where
T: ArrowNumericType,
F: Fn(Option<T::Native>, Option<T::Native>) -> bool,
{
if left.len() != right.len() {
return Err(ArrowError::ComputeError(
"Cannot perform math operation on arrays of different length".to_string(),
));
}
let mut b = BooleanArray::builder(left.len());
for i in 0..left.len() {
let index = i;
let l = if left.is_null(i) {
None
} else {
Some(left.value(index))
};
let r = if right.is_null(i) {
None
} else {
Some(right.value(index))
};
b.push(op(l, r))?;
}
Ok(b.finish())
}
pub fn and(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanArray> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(
"Cannot perform boolean operation on arrays of different length".to_string(),
));
}
let mut b = BooleanArray::builder(left.len());
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
b.push_null()?;
} else {
b.push(left.value(i) && right.value(i))?;
}
}
Ok(b.finish())
}
pub fn or(left: &BooleanArray, right: &BooleanArray) -> Result<BooleanArray> {
if left.len() != right.len() {
return Err(ArrowError::ComputeError(
"Cannot perform boolean operation on arrays of different length".to_string(),
));
}
let mut b = BooleanArray::builder(left.len());
for i in 0..left.len() {
if left.is_null(i) || right.is_null(i) {
b.push_null()?;
} else {
b.push(left.value(i) || right.value(i))?;
}
}
Ok(b.finish())
}
pub fn not(left: &BooleanArray) -> Result<BooleanArray> {
let mut b = BooleanArray::builder(left.len());
for i in 0..left.len() {
if left.is_null(i) {
b.push_null()?;
} else {
b.push(!left.value(i))?;
}
}
Ok(b.finish())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::array::{Float64Array, Int32Array};
#[test]
fn test_primitive_array_add() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = Int32Array::from(vec![6, 7, 8, 9, 8]);
let c = add(&a, &b).unwrap();
assert_eq!(11, c.value(0));
assert_eq!(13, c.value(1));
assert_eq!(15, c.value(2));
assert_eq!(17, c.value(3));
assert_eq!(17, c.value(4));
}
#[test]
fn test_primitive_array_add_mismatched_length() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = Int32Array::from(vec![6, 7, 8]);
let e = add(&a, &b)
.err()
.expect("should have failed due to different lengths");
assert_eq!(
"ComputeError(\"Cannot perform math operation on arrays of different length\")",
format!("{:?}", e)
);
}
#[test]
fn test_primitive_array_subtract() {
let a = Int32Array::from(vec![1, 2, 3, 4, 5]);
let b = Int32Array::from(vec![5, 4, 3, 2, 1]);
let c = subtract(&a, &b).unwrap();
assert_eq!(-4, c.value(0));
assert_eq!(-2, c.value(1));
assert_eq!(0, c.value(2));
assert_eq!(2, c.value(3));
assert_eq!(4, c.value(4));
}
#[test]
fn test_primitive_array_multiply() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
let b = Int32Array::from(vec![6, 7, 8, 9, 8]);
let c = multiply(&a, &b).unwrap();
assert_eq!(30, c.value(0));
assert_eq!(42, c.value(1));
assert_eq!(56, c.value(2));
assert_eq!(72, c.value(3));
assert_eq!(72, c.value(4));
}
#[test]
fn test_primitive_array_divide() {
let a = Int32Array::from(vec![15, 15, 8, 1, 9]);
let b = Int32Array::from(vec![5, 6, 8, 9, 1]);
let c = divide(&a, &b).unwrap();
assert_eq!(3, c.value(0));
assert_eq!(2, c.value(1));
assert_eq!(1, c.value(2));
assert_eq!(0, c.value(3));
assert_eq!(9, c.value(4));
}
#[test]
fn test_primitive_array_divide_by_zero() {
let a = Int32Array::from(vec![15]);
let b = Int32Array::from(vec![0]);
assert_eq!(
ArrowError::DivideByZero,
divide(&a, &b).err().expect("divide by zero should fail")
);
}
#[test]
fn test_primitive_array_divide_f64() {
let a = Float64Array::from(vec![15.0, 15.0, 8.0]);
let b = Float64Array::from(vec![5.0, 6.0, 8.0]);
let c = divide(&a, &b).unwrap();
assert_eq!(3.0, c.value(0));
assert_eq!(2.5, c.value(1));
assert_eq!(1.0, c.value(2));
}
#[test]
fn test_primitive_array_add_with_nulls() {
let a = Int32Array::from(vec![Some(5), None, Some(7), None]);
let b = Int32Array::from(vec![None, None, Some(6), Some(7)]);
let c = add(&a, &b).unwrap();
assert_eq!(true, c.is_null(0));
assert_eq!(true, c.is_null(1));
assert_eq!(false, c.is_null(2));
assert_eq!(true, c.is_null(3));
assert_eq!(13, c.value(2));
}
#[test]
fn test_primitive_array_eq() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = eq(&a, &b).unwrap();
assert_eq!(false, c.value(0));
assert_eq!(false, c.value(1));
assert_eq!(true, c.value(2));
assert_eq!(false, c.value(3));
assert_eq!(false, c.value(4));
}
#[test]
fn test_primitive_array_neq() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = neq(&a, &b).unwrap();
assert_eq!(true, c.value(0));
assert_eq!(true, c.value(1));
assert_eq!(false, c.value(2));
assert_eq!(true, c.value(3));
assert_eq!(true, c.value(4));
}
#[test]
fn test_primitive_array_lt() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = lt(&a, &b).unwrap();
assert_eq!(false, c.value(0));
assert_eq!(false, c.value(1));
assert_eq!(false, c.value(2));
assert_eq!(true, c.value(3));
assert_eq!(true, c.value(4));
}
#[test]
fn test_primitive_array_lt_eq() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = lt_eq(&a, &b).unwrap();
assert_eq!(false, c.value(0));
assert_eq!(false, c.value(1));
assert_eq!(true, c.value(2));
assert_eq!(true, c.value(3));
assert_eq!(true, c.value(4));
}
#[test]
fn test_primitive_array_gt() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = gt(&a, &b).unwrap();
assert_eq!(true, c.value(0));
assert_eq!(true, c.value(1));
assert_eq!(false, c.value(2));
assert_eq!(false, c.value(3));
assert_eq!(false, c.value(4));
}
#[test]
fn test_primitive_array_gt_eq() {
let a = Int32Array::from(vec![8, 8, 8, 8, 8]);
let b = Int32Array::from(vec![6, 7, 8, 9, 10]);
let c = gt_eq(&a, &b).unwrap();
assert_eq!(true, c.value(0));
assert_eq!(true, c.value(1));
assert_eq!(true, c.value(2));
assert_eq!(false, c.value(3));
assert_eq!(false, c.value(4));
}
#[test]
fn test_buffer_array_min_max() {
let a = Int32Array::from(vec![5, 6, 7, 8, 9]);
assert_eq!(5, min(&a).unwrap());
assert_eq!(9, max(&a).unwrap());
}
#[test]
fn test_buffer_array_min_max_with_nulls() {
let a = Int32Array::from(vec![Some(5), None, None, Some(8), Some(9)]);
assert_eq!(5, min(&a).unwrap());
assert_eq!(9, max(&a).unwrap());
}
#[test]
fn test_bool_array_and() {
let a = BooleanArray::from(vec![false, false, true, true]);
let b = BooleanArray::from(vec![false, true, false, true]);
let c = and(&a, &b).unwrap();
assert_eq!(false, c.value(0));
assert_eq!(false, c.value(1));
assert_eq!(false, c.value(2));
assert_eq!(true, c.value(3));
}
#[test]
fn test_bool_array_or() {
let a = BooleanArray::from(vec![false, false, true, true]);
let b = BooleanArray::from(vec![false, true, false, true]);
let c = or(&a, &b).unwrap();
assert_eq!(false, c.value(0));
assert_eq!(true, c.value(1));
assert_eq!(true, c.value(2));
assert_eq!(true, c.value(3));
}
#[test]
fn test_bool_array_not() {
let a = BooleanArray::from(vec![false, false, true, true]);
let c = not(&a).unwrap();
assert_eq!(true, c.value(0));
assert_eq!(true, c.value(1));
assert_eq!(false, c.value(2));
assert_eq!(false, c.value(3));
}
}