use std::hash::Hash;
use num_traits::{AsPrimitive, Float, ToPrimitive};
use crate::datatypes::IntervalUnit;
use crate::error::Result;
use crate::offset::{Offset, Offsets};
use crate::types::{days_ms, f16, months_days_ns};
use crate::{
array::*,
bitmap::Bitmap,
compute::arity::unary,
datatypes::{DataType, TimeUnit},
temporal_conversions::*,
types::NativeType,
};
use super::CastOptions;
pub fn primitive_to_binary<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> BinaryArray<O> {
let mut values: Vec<u8> = Vec::with_capacity(from.len());
let mut offsets: Vec<O> = Vec::with_capacity(from.len() + 1);
offsets.push(O::default());
let mut offset: usize = 0;
unsafe {
for x in from.values().iter() {
values.reserve(offset + T::FORMATTED_SIZE_DECIMAL);
let bytes = std::slice::from_raw_parts_mut(
values.as_mut_ptr().add(offset),
values.capacity() - offset,
);
let len = lexical_core::write_unchecked(*x, bytes).len();
offset += len;
offsets.push(O::from_usize(offset).unwrap());
}
values.set_len(offset);
values.shrink_to_fit();
let offsets = unsafe { Offsets::new_unchecked(offsets) };
BinaryArray::<O>::new(
BinaryArray::<O>::default_data_type(),
offsets.into(),
values.into(),
from.validity().cloned(),
)
}
}
pub(super) fn primitive_to_binary_dyn<T, O>(from: &dyn Array) -> Result<Box<dyn Array>>
where
O: Offset,
T: NativeType + lexical_core::ToLexical,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(primitive_to_binary::<T, O>(from)))
}
pub fn primitive_to_boolean<T: NativeType>(
from: &PrimitiveArray<T>,
to_type: DataType,
) -> BooleanArray {
let iter = from.values().iter().map(|v| *v != T::default());
let values = Bitmap::from_trusted_len_iter(iter);
BooleanArray::new(to_type, values, from.validity().cloned())
}
pub(super) fn primitive_to_boolean_dyn<T>(
from: &dyn Array,
to_type: DataType,
) -> Result<Box<dyn Array>>
where
T: NativeType,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(primitive_to_boolean::<T>(from, to_type)))
}
pub fn primitive_to_utf8<T: NativeType + lexical_core::ToLexical, O: Offset>(
from: &PrimitiveArray<T>,
) -> Utf8Array<O> {
let mut values: Vec<u8> = Vec::with_capacity(from.len());
let mut offsets: Vec<O> = Vec::with_capacity(from.len() + 1);
offsets.push(O::default());
let mut offset: usize = 0;
unsafe {
for x in from.values().iter() {
values.reserve(offset + T::FORMATTED_SIZE_DECIMAL);
let bytes = std::slice::from_raw_parts_mut(
values.as_mut_ptr().add(offset),
values.capacity() - offset,
);
let len = lexical_core::write_unchecked(*x, bytes).len();
offset += len;
offsets.push(O::from_usize(offset).unwrap());
}
values.set_len(offset);
values.shrink_to_fit();
let offsets = unsafe { Offsets::new_unchecked(offsets) };
Utf8Array::<O>::new_unchecked(
Utf8Array::<O>::default_data_type(),
offsets.into(),
values.into(),
from.validity().cloned(),
)
}
}
pub(super) fn primitive_to_utf8_dyn<T, O>(from: &dyn Array) -> Result<Box<dyn Array>>
where
O: Offset,
T: NativeType + lexical_core::ToLexical,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(primitive_to_utf8::<T, O>(from)))
}
pub(super) fn primitive_to_primitive_dyn<I, O>(
from: &dyn Array,
to_type: &DataType,
options: CastOptions,
) -> Result<Box<dyn Array>>
where
I: NativeType + num_traits::NumCast + num_traits::AsPrimitive<O>,
O: NativeType + num_traits::NumCast,
{
let from = from.as_any().downcast_ref::<PrimitiveArray<I>>().unwrap();
if options.wrapped {
Ok(Box::new(primitive_as_primitive::<I, O>(from, to_type)))
} else {
Ok(Box::new(primitive_to_primitive::<I, O>(from, to_type)))
}
}
pub fn primitive_to_primitive<I, O>(
from: &PrimitiveArray<I>,
to_type: &DataType,
) -> PrimitiveArray<O>
where
I: NativeType + num_traits::NumCast,
O: NativeType + num_traits::NumCast,
{
let iter = from
.iter()
.map(|v| v.and_then(|x| num_traits::cast::cast::<I, O>(*x)));
PrimitiveArray::<O>::from_trusted_len_iter(iter).to(to_type.clone())
}
pub fn integer_to_decimal<T: NativeType + AsPrimitive<i128>>(
from: &PrimitiveArray<T>,
to_precision: usize,
to_scale: usize,
) -> PrimitiveArray<i128> {
let multiplier = 10_i128.pow(to_scale as u32);
let min_for_precision = 9_i128
.saturating_pow(1 + to_precision as u32)
.saturating_neg();
let max_for_precision = 9_i128.saturating_pow(1 + to_precision as u32);
let values = from.iter().map(|x| {
x.and_then(|x| {
x.as_().checked_mul(multiplier).and_then(|x| {
if x > max_for_precision || x < min_for_precision {
None
} else {
Some(x)
}
})
})
});
PrimitiveArray::<i128>::from_trusted_len_iter(values)
.to(DataType::Decimal(to_precision, to_scale))
}
pub(super) fn integer_to_decimal_dyn<T>(
from: &dyn Array,
precision: usize,
scale: usize,
) -> Result<Box<dyn Array>>
where
T: NativeType + AsPrimitive<i128>,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(integer_to_decimal::<T>(from, precision, scale)))
}
pub fn float_to_decimal<T>(
from: &PrimitiveArray<T>,
to_precision: usize,
to_scale: usize,
) -> PrimitiveArray<i128>
where
T: NativeType + Float + ToPrimitive,
f64: AsPrimitive<T>,
{
let multiplier: T = (10_f64).powi(to_scale as i32).as_();
let min_for_precision = 9_i128
.saturating_pow(1 + to_precision as u32)
.saturating_neg();
let max_for_precision = 9_i128.saturating_pow(1 + to_precision as u32);
let values = from.iter().map(|x| {
x.and_then(|x| {
let x = (*x * multiplier).to_i128().unwrap();
if x > max_for_precision || x < min_for_precision {
None
} else {
Some(x)
}
})
});
PrimitiveArray::<i128>::from_trusted_len_iter(values)
.to(DataType::Decimal(to_precision, to_scale))
}
pub(super) fn float_to_decimal_dyn<T>(
from: &dyn Array,
precision: usize,
scale: usize,
) -> Result<Box<dyn Array>>
where
T: NativeType + Float + ToPrimitive,
f64: AsPrimitive<T>,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(float_to_decimal::<T>(from, precision, scale)))
}
pub fn primitive_as_primitive<I, O>(
from: &PrimitiveArray<I>,
to_type: &DataType,
) -> PrimitiveArray<O>
where
I: NativeType + num_traits::AsPrimitive<O>,
O: NativeType,
{
unary(from, num_traits::AsPrimitive::<O>::as_, to_type.clone())
}
pub fn primitive_to_same_primitive<T>(
from: &PrimitiveArray<T>,
to_type: &DataType,
) -> PrimitiveArray<T>
where
T: NativeType,
{
PrimitiveArray::<T>::new(
to_type.clone(),
from.values().clone(),
from.validity().cloned(),
)
}
pub(super) fn primitive_to_same_primitive_dyn<T>(
from: &dyn Array,
to_type: &DataType,
) -> Result<Box<dyn Array>>
where
T: NativeType,
{
let from = from.as_any().downcast_ref().unwrap();
Ok(Box::new(primitive_to_same_primitive::<T>(from, to_type)))
}
pub(super) fn primitive_to_dictionary_dyn<T: NativeType + Eq + Hash, K: DictionaryKey>(
from: &dyn Array,
) -> Result<Box<dyn Array>> {
let from = from.as_any().downcast_ref().unwrap();
primitive_to_dictionary::<T, K>(from).map(|x| Box::new(x) as Box<dyn Array>)
}
pub fn primitive_to_dictionary<T: NativeType + Eq + Hash, K: DictionaryKey>(
from: &PrimitiveArray<T>,
) -> Result<DictionaryArray<K>> {
let iter = from.iter().map(|x| x.copied());
let mut array = MutableDictionaryArray::<K, _>::try_empty(MutablePrimitiveArray::<T>::from(
from.data_type().clone(),
))?;
array.try_extend(iter)?;
Ok(array.into())
}
const fn time_unit_multiple(unit: TimeUnit) -> i64 {
match unit {
TimeUnit::Second => 1,
TimeUnit::Millisecond => MILLISECONDS,
TimeUnit::Microsecond => MICROSECONDS,
TimeUnit::Nanosecond => NANOSECONDS,
}
}
pub fn date32_to_date64(from: &PrimitiveArray<i32>) -> PrimitiveArray<i64> {
unary(from, |x| x as i64 * MILLISECONDS_IN_DAY, DataType::Date64)
}
pub fn date64_to_date32(from: &PrimitiveArray<i64>) -> PrimitiveArray<i32> {
unary(from, |x| (x / MILLISECONDS_IN_DAY) as i32, DataType::Date32)
}
pub fn time32s_to_time32ms(from: &PrimitiveArray<i32>) -> PrimitiveArray<i32> {
unary(from, |x| x * 1000, DataType::Time32(TimeUnit::Millisecond))
}
pub fn time32ms_to_time32s(from: &PrimitiveArray<i32>) -> PrimitiveArray<i32> {
unary(from, |x| x / 1000, DataType::Time32(TimeUnit::Second))
}
pub fn time64us_to_time64ns(from: &PrimitiveArray<i64>) -> PrimitiveArray<i64> {
unary(from, |x| x * 1000, DataType::Time64(TimeUnit::Nanosecond))
}
pub fn time64ns_to_time64us(from: &PrimitiveArray<i64>) -> PrimitiveArray<i64> {
unary(from, |x| x / 1000, DataType::Time64(TimeUnit::Microsecond))
}
pub fn timestamp_to_date64(from: &PrimitiveArray<i64>, from_unit: TimeUnit) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = MILLISECONDS;
let to_type = DataType::Date64;
match to_size.cmp(&from_size) {
std::cmp::Ordering::Less => unary(from, |x| (x / (from_size / to_size)), to_type),
std::cmp::Ordering::Equal => primitive_to_same_primitive(from, &to_type),
std::cmp::Ordering::Greater => unary(from, |x| (x * (to_size / from_size)), to_type),
}
}
pub fn timestamp_to_date32(from: &PrimitiveArray<i64>, from_unit: TimeUnit) -> PrimitiveArray<i32> {
let from_size = time_unit_multiple(from_unit) * SECONDS_IN_DAY;
unary(from, |x| (x / from_size) as i32, DataType::Date32)
}
pub fn time32_to_time64(
from: &PrimitiveArray<i32>,
from_unit: TimeUnit,
to_unit: TimeUnit,
) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
let divisor = to_size / from_size;
unary(from, |x| (x as i64 * divisor), DataType::Time64(to_unit))
}
pub fn time64_to_time32(
from: &PrimitiveArray<i64>,
from_unit: TimeUnit,
to_unit: TimeUnit,
) -> PrimitiveArray<i32> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
let divisor = from_size / to_size;
unary(from, |x| (x / divisor) as i32, DataType::Time32(to_unit))
}
pub fn timestamp_to_timestamp(
from: &PrimitiveArray<i64>,
from_unit: TimeUnit,
to_unit: TimeUnit,
tz: &Option<String>,
) -> PrimitiveArray<i64> {
let from_size = time_unit_multiple(from_unit);
let to_size = time_unit_multiple(to_unit);
let to_type = DataType::Timestamp(to_unit, tz.clone());
if from_size >= to_size {
unary(from, |x| (x / (from_size / to_size)), to_type)
} else {
unary(from, |x| (x * (to_size / from_size)), to_type)
}
}
fn timestamp_to_utf8_impl<O: Offset, T: chrono::TimeZone>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone: T,
) -> Utf8Array<O>
where
T::Offset: std::fmt::Display,
{
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ns_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_naive_utc_and_offset(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_us_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_naive_utc_and_offset(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_ms_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_naive_utc_and_offset(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.map(|x| {
let datetime = timestamp_s_to_datetime(*x);
let offset = timezone.offset_from_utc_datetime(&datetime);
chrono::DateTime::<T>::from_naive_utc_and_offset(datetime, offset).to_rfc3339()
})
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}
#[cfg(feature = "chrono-tz")]
#[cfg_attr(docsrs, doc(cfg(feature = "chrono-tz")))]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset_tz(timezone_str)?;
Ok(timestamp_to_utf8_impl::<O, chrono_tz::Tz>(
from, time_unit, timezone,
))
}
#[cfg(not(feature = "chrono-tz"))]
fn chrono_tz_timestamp_to_utf8<O: Offset>(
_: &PrimitiveArray<i64>,
_: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
use crate::error::Error;
Err(Error::InvalidArgumentError(format!(
"timezone \"{}\" cannot be parsed (feature chrono-tz is not active)",
timezone_str
)))
}
pub fn timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
timezone_str: &str,
) -> Result<Utf8Array<O>> {
let timezone = parse_offset(timezone_str);
if let Ok(timezone) = timezone {
Ok(timestamp_to_utf8_impl::<O, chrono::FixedOffset>(
from, time_unit, timezone,
))
} else {
chrono_tz_timestamp_to_utf8(from, time_unit, timezone_str)
}
}
pub fn naive_timestamp_to_utf8<O: Offset>(
from: &PrimitiveArray<i64>,
time_unit: TimeUnit,
) -> Utf8Array<O> {
match time_unit {
TimeUnit::Nanosecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ns_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Microsecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_us_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Millisecond => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_ms_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
TimeUnit::Second => {
let iter = from.iter().map(|x| {
x.copied()
.map(timestamp_s_to_datetime)
.map(|x| x.to_string())
});
Utf8Array::from_trusted_len_iter(iter)
}
}
}
#[inline]
fn days_ms_to_months_days_ns_scalar(from: days_ms) -> months_days_ns {
months_days_ns::new(0, from.days(), from.milliseconds() as i64 * 1000)
}
pub fn days_ms_to_months_days_ns(from: &PrimitiveArray<days_ms>) -> PrimitiveArray<months_days_ns> {
unary(
from,
days_ms_to_months_days_ns_scalar,
DataType::Interval(IntervalUnit::MonthDayNano),
)
}
#[inline]
fn months_to_months_days_ns_scalar(from: i32) -> months_days_ns {
months_days_ns::new(from, 0, 0)
}
pub fn months_to_months_days_ns(from: &PrimitiveArray<i32>) -> PrimitiveArray<months_days_ns> {
unary(
from,
months_to_months_days_ns_scalar,
DataType::Interval(IntervalUnit::MonthDayNano),
)
}
pub fn f16_to_f32(from: &PrimitiveArray<f16>) -> PrimitiveArray<f32> {
unary(from, |x| x.to_f32(), DataType::Float32)
}