use arrow::array::{
ArrayRef, BooleanArray, PrimitiveArray,
cast::AsArray,
types::{
TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
TimestampSecondType,
},
};
use arrow::buffer::{BooleanBuffer, ScalarBuffer};
use arrow::datatypes::{ArrowPrimitiveType, Date32Type, Int32Type, UInt32Type};
use arrow_schema::{DataType, TimeUnit};
use bytes::Bytes;
use num_traits::AsPrimitive;
use std::ops::Range;
use std::sync::Arc;
use super::LiquidArray;
use super::primitive_array::LiquidPrimitiveArray;
use super::{LiquidDataType, LiquidSqueezedArray};
use crate::liquid_array::LiquidPrimitiveType;
use crate::liquid_array::SqueezeIoHandler;
use crate::liquid_array::raw::BitPackedArray;
use crate::utils::get_bit_width;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, serde::Serialize)]
pub enum Date32Field {
Year,
Month,
Day,
DayOfWeek,
}
#[derive(Debug, Clone)]
pub struct SqueezedDate32Array {
field: Date32Field,
bit_packed: BitPackedArray<UInt32Type>,
reference_value: i32,
original_data_type: DataType,
backing: Option<SqueezedBacking>,
}
#[derive(Debug, Clone)]
struct SqueezedBacking {
io: Arc<dyn SqueezeIoHandler>,
disk_range: Range<u64>,
}
impl SqueezedDate32Array {
pub fn from_liquid_date32<T: LiquidPrimitiveType>(
array: &LiquidPrimitiveArray<T>,
field: Date32Field,
) -> Self {
let arrow_array: PrimitiveArray<Date32Type> =
array.to_arrow_array().as_primitive::<Date32Type>().clone();
let (_dt, values, nulls) = arrow_array.into_parts();
let mut has_value = false;
let mut min_component: i32 = i32::MAX;
let mut max_component: i32 = i32::MIN;
if let Some(nulls_buf) = &nulls
&& nulls_buf.null_count() == values.len()
{
return Self {
field,
bit_packed: BitPackedArray::new_null_array(values.len()),
reference_value: 0,
original_data_type: DataType::Date32,
backing: None,
};
}
for (idx, &days) in values.iter().enumerate() {
if let Some(nulls_buf) = &nulls
&& nulls_buf.is_null(idx)
{
continue;
}
let comp = component_from_days(field, days);
has_value = true;
if comp < min_component {
min_component = comp;
}
if comp > max_component {
max_component = comp;
}
}
if !has_value {
return Self {
field,
bit_packed: BitPackedArray::new_null_array(values.len()),
reference_value: 0,
original_data_type: DataType::Date32,
backing: None,
};
}
let max_offset = (max_component as i64 - min_component as i64) as u64;
let bit_width = get_bit_width(max_offset);
let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
ScalarBuffer::from_iter((0..values.len()).map(|idx| {
if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
0u32
} else {
let comp = component_from_days(field, values[idx]);
(comp - min_component) as u32
}
}));
let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
Self {
field,
bit_packed,
reference_value: min_component,
original_data_type: DataType::Date32,
backing: None,
}
}
pub fn from_liquid_timestamp<T: LiquidPrimitiveType>(
array: &LiquidPrimitiveArray<T>,
field: Date32Field,
) -> Self {
let unit = timestamp_unit(&T::DATA_TYPE).expect("timestamp data type");
let arrow_array: PrimitiveArray<T> = array.to_arrow_array().as_primitive::<T>().clone();
let (_dt, values, nulls) = arrow_array.into_parts();
let mut has_value = false;
let mut min_component: i32 = i32::MAX;
let mut max_component: i32 = i32::MIN;
if let Some(nulls_buf) = &nulls
&& nulls_buf.null_count() == values.len()
{
return Self {
field,
bit_packed: BitPackedArray::new_null_array(values.len()),
reference_value: 0,
original_data_type: T::DATA_TYPE.clone(),
backing: None,
};
}
for (idx, &value) in values.iter().enumerate() {
if let Some(nulls_buf) = &nulls
&& nulls_buf.is_null(idx)
{
continue;
}
let days = timestamp_to_days_since_epoch(value.as_(), unit);
let comp = component_from_days(field, days);
has_value = true;
if comp < min_component {
min_component = comp;
}
if comp > max_component {
max_component = comp;
}
}
if !has_value {
return Self {
field,
bit_packed: BitPackedArray::new_null_array(values.len()),
reference_value: 0,
original_data_type: T::DATA_TYPE.clone(),
backing: None,
};
}
let max_offset = (max_component as i64 - min_component as i64) as u64;
let bit_width = get_bit_width(max_offset);
let offsets: ScalarBuffer<<UInt32Type as ArrowPrimitiveType>::Native> =
ScalarBuffer::from_iter((0..values.len()).map(|idx| {
if nulls.as_ref().is_some_and(|n| n.is_null(idx)) {
0u32
} else {
let days = timestamp_to_days_since_epoch(values[idx].as_(), unit);
let comp = component_from_days(field, days);
(comp - min_component) as u32
}
}));
let unsigned_array = PrimitiveArray::<UInt32Type>::new(offsets, nulls);
let bit_packed = BitPackedArray::from_primitive(unsigned_array, bit_width);
Self {
field,
bit_packed,
reference_value: min_component,
original_data_type: T::DATA_TYPE.clone(),
backing: None,
}
}
pub(crate) fn with_backing(
mut self,
io: Arc<dyn SqueezeIoHandler>,
disk_range: Range<u64>,
) -> Self {
self.backing = Some(SqueezedBacking { io, disk_range });
self
}
async fn read_backing(&self) -> Bytes {
let backing = self
.backing
.as_ref()
.expect("SqueezedDate32Array backing not set");
backing
.io
.read(Some(backing.disk_range.clone()))
.await
.expect("read squeezed backing")
}
pub fn len(&self) -> usize {
self.bit_packed.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn get_array_memory_size(&self) -> usize {
self.bit_packed.get_array_memory_size() + std::mem::size_of::<i32>()
}
pub fn field(&self) -> Date32Field {
self.field
}
pub fn to_component_array(&self) -> ArrayRef {
match &self.original_data_type {
DataType::Date32 => Arc::new(self.to_component_date32()) as ArrayRef,
DataType::Timestamp(unit, _) => self.to_component_timestamp(*unit),
_ => Arc::new(self.to_component_date32()) as ArrayRef,
}
}
pub fn to_component_date32(&self) -> PrimitiveArray<Date32Type> {
let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
let (_dt, values, nulls) = unsigned.into_parts();
let ref_v = self.reference_value;
let signed_values: ScalarBuffer<<Int32Type as ArrowPrimitiveType>::Native> =
ScalarBuffer::from_iter(values.iter().map(|&v| (v as i32).saturating_add(ref_v)));
PrimitiveArray::<Date32Type>::new(signed_values, nulls)
}
fn to_component_timestamp(&self, unit: TimeUnit) -> ArrayRef {
let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
let (_dt, values, nulls) = unsigned.into_parts();
let ref_v = self.reference_value;
let signed_values: ScalarBuffer<i64> =
ScalarBuffer::from_iter(values.iter().map(|&v| (v as i32 + ref_v) as i64));
match unit {
TimeUnit::Second => Arc::new(PrimitiveArray::<TimestampSecondType>::new(
signed_values,
nulls,
)),
TimeUnit::Millisecond => Arc::new(PrimitiveArray::<TimestampMillisecondType>::new(
signed_values.clone(),
nulls,
)),
TimeUnit::Microsecond => Arc::new(PrimitiveArray::<TimestampMicrosecondType>::new(
signed_values.clone(),
nulls,
)),
TimeUnit::Nanosecond => Arc::new(PrimitiveArray::<TimestampNanosecondType>::new(
signed_values,
nulls,
)),
}
}
pub fn to_arrow_date32_lossy(&self) -> PrimitiveArray<Date32Type> {
let unsigned: PrimitiveArray<UInt32Type> = self.bit_packed.to_primitive();
let (_dt, values, nulls) = unsigned.into_parts();
let ref_v = self.reference_value;
let days_values: ScalarBuffer<<Date32Type as ArrowPrimitiveType>::Native> =
ScalarBuffer::from_iter(values.iter().enumerate().map(|(i, &off)| {
if nulls.as_ref().is_some_and(|n| n.is_null(i)) {
0i32
} else {
match self.field {
Date32Field::Year => {
let y = ref_v + off as i32;
ymd_to_epoch_days(y, 1, 1)
}
Date32Field::Month => {
let m = (ref_v + off as i32) as u32;
ymd_to_epoch_days(1970, m, 1)
}
Date32Field::Day => {
let d = (ref_v + off as i32) as u32;
ymd_to_epoch_days(1970, 1, d)
}
Date32Field::DayOfWeek => {
let dow = ref_v + off as i32;
ymd_to_epoch_days(1970, 1, 4).saturating_add(dow)
}
}
}
}));
PrimitiveArray::<Date32Type>::new(days_values, nulls)
}
}
fn ymd_from_epoch_days(days_since_epoch: i32) -> (i32, u32, u32) {
let z = days_since_epoch as i64 + 719_468; let era = if z >= 0 {
z / 146_097
} else {
(z - 146_096) / 146_097
};
let doe = z - era * 146_097; let yoe = (doe - doe / 1_460 + doe / 36_524 - doe / 146_096) / 365; let mut y = yoe + era * 400;
let doy = doe - (365 * yoe + yoe / 4 - yoe / 100); let mp = (5 * doy + 2) / 153; let d = (doy - (153 * mp + 2) / 5) + 1; let m = mp + if mp < 10 { 3 } else { -9 }; if m <= 2 {
y += 1;
}
(y as i32, m as u32, d as u32)
}
fn component_from_days(field: Date32Field, days: i32) -> i32 {
let (year, month, day) = ymd_from_epoch_days(days);
match field {
Date32Field::Year => year,
Date32Field::Month => month as i32,
Date32Field::Day => day as i32,
Date32Field::DayOfWeek => day_of_week_sunday0(days),
}
}
fn day_of_week_sunday0(days_since_epoch: i32) -> i32 {
(days_since_epoch + 4).rem_euclid(7)
}
fn timestamp_unit(data_type: &DataType) -> Option<TimeUnit> {
match data_type {
DataType::Timestamp(unit, _) => Some(*unit),
_ => None,
}
}
fn timestamp_to_days_since_epoch(value: i64, unit: TimeUnit) -> i32 {
let ticks_per_day = match unit {
TimeUnit::Second => 86_400,
TimeUnit::Millisecond => 86_400_000,
TimeUnit::Microsecond => 86_400_000_000,
TimeUnit::Nanosecond => 86_400_000_000_000,
};
(value.div_euclid(ticks_per_day)) as i32
}
fn ymd_to_epoch_days(year: i32, month: u32, day: u32) -> i32 {
let y = year as i64 - if month <= 2 { 1 } else { 0 };
let era = if y >= 0 { y / 400 } else { (y - 399) / 400 };
let yoe = y - era * 400; let m = month as i64;
let d = day as i64;
let mp = m + if m > 2 { -3 } else { 9 }; let doy = (153 * mp + 2) / 5 + d - 1; let doe = yoe * 365 + yoe / 4 - yoe / 100 + doy; (era * 146_097 + doe - 719_468) as i32
}
#[async_trait::async_trait]
impl LiquidSqueezedArray for SqueezedDate32Array {
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn get_array_memory_size(&self) -> usize {
self.get_array_memory_size()
}
fn len(&self) -> usize {
self.len()
}
async fn to_arrow_array(&self) -> ArrayRef {
let bytes = self.read_backing().await;
let liquid = crate::liquid_array::ipc::read_from_bytes(
bytes,
&crate::liquid_array::ipc::LiquidIPCContext::new(None),
);
liquid.to_arrow_array()
}
fn data_type(&self) -> LiquidDataType {
LiquidDataType::Integer
}
fn original_arrow_data_type(&self) -> DataType {
self.original_data_type.clone()
}
async fn filter(&self, selection: &BooleanBuffer) -> ArrayRef {
if selection.count_set_bits() == 0 {
return arrow::array::new_empty_array(&self.original_arrow_data_type());
}
let full = self.to_arrow_array().await;
let selection_array = BooleanArray::new(selection.clone(), None);
arrow::compute::filter(&full, &selection_array).unwrap()
}
async fn try_eval_predicate(
&self,
_predicate: &Arc<dyn datafusion::physical_plan::PhysicalExpr>,
_filter: &BooleanBuffer,
) -> Option<BooleanArray> {
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::types::TimestampMicrosecondType;
use arrow::array::{Array, PrimitiveArray};
use std::sync::Arc;
fn dates(vals: &[Option<i32>]) -> PrimitiveArray<Date32Type> {
PrimitiveArray::<Date32Type>::from(vals.to_vec())
}
fn assert_prim_eq<T: ArrowPrimitiveType>(a: PrimitiveArray<T>, b: PrimitiveArray<T>) {
let a_ref: arrow::array::ArrayRef = Arc::new(a);
let b_ref: arrow::array::ArrayRef = Arc::new(b);
assert_eq!(a_ref.as_ref(), b_ref.as_ref());
}
fn extract(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
let arr = dates(&input);
let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
squeezed.to_component_date32()
}
fn lossy(field: Date32Field, input: Vec<Option<i32>>) -> PrimitiveArray<Date32Type> {
let arr = dates(&input);
let liquid = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(arr);
let squeezed = SqueezedDate32Array::from_liquid_date32(&liquid, field);
squeezed.to_arrow_date32_lossy()
}
#[test]
fn test_extraction_correctness() {
let input = vec![
Some(-1),
Some(0),
Some(ymd_to_epoch_days(1971, 7, 15)),
None,
];
let expected =
PrimitiveArray::<Date32Type>::from(vec![Some(1969), Some(1970), Some(1971), None]);
assert_prim_eq(extract(Date32Field::Year, input), expected);
let input = vec![
Some(ymd_to_epoch_days(1970, 1, 31)),
Some(ymd_to_epoch_days(1970, 2, 1)),
Some(ymd_to_epoch_days(1970, 12, 31)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(2), Some(12), None]);
assert_prim_eq(extract(Date32Field::Month, input), expected);
let input = vec![
Some(ymd_to_epoch_days(1970, 1, 1)),
Some(ymd_to_epoch_days(1970, 1, 31)),
Some(ymd_to_epoch_days(1970, 2, 1)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![Some(1), Some(31), Some(1), None]);
assert_prim_eq(extract(Date32Field::Day, input), expected);
let input = vec![
Some(ymd_to_epoch_days(1970, 1, 4)),
Some(ymd_to_epoch_days(1970, 1, 5)),
Some(ymd_to_epoch_days(1970, 1, 10)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![Some(0), Some(1), Some(6), None]);
assert_prim_eq(extract(Date32Field::DayOfWeek, input), expected);
}
#[test]
fn test_lossy_reconstruction_mapping() {
let input = vec![
Some(ymd_to_epoch_days(1999, 12, 31)),
Some(ymd_to_epoch_days(2000, 6, 1)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![
Some(ymd_to_epoch_days(1999, 1, 1)),
Some(ymd_to_epoch_days(2000, 1, 1)),
None,
]);
assert_prim_eq(lossy(Date32Field::Year, input), expected);
let input = vec![
Some(ymd_to_epoch_days(1980, 3, 14)),
Some(ymd_to_epoch_days(1977, 12, 5)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![
Some(ymd_to_epoch_days(1970, 3, 1)),
Some(ymd_to_epoch_days(1970, 12, 1)),
None,
]);
assert_prim_eq(lossy(Date32Field::Month, input), expected);
let input = vec![
Some(ymd_to_epoch_days(1980, 3, 14)),
Some(ymd_to_epoch_days(1977, 12, 5)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![
Some(ymd_to_epoch_days(1970, 1, 14)),
Some(ymd_to_epoch_days(1970, 1, 5)),
None,
]);
assert_prim_eq(lossy(Date32Field::Day, input), expected);
let input = vec![
Some(ymd_to_epoch_days(2020, 5, 17)),
Some(ymd_to_epoch_days(2020, 5, 18)),
None,
];
let expected = PrimitiveArray::<Date32Type>::from(vec![
Some(ymd_to_epoch_days(1970, 1, 4)),
Some(ymd_to_epoch_days(1970, 1, 5)),
None,
]);
assert_prim_eq(lossy(Date32Field::DayOfWeek, input), expected);
}
#[test]
fn test_roundtrip_idempotence() {
let input = vec![
Some(ymd_to_epoch_days(1969, 12, 31)),
Some(ymd_to_epoch_days(1970, 1, 1)),
Some(ymd_to_epoch_days(1970, 1, 31)),
Some(ymd_to_epoch_days(1970, 2, 1)),
Some(ymd_to_epoch_days(1971, 7, 15)),
None,
];
for &field in &[
Date32Field::Year,
Date32Field::Month,
Date32Field::Day,
Date32Field::DayOfWeek,
] {
let comp1 = extract(field, input.clone());
let lossy_dt = lossy(field, input.clone());
let liquid2 = LiquidPrimitiveArray::<Date32Type>::from_arrow_array(lossy_dt);
let comp2 =
SqueezedDate32Array::from_liquid_date32(&liquid2, field).to_component_date32();
assert_prim_eq(comp1, comp2);
}
}
#[test]
fn test_all_nulls_behavior() {
let input = vec![None, None, None];
for &field in &[
Date32Field::Year,
Date32Field::Month,
Date32Field::Day,
Date32Field::DayOfWeek,
] {
let comp = extract(field, input.clone());
let expected_comp = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
assert_prim_eq(comp, expected_comp);
let lossy_dt = lossy(field, input.clone());
let expected_dt = PrimitiveArray::<Date32Type>::from(vec![None, None, None]);
assert_prim_eq(lossy_dt, expected_dt);
}
}
#[test]
fn test_timestamp_extraction() {
let input = vec![
Some(1_609_459_200_000_000),
Some(1_640_995_200_000_000),
None,
];
let arr = PrimitiveArray::<TimestampMicrosecondType>::from(input);
let liquid = LiquidPrimitiveArray::<TimestampMicrosecondType>::from_arrow_array(arr);
let squeezed = SqueezedDate32Array::from_liquid_timestamp(&liquid, Date32Field::Year);
let component = squeezed.to_component_array();
let out = component
.as_any()
.downcast_ref::<PrimitiveArray<TimestampMicrosecondType>>()
.expect("timestamp array");
assert_eq!(out.value(0), 2021);
assert_eq!(out.value(1), 2022);
assert!(out.is_null(2));
}
}