use std::ops::{Add, Sub};
use num_traits::AsPrimitive;
use crate::{
array::PrimitiveArray,
compute::arity::{binary, unary},
datatypes::{DataType, TimeUnit},
error::{Error, Result},
scalar::{PrimitiveScalar, Scalar},
temporal_conversions,
types::{months_days_ns, NativeType},
};
fn create_scale(lhs: &DataType, rhs: &DataType) -> Result<f64> {
let scale = match (lhs, rhs) {
(DataType::Timestamp(timeunit_a, _), DataType::Duration(timeunit_b))
| (DataType::Time32(timeunit_a), DataType::Duration(timeunit_b))
| (DataType::Time64(timeunit_a), DataType::Duration(timeunit_b)) => {
temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b)
}
(DataType::Date32, DataType::Duration(timeunit)) => {
temporal_conversions::timeunit_scale(TimeUnit::Second, *timeunit)
/ temporal_conversions::SECONDS_IN_DAY as f64
}
(DataType::Date64, DataType::Duration(timeunit)) => {
temporal_conversions::timeunit_scale(TimeUnit::Millisecond, *timeunit)
}
_ => {
return Err(Error::InvalidArgumentError(
"Incorrect data type for the arguments".to_string(),
));
}
};
Ok(scale)
}
pub fn add_duration<T>(
time: &PrimitiveArray<T>,
duration: &PrimitiveArray<i64>,
) -> PrimitiveArray<T>
where
f64: AsPrimitive<T>,
T: NativeType + Add<T, Output = T>,
{
let scale = create_scale(time.data_type(), duration.data_type()).unwrap();
let op = move |a: T, b: i64| a + (b as f64 * scale).as_();
binary(time, duration, time.data_type().clone(), op)
}
pub fn add_duration_scalar<T>(
time: &PrimitiveArray<T>,
duration: &PrimitiveScalar<i64>,
) -> PrimitiveArray<T>
where
f64: AsPrimitive<T>,
T: NativeType + Add<T, Output = T>,
{
let scale = create_scale(time.data_type(), duration.data_type()).unwrap();
let duration = if let Some(duration) = *duration.value() {
duration
} else {
return PrimitiveArray::<T>::new_null(time.data_type().clone(), time.len());
};
let op = move |a: T| a + (duration as f64 * scale).as_();
unary(time, op, time.data_type().clone())
}
pub fn subtract_duration<T>(
time: &PrimitiveArray<T>,
duration: &PrimitiveArray<i64>,
) -> PrimitiveArray<T>
where
f64: AsPrimitive<T>,
T: NativeType + Sub<T, Output = T>,
{
let scale = create_scale(time.data_type(), duration.data_type()).unwrap();
let op = move |a: T, b: i64| a - (b as f64 * scale).as_();
binary(time, duration, time.data_type().clone(), op)
}
pub fn sub_duration_scalar<T>(
time: &PrimitiveArray<T>,
duration: &PrimitiveScalar<i64>,
) -> PrimitiveArray<T>
where
f64: AsPrimitive<T>,
T: NativeType + Sub<T, Output = T>,
{
let scale = create_scale(time.data_type(), duration.data_type()).unwrap();
let duration = if let Some(duration) = *duration.value() {
duration
} else {
return PrimitiveArray::<T>::new_null(time.data_type().clone(), time.len());
};
let op = move |a: T| a - (duration as f64 * scale).as_();
unary(time, op, time.data_type().clone())
}
pub fn subtract_timestamps(
lhs: &PrimitiveArray<i64>,
rhs: &PrimitiveArray<i64>,
) -> Result<PrimitiveArray<i64>> {
match (lhs.data_type(), rhs.data_type()) {
(DataType::Timestamp(timeunit_a, None), DataType::Timestamp(timeunit_b, None)) => {
let scale = temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b);
let op = move |a, b| a - (b as f64 * scale) as i64;
Ok(binary(lhs, rhs, DataType::Duration(*timeunit_a), op))
}
_ => Err(Error::InvalidArgumentError(
"Incorrect data type for the arguments".to_string(),
)),
}
}
pub fn sub_timestamps_scalar(
lhs: &PrimitiveArray<i64>,
rhs: &PrimitiveScalar<i64>,
) -> Result<PrimitiveArray<i64>> {
let (scale, timeunit_a) =
if let (DataType::Timestamp(timeunit_a, None), DataType::Timestamp(timeunit_b, None)) =
(lhs.data_type(), rhs.data_type())
{
(
temporal_conversions::timeunit_scale(*timeunit_a, *timeunit_b),
timeunit_a,
)
} else {
return Err(Error::InvalidArgumentError(
"sub_timestamps_scalar requires both arguments to be timestamps without timezone"
.to_string(),
));
};
let rhs = if let Some(value) = *rhs.value() {
value
} else {
return Ok(PrimitiveArray::<i64>::new_null(
lhs.data_type().clone(),
lhs.len(),
));
};
let op = move |a| a - (rhs as f64 * scale) as i64;
Ok(unary(lhs, op, DataType::Duration(*timeunit_a)))
}
pub fn add_interval(
timestamp: &PrimitiveArray<i64>,
interval: &PrimitiveArray<months_days_ns>,
) -> Result<PrimitiveArray<i64>> {
match timestamp.data_type().to_logical_type() {
DataType::Timestamp(time_unit, Some(timezone_str)) => {
let time_unit = *time_unit;
let timezone = temporal_conversions::parse_offset(timezone_str);
match timezone {
Ok(timezone) => Ok(binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
)),
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(timezone_str)?;
Ok(binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => Err(Error::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
))),
}
}
DataType::Timestamp(time_unit, None) => {
let time_unit = *time_unit;
Ok(binary(
timestamp,
interval,
timestamp.data_type().clone(),
|timestamp, interval| {
temporal_conversions::add_naive_interval(timestamp, time_unit, interval)
},
))
}
_ => Err(Error::InvalidArgumentError(
"Adding an interval is only supported for `DataType::Timestamp`".to_string(),
)),
}
}
pub fn add_interval_scalar(
timestamp: &PrimitiveArray<i64>,
interval: &PrimitiveScalar<months_days_ns>,
) -> Result<PrimitiveArray<i64>> {
let interval = if let Some(interval) = *interval.value() {
interval
} else {
return Ok(PrimitiveArray::<i64>::new_null(
timestamp.data_type().clone(),
timestamp.len(),
));
};
match timestamp.data_type().to_logical_type() {
DataType::Timestamp(time_unit, Some(timezone_str)) => {
let time_unit = *time_unit;
let timezone = temporal_conversions::parse_offset(timezone_str);
match timezone {
Ok(timezone) => Ok(unary(
timestamp,
|timestamp| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
timestamp.data_type().clone(),
)),
#[cfg(feature = "chrono-tz")]
Err(_) => {
let timezone = temporal_conversions::parse_offset_tz(timezone_str)?;
Ok(unary(
timestamp,
|timestamp| {
temporal_conversions::add_interval(
timestamp, time_unit, interval, &timezone,
)
},
timestamp.data_type().clone(),
))
}
#[cfg(not(feature = "chrono-tz"))]
_ => Err(Error::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
))),
}
}
DataType::Timestamp(time_unit, None) => {
let time_unit = *time_unit;
Ok(unary(
timestamp,
|timestamp| {
temporal_conversions::add_naive_interval(timestamp, time_unit, interval)
},
timestamp.data_type().clone(),
))
}
_ => Err(Error::InvalidArgumentError(
"Adding an interval is only supported for `DataType::Timestamp`".to_string(),
)),
}
}