use std::fmt::{Display, Formatter, Result as FmtResult};
#[cfg(feature = "datetime")]
use crate::Buffer;
use crate::enums::shape_dim::ShapeDim;
use crate::enums::time_units::TimeUnit;
use crate::traits::concatenate::Concatenate;
use crate::traits::masked_array::MaskedArray;
use crate::traits::shape::Shape;
use crate::traits::type_unions::Integer;
use crate::utils::validate_null_mask_len;
use crate::{
Bitmask, Length, Offset, impl_arc_masked_array, impl_array_ref_deref, impl_masked_array,
};
use ::vec64::{Vec64, Vec64Alloc};
pub mod datetime_ops;
pub mod tz;
pub const UNIX_EPOCH_JULIAN_DAY: i64 = 2_440_588;
#[cfg(feature = "datetime")]
#[repr(C, align(64))]
#[derive(PartialEq, Clone, Debug, Default)]
pub struct DatetimeArray<T> {
pub data: Buffer<T>,
pub null_mask: Option<Bitmask>,
pub time_unit: TimeUnit,
}
impl<T: Integer> DatetimeArray<T> {
#[inline]
pub fn new(
data: impl Into<Buffer<T>>,
null_mask: Option<Bitmask>,
time_unit: Option<TimeUnit>,
) -> Self {
let data: Buffer<T> = data.into();
validate_null_mask_len(data.len(), &null_mask);
Self {
data: data.into(),
null_mask: null_mask,
time_unit: time_unit.unwrap_or_default(),
}
}
#[inline]
pub fn with_capacity(cap: usize, null_mask: bool, time_unit: Option<TimeUnit>) -> Self {
Self {
data: Vec64::with_capacity(cap).into(),
null_mask: if null_mask {
Some(Bitmask::with_capacity(cap))
} else {
None
},
time_unit: time_unit.unwrap_or_default(),
}
}
#[inline]
pub fn with_default_unit(time_unit: Option<TimeUnit>) -> Self {
Self {
data: Vec64::new().into(),
null_mask: None,
time_unit: time_unit.unwrap_or_default(),
}
}
#[inline]
pub fn from_slice(slice: &[T], time_unit: Option<TimeUnit>) -> Self {
Self {
data: Vec64(slice.to_vec_in(Vec64Alloc::default())).into(),
null_mask: None,
time_unit: time_unit.unwrap_or_default(),
}
}
#[inline]
pub fn value(&self, idx: usize) -> Option<T> {
if idx >= self.len() {
None
} else if self.is_null(idx) {
None
} else {
Some(self.data[idx])
}
}
#[inline]
pub fn values(&self) -> &Buffer<T> {
&self.data
}
#[inline]
pub fn from_vec64(
data: Vec64<T>,
null_mask: Option<Bitmask>,
time_unit: Option<TimeUnit>,
) -> Self {
Self {
data: data.into(),
null_mask,
time_unit: time_unit.unwrap_or_default(),
}
}
#[inline]
pub fn from_vec(data: Vec<T>, null_mask: Option<Bitmask>, time_unit: Option<TimeUnit>) -> Self {
Self::from_vec64(data.into(), null_mask, time_unit)
}
}
#[cfg(feature = "datetime_ops")]
impl DatetimeArray<i64> {
#[inline]
pub fn as_datetime(&self, idx: usize) -> Option<time::OffsetDateTime> {
use time::OffsetDateTime;
self.value(idx).and_then(|val| {
match self.time_unit {
TimeUnit::Seconds => OffsetDateTime::from_unix_timestamp(val).ok(),
TimeUnit::Milliseconds => {
OffsetDateTime::from_unix_timestamp_nanos((val as i128) * 1_000_000).ok()
}
TimeUnit::Microseconds => {
OffsetDateTime::from_unix_timestamp_nanos((val as i128) * 1_000).ok()
}
TimeUnit::Nanoseconds => {
OffsetDateTime::from_unix_timestamp_nanos(val as i128).ok()
}
TimeUnit::Days => {
time::Date::from_julian_day((val + UNIX_EPOCH_JULIAN_DAY) as i32) .ok()
.and_then(|date| date.with_hms(0, 0, 0).ok())
.map(|dt| dt.assume_utc())
}
}
})
}
#[inline]
pub fn tuple_dt(&self, idx: usize) -> Option<(i32, u32, u32, u32, u32, u32, u32, u32)> {
self.as_datetime(idx).map(|dt| {
(
dt.year(),
dt.month() as u32,
dt.day() as u32,
dt.hour() as u32,
dt.minute() as u32,
dt.second() as u32,
dt.nanosecond() / 1_000_000, dt.nanosecond(),
)
})
}
#[inline]
pub fn as_date(&self, idx: usize) -> Option<time::Date> {
self.as_datetime(idx).map(|dt| dt.date())
}
#[inline]
pub fn as_time(&self, idx: usize) -> Option<time::Time> {
self.as_datetime(idx).map(|dt| dt.time())
}
#[cfg(feature = "datetime")]
pub fn tz(&self, tz: &str) -> crate::FieldArray
where
Self: Into<crate::Array>,
{
use crate::FieldArray;
use crate::ffi::arrow_dtype::ArrowType;
let array: crate::Array = self.clone().into();
FieldArray::from_parts(
"datetime",
ArrowType::Timestamp(self.time_unit, Some(tz.to_string())),
Some(self.is_nullable()),
None,
array,
)
}
}
impl_masked_array!(DatetimeArray, Integer, Buffer<T>, T, time_unit);
impl_array_ref_deref!(DatetimeArray<T>);
impl_arc_masked_array!(
Inner = DatetimeArray<T>,
T = T,
Container = Buffer<T>,
LogicalType = T,
CopyType = T,
BufferT = T,
Variant = TemporalArray,
Bound = Integer,
);
#[cfg(feature = "datetime")]
impl<T> Display for DatetimeArray<T>
where
T: Integer + Display,
{
fn fmt(&self, f: &mut Formatter<'_>) -> FmtResult {
use crate::traits::print::MAX_PREVIEW;
let len = self.len();
let nulls = self.null_count();
writeln!(
f,
"DatetimeArray [{} values] (dtype: datetime[{:?}], nulls: {})",
len, self.time_unit, nulls
)?;
write!(f, "[")?;
#[cfg(feature = "datetime_ops")]
{
use time::OffsetDateTime;
for i in 0..usize::min(len, MAX_PREVIEW) {
if i > 0 {
write!(f, ", ")?;
}
match self.value(i) {
Some(val) => match self.time_unit {
TimeUnit::Seconds => {
let v = val.to_i64().unwrap();
match OffsetDateTime::from_unix_timestamp(v) {
Ok(dt) => write!(f, "{}", dt),
Err(_) => write!(f, "{}s", val),
}
}
TimeUnit::Milliseconds => {
let v = val.to_i64().unwrap();
match OffsetDateTime::from_unix_timestamp_nanos((v as i128) * 1_000_000)
{
Ok(dt) => write!(f, "{}", dt),
Err(_) => write!(f, "{}ms", val),
}
}
TimeUnit::Microseconds => {
let v = val.to_i64().unwrap();
match OffsetDateTime::from_unix_timestamp_nanos((v as i128) * 1_000) {
Ok(dt) => write!(f, "{}", dt),
Err(_) => write!(f, "{}µs", val),
}
}
TimeUnit::Nanoseconds => {
let v = val.to_i64().unwrap();
match OffsetDateTime::from_unix_timestamp_nanos(v as i128) {
Ok(dt) => write!(f, "{}", dt),
Err(_) => write!(f, "{}ns", val),
}
}
TimeUnit::Days => {
let days = val.to_i64().unwrap();
match time::Date::from_julian_day((days + UNIX_EPOCH_JULIAN_DAY) as i32)
{
Ok(date) => write!(f, "{}", date),
Err(_) => write!(f, "{}d", val),
}
}
},
None => write!(f, "null"),
}?;
}
}
#[cfg(not(feature = "datetime_ops"))]
{
let suffix = match self.time_unit {
TimeUnit::Seconds => "s",
TimeUnit::Milliseconds => "ms",
TimeUnit::Microseconds => "µs",
TimeUnit::Nanoseconds => "ns",
TimeUnit::Days => "d",
};
for i in 0..usize::min(len, MAX_PREVIEW) {
if i > 0 {
write!(f, ", ")?;
}
match self.value(i) {
Some(val) => write!(f, "{}{}", val, suffix)?,
None => write!(f, "null")?,
}
}
}
if len > MAX_PREVIEW {
write!(f, ", … ({} total)", len)?;
}
write!(f, "]")
}
}
impl<T: Integer> Shape for DatetimeArray<T> {
fn shape(&self) -> ShapeDim {
ShapeDim::Rank1(self.len())
}
}
impl<T: Integer> Concatenate for DatetimeArray<T> {
fn concat(
mut self,
other: Self,
) -> core::result::Result<Self, crate::enums::error::MinarrowError> {
if self.time_unit != other.time_unit {
return Err(crate::enums::error::MinarrowError::IncompatibleTypeError {
from: "DatetimeArray",
to: "DatetimeArray",
message: Some(format!(
"Cannot concatenate DatetimeArrays with different time units: {:?} and {:?}",
self.time_unit, other.time_unit
)),
});
}
self.append_array(&other);
Ok(self)
}
}
#[cfg(feature = "datetime")]
#[cfg(test)]
mod tests {
use super::*;
use crate::vec64;
#[test]
fn test_new_and_with_capacity() {
let arr = DatetimeArray::<i64>::default();
assert_eq!(arr.len(), 0);
assert!(arr.data.is_empty());
assert!(arr.null_mask.is_none());
let arr = DatetimeArray::<i64>::with_capacity(50, true, None);
assert_eq!(arr.len(), 0);
assert!(arr.data.capacity() >= 50);
assert!(arr.null_mask.is_some());
assert!(arr.null_mask.as_ref().unwrap().capacity() >= 50);
}
#[test]
fn test_push_and_value_no_null_mask() {
let mut arr = DatetimeArray::<i64>::with_capacity(2, false, None);
arr.push(123);
arr.push(456);
assert_eq!(arr.len(), 2);
assert_eq!(arr.value(0), Some(123));
assert_eq!(arr.value(1), Some(456));
assert!(!arr.is_null(0));
assert!(!arr.is_null(1));
}
#[test]
fn test_push_and_value_with_null_mask() {
let mut arr = DatetimeArray::<i64>::with_capacity(3, true, None);
arr.push(1234);
arr.push(5678);
arr.push_null();
assert_eq!(arr.len(), 3);
assert_eq!(arr.value(0), Some(1234));
assert_eq!(arr.value(1), Some(5678));
assert_eq!(arr.value(2), None);
assert!(arr.is_null(2));
}
#[test]
fn test_push_null_auto_mask() {
let mut arr = DatetimeArray::<i64>::default();
arr.push_null();
assert_eq!(arr.len(), 1);
assert!(arr.is_null(0));
assert!(arr.null_mask.is_some());
assert_eq!(arr.value(0), None);
}
#[test]
fn test_set_and_set_null() {
let mut arr = DatetimeArray::<i64>::with_capacity(3, true, None);
arr.push(1);
arr.push(2);
arr.push(3);
arr.set(1, 99);
assert_eq!(arr.value(1), Some(99));
arr.set_null(1);
assert_eq!(arr.value(1), None);
assert!(arr.is_null(1));
}
#[test]
fn test_values_method() {
let mut arr = DatetimeArray::<i32>::with_capacity(3, false, None);
arr.push(10);
arr.push(20);
arr.push(30);
assert_eq!(arr.values(), &vec64![10, 20, 30]);
}
#[test]
fn test_out_of_bounds_value() {
let mut arr = DatetimeArray::<i64>::with_capacity(1, true, None);
arr.push(9);
assert_eq!(arr.value(1), None);
assert_eq!(arr.value(100), None);
}
#[test]
fn test_is_empty() {
let arr = DatetimeArray::<i64>::default();
assert!(arr.is_empty());
let mut arr = DatetimeArray::<i64>::with_capacity(2, true, None);
arr.push(0);
assert!(!arr.is_empty());
}
#[test]
fn test_push_nulls_bulk() {
let mut arr = DatetimeArray::<i64>::with_capacity(4, true, None);
arr.push(100);
arr.push_nulls(5);
assert_eq!(arr.len(), 6);
assert_eq!(arr.value(0), Some(100));
for i in 1..6 {
assert_eq!(arr.value(i), None);
}
}
#[test]
fn test_masked_and_mutable_trait() {
let mut arr = DatetimeArray::<i64>::with_capacity(3, true, None);
arr.push(1);
arr.push(2);
arr.push(3);
assert_eq!(arr.get(0), Some(1));
assert_eq!(arr.get(2), Some(3));
arr.set_null(0);
assert_eq!(arr.get(0), None);
arr.set(2, 88);
assert_eq!(arr.get(2), Some(88));
}
#[test]
fn test_masked_trait_no_mask() {
let mut arr = DatetimeArray::<i64>::with_capacity(2, false, None);
arr.push(3);
arr.push(5);
assert_eq!(arr.get(0), Some(3));
assert!(!arr.is_null(1));
}
#[test]
fn test_unsigned_type() {
let mut arr = DatetimeArray::<u64>::with_capacity(3, true, None);
arr.push(1);
arr.push(2);
arr.push_null();
assert_eq!(arr.value(0), Some(1));
assert_eq!(arr.value(2), None);
}
#[cfg(feature = "datetime_ops")]
#[test]
fn test_as_datetime_and_tuple_dt() {
use time::OffsetDateTime;
let mut arr = DatetimeArray::<i64>::with_capacity(2, true, Some(TimeUnit::Milliseconds));
let ms = 1_700_000_000_000;
arr.push(ms);
arr.push_null();
let dt = arr.as_datetime(0).unwrap();
let expected = OffsetDateTime::from_unix_timestamp_nanos((ms as i128) * 1_000_000).unwrap();
assert_eq!(dt, expected);
let tuple = arr.tuple_dt(0).unwrap();
assert_eq!(tuple.0, dt.year());
assert_eq!(tuple.1, dt.month() as u32);
assert_eq!(tuple.2, dt.day() as u32);
assert_eq!(tuple.3, dt.hour() as u32);
assert_eq!(tuple.4, dt.minute() as u32);
assert_eq!(tuple.5, dt.second() as u32);
assert_eq!(tuple.6, dt.nanosecond() / 1_000_000);
assert_eq!(tuple.7, dt.nanosecond());
assert!(arr.as_datetime(1).is_none());
assert!(arr.tuple_dt(1).is_none());
}
#[cfg(feature = "datetime_ops")]
#[test]
fn test_as_date_and_as_time() {
use time::Month;
let mut arr = DatetimeArray::<i64>::with_capacity(1, false, Some(TimeUnit::Seconds));
let timestamp = 1_700_000_000; arr.push(timestamp);
let date = arr.as_date(0).unwrap();
assert_eq!(date.year(), 2023);
assert_eq!(date.month(), Month::November);
assert_eq!(date.day(), 14);
let time = arr.as_time(0).unwrap();
assert_eq!(time.hour(), 22);
assert_eq!(time.minute(), 13);
assert_eq!(time.second(), 20);
}
#[test]
fn test_datetime_array_slice() {
use crate::enums::time_units::TimeUnit;
let mut arr = DatetimeArray::<i64>::with_capacity(5, true, Some(TimeUnit::Milliseconds));
arr.push(1000);
arr.push(2000);
arr.push(3000);
arr.push_null(); arr.push(5000);
let sliced = arr.slice_clone(1, 3);
assert_eq!(sliced.len(), 3);
assert_eq!(sliced.value(0), Some(2000));
assert_eq!(sliced.value(1), Some(3000));
assert_eq!(sliced.value(2), None);
assert!(sliced.is_null(2));
assert_eq!(sliced.null_count(), 1);
assert_eq!(sliced.time_unit, TimeUnit::Milliseconds);
}
#[test]
fn test_batch_extend_from_iter_with_capacity() {
let mut arr = DatetimeArray::<i64>::with_default_unit(Some(TimeUnit::Seconds));
let data: Vec<i64> = (1_000_000_000..1_000_000_100).collect();
arr.extend_from_iter_with_capacity(data.into_iter(), 100);
assert_eq!(arr.len(), 100);
for i in 0..100 {
assert_eq!(arr.value(i), Some(1_000_000_000 + i as i64));
}
assert_eq!(arr.time_unit, TimeUnit::Seconds);
}
#[test]
fn test_batch_extend_from_slice() {
let mut arr = DatetimeArray::<i32>::with_capacity(10, true, Some(TimeUnit::Milliseconds));
arr.push(1000);
arr.push_null();
let data = &[2000i32, 3000, 4000];
arr.extend_from_slice(data);
assert_eq!(arr.len(), 5);
assert_eq!(arr.value(0), Some(1000));
assert_eq!(arr.value(1), None);
assert_eq!(arr.value(2), Some(2000));
assert_eq!(arr.value(3), Some(3000));
assert_eq!(arr.value(4), Some(4000));
assert!(arr.null_count() >= 1); assert_eq!(arr.time_unit, TimeUnit::Milliseconds);
}
#[test]
fn test_batch_fill_datetime() {
let timestamp = 1_700_000_000i64; let arr = DatetimeArray::<i64>::fill(timestamp, 200);
assert_eq!(arr.len(), 200);
assert_eq!(arr.null_count(), 0);
for i in 0..200 {
assert_eq!(arr.value(i), Some(timestamp));
}
assert!(matches!(
arr.time_unit,
TimeUnit::Nanoseconds
| TimeUnit::Milliseconds
| TimeUnit::Seconds
| TimeUnit::Microseconds
| TimeUnit::Days
));
}
#[test]
fn test_batch_operations_different_time_units() {
let mut arr_micro = DatetimeArray::<i64>::with_default_unit(Some(TimeUnit::Microseconds));
let micro_data = &[1_000_000i64, 2_000_000, 3_000_000];
arr_micro.extend_from_slice(micro_data);
assert_eq!(arr_micro.len(), 3);
assert_eq!(arr_micro.time_unit, TimeUnit::Microseconds);
for (i, &expected) in micro_data.iter().enumerate() {
assert_eq!(arr_micro.value(i), Some(expected));
}
}
#[test]
fn test_batch_fill_preserves_time_unit() {
let _arr = DatetimeArray::<i32>::with_default_unit(Some(TimeUnit::Milliseconds));
let filled = DatetimeArray::<i32>::fill(1000, 50);
assert!(matches!(
filled.time_unit,
TimeUnit::Nanoseconds
| TimeUnit::Milliseconds
| TimeUnit::Seconds
| TimeUnit::Microseconds
| TimeUnit::Days
));
assert_eq!(filled.len(), 50);
}
#[test]
fn test_batch_operations_large_timestamps() {
let mut arr = DatetimeArray::<i64>::with_default_unit(Some(TimeUnit::Nanoseconds));
let large_timestamps: Vec<i64> = (0..10).map(|i| 1_000_000_000_000_000_000 + i).collect();
arr.extend_from_iter_with_capacity(large_timestamps.into_iter(), 10);
assert_eq!(arr.len(), 10);
for i in 0..10 {
assert_eq!(arr.value(i), Some(1_000_000_000_000_000_000 + i as i64));
}
}
#[test]
fn test_datetime_array_concat() {
use crate::traits::concatenate::Concatenate;
let arr1 =
DatetimeArray::<i64>::from_slice(&[1000, 2000, 3000], Some(TimeUnit::Milliseconds));
let arr2 = DatetimeArray::<i64>::from_slice(&[4000, 5000], Some(TimeUnit::Milliseconds));
let result = arr1.concat(arr2).unwrap();
assert_eq!(result.len(), 5);
assert_eq!(result.value(0), Some(1000));
assert_eq!(result.value(1), Some(2000));
assert_eq!(result.value(2), Some(3000));
assert_eq!(result.value(3), Some(4000));
assert_eq!(result.value(4), Some(5000));
assert_eq!(result.time_unit, TimeUnit::Milliseconds);
}
#[test]
fn test_datetime_array_concat_with_nulls() {
use crate::traits::concatenate::Concatenate;
let mut arr1 = DatetimeArray::<i64>::with_capacity(3, true, Some(TimeUnit::Seconds));
arr1.push(100);
arr1.push_null();
arr1.push(300);
let mut arr2 = DatetimeArray::<i64>::with_capacity(2, true, Some(TimeUnit::Seconds));
arr2.push(400);
arr2.push_null();
let result = arr1.concat(arr2).unwrap();
assert_eq!(result.len(), 5);
assert_eq!(result.value(0), Some(100));
assert_eq!(result.value(1), None);
assert_eq!(result.value(2), Some(300));
assert_eq!(result.value(3), Some(400));
assert_eq!(result.value(4), None);
assert_eq!(result.null_count(), 2);
assert_eq!(result.time_unit, TimeUnit::Seconds);
}
#[test]
fn test_datetime_array_concat_incompatible_time_units() {
use crate::traits::concatenate::Concatenate;
let arr1 = DatetimeArray::<i64>::from_slice(&[1000, 2000], Some(TimeUnit::Milliseconds));
let arr2 = DatetimeArray::<i64>::from_slice(&[3, 4], Some(TimeUnit::Seconds));
let result = arr1.concat(arr2);
assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
crate::enums::error::MinarrowError::IncompatibleTypeError { .. }
));
}
}
#[cfg(test)]
#[cfg(feature = "parallel_proc")]
mod parallel_tests {
use rayon::prelude::*;
use super::*;
use crate::enums::time_units::TimeUnit;
#[test]
fn test_datetimearray_par_iter_all_valid_refs() {
let arr =
DatetimeArray::<i64>::from_slice(&[1000, 2000, 3000], Some(TimeUnit::Milliseconds));
let out: Vec<&i64> = arr.par_iter().collect();
let expected: Vec<i64> = vec![1000, 2000, 3000];
for (i, val) in out.iter().enumerate() {
assert_eq!(**val, expected[i]);
}
}
#[test]
fn test_datetimearray_par_iter_nulls() {
let mut arr = DatetimeArray::<i64>::with_default_unit(Some(TimeUnit::Milliseconds));
arr.data.extend_from_slice(&[10, 20, 30]);
let mut mask = Bitmask::new_set_all(3, false);
mask.set_bits_chunk(0, 0b0000_0011, 3);
arr.null_mask = Some(mask);
let out: Vec<&i64> = arr.par_iter().collect();
let expected: Vec<i64> = vec![10, 20, 30];
for (i, val) in out.iter().enumerate() {
assert_eq!(**val, expected[i]);
}
}
#[test]
fn test_datetimearray_par_iter_opt_all_valid() {
let arr = DatetimeArray::<i64>::from_slice(&[111, 222, 333], Some(TimeUnit::Seconds));
let out: Vec<Option<&i64>> = arr.par_iter_opt().collect();
assert_eq!(out.len(), 3);
assert_eq!(out[0], Some(&111));
assert_eq!(out[1], Some(&222));
assert_eq!(out[2], Some(&333));
}
#[test]
fn test_datetimearray_par_iter_opt_with_nulls() {
let mut arr = DatetimeArray::<i64>::with_default_unit(Some(TimeUnit::Microseconds));
arr.data.extend_from_slice(&[77, 88, 99, 111]);
let mut mask = Bitmask::new_set_all(4, false);
mask.set_bits_chunk(0, 0b0000_1011, 4);
arr.null_mask = Some(mask);
let out: Vec<Option<&i64>> = arr.par_iter_opt().collect();
assert_eq!(out.len(), 4);
assert_eq!(out[0], Some(&77));
assert_eq!(out[1], Some(&88));
assert_eq!(out[2], None);
assert_eq!(out[3], Some(&111));
}
#[cfg(feature = "datetime")]
#[test]
fn test_datetimearray_par_iter_range_unchecked() {
let arr = DatetimeArray::<i64>::from_slice(
&[100, 200, 300, 400],
Some(crate::enums::time_units::TimeUnit::Milliseconds),
);
let out: Vec<&i64> = unsafe { arr.par_iter_range_unchecked(1, 3).collect() };
assert_eq!(*out[0], 200);
assert_eq!(*out[1], 300);
}
}